Coverage for amqtt/mqtt/pubrel.py: 95%

32 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from typing_extensions import Self 

2 

3from amqtt.errors import AMQTTError 

4from amqtt.mqtt.packet import PUBREL, MQTTFixedHeader, MQTTPacket, PacketIdVariableHeader 

5 

6 

7class PubrelPacket(MQTTPacket[PacketIdVariableHeader, None, MQTTFixedHeader]): 

8 VARIABLE_HEADER = PacketIdVariableHeader 

9 PAYLOAD = None 

10 

11 def __init__( 

12 self, 

13 fixed: MQTTFixedHeader | None = None, 

14 variable_header: PacketIdVariableHeader | None = None, 

15 ) -> None: 

16 if fixed is None: 

17 header = MQTTFixedHeader(PUBREL, 0x02) # [MQTT-3.6.1-1] 

18 else: 

19 if fixed.packet_type is not PUBREL: 

20 msg = f"Invalid fixed packet type {fixed.packet_type} for PubrelPacket init" 

21 raise AMQTTError(msg) 

22 header = fixed 

23 super().__init__(header) 

24 self.variable_header = variable_header 

25 self.payload = None 

26 

27 @classmethod 

28 def build(cls, packet_id: int) -> Self: 

29 variable_header = PacketIdVariableHeader(packet_id) 

30 return cls(variable_header=variable_header) 

31 

32 @property 

33 def packet_id(self) -> int: 

34 if self.variable_header is None: 

35 msg = "Variable header is not set" 

36 raise ValueError(msg) 

37 return self.variable_header.packet_id 

38 

39 @packet_id.setter 

40 def packet_id(self, val: int) -> None: 

41 if self.variable_header is None: 41 ↛ 44line 41 didn't jump to line 44 because the condition on line 41 was always true

42 msg = "Variable header is not set" 

43 raise ValueError(msg) 

44 self.variable_header.packet_id = val