Coverage for amqtt/codecs_amqtt.py: 91%

63 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1import asyncio 

2from decimal import ROUND_HALF_UP, Decimal 

3from struct import pack, unpack 

4 

5from amqtt.adapters import ReaderAdapter 

6from amqtt.errors import NoDataError, ZeroLengthReadError 

7 

8 

9def bytes_to_hex_str(data: bytes | bytearray) -> str: 

10 """Convert a sequence of bytes into its displayable hex representation, ie: 0x??????. 

11 

12 :param data: byte sequence 

13 :return: Hexadecimal displayable representation. 

14 """ 

15 return "0x" + "".join(format(b, "02x") for b in data) 

16 

17 

18def bytes_to_int(data: bytes | int) -> int: 

19 """Convert a sequence of bytes to an integer using big endian byte ordering. 

20 

21 :param data: byte sequence 

22 :return: integer value. 

23 """ 

24 if isinstance(data, int): 

25 return data 

26 

27 return int.from_bytes(data, byteorder="big") 

28 

29 

30def int_to_bytes(int_value: int, length: int) -> bytes: 

31 """Convert an integer to a sequence of bytes using big endian byte ordering. 

32 

33 :param int_value: integer value to convert 

34 :param length: byte length (must be 1 or 2) 

35 :return: byte sequence 

36 :raises ValueError: if the length is unsupported 

37 """ 

38 # Map length to the appropriate format string 

39 fmt_mapping = { 

40 1: "!B", # 1 byte, unsigned char 

41 2: "!H", # 2 bytes, unsigned short 

42 } 

43 

44 fmt = fmt_mapping.get(length) 

45 if not fmt: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true

46 msg = "Unsupported length for int to bytes conversion. Only lengths 1 or 2 are allowed." 

47 raise ValueError(msg) 

48 

49 return pack(fmt, int_value) 

50 

51 

52async def read_or_raise(reader: ReaderAdapter | asyncio.StreamReader, n: int = -1) -> bytes: 

53 """Read a given byte number from Stream. NoDataException is raised if read gives no data. 

54 

55 :param reader: reader adapter 

56 :param n: number of bytes to read 

57 :return: bytes read. 

58 """ 

59 try: 

60 data = await reader.read(n) 

61 except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError): 

62 data = None 

63 if data is None: 

64 msg = "No more data" 

65 raise NoDataError(msg) 

66 return data 

67 

68 

69async def decode_string(reader: ReaderAdapter | asyncio.StreamReader) -> str: 

70 """Read a string from a reader and decode it according to MQTT string specification. 

71 

72 :param reader: Stream reader 

73 :return: string read from stream. 

74 """ 

75 length_bytes = await read_or_raise(reader, 2) 

76 if len(length_bytes) < 1: 

77 raise ZeroLengthReadError 

78 str_length = unpack("!H", length_bytes)[0] 

79 if str_length: 

80 byte_str = await read_or_raise(reader, str_length) 

81 try: 

82 return byte_str.decode(encoding="utf-8") 

83 except UnicodeDecodeError: 

84 return str(byte_str) 

85 else: 

86 return "" 

87 

88 

89async def decode_data_with_length(reader: ReaderAdapter | asyncio.StreamReader) -> bytes: 

90 """Read data from a reader. Data is prefixed with 2 bytes length. 

91 

92 :param reader: Stream reader 

93 :return: bytes read from stream (without length). 

94 """ 

95 length_bytes = await read_or_raise(reader, 2) 

96 if len(length_bytes) < 1: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true

97 raise ZeroLengthReadError 

98 bytes_length = unpack("!H", length_bytes)[0] 

99 return await read_or_raise(reader, bytes_length) 

100 

101 

102def encode_string(string: str) -> bytes: 

103 """Encode a string with its length as prefix. 

104 

105 :param string: string to encode 

106 :return: string with length prefix. 

107 """ 

108 data = string.encode(encoding="utf-8") 

109 data_length = len(data) 

110 return int_to_bytes(data_length, 2) + data 

111 

112 

113def encode_data_with_length(data: bytes | bytearray) -> bytes: 

114 """Encode data with its length as prefix. 

115 

116 :param data: data to encode 

117 :return: data with length prefix. 

118 """ 

119 data_length = len(data) 

120 return int_to_bytes(data_length, 2) + data 

121 

122 

123async def decode_packet_id(reader: ReaderAdapter | asyncio.StreamReader) -> int: 

124 """Read a packet ID as 2-bytes int from stream according to MQTT specification (2.3.1). 

125 

126 :param reader: Stream reader 

127 :return: Packet ID. 

128 """ 

129 packet_id_bytes = await read_or_raise(reader, 2) 

130 packet_id = unpack("!H", packet_id_bytes) 

131 packet: int = packet_id[0] 

132 return packet 

133 

134 

135def int_to_bytes_str(value: int) -> bytes: 

136 """Convert an int value to a bytes array containing the numeric character. 

137 

138 Ex: 123 -> b'123' 

139 :param value: int value to convert 

140 :return: bytes array. 

141 """ 

142 return str(value).encode("utf-8") 

143 

144 

145def float_to_bytes_str(value: float, places: int = 3) -> bytes: 

146 """Convert an float value to a bytes array containing the numeric character.""" 

147 quant = Decimal(f"0.{''.join(['0' for i in range(places - 1)])}1") 

148 rounded = Decimal(value).quantize(quant, rounding=ROUND_HALF_UP) 

149 return str(rounded).encode("utf-8")