Coverage for amqtt/mqtt/suback.py: 88%
55 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1import asyncio
2from typing_extensions import Self
4from amqtt.adapters import ReaderAdapter
5from amqtt.codecs_amqtt import bytes_to_int, int_to_bytes, read_or_raise
6from amqtt.errors import AMQTTError, NoDataError
7from amqtt.mqtt.packet import SUBACK, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader, PacketIdVariableHeader
10class SubackPayload(MQTTPayload[MQTTVariableHeader]):
11 __slots__ = ("return_codes",)
13 RETURN_CODE_00 = 0x00
14 RETURN_CODE_01 = 0x01
15 RETURN_CODE_02 = 0x02
16 RETURN_CODE_80 = 0x80
18 def __init__(self, return_codes: list[int] | None = None) -> None:
19 super().__init__()
20 self.return_codes = return_codes or []
22 def __repr__(self) -> str:
23 """Return a string representation of the SubackPayload object."""
24 return f"{type(self).__name__}(return_codes={self.return_codes!r})"
26 def to_bytes(
27 self,
28 fixed_header: MQTTFixedHeader | None = None,
29 variable_header: MQTTVariableHeader | None = None,
30 ) -> bytes:
31 out = b""
32 for return_code in self.return_codes:
33 out += int_to_bytes(return_code, 1)
34 return out
36 @classmethod
37 async def from_stream(
38 cls,
39 reader: asyncio.StreamReader | ReaderAdapter,
40 fixed_header: MQTTFixedHeader | None,
41 variable_header: MQTTVariableHeader | None,
42 ) -> Self:
43 return_codes = []
44 if fixed_header is None or variable_header is None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 msg = "Fixed header or variable header cannot be None"
46 raise AMQTTError(msg)
48 bytes_to_read = fixed_header.remaining_length - variable_header.bytes_length
49 for _ in range(bytes_to_read):
50 try:
51 return_code_byte = await read_or_raise(reader, 1)
52 return_code = bytes_to_int(return_code_byte)
53 return_codes.append(return_code)
54 except NoDataError:
55 break
56 return cls(return_codes)
59class SubackPacket(MQTTPacket[PacketIdVariableHeader, SubackPayload, MQTTFixedHeader]):
60 VARIABLE_HEADER = PacketIdVariableHeader
61 PAYLOAD = SubackPayload
63 def __init__(
64 self,
65 fixed: MQTTFixedHeader | None = None,
66 variable_header: PacketIdVariableHeader | None = None,
67 payload: SubackPayload | None = None,
68 ) -> None:
69 if fixed is None:
70 header = MQTTFixedHeader(SUBACK, 0x00)
71 else:
72 if fixed.packet_type is not SUBACK: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 msg = f"Invalid fixed packet type {fixed.packet_type} for SubackPacket init"
74 raise AMQTTError(msg)
75 header = fixed
77 super().__init__(header)
78 self.variable_header = variable_header
79 self.payload = payload
81 @classmethod
82 def build(cls, packet_id: int, return_codes: list[int]) -> Self:
83 variable_header = cls.VARIABLE_HEADER(packet_id)
84 payload = cls.PAYLOAD(return_codes)
85 return cls(variable_header=variable_header, payload=payload)