Coverage for amqtt/mqtt/subscribe.py: 88%
55 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1import asyncio
2from typing_extensions import Self
4from amqtt.adapters import ReaderAdapter
5from amqtt.codecs_amqtt import bytes_to_int, decode_string, encode_string, int_to_bytes, read_or_raise
6from amqtt.errors import AMQTTError, NoDataError
7from amqtt.mqtt.packet import SUBSCRIBE, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader, PacketIdVariableHeader
10class SubscribePayload(MQTTPayload[MQTTVariableHeader]):
11 __slots__ = ("topics",)
13 def __init__(self, topics: list[tuple[str, int]] | None = None) -> None:
14 super().__init__()
15 self.topics = topics or []
17 def to_bytes(
18 self,
19 fixed_header: MQTTFixedHeader | None = None,
20 variable_header: MQTTVariableHeader | None = None,
21 ) -> bytes:
22 out = b""
23 for topic in self.topics:
24 out += encode_string(topic[0])
25 out += int_to_bytes(topic[1], 1)
26 return out
28 @classmethod
29 async def from_stream(
30 cls,
31 reader: asyncio.StreamReader | ReaderAdapter,
32 fixed_header: MQTTFixedHeader | None,
33 variable_header: MQTTVariableHeader | None,
34 ) -> Self:
35 topics = []
36 if fixed_header is None or variable_header is None: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 msg = "Fixed header or variable header cannot be None"
38 raise ValueError(msg)
40 payload_length = fixed_header.remaining_length - variable_header.bytes_length
41 read_bytes = 0
42 while read_bytes < payload_length:
43 try:
44 topic = await decode_string(reader)
45 qos_byte = await read_or_raise(reader, 1)
46 qos = bytes_to_int(qos_byte)
47 topics.append((topic, qos))
48 read_bytes += 2 + len(topic.encode("utf-8")) + 1
49 except NoDataError:
50 break
51 return cls(topics)
53 def __repr__(self) -> str:
54 """Return a string representation of the SubscribePayload object."""
55 return type(self).__name__ + f"(topics={self.topics!r})"
58class SubscribePacket(MQTTPacket[PacketIdVariableHeader, SubscribePayload, MQTTFixedHeader]):
59 VARIABLE_HEADER = PacketIdVariableHeader
60 PAYLOAD = SubscribePayload
62 def __init__(
63 self,
64 fixed: MQTTFixedHeader | None = None,
65 variable_header: PacketIdVariableHeader | None = None,
66 payload: SubscribePayload | None = None,
67 ) -> None:
68 if fixed is None:
69 header = MQTTFixedHeader(SUBSCRIBE, 0x02) # [MQTT-3.8.1-1]
70 else:
71 if fixed.packet_type is not SUBSCRIBE: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 msg = f"Invalid fixed packet type {fixed.packet_type} for SubscribePacket init"
73 raise AMQTTError(msg)
74 header = fixed
76 super().__init__(header)
77 self.variable_header = variable_header
78 self.payload = payload
80 @classmethod
81 def build(cls, topics: list[tuple[str, int]], packet_id: int) -> Self:
82 v_header = PacketIdVariableHeader(packet_id)
83 payload = SubscribePayload(topics)
84 return cls(variable_header=v_header, payload=payload)