Coverage for amqtt/mqtt/suback.py: 88%

55 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1import asyncio 

2from typing_extensions import Self 

3 

4from amqtt.adapters import ReaderAdapter 

5from amqtt.codecs_amqtt import bytes_to_int, int_to_bytes, read_or_raise 

6from amqtt.errors import AMQTTError, NoDataError 

7from amqtt.mqtt.packet import SUBACK, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader, PacketIdVariableHeader 

8 

9 

10class SubackPayload(MQTTPayload[MQTTVariableHeader]): 

11 __slots__ = ("return_codes",) 

12 

13 RETURN_CODE_00 = 0x00 

14 RETURN_CODE_01 = 0x01 

15 RETURN_CODE_02 = 0x02 

16 RETURN_CODE_80 = 0x80 

17 

18 def __init__(self, return_codes: list[int] | None = None) -> None: 

19 super().__init__() 

20 self.return_codes = return_codes or [] 

21 

22 def __repr__(self) -> str: 

23 """Return a string representation of the SubackPayload object.""" 

24 return f"{type(self).__name__}(return_codes={self.return_codes!r})" 

25 

26 def to_bytes( 

27 self, 

28 fixed_header: MQTTFixedHeader | None = None, 

29 variable_header: MQTTVariableHeader | None = None, 

30 ) -> bytes: 

31 out = b"" 

32 for return_code in self.return_codes: 

33 out += int_to_bytes(return_code, 1) 

34 return out 

35 

36 @classmethod 

37 async def from_stream( 

38 cls, 

39 reader: asyncio.StreamReader | ReaderAdapter, 

40 fixed_header: MQTTFixedHeader | None, 

41 variable_header: MQTTVariableHeader | None, 

42 ) -> Self: 

43 return_codes = [] 

44 if fixed_header is None or variable_header is None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 msg = "Fixed header or variable header cannot be None" 

46 raise AMQTTError(msg) 

47 

48 bytes_to_read = fixed_header.remaining_length - variable_header.bytes_length 

49 for _ in range(bytes_to_read): 

50 try: 

51 return_code_byte = await read_or_raise(reader, 1) 

52 return_code = bytes_to_int(return_code_byte) 

53 return_codes.append(return_code) 

54 except NoDataError: 

55 break 

56 return cls(return_codes) 

57 

58 

59class SubackPacket(MQTTPacket[PacketIdVariableHeader, SubackPayload, MQTTFixedHeader]): 

60 VARIABLE_HEADER = PacketIdVariableHeader 

61 PAYLOAD = SubackPayload 

62 

63 def __init__( 

64 self, 

65 fixed: MQTTFixedHeader | None = None, 

66 variable_header: PacketIdVariableHeader | None = None, 

67 payload: SubackPayload | None = None, 

68 ) -> None: 

69 if fixed is None: 

70 header = MQTTFixedHeader(SUBACK, 0x00) 

71 else: 

72 if fixed.packet_type is not SUBACK: 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 msg = f"Invalid fixed packet type {fixed.packet_type} for SubackPacket init" 

74 raise AMQTTError(msg) 

75 header = fixed 

76 

77 super().__init__(header) 

78 self.variable_header = variable_header 

79 self.payload = payload 

80 

81 @classmethod 

82 def build(cls, packet_id: int, return_codes: list[int]) -> Self: 

83 variable_header = cls.VARIABLE_HEADER(packet_id) 

84 payload = cls.PAYLOAD(return_codes) 

85 return cls(variable_header=variable_header, payload=payload)