Coverage for amqtt/mqtt/pingresp.py: 87%

19 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from typing_extensions import Self 

2 

3from amqtt.errors import AMQTTError 

4from amqtt.mqtt.packet import PINGRESP, MQTTFixedHeader, MQTTPacket 

5 

6 

7class PingRespPacket(MQTTPacket[None, None, MQTTFixedHeader]): 

8 VARIABLE_HEADER = None 

9 PAYLOAD = None 

10 

11 def __init__(self, fixed: MQTTFixedHeader | None = None) -> None: 

12 if fixed is None: 

13 header = MQTTFixedHeader(PINGRESP, 0x00) 

14 else: 

15 if fixed.packet_type is not PINGRESP: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true

16 msg = f"Invalid fixed packet type {fixed.packet_type} for PingRespPacket init" 

17 raise AMQTTError(msg) 

18 header = fixed 

19 super().__init__(header) 

20 self.variable_header = None 

21 self.payload = None 

22 

23 @classmethod 

24 def build(cls) -> Self: 

25 return cls()