Coverage for amqtt/codecs_amqtt.py: 91%
63 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1import asyncio
2from decimal import ROUND_HALF_UP, Decimal
3from struct import pack, unpack
5from amqtt.adapters import ReaderAdapter
6from amqtt.errors import NoDataError, ZeroLengthReadError
9def bytes_to_hex_str(data: bytes | bytearray) -> str:
10 """Convert a sequence of bytes into its displayable hex representation, ie: 0x??????.
12 :param data: byte sequence
13 :return: Hexadecimal displayable representation.
14 """
15 return "0x" + "".join(format(b, "02x") for b in data)
18def bytes_to_int(data: bytes | int) -> int:
19 """Convert a sequence of bytes to an integer using big endian byte ordering.
21 :param data: byte sequence
22 :return: integer value.
23 """
24 if isinstance(data, int):
25 return data
27 return int.from_bytes(data, byteorder="big")
30def int_to_bytes(int_value: int, length: int) -> bytes:
31 """Convert an integer to a sequence of bytes using big endian byte ordering.
33 :param int_value: integer value to convert
34 :param length: byte length (must be 1 or 2)
35 :return: byte sequence
36 :raises ValueError: if the length is unsupported
37 """
38 # Map length to the appropriate format string
39 fmt_mapping = {
40 1: "!B", # 1 byte, unsigned char
41 2: "!H", # 2 bytes, unsigned short
42 }
44 fmt = fmt_mapping.get(length)
45 if not fmt: 45 ↛ 46line 45 didn't jump to line 46 because the condition on line 45 was never true
46 msg = "Unsupported length for int to bytes conversion. Only lengths 1 or 2 are allowed."
47 raise ValueError(msg)
49 return pack(fmt, int_value)
52async def read_or_raise(reader: ReaderAdapter | asyncio.StreamReader, n: int = -1) -> bytes:
53 """Read a given byte number from Stream. NoDataException is raised if read gives no data.
55 :param reader: reader adapter
56 :param n: number of bytes to read
57 :return: bytes read.
58 """
59 try:
60 data = await reader.read(n)
61 except (asyncio.IncompleteReadError, ConnectionResetError, BrokenPipeError):
62 data = None
63 if data is None:
64 msg = "No more data"
65 raise NoDataError(msg)
66 return data
69async def decode_string(reader: ReaderAdapter | asyncio.StreamReader) -> str:
70 """Read a string from a reader and decode it according to MQTT string specification.
72 :param reader: Stream reader
73 :return: string read from stream.
74 """
75 length_bytes = await read_or_raise(reader, 2)
76 if len(length_bytes) < 1:
77 raise ZeroLengthReadError
78 str_length = unpack("!H", length_bytes)[0]
79 if str_length:
80 byte_str = await read_or_raise(reader, str_length)
81 try:
82 return byte_str.decode(encoding="utf-8")
83 except UnicodeDecodeError:
84 return str(byte_str)
85 else:
86 return ""
89async def decode_data_with_length(reader: ReaderAdapter | asyncio.StreamReader) -> bytes:
90 """Read data from a reader. Data is prefixed with 2 bytes length.
92 :param reader: Stream reader
93 :return: bytes read from stream (without length).
94 """
95 length_bytes = await read_or_raise(reader, 2)
96 if len(length_bytes) < 1: 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true
97 raise ZeroLengthReadError
98 bytes_length = unpack("!H", length_bytes)[0]
99 return await read_or_raise(reader, bytes_length)
102def encode_string(string: str) -> bytes:
103 """Encode a string with its length as prefix.
105 :param string: string to encode
106 :return: string with length prefix.
107 """
108 data = string.encode(encoding="utf-8")
109 data_length = len(data)
110 return int_to_bytes(data_length, 2) + data
113def encode_data_with_length(data: bytes | bytearray) -> bytes:
114 """Encode data with its length as prefix.
116 :param data: data to encode
117 :return: data with length prefix.
118 """
119 data_length = len(data)
120 return int_to_bytes(data_length, 2) + data
123async def decode_packet_id(reader: ReaderAdapter | asyncio.StreamReader) -> int:
124 """Read a packet ID as 2-bytes int from stream according to MQTT specification (2.3.1).
126 :param reader: Stream reader
127 :return: Packet ID.
128 """
129 packet_id_bytes = await read_or_raise(reader, 2)
130 packet_id = unpack("!H", packet_id_bytes)
131 packet: int = packet_id[0]
132 return packet
135def int_to_bytes_str(value: int) -> bytes:
136 """Convert an int value to a bytes array containing the numeric character.
138 Ex: 123 -> b'123'
139 :param value: int value to convert
140 :return: bytes array.
141 """
142 return str(value).encode("utf-8")
145def float_to_bytes_str(value: float, places: int = 3) -> bytes:
146 """Convert an float value to a bytes array containing the numeric character."""
147 quant = Decimal(f"0.{''.join(['0' for i in range(places - 1)])}1")
148 rounded = Decimal(value).quantize(quant, rounding=ROUND_HALF_UP)
149 return str(rounded).encode("utf-8")