Coverage for amqtt/mqtt/pubcomp.py: 95%
32 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1from typing_extensions import Self
3from amqtt.errors import AMQTTError
4from amqtt.mqtt.packet import PUBCOMP, MQTTFixedHeader, MQTTPacket, PacketIdVariableHeader
7class PubcompPacket(MQTTPacket[PacketIdVariableHeader, None, MQTTFixedHeader]):
8 VARIABLE_HEADER = PacketIdVariableHeader
9 PAYLOAD = None
11 def __init__(
12 self,
13 fixed: MQTTFixedHeader | None = None,
14 variable_header: PacketIdVariableHeader | None = None,
15 ) -> None:
16 if fixed is None:
17 header = MQTTFixedHeader(PUBCOMP, 0x00)
18 else:
19 if fixed.packet_type is not PUBCOMP:
20 msg = f"Invalid fixed packet type {fixed.packet_type} for PubcompPacket init"
21 raise AMQTTError(
22 msg,
23 )
24 header = fixed
25 super().__init__(header)
26 self.variable_header = variable_header
27 self.payload = None
29 @classmethod
30 def build(cls, packet_id: int) -> Self:
31 v_header = PacketIdVariableHeader(packet_id)
32 return cls(variable_header=v_header)
34 @property
35 def packet_id(self) -> int:
36 if self.variable_header is None:
37 msg = "Variable header is not set"
38 raise ValueError(msg)
39 return self.variable_header.packet_id
41 @packet_id.setter
42 def packet_id(self, val: int) -> None:
43 if self.variable_header is None: 43 ↛ 46line 43 didn't jump to line 46 because the condition on line 43 was always true
44 msg = "Variable header is not set"
45 raise ValueError(msg)
46 self.variable_header.packet_id = val