Coverage for amqtt/mqtt/unsubscribe.py: 87%

50 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from asyncio import StreamReader 

2from typing_extensions import Self 

3 

4from amqtt.adapters import ReaderAdapter 

5from amqtt.codecs_amqtt import decode_string, encode_string 

6from amqtt.errors import AMQTTError, NoDataError 

7from amqtt.mqtt.packet import UNSUBSCRIBE, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader, PacketIdVariableHeader 

8 

9 

10class UnubscribePayload(MQTTPayload[MQTTVariableHeader]): 

11 __slots__ = ("topics",) 

12 

13 def __init__(self, topics: list[str] | None = None) -> None: 

14 super().__init__() 

15 self.topics = topics or [] 

16 

17 def to_bytes(self, fixed_header: MQTTFixedHeader | None = None, variable_header: MQTTVariableHeader | None = None) -> bytes: 

18 out = b"" 

19 for topic in self.topics: 

20 out += encode_string(topic) 

21 return out 

22 

23 @classmethod 

24 async def from_stream( 

25 cls: type[Self], 

26 reader: StreamReader | ReaderAdapter, 

27 fixed_header: MQTTFixedHeader | None, 

28 variable_header: MQTTVariableHeader | None, 

29 ) -> Self: 

30 if fixed_header is None or variable_header is None: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true

31 msg = "Fixed header or Value header is not set." 

32 raise ValueError(msg) 

33 

34 topics = [] 

35 payload_length = fixed_header.remaining_length - variable_header.bytes_length 

36 read_bytes = 0 

37 while read_bytes < payload_length: 

38 try: 

39 topic = await decode_string(reader) 

40 topics.append(topic) 

41 read_bytes += 2 + len(topic.encode("utf-8")) 

42 except NoDataError: 

43 break 

44 return cls(topics) 

45 

46 

47class UnsubscribePacket(MQTTPacket[PacketIdVariableHeader, UnubscribePayload, MQTTFixedHeader]): 

48 VARIABLE_HEADER = PacketIdVariableHeader 

49 PAYLOAD = UnubscribePayload 

50 

51 def __init__( 

52 self, 

53 fixed: MQTTFixedHeader | None = None, 

54 variable_header: PacketIdVariableHeader | None = None, 

55 payload: UnubscribePayload | None = None, 

56 ) -> None: 

57 if fixed is None: 

58 header = MQTTFixedHeader(UNSUBSCRIBE, 0x02) # [MQTT-3.10.1-1] 

59 else: 

60 if fixed.packet_type is not UNSUBSCRIBE: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 msg = f"Invalid fixed packet type {fixed.packet_type} for UnsubscribePacket init" 

62 raise AMQTTError(msg) 

63 header = fixed 

64 

65 super().__init__(header) 

66 self.variable_header = variable_header 

67 self.payload = payload 

68 

69 @classmethod 

70 def build(cls, topics: list[str], packet_id: int) -> "UnsubscribePacket": 

71 v_header = PacketIdVariableHeader(packet_id) 

72 payload = UnubscribePayload(topics) 

73 return UnsubscribePacket(variable_header=v_header, payload=payload)