Coverage for amqtt/mqtt/pubcomp.py: 95%

32 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from typing_extensions import Self 

2 

3from amqtt.errors import AMQTTError 

4from amqtt.mqtt.packet import PUBCOMP, MQTTFixedHeader, MQTTPacket, PacketIdVariableHeader 

5 

6 

7class PubcompPacket(MQTTPacket[PacketIdVariableHeader, None, MQTTFixedHeader]): 

8 VARIABLE_HEADER = PacketIdVariableHeader 

9 PAYLOAD = None 

10 

11 def __init__( 

12 self, 

13 fixed: MQTTFixedHeader | None = None, 

14 variable_header: PacketIdVariableHeader | None = None, 

15 ) -> None: 

16 if fixed is None: 

17 header = MQTTFixedHeader(PUBCOMP, 0x00) 

18 else: 

19 if fixed.packet_type is not PUBCOMP: 

20 msg = f"Invalid fixed packet type {fixed.packet_type} for PubcompPacket init" 

21 raise AMQTTError( 

22 msg, 

23 ) 

24 header = fixed 

25 super().__init__(header) 

26 self.variable_header = variable_header 

27 self.payload = None 

28 

29 @classmethod 

30 def build(cls, packet_id: int) -> Self: 

31 v_header = PacketIdVariableHeader(packet_id) 

32 return cls(variable_header=v_header) 

33 

34 @property 

35 def packet_id(self) -> int: 

36 if self.variable_header is None: 

37 msg = "Variable header is not set" 

38 raise ValueError(msg) 

39 return self.variable_header.packet_id 

40 

41 @packet_id.setter 

42 def packet_id(self, val: int) -> None: 

43 if self.variable_header is None: 43 ↛ 46line 43 didn't jump to line 46 because the condition on line 43 was always true

44 msg = "Variable header is not set" 

45 raise ValueError(msg) 

46 self.variable_header.packet_id = val