Coverage for amqtt/mqtt/unsubscribe.py: 87%
50 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1from asyncio import StreamReader
2from typing_extensions import Self
4from amqtt.adapters import ReaderAdapter
5from amqtt.codecs_amqtt import decode_string, encode_string
6from amqtt.errors import AMQTTError, NoDataError
7from amqtt.mqtt.packet import UNSUBSCRIBE, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader, PacketIdVariableHeader
10class UnubscribePayload(MQTTPayload[MQTTVariableHeader]):
11 __slots__ = ("topics",)
13 def __init__(self, topics: list[str] | None = None) -> None:
14 super().__init__()
15 self.topics = topics or []
17 def to_bytes(self, fixed_header: MQTTFixedHeader | None = None, variable_header: MQTTVariableHeader | None = None) -> bytes:
18 out = b""
19 for topic in self.topics:
20 out += encode_string(topic)
21 return out
23 @classmethod
24 async def from_stream(
25 cls: type[Self],
26 reader: StreamReader | ReaderAdapter,
27 fixed_header: MQTTFixedHeader | None,
28 variable_header: MQTTVariableHeader | None,
29 ) -> Self:
30 if fixed_header is None or variable_header is None: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true
31 msg = "Fixed header or Value header is not set."
32 raise ValueError(msg)
34 topics = []
35 payload_length = fixed_header.remaining_length - variable_header.bytes_length
36 read_bytes = 0
37 while read_bytes < payload_length:
38 try:
39 topic = await decode_string(reader)
40 topics.append(topic)
41 read_bytes += 2 + len(topic.encode("utf-8"))
42 except NoDataError:
43 break
44 return cls(topics)
47class UnsubscribePacket(MQTTPacket[PacketIdVariableHeader, UnubscribePayload, MQTTFixedHeader]):
48 VARIABLE_HEADER = PacketIdVariableHeader
49 PAYLOAD = UnubscribePayload
51 def __init__(
52 self,
53 fixed: MQTTFixedHeader | None = None,
54 variable_header: PacketIdVariableHeader | None = None,
55 payload: UnubscribePayload | None = None,
56 ) -> None:
57 if fixed is None:
58 header = MQTTFixedHeader(UNSUBSCRIBE, 0x02) # [MQTT-3.10.1-1]
59 else:
60 if fixed.packet_type is not UNSUBSCRIBE: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true
61 msg = f"Invalid fixed packet type {fixed.packet_type} for UnsubscribePacket init"
62 raise AMQTTError(msg)
63 header = fixed
65 super().__init__(header)
66 self.variable_header = variable_header
67 self.payload = payload
69 @classmethod
70 def build(cls, topics: list[str], packet_id: int) -> "UnsubscribePacket":
71 v_header = PacketIdVariableHeader(packet_id)
72 payload = UnubscribePayload(topics)
73 return UnsubscribePacket(variable_header=v_header, payload=payload)