Coverage for amqtt/mqtt/puback.py: 95%
30 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1from typing_extensions import Self
3from amqtt.errors import AMQTTError
4from amqtt.mqtt.packet import PUBACK, MQTTFixedHeader, MQTTPacket, PacketIdVariableHeader
7class PubackPacket(MQTTPacket[PacketIdVariableHeader, None, MQTTFixedHeader]):
8 VARIABLE_HEADER = PacketIdVariableHeader
9 PAYLOAD = None
11 @property
12 def packet_id(self) -> int:
13 if self.variable_header is None:
14 msg = "Variable header is not set"
15 raise ValueError(msg)
16 return self.variable_header.packet_id
18 @packet_id.setter
19 def packet_id(self, val: int) -> None:
20 if self.variable_header is None: 20 ↛ 23line 20 didn't jump to line 23 because the condition on line 20 was always true
21 msg = "Variable header is not set"
22 raise ValueError(msg)
23 self.variable_header.packet_id = val
25 def __init__(
26 self,
27 fixed: MQTTFixedHeader | None = None,
28 variable_header: PacketIdVariableHeader | None = None,
29 ) -> None:
30 if fixed is None:
31 header = MQTTFixedHeader(PUBACK, 0x00)
32 else:
33 if fixed.packet_type is not PUBACK:
34 msg = f"Invalid fixed packet type {fixed.packet_type} for PubackPacket init"
35 raise AMQTTError(msg)
36 header = fixed
38 super().__init__(header, variable_header, None)
40 @classmethod
41 def build(cls, packet_id: int) -> Self:
42 v_header = PacketIdVariableHeader(packet_id)
43 return cls(variable_header=v_header)