Coverage for amqtt/mqtt/subscribe.py: 88%

55 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1import asyncio 

2from typing_extensions import Self 

3 

4from amqtt.adapters import ReaderAdapter 

5from amqtt.codecs_amqtt import bytes_to_int, decode_string, encode_string, int_to_bytes, read_or_raise 

6from amqtt.errors import AMQTTError, NoDataError 

7from amqtt.mqtt.packet import SUBSCRIBE, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader, PacketIdVariableHeader 

8 

9 

10class SubscribePayload(MQTTPayload[MQTTVariableHeader]): 

11 __slots__ = ("topics",) 

12 

13 def __init__(self, topics: list[tuple[str, int]] | None = None) -> None: 

14 super().__init__() 

15 self.topics = topics or [] 

16 

17 def to_bytes( 

18 self, 

19 fixed_header: MQTTFixedHeader | None = None, 

20 variable_header: MQTTVariableHeader | None = None, 

21 ) -> bytes: 

22 out = b"" 

23 for topic in self.topics: 

24 out += encode_string(topic[0]) 

25 out += int_to_bytes(topic[1], 1) 

26 return out 

27 

28 @classmethod 

29 async def from_stream( 

30 cls, 

31 reader: asyncio.StreamReader | ReaderAdapter, 

32 fixed_header: MQTTFixedHeader | None, 

33 variable_header: MQTTVariableHeader | None, 

34 ) -> Self: 

35 topics = [] 

36 if fixed_header is None or variable_header is None: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 msg = "Fixed header or variable header cannot be None" 

38 raise ValueError(msg) 

39 

40 payload_length = fixed_header.remaining_length - variable_header.bytes_length 

41 read_bytes = 0 

42 while read_bytes < payload_length: 

43 try: 

44 topic = await decode_string(reader) 

45 qos_byte = await read_or_raise(reader, 1) 

46 qos = bytes_to_int(qos_byte) 

47 topics.append((topic, qos)) 

48 read_bytes += 2 + len(topic.encode("utf-8")) + 1 

49 except NoDataError: 

50 break 

51 return cls(topics) 

52 

53 def __repr__(self) -> str: 

54 """Return a string representation of the SubscribePayload object.""" 

55 return type(self).__name__ + f"(topics={self.topics!r})" 

56 

57 

58class SubscribePacket(MQTTPacket[PacketIdVariableHeader, SubscribePayload, MQTTFixedHeader]): 

59 VARIABLE_HEADER = PacketIdVariableHeader 

60 PAYLOAD = SubscribePayload 

61 

62 def __init__( 

63 self, 

64 fixed: MQTTFixedHeader | None = None, 

65 variable_header: PacketIdVariableHeader | None = None, 

66 payload: SubscribePayload | None = None, 

67 ) -> None: 

68 if fixed is None: 

69 header = MQTTFixedHeader(SUBSCRIBE, 0x02) # [MQTT-3.8.1-1] 

70 else: 

71 if fixed.packet_type is not SUBSCRIBE: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 msg = f"Invalid fixed packet type {fixed.packet_type} for SubscribePacket init" 

73 raise AMQTTError(msg) 

74 header = fixed 

75 

76 super().__init__(header) 

77 self.variable_header = variable_header 

78 self.payload = payload 

79 

80 @classmethod 

81 def build(cls, topics: list[tuple[str, int]], packet_id: int) -> Self: 

82 v_header = PacketIdVariableHeader(packet_id) 

83 payload = SubscribePayload(topics) 

84 return cls(variable_header=v_header, payload=payload)