Coverage for amqtt/mqtt/disconnect.py: 84%

15 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from amqtt.errors import AMQTTError 

2from amqtt.mqtt.packet import DISCONNECT, MQTTFixedHeader, MQTTPacket 

3 

4 

5class DisconnectPacket(MQTTPacket[None, None, MQTTFixedHeader]): 

6 VARIABLE_HEADER = None 

7 PAYLOAD = None 

8 

9 def __init__(self, fixed: MQTTFixedHeader | None = None) -> None: 

10 if fixed is None: 

11 header = MQTTFixedHeader(DISCONNECT, 0x00) 

12 else: 

13 if fixed.packet_type is not DISCONNECT: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true

14 msg = f"Invalid fixed packet type {fixed.packet_type} for DisconnectPacket init" 

15 raise AMQTTError(msg) 

16 header = fixed 

17 super().__init__(header) 

18 self.variable_header = None 

19 self.payload = None