Coverage for amqtt/mqtt/protocol/handler.py: 69%

517 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1import asyncio 

2 

3try: 

4 from asyncio import InvalidStateError, QueueFull, QueueShutDown 

5except ImportError: 

6 # Fallback for Python < 3.12 

7 class InvalidStateError(Exception): # type: ignore[no-redef] 

8 pass 

9 

10 class QueueFull(Exception): # type: ignore[no-redef] # noqa : N818 

11 pass 

12 

13 class QueueShutDown(Exception): # type: ignore[no-redef] # noqa : N818 

14 pass 

15 

16 

17import collections 

18import itertools 

19import logging 

20from typing import Generic, TypeVar, cast 

21 

22from amqtt.adapters import ReaderAdapter, WriterAdapter 

23from amqtt.contexts import BaseContext 

24from amqtt.errors import AMQTTError, MQTTError, NoDataError, ProtocolHandlerError 

25from amqtt.events import MQTTEvents 

26from amqtt.mqtt import packet_class 

27from amqtt.mqtt.connack import ConnackPacket 

28from amqtt.mqtt.connect import ConnectPacket 

29from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 

30from amqtt.mqtt.disconnect import DisconnectPacket 

31from amqtt.mqtt.packet import ( 

32 CONNACK, 

33 CONNECT, 

34 DISCONNECT, 

35 PINGREQ, 

36 PINGRESP, 

37 PUBACK, 

38 PUBCOMP, 

39 PUBLISH, 

40 PUBREC, 

41 PUBREL, 

42 RESERVED_0, 

43 RESERVED_15, 

44 SUBACK, 

45 SUBSCRIBE, 

46 UNSUBACK, 

47 UNSUBSCRIBE, 

48 MQTTFixedHeader, 

49) 

50from amqtt.mqtt.pingreq import PingReqPacket 

51from amqtt.mqtt.pingresp import PingRespPacket 

52from amqtt.mqtt.puback import PubackPacket 

53from amqtt.mqtt.pubcomp import PubcompPacket 

54from amqtt.mqtt.publish import PublishPacket 

55from amqtt.mqtt.pubrec import PubrecPacket 

56from amqtt.mqtt.pubrel import PubrelPacket 

57from amqtt.mqtt.suback import SubackPacket 

58from amqtt.mqtt.subscribe import SubscribePacket 

59from amqtt.mqtt.unsuback import UnsubackPacket 

60from amqtt.mqtt.unsubscribe import UnsubscribePacket 

61from amqtt.plugins.manager import PluginManager 

62from amqtt.session import INCOMING, OUTGOING, ApplicationMessage, IncomingApplicationMessage, OutgoingApplicationMessage, Session 

63 

64C = TypeVar("C", bound=BaseContext) 

65 

66 

67class ProtocolHandler(Generic[C]): 

68 """Class implementing the MQTT communication protocol using asyncio features.""" 

69 

70 def __init__( 

71 self, 

72 plugins_manager: PluginManager[C], 

73 session: Session | None = None, 

74 loop: asyncio.AbstractEventLoop | None = None, 

75 ) -> None: 

76 self.logger: logging.Logger | logging.LoggerAdapter[logging.Logger] = logging.getLogger(__name__) 

77 if session is not None: 

78 self._init_session(session) 

79 else: 

80 self.session: Session | None = None 

81 self.reader: ReaderAdapter | None = None 

82 self.writer: WriterAdapter | None = None 

83 self.plugins_manager: PluginManager[C] = plugins_manager 

84 

85 try: 

86 self._loop = loop if loop is not None else asyncio.get_running_loop() 

87 except RuntimeError: 

88 self._loop = asyncio.new_event_loop() 

89 asyncio.set_event_loop(self._loop) 

90 

91 self._reader_task: asyncio.Task[None] | None = None 

92 self._keepalive_task: asyncio.TimerHandle | None = None 

93 self._reader_ready: asyncio.Event | None = None 

94 self._reader_stopped = asyncio.Event() 

95 self._puback_waiters: dict[int, asyncio.Future[PubackPacket]] = {} 

96 self._pubrec_waiters: dict[int, asyncio.Future[PubrecPacket]] = {} 

97 self._pubrel_waiters: dict[int, asyncio.Future[PubrelPacket]] = {} 

98 self._pubcomp_waiters: dict[int, asyncio.Future[PubcompPacket]] = {} 

99 self._write_lock = asyncio.Lock() 

100 

101 def _init_session(self, session: Session) -> None: 

102 if not session: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true

103 msg = "Session cannot be None" 

104 raise AMQTTError(msg) 

105 log = logging.getLogger(__name__) 

106 self.session = session 

107 self.logger = logging.LoggerAdapter(log, {"client_id": self.session.client_id}) 

108 self.keepalive_timeout: int | None = self.session.keep_alive 

109 if self.keepalive_timeout <= 0: 

110 self.keepalive_timeout = None 

111 

112 def attach(self, session: Session, reader: ReaderAdapter, writer: WriterAdapter) -> None: 

113 if self.session: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 msg = "Handler is already attached to a session" 

115 raise ProtocolHandlerError(msg) 

116 self._init_session(session) 

117 self.reader = reader 

118 self.writer = writer 

119 

120 def detach(self) -> None: 

121 self.session = None 

122 self.reader = None 

123 self.writer = None 

124 

125 def _is_attached(self) -> bool: 

126 return bool(self.session) 

127 

128 async def start(self) -> None: 

129 if not self._is_attached(): 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true

130 msg = "Handler is not attached to a stream" 

131 raise ProtocolHandlerError(msg) 

132 self._reader_ready = asyncio.Event() 

133 self._reader_stopped = asyncio.Event() 

134 self._reader_task = asyncio.create_task(self._reader_loop()) 

135 await self._reader_ready.wait() 

136 if self._loop is not None and self.keepalive_timeout is not None: 

137 self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout) 

138 self.logger.debug("Handler tasks started") 

139 await self._retry_deliveries() 

140 self.logger.debug("Handler ready") 

141 

142 async def stop(self) -> None: 

143 # Stop messages flow waiter 

144 self._stop_waiters() 

145 if self._keepalive_task: 

146 self._keepalive_task.cancel() 

147 self.logger.debug("Waiting for tasks to be stopped") 

148 if self._reader_task and not self._reader_task.done(): 

149 self._reader_task.cancel() 

150 await self._reader_stopped.wait() 

151 self.logger.debug("Closing writer") 

152 try: 

153 if self.writer is not None: 

154 await self.writer.close() 

155 except asyncio.CancelledError: 

156 # canceling the task is the expected result 

157 self.logger.debug("Writer close was cancelled.") 

158 except asyncio.TimeoutError: 

159 self.logger.debug("Writer close operation timed out.", exc_info=True) 

160 except OSError: 

161 self.logger.debug("Writer close failed due to I/O error.", exc_info=True) 

162 

163 def _stop_waiters(self) -> None: 

164 self.logger.debug(f"Stopping {len(self._puback_waiters)} puback waiters") 

165 self.logger.debug(f"Stopping {len(self._pubcomp_waiters)} pucomp waiters") 

166 self.logger.debug(f"Stopping {len(self._pubrec_waiters)} purec waiters") 

167 self.logger.debug(f"Stopping {len(self._pubrel_waiters)} purel waiters") 

168 for waiter in itertools.chain( 

169 self._puback_waiters.values(), 

170 self._pubcomp_waiters.values(), 

171 self._pubrec_waiters.values(), 

172 self._pubrel_waiters.values(), 

173 ): 

174 if not isinstance(waiter, asyncio.Future): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 msg = "Waiter is not a asyncio.Future" 

176 raise AMQTTError(msg) 

177 waiter.cancel() 

178 

179 async def _retry_deliveries(self) -> None: 

180 """Handle [MQTT-4.4.0-1] by resending PUBLISH and PUBREL messages for pending out messages.""" 

181 self.logger.debug("Begin messages delivery retries") 

182 if self.session is None: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 msg = "Session is not initialized." 

184 raise AMQTTError(msg) 

185 tasks = [ 

186 asyncio.create_task( 

187 asyncio.wait_for( 

188 self._handle_message_flow(cast("IncomingApplicationMessage | OutgoingApplicationMessage", message)), 

189 10, 

190 ), 

191 ) 

192 for message in itertools.chain(self.session.inflight_in.values(), self.session.inflight_out.values()) 

193 ] 

194 if tasks: 

195 done, pending = await asyncio.wait(tasks) 

196 self.logger.debug(f"{len(done)} messages redelivered") 

197 self.logger.debug(f"{len(pending)} messages not redelivered due to timeout") 

198 self.logger.debug("End messages delivery retries") 

199 

200 async def mqtt_publish( 

201 self, 

202 topic: str, 

203 data: bytes | bytearray, 

204 qos: int | None, 

205 retain: bool, 

206 ack_timeout: int | None = None, 

207 ) -> OutgoingApplicationMessage: 

208 """Send a MQTT publish message and manage messages flows. 

209 

210 This method doesn't return until the message has been acknowledged by receiver or timeout occurs. 

211 :param topic: MQTT topic to publish 

212 :param data: data to send on topic 

213 :param qos: quality of service to use for message flow. Can be QOS_0, QOS_1 or QOS_2 

214 :param retain: retain message flag 

215 :param ack_timeout: acknowledge timeout. If set, this method will return a TimeOut error if the acknowledgment 

216 is not completed before ack_timeout second 

217 :return: ApplicationMessage used during inflight operations. 

218 """ 

219 if self.session is None: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true

220 msg = "Session is not initialized." 

221 raise AMQTTError(msg) 

222 if qos in (QOS_1, QOS_2): 

223 packet_id = self.session.next_packet_id 

224 if packet_id in self.session.inflight_out: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 msg = f"A message with the same packet ID '{packet_id}' is already in flight" 

226 raise AMQTTError(msg) 

227 else: 

228 packet_id = None 

229 message: OutgoingApplicationMessage = OutgoingApplicationMessage(packet_id, topic, qos, data, retain) 

230 # Handle message flow 

231 if ack_timeout is not None and ack_timeout > 0: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 await asyncio.wait_for(self._handle_message_flow(message), ack_timeout) 

233 else: 

234 await self._handle_message_flow(message) 

235 return message 

236 

237 async def _handle_message_flow(self, app_message: IncomingApplicationMessage | OutgoingApplicationMessage) -> None: 

238 """Handle protocol flow for incoming and outgoing messages. 

239 

240 Depending on service level and according to MQTT spec. paragraph 4.3-Quality of Service levels and protocol flows. 

241 :param app_message: PublishMessage to handle 

242 """ 

243 if app_message.qos not in (QOS_0, QOS_1, QOS_2): 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true

244 msg = f"Unexpected QOS value '{app_message.qos}' for message: {app_message}" 

245 raise AMQTTError(msg) 

246 if app_message.qos == QOS_0: 

247 await self._handle_qos0_message_flow(app_message) 

248 elif app_message.qos == QOS_1: 

249 await self._handle_qos1_message_flow(app_message) 

250 elif app_message.qos == QOS_2: 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true

251 await self._handle_qos2_message_flow(app_message) 

252 else: 

253 msg = f"Unexpected QOS value '{app_message.qos}'" 

254 raise AMQTTError(msg) 

255 

256 async def _handle_qos0_message_flow(self, app_message: IncomingApplicationMessage | OutgoingApplicationMessage) -> None: 

257 """Handle QOS_0 application message acknowledgment. 

258 

259 For incoming messages, this method stores the message. 

260 For outgoing messages, this methods sends PUBLISH. 

261 :param app_message: Application message to handle 

262 """ 

263 if app_message.qos != QOS_0: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 msg = f"Expected QOS_0 message, got QOS_{app_message.qos}" 

265 raise ValueError(msg) 

266 if self.session is None: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 msg = "Session is not initialized." 

268 raise AMQTTError(msg) 

269 if app_message.direction == OUTGOING: 

270 packet = app_message.build_publish_packet() 

271 # Send PUBLISH packet 

272 await self._send_packet(packet) 

273 app_message.publish_packet = packet 

274 elif app_message.direction == INCOMING: 274 ↛ exitline 274 didn't return from function '_handle_qos0_message_flow' because the condition on line 274 was always true

275 if app_message.publish_packet is not None and app_message.publish_packet.dup_flag: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 self.logger.warning( 

277 "[MQTT-3.3.1-2] DUP flag must set to 0 for QOS 0 message. Message ignored: %r", 

278 app_message.publish_packet, 

279 ) 

280 else: 

281 try: 

282 self.session.delivered_message_queue.put_nowait(app_message) 

283 self.logger.debug(f"Message added to delivery queue: {app_message}") 

284 except QueueShutDown as e: 

285 self.logger.warning(f"Delivered messages queue is shut down. QOS_0 message discarded: {e}") 

286 except QueueFull as e: 

287 self.logger.warning(f"Delivered messages queue is full. QOS_0 message discarded: {e}") 

288 

289 async def _handle_qos1_message_flow(self, app_message: OutgoingApplicationMessage | IncomingApplicationMessage) -> None: 

290 """Handle QOS_1 application message acknowledgment. 

291 

292 For incoming messages, this method stores the message and reply with PUBACK. 

293 For outgoing messages, this methods sends PUBLISH and waits for the corresponding PUBACK. 

294 :param app_message: Application message to handle 

295 """ 

296 if app_message.qos != QOS_1: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true

297 msg = f"Expected QOS_1 message, got QOS_{app_message.qos}" 

298 raise ValueError(msg) 

299 if app_message.packet_id is None: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 msg = "Packet ID is not set" 

301 raise ValueError(msg) 

302 if app_message.puback_packet: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 msg = f"Message '{app_message.packet_id}' has already been acknowledged" 

304 raise AMQTTError(msg) 

305 if self.session is None: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 msg = "Session is not initialized." 

307 raise AMQTTError(msg) 

308 

309 if app_message.direction == OUTGOING: 

310 if app_message.packet_id not in self.session.inflight_out and isinstance(app_message, OutgoingApplicationMessage): 

311 # Store message in session 

312 self.session.inflight_out[app_message.packet_id] = app_message 

313 if app_message.publish_packet is not None: 

314 # A Publish packet has already been sent, this is a retry 

315 publish_packet = app_message.build_publish_packet(dup=True) 

316 else: 

317 publish_packet = app_message.build_publish_packet() 

318 # Send PUBLISH packet 

319 await self._send_packet(publish_packet) 

320 app_message.publish_packet = publish_packet 

321 # Wait for puback 

322 waiter: asyncio.Future[PubackPacket] = asyncio.Future() 

323 self._puback_waiters[app_message.packet_id] = waiter 

324 try: 

325 app_message.puback_packet = await asyncio.wait_for(waiter, timeout=5) 

326 except asyncio.TimeoutError: 

327 msg = f"Timeout waiting for PUBACK for packet ID {app_message.packet_id}" 

328 self.logger.warning(msg) 

329 raise TimeoutError(msg) from None 

330 finally: 

331 self._puback_waiters.pop(app_message.packet_id, None) 

332 # Discard inflight message 

333 self.session.inflight_out.pop(app_message.packet_id, None) 

334 elif app_message.direction == INCOMING: 334 ↛ exitline 334 didn't return from function '_handle_qos1_message_flow' because the condition on line 334 was always true

335 # Initiate delivery 

336 self.logger.debug("Add message to delivery") 

337 await self.session.delivered_message_queue.put(app_message) 

338 # Send PUBACK 

339 puback = PubackPacket.build(app_message.packet_id) 

340 await self._send_packet(puback) 

341 app_message.puback_packet = puback 

342 

343 async def _handle_qos2_message_flow(self, app_message: OutgoingApplicationMessage | IncomingApplicationMessage) -> None: 

344 """Handle QOS_2 application message acknowledgment. 

345 

346 For incoming messages, this method stores the message, sends PUBREC, waits for PUBREL, initiate delivery 

347 and send PUBCOMP. 

348 For outgoing messages, this methods sends PUBLISH, waits for PUBREC, discards messages and wait for PUBCOMP. 

349 :param app_message: Application message to handle 

350 """ 

351 if app_message.qos != QOS_2: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true

352 msg = f"Expected QOS_2 message, got QOS_{app_message.qos}" 

353 raise ValueError(msg) 

354 if app_message.packet_id is None: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true

355 msg = "Packet ID is not set" 

356 raise ValueError(msg) 

357 if self.session is None: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true

358 msg = "Session is not initialized." 

359 raise AMQTTError(msg) 

360 

361 if app_message.direction == OUTGOING: 

362 if app_message.pubrel_packet and app_message.pubcomp_packet: 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true

363 msg = f"Message '{app_message.packet_id}' has already been acknowledged" 

364 raise AMQTTError(msg) 

365 

366 if not app_message.pubrel_packet: 366 ↛ 398line 366 didn't jump to line 398 because the condition on line 366 was always true

367 # Store message 

368 publish_packet: PublishPacket 

369 if app_message.publish_packet is not None: 

370 # This is a retry flow, no need to store just check the message exists in session 

371 if app_message.packet_id not in self.session.inflight_out: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true

372 msg = f"Unknown inflight message '{app_message.packet_id}' in session" 

373 raise AMQTTError(msg) 

374 publish_packet = app_message.build_publish_packet(dup=True) 

375 elif isinstance(app_message, OutgoingApplicationMessage): 375 ↛ 380line 375 didn't jump to line 380 because the condition on line 375 was always true

376 # Store message in session 

377 self.session.inflight_out[app_message.packet_id] = app_message 

378 publish_packet = app_message.build_publish_packet() 

379 else: 

380 self.logger.debug("Message can not be stored, to be checked!") 

381 # Send PUBLISH packet 

382 await self._send_packet(publish_packet) 

383 app_message.publish_packet = publish_packet 

384 # Wait PUBREC 

385 if app_message.packet_id in self._pubrec_waiters: 385 ↛ 387line 385 didn't jump to line 387 because the condition on line 385 was never true

386 # PUBREC waiter already exists for this packet ID 

387 message = f"Can't add PUBREC waiter, a waiter already exists for message Id '{app_message.packet_id}'" 

388 self.logger.warning(message) 

389 raise AMQTTError(message) 

390 waiter_pub_rec: asyncio.Future[PubrecPacket] = asyncio.Future() 

391 self._pubrec_waiters[app_message.packet_id] = waiter_pub_rec 

392 try: 

393 app_message.pubrec_packet = await waiter_pub_rec 

394 finally: 

395 self._pubrec_waiters.pop(app_message.packet_id, None) 

396 self.session.inflight_out.pop(app_message.packet_id, None) 

397 

398 if not app_message.pubcomp_packet: 398 ↛ exitline 398 didn't return from function '_handle_qos2_message_flow' because the condition on line 398 was always true

399 # Send pubrel 

400 app_message.pubrel_packet = PubrelPacket.build(app_message.packet_id) 

401 await self._send_packet(app_message.pubrel_packet) 

402 # Wait for PUBCOMP 

403 waiter_pub_comp: asyncio.Future[PubcompPacket] = asyncio.Future() 

404 self._pubcomp_waiters[app_message.packet_id] = waiter_pub_comp 

405 try: 

406 app_message.pubcomp_packet = await waiter_pub_comp 

407 finally: 

408 self._pubcomp_waiters.pop(app_message.packet_id, None) 

409 self.session.inflight_out.pop(app_message.packet_id, None) 

410 elif app_message.direction == INCOMING and isinstance(app_message, IncomingApplicationMessage): 410 ↛ 438line 410 didn't jump to line 438 because the condition on line 410 was always true

411 self.session.inflight_in[app_message.packet_id] = app_message 

412 # Send pubrec 

413 pubrec_packet = PubrecPacket.build(app_message.packet_id) 

414 await self._send_packet(pubrec_packet) 

415 app_message.pubrec_packet = pubrec_packet 

416 # Wait PUBREL 

417 if app_message.packet_id in self._pubrel_waiters and not self._pubrel_waiters[app_message.packet_id].done(): 

418 # PUBREL waiter already exists for this packet ID 

419 message = f"A waiter already exists for message Id '{app_message.packet_id}', canceling it" 

420 self.logger.warning(message) 

421 self._pubrel_waiters[app_message.packet_id].cancel() 

422 try: 

423 waiter_pub_rel: asyncio.Future[PubrelPacket] = asyncio.Future() 

424 self._pubrel_waiters[app_message.packet_id] = waiter_pub_rel 

425 await waiter_pub_rel 

426 del self._pubrel_waiters[app_message.packet_id] 

427 app_message.pubrel_packet = waiter_pub_rel.result() 

428 # Initiate delivery and discard message 

429 await self.session.delivered_message_queue.put(app_message) 

430 del self.session.inflight_in[app_message.packet_id] 

431 # Send pubcomp 

432 pubcomp_packet = PubcompPacket.build(app_message.packet_id) 

433 await self._send_packet(pubcomp_packet) 

434 app_message.pubcomp_packet = pubcomp_packet 

435 except asyncio.CancelledError: 

436 self.logger.debug("Message flow cancelled") 

437 else: 

438 self.logger.debug("Unknown direction!") 

439 

440 async def _reader_loop(self) -> None: 

441 if self.session is None: 441 ↛ 442line 441 didn't jump to line 442 because the condition on line 441 was never true

442 msg = "Session is not initialized." 

443 raise AMQTTError(msg) 

444 if not self._reader_ready: 444 ↛ 445line 444 didn't jump to line 445 because the condition on line 444 was never true

445 msg = "Reader ready is not initialized." 

446 raise ProtocolHandlerError(msg) 

447 

448 self.logger.debug(f"{self.session.client_id} Starting reader coro") 

449 running_tasks: collections.deque[asyncio.Task[None]] = collections.deque() 

450 keepalive_timeout: int | None = self.session.keep_alive 

451 if keepalive_timeout is not None and keepalive_timeout <= 0: 

452 keepalive_timeout = None 

453 while True: 

454 try: 

455 self._reader_ready.set() 

456 while running_tasks and running_tasks[0].done(): 

457 running_tasks.popleft() 

458 if len(running_tasks) > 1: 

459 self.logger.debug(f"Handler running tasks: {len(running_tasks)}") 

460 if self.reader is None: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true

461 self.logger.warning("Reader is not initialized!") 

462 break 

463 fixed_header = await asyncio.wait_for(MQTTFixedHeader.from_stream(self.reader), timeout=keepalive_timeout) 

464 if not fixed_header: 

465 self.logger.debug(f"{self.session.client_id} No more data (EOF received), stopping reader coro") 

466 break 

467 if fixed_header.packet_type in (RESERVED_0, RESERVED_15): 467 ↛ 468line 467 didn't jump to line 468 because the condition on line 467 was never true

468 self.logger.warning( 

469 f"{self.session.client_id} Received reserved packet, which is forbidden: closing connection", 

470 ) 

471 await self.handle_connection_closed() 

472 continue 

473 

474 cls = packet_class(fixed_header) 

475 packet = await cls.from_stream(self.reader, fixed_header=fixed_header) 

476 await self.plugins_manager.fire_event(MQTTEvents.PACKET_RECEIVED, packet=packet, session=self.session) 

477 if packet.fixed_header is None or packet.fixed_header.packet_type not in ( 477 ↛ 493line 477 didn't jump to line 493 because the condition on line 477 was never true

478 CONNACK, 

479 SUBSCRIBE, 

480 UNSUBSCRIBE, 

481 SUBACK, 

482 UNSUBACK, 

483 PUBACK, 

484 PUBREC, 

485 PUBREL, 

486 PUBCOMP, 

487 PINGREQ, 

488 PINGRESP, 

489 PUBLISH, 

490 DISCONNECT, 

491 CONNECT, 

492 ): 

493 self.logger.warning(f"{self.session.client_id} Unhandled packet type: {packet.fixed_header.packet_type}") 

494 continue 

495 

496 task: asyncio.Task[None] | None = None 

497 if packet.fixed_header.packet_type == CONNACK and isinstance(packet, ConnackPacket): 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true

498 task = asyncio.create_task(self.handle_connack(packet)) 

499 elif packet.fixed_header.packet_type == SUBSCRIBE and isinstance(packet, SubscribePacket): 

500 task = asyncio.create_task(self.handle_subscribe(packet)) 

501 elif packet.fixed_header.packet_type == UNSUBSCRIBE and isinstance(packet, UnsubscribePacket): 

502 task = asyncio.create_task(self.handle_unsubscribe(packet)) 

503 elif packet.fixed_header.packet_type == SUBACK and isinstance(packet, SubackPacket): 

504 task = asyncio.create_task(self.handle_suback(packet)) 

505 elif packet.fixed_header.packet_type == UNSUBACK and isinstance(packet, UnsubackPacket): 

506 task = asyncio.create_task(self.handle_unsuback(packet)) 

507 elif packet.fixed_header.packet_type == PUBACK and isinstance(packet, PubackPacket): 

508 task = asyncio.create_task(self.handle_puback(packet)) 

509 elif packet.fixed_header.packet_type == PUBREC and isinstance(packet, PubrecPacket): 

510 task = asyncio.create_task(self.handle_pubrec(packet)) 

511 elif packet.fixed_header.packet_type == PUBREL and isinstance(packet, PubrelPacket): 

512 task = asyncio.create_task(self.handle_pubrel(packet)) 

513 elif packet.fixed_header.packet_type == PUBCOMP and isinstance(packet, PubcompPacket): 

514 task = asyncio.create_task(self.handle_pubcomp(packet)) 

515 elif packet.fixed_header.packet_type == PINGREQ and isinstance(packet, PingReqPacket): 

516 task = asyncio.create_task(self.handle_pingreq(packet)) 

517 elif packet.fixed_header.packet_type == PINGRESP and isinstance(packet, PingRespPacket): 

518 task = asyncio.create_task(self.handle_pingresp(packet)) 

519 elif packet.fixed_header.packet_type == PUBLISH and isinstance(packet, PublishPacket): 

520 task = asyncio.create_task(self.handle_publish(packet)) 

521 elif packet.fixed_header.packet_type == DISCONNECT and isinstance(packet, DisconnectPacket): 521 ↛ 523line 521 didn't jump to line 523 because the condition on line 521 was always true

522 task = asyncio.create_task(self.handle_disconnect(packet)) 

523 elif packet.fixed_header.packet_type == CONNECT and isinstance(packet, ConnectPacket): 

524 # q: why is this not like all other inside a create_task? 

525 # a: the connection needs to be established before any other packet tasks for this new session are scheduled 

526 await self.handle_connect(packet) 

527 if task: 527 ↛ 453line 527 didn't jump to line 453 because the condition on line 527 was always true

528 running_tasks.append(task) 

529 except MQTTError: 

530 self.logger.debug("Message discarded") 

531 except asyncio.CancelledError: 

532 self.logger.debug("Task cancelled, reader loop ending") 

533 break 

534 except asyncio.TimeoutError: 

535 self.logger.debug(f"{self.session.client_id} Input stream read timeout") 

536 self.handle_read_timeout() 

537 except NoDataError: 

538 self.logger.debug(f"{self.session.client_id} No data available") 

539 except Exception as e: # noqa: BLE001, pylint: disable=W0718 

540 self.logger.warning(f"{type(self).__name__} Unhandled exception in reader coro: {e!r}") 

541 break 

542 while running_tasks: 

543 running_tasks.popleft().cancel() 

544 await self.handle_connection_closed() 

545 self._reader_stopped.set() 

546 self.logger.debug("Reader coro stopped") 

547 await self.stop() 

548 

549 async def _send_packet( 

550 self, 

551 packet: PublishPacket 

552 | PubackPacket 

553 | ConnackPacket 

554 | SubackPacket 

555 | ConnectPacket 

556 | SubscribePacket 

557 | UnsubscribePacket 

558 | DisconnectPacket 

559 | PingReqPacket 

560 | PubrelPacket 

561 | PubrecPacket 

562 | PubcompPacket 

563 | PingRespPacket 

564 | UnsubackPacket, 

565 ) -> None: 

566 try: 

567 if self.writer: 567 ↛ 570line 567 didn't jump to line 570 because the condition on line 567 was always true

568 async with self._write_lock: 

569 await packet.to_stream(self.writer) 

570 if self._keepalive_task: 

571 self._keepalive_task.cancel() 

572 if self.keepalive_timeout is not None: 572 ↛ 574line 572 didn't jump to line 574 because the condition on line 572 was always true

573 self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout) 

574 await self.plugins_manager.fire_event(MQTTEvents.PACKET_SENT, packet=packet, session=self.session) 

575 except (ConnectionResetError, BrokenPipeError): 

576 await self.handle_connection_closed() 

577 except asyncio.CancelledError as e: 

578 msg = "Packet handling was cancelled" 

579 raise ProtocolHandlerError(msg) from e 

580 except Exception as e: 

581 self.logger.warning(f"Unhandled exception: {e}") 

582 raise 

583 

584 async def mqtt_deliver_next_message(self) -> ApplicationMessage | None: 

585 if self.session is None: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true

586 msg = "Session is not initialized." 

587 raise AMQTTError(msg) 

588 

589 if not self._is_attached(): 589 ↛ 590line 589 didn't jump to line 590 because the condition on line 589 was never true

590 return None 

591 if self.logger.isEnabledFor(logging.DEBUG): 

592 self.logger.debug(f"{self.session.delivered_message_queue.qsize()} message(s) available for delivery") 

593 message: ApplicationMessage | None = None 

594 try: 

595 message = await self.session.delivered_message_queue.get() 

596 except (asyncio.CancelledError, RuntimeError): 

597 message = None 

598 if self.logger.isEnabledFor(logging.DEBUG): 

599 self.logger.debug(f"Delivering message {message}") 

600 return message 

601 

602 def handle_write_timeout(self) -> None: 

603 if self.session is None: 

604 msg = "Session is not initialized." 

605 raise AMQTTError(msg) 

606 self.logger.debug(f"{self.session.client_id} write timeout unhandled") 

607 

608 def handle_read_timeout(self) -> None: 

609 if self.session is None: 

610 msg = "Session is not initialized." 

611 raise AMQTTError(msg) 

612 self.logger.debug(f"{self.session.client_id} read timeout unhandled") 

613 

614 async def handle_connack(self, connack: ConnackPacket) -> None: 

615 if self.session is None: 

616 msg = "Session is not initialized." 

617 raise AMQTTError(msg) 

618 self.logger.debug(f"{self.session.client_id} CONNACK unhandled") 

619 

620 async def handle_connect(self, connect: ConnectPacket) -> None: 

621 if self.session is None: 

622 msg = "Session is not initialized." 

623 raise AMQTTError(msg) 

624 self.logger.debug(f"{self.session.client_id} CONNECT unhandled") 

625 

626 async def handle_subscribe(self, subscribe: SubscribePacket) -> None: 

627 if self.session is None: 

628 msg = "Session is not initialized." 

629 raise AMQTTError(msg) 

630 self.logger.debug(f"{self.session.client_id} SUBSCRIBE unhandled") 

631 

632 async def handle_unsubscribe(self, unsubscribe: UnsubscribePacket) -> None: 

633 if self.session is None: 

634 msg = "Session is not initialized." 

635 raise AMQTTError(msg) 

636 self.logger.debug(f"{self.session.client_id} UNSUBSCRIBE unhandled") 

637 

638 async def handle_suback(self, suback: SubackPacket) -> None: 

639 if self.session is None: 

640 msg = "Session is not initialized." 

641 raise AMQTTError(msg) 

642 self.logger.debug(f"{self.session.client_id} SUBACK unhandled") 

643 

644 async def handle_unsuback(self, unsuback: UnsubackPacket) -> None: 

645 if self.session is None: 

646 msg = "Session is not initialized." 

647 raise AMQTTError(msg) 

648 self.logger.debug(f"{self.session.client_id} UNSUBACK unhandled") 

649 

650 async def handle_pingresp(self, pingresp: PingRespPacket) -> None: 

651 if self.session is None: 

652 msg = "Session is not initialized." 

653 raise AMQTTError(msg) 

654 self.logger.debug(f"{self.session.client_id} PINGRESP unhandled") 

655 

656 async def handle_pingreq(self, pingreq: PingReqPacket) -> None: 

657 if self.session is None: 

658 msg = "Session is not initialized." 

659 raise AMQTTError(msg) 

660 self.logger.debug(f"{self.session.client_id} PINGREQ unhandled") 

661 

662 async def handle_disconnect(self, disconnect: DisconnectPacket) -> None: 

663 if self.session is None: 

664 msg = "Session is not initialized." 

665 raise AMQTTError(msg) 

666 self.logger.debug(f"{self.session.client_id} DISCONNECT unhandled") 

667 

668 async def handle_connection_closed(self) -> None: 

669 if self.session is None: 669 ↛ 670line 669 didn't jump to line 670 because the condition on line 669 was never true

670 msg = "Session is not initialized." 

671 raise AMQTTError(msg) 

672 self.logger.debug(f"{self.session.client_id} Connection closed unhandled") 

673 

674 async def handle_puback(self, puback: PubackPacket) -> None: 

675 if puback.variable_header is None: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true

676 msg = "Variable header is not set" 

677 raise ValueError(msg) 

678 packet_id = puback.variable_header.packet_id 

679 try: 

680 waiter = self._puback_waiters[packet_id] 

681 waiter.set_result(puback) 

682 except KeyError: 

683 self.logger.warning(f"Received PUBACK for unknown pending message Id: '{packet_id}'") 

684 except InvalidStateError: 

685 self.logger.warning(f"PUBACK waiter with Id '{packet_id}' already done") 

686 

687 async def handle_pubrec(self, pubrec: PubrecPacket) -> None: 

688 packet_id = pubrec.packet_id 

689 try: 

690 waiter = self._pubrec_waiters[packet_id] 

691 waiter.set_result(pubrec) 

692 except KeyError: 

693 self.logger.warning(f"Received PUBREC for unknown pending message with Id: {packet_id}") 

694 except InvalidStateError: 

695 self.logger.warning(f"PUBREC waiter with Id '{packet_id}' already done") 

696 

697 async def handle_pubcomp(self, pubcomp: PubcompPacket) -> None: 

698 packet_id = pubcomp.packet_id 

699 try: 

700 waiter = self._pubcomp_waiters[packet_id] 

701 waiter.set_result(pubcomp) 

702 except KeyError: 

703 self.logger.warning(f"Received PUBCOMP for unknown pending message with Id: {packet_id}") 

704 except InvalidStateError: 

705 self.logger.warning(f"PUBCOMP waiter with Id '{packet_id}' already done") 

706 

707 async def handle_pubrel(self, pubrel: PubrelPacket) -> None: 

708 packet_id = pubrel.packet_id 

709 try: 

710 waiter = self._pubrel_waiters[packet_id] 

711 waiter.set_result(pubrel) 

712 except KeyError: 

713 self.logger.warning(f"Received PUBREL for unknown pending message with Id: {packet_id}") 

714 except InvalidStateError: 

715 self.logger.warning(f"PUBREL waiter with Id '{packet_id}' already done") 

716 

717 async def handle_publish(self, publish_packet: PublishPacket) -> None: 

718 packet_id = publish_packet.variable_header.packet_id if publish_packet.variable_header else None 

719 qos = publish_packet.qos 

720 if publish_packet.topic_name is None or publish_packet.data is None: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true

721 return 

722 incoming_message = IncomingApplicationMessage( 

723 packet_id, 

724 publish_packet.topic_name, 

725 qos, 

726 publish_packet.data, 

727 publish_packet.retain_flag, 

728 ) 

729 incoming_message.publish_packet = publish_packet 

730 await self._handle_message_flow(incoming_message) 

731 self.logger.debug(f"Message queue size: {self.session.delivered_message_queue.qsize() if self.session else None}")