Coverage for amqtt/mqtt/protocol/handler.py: 69%
517 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1import asyncio
3try:
4 from asyncio import InvalidStateError, QueueFull, QueueShutDown
5except ImportError:
6 # Fallback for Python < 3.12
7 class InvalidStateError(Exception): # type: ignore[no-redef]
8 pass
10 class QueueFull(Exception): # type: ignore[no-redef] # noqa : N818
11 pass
13 class QueueShutDown(Exception): # type: ignore[no-redef] # noqa : N818
14 pass
17import collections
18import itertools
19import logging
20from typing import Generic, TypeVar, cast
22from amqtt.adapters import ReaderAdapter, WriterAdapter
23from amqtt.contexts import BaseContext
24from amqtt.errors import AMQTTError, MQTTError, NoDataError, ProtocolHandlerError
25from amqtt.events import MQTTEvents
26from amqtt.mqtt import packet_class
27from amqtt.mqtt.connack import ConnackPacket
28from amqtt.mqtt.connect import ConnectPacket
29from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
30from amqtt.mqtt.disconnect import DisconnectPacket
31from amqtt.mqtt.packet import (
32 CONNACK,
33 CONNECT,
34 DISCONNECT,
35 PINGREQ,
36 PINGRESP,
37 PUBACK,
38 PUBCOMP,
39 PUBLISH,
40 PUBREC,
41 PUBREL,
42 RESERVED_0,
43 RESERVED_15,
44 SUBACK,
45 SUBSCRIBE,
46 UNSUBACK,
47 UNSUBSCRIBE,
48 MQTTFixedHeader,
49)
50from amqtt.mqtt.pingreq import PingReqPacket
51from amqtt.mqtt.pingresp import PingRespPacket
52from amqtt.mqtt.puback import PubackPacket
53from amqtt.mqtt.pubcomp import PubcompPacket
54from amqtt.mqtt.publish import PublishPacket
55from amqtt.mqtt.pubrec import PubrecPacket
56from amqtt.mqtt.pubrel import PubrelPacket
57from amqtt.mqtt.suback import SubackPacket
58from amqtt.mqtt.subscribe import SubscribePacket
59from amqtt.mqtt.unsuback import UnsubackPacket
60from amqtt.mqtt.unsubscribe import UnsubscribePacket
61from amqtt.plugins.manager import PluginManager
62from amqtt.session import INCOMING, OUTGOING, ApplicationMessage, IncomingApplicationMessage, OutgoingApplicationMessage, Session
64C = TypeVar("C", bound=BaseContext)
67class ProtocolHandler(Generic[C]):
68 """Class implementing the MQTT communication protocol using asyncio features."""
70 def __init__(
71 self,
72 plugins_manager: PluginManager[C],
73 session: Session | None = None,
74 loop: asyncio.AbstractEventLoop | None = None,
75 ) -> None:
76 self.logger: logging.Logger | logging.LoggerAdapter[logging.Logger] = logging.getLogger(__name__)
77 if session is not None:
78 self._init_session(session)
79 else:
80 self.session: Session | None = None
81 self.reader: ReaderAdapter | None = None
82 self.writer: WriterAdapter | None = None
83 self.plugins_manager: PluginManager[C] = plugins_manager
85 try:
86 self._loop = loop if loop is not None else asyncio.get_running_loop()
87 except RuntimeError:
88 self._loop = asyncio.new_event_loop()
89 asyncio.set_event_loop(self._loop)
91 self._reader_task: asyncio.Task[None] | None = None
92 self._keepalive_task: asyncio.TimerHandle | None = None
93 self._reader_ready: asyncio.Event | None = None
94 self._reader_stopped = asyncio.Event()
95 self._puback_waiters: dict[int, asyncio.Future[PubackPacket]] = {}
96 self._pubrec_waiters: dict[int, asyncio.Future[PubrecPacket]] = {}
97 self._pubrel_waiters: dict[int, asyncio.Future[PubrelPacket]] = {}
98 self._pubcomp_waiters: dict[int, asyncio.Future[PubcompPacket]] = {}
99 self._write_lock = asyncio.Lock()
101 def _init_session(self, session: Session) -> None:
102 if not session: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true
103 msg = "Session cannot be None"
104 raise AMQTTError(msg)
105 log = logging.getLogger(__name__)
106 self.session = session
107 self.logger = logging.LoggerAdapter(log, {"client_id": self.session.client_id})
108 self.keepalive_timeout: int | None = self.session.keep_alive
109 if self.keepalive_timeout <= 0:
110 self.keepalive_timeout = None
112 def attach(self, session: Session, reader: ReaderAdapter, writer: WriterAdapter) -> None:
113 if self.session: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 msg = "Handler is already attached to a session"
115 raise ProtocolHandlerError(msg)
116 self._init_session(session)
117 self.reader = reader
118 self.writer = writer
120 def detach(self) -> None:
121 self.session = None
122 self.reader = None
123 self.writer = None
125 def _is_attached(self) -> bool:
126 return bool(self.session)
128 async def start(self) -> None:
129 if not self._is_attached(): 129 ↛ 130line 129 didn't jump to line 130 because the condition on line 129 was never true
130 msg = "Handler is not attached to a stream"
131 raise ProtocolHandlerError(msg)
132 self._reader_ready = asyncio.Event()
133 self._reader_stopped = asyncio.Event()
134 self._reader_task = asyncio.create_task(self._reader_loop())
135 await self._reader_ready.wait()
136 if self._loop is not None and self.keepalive_timeout is not None:
137 self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout)
138 self.logger.debug("Handler tasks started")
139 await self._retry_deliveries()
140 self.logger.debug("Handler ready")
142 async def stop(self) -> None:
143 # Stop messages flow waiter
144 self._stop_waiters()
145 if self._keepalive_task:
146 self._keepalive_task.cancel()
147 self.logger.debug("Waiting for tasks to be stopped")
148 if self._reader_task and not self._reader_task.done():
149 self._reader_task.cancel()
150 await self._reader_stopped.wait()
151 self.logger.debug("Closing writer")
152 try:
153 if self.writer is not None:
154 await self.writer.close()
155 except asyncio.CancelledError:
156 # canceling the task is the expected result
157 self.logger.debug("Writer close was cancelled.")
158 except asyncio.TimeoutError:
159 self.logger.debug("Writer close operation timed out.", exc_info=True)
160 except OSError:
161 self.logger.debug("Writer close failed due to I/O error.", exc_info=True)
163 def _stop_waiters(self) -> None:
164 self.logger.debug(f"Stopping {len(self._puback_waiters)} puback waiters")
165 self.logger.debug(f"Stopping {len(self._pubcomp_waiters)} pucomp waiters")
166 self.logger.debug(f"Stopping {len(self._pubrec_waiters)} purec waiters")
167 self.logger.debug(f"Stopping {len(self._pubrel_waiters)} purel waiters")
168 for waiter in itertools.chain(
169 self._puback_waiters.values(),
170 self._pubcomp_waiters.values(),
171 self._pubrec_waiters.values(),
172 self._pubrel_waiters.values(),
173 ):
174 if not isinstance(waiter, asyncio.Future): 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 msg = "Waiter is not a asyncio.Future"
176 raise AMQTTError(msg)
177 waiter.cancel()
179 async def _retry_deliveries(self) -> None:
180 """Handle [MQTT-4.4.0-1] by resending PUBLISH and PUBREL messages for pending out messages."""
181 self.logger.debug("Begin messages delivery retries")
182 if self.session is None: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 msg = "Session is not initialized."
184 raise AMQTTError(msg)
185 tasks = [
186 asyncio.create_task(
187 asyncio.wait_for(
188 self._handle_message_flow(cast("IncomingApplicationMessage | OutgoingApplicationMessage", message)),
189 10,
190 ),
191 )
192 for message in itertools.chain(self.session.inflight_in.values(), self.session.inflight_out.values())
193 ]
194 if tasks:
195 done, pending = await asyncio.wait(tasks)
196 self.logger.debug(f"{len(done)} messages redelivered")
197 self.logger.debug(f"{len(pending)} messages not redelivered due to timeout")
198 self.logger.debug("End messages delivery retries")
200 async def mqtt_publish(
201 self,
202 topic: str,
203 data: bytes | bytearray,
204 qos: int | None,
205 retain: bool,
206 ack_timeout: int | None = None,
207 ) -> OutgoingApplicationMessage:
208 """Send a MQTT publish message and manage messages flows.
210 This method doesn't return until the message has been acknowledged by receiver or timeout occurs.
211 :param topic: MQTT topic to publish
212 :param data: data to send on topic
213 :param qos: quality of service to use for message flow. Can be QOS_0, QOS_1 or QOS_2
214 :param retain: retain message flag
215 :param ack_timeout: acknowledge timeout. If set, this method will return a TimeOut error if the acknowledgment
216 is not completed before ack_timeout second
217 :return: ApplicationMessage used during inflight operations.
218 """
219 if self.session is None: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 msg = "Session is not initialized."
221 raise AMQTTError(msg)
222 if qos in (QOS_1, QOS_2):
223 packet_id = self.session.next_packet_id
224 if packet_id in self.session.inflight_out: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 msg = f"A message with the same packet ID '{packet_id}' is already in flight"
226 raise AMQTTError(msg)
227 else:
228 packet_id = None
229 message: OutgoingApplicationMessage = OutgoingApplicationMessage(packet_id, topic, qos, data, retain)
230 # Handle message flow
231 if ack_timeout is not None and ack_timeout > 0: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 await asyncio.wait_for(self._handle_message_flow(message), ack_timeout)
233 else:
234 await self._handle_message_flow(message)
235 return message
237 async def _handle_message_flow(self, app_message: IncomingApplicationMessage | OutgoingApplicationMessage) -> None:
238 """Handle protocol flow for incoming and outgoing messages.
240 Depending on service level and according to MQTT spec. paragraph 4.3-Quality of Service levels and protocol flows.
241 :param app_message: PublishMessage to handle
242 """
243 if app_message.qos not in (QOS_0, QOS_1, QOS_2): 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true
244 msg = f"Unexpected QOS value '{app_message.qos}' for message: {app_message}"
245 raise AMQTTError(msg)
246 if app_message.qos == QOS_0:
247 await self._handle_qos0_message_flow(app_message)
248 elif app_message.qos == QOS_1:
249 await self._handle_qos1_message_flow(app_message)
250 elif app_message.qos == QOS_2: 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true
251 await self._handle_qos2_message_flow(app_message)
252 else:
253 msg = f"Unexpected QOS value '{app_message.qos}'"
254 raise AMQTTError(msg)
256 async def _handle_qos0_message_flow(self, app_message: IncomingApplicationMessage | OutgoingApplicationMessage) -> None:
257 """Handle QOS_0 application message acknowledgment.
259 For incoming messages, this method stores the message.
260 For outgoing messages, this methods sends PUBLISH.
261 :param app_message: Application message to handle
262 """
263 if app_message.qos != QOS_0: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true
264 msg = f"Expected QOS_0 message, got QOS_{app_message.qos}"
265 raise ValueError(msg)
266 if self.session is None: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 msg = "Session is not initialized."
268 raise AMQTTError(msg)
269 if app_message.direction == OUTGOING:
270 packet = app_message.build_publish_packet()
271 # Send PUBLISH packet
272 await self._send_packet(packet)
273 app_message.publish_packet = packet
274 elif app_message.direction == INCOMING: 274 ↛ exitline 274 didn't return from function '_handle_qos0_message_flow' because the condition on line 274 was always true
275 if app_message.publish_packet is not None and app_message.publish_packet.dup_flag: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 self.logger.warning(
277 "[MQTT-3.3.1-2] DUP flag must set to 0 for QOS 0 message. Message ignored: %r",
278 app_message.publish_packet,
279 )
280 else:
281 try:
282 self.session.delivered_message_queue.put_nowait(app_message)
283 self.logger.debug(f"Message added to delivery queue: {app_message}")
284 except QueueShutDown as e:
285 self.logger.warning(f"Delivered messages queue is shut down. QOS_0 message discarded: {e}")
286 except QueueFull as e:
287 self.logger.warning(f"Delivered messages queue is full. QOS_0 message discarded: {e}")
289 async def _handle_qos1_message_flow(self, app_message: OutgoingApplicationMessage | IncomingApplicationMessage) -> None:
290 """Handle QOS_1 application message acknowledgment.
292 For incoming messages, this method stores the message and reply with PUBACK.
293 For outgoing messages, this methods sends PUBLISH and waits for the corresponding PUBACK.
294 :param app_message: Application message to handle
295 """
296 if app_message.qos != QOS_1: 296 ↛ 297line 296 didn't jump to line 297 because the condition on line 296 was never true
297 msg = f"Expected QOS_1 message, got QOS_{app_message.qos}"
298 raise ValueError(msg)
299 if app_message.packet_id is None: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true
300 msg = "Packet ID is not set"
301 raise ValueError(msg)
302 if app_message.puback_packet: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 msg = f"Message '{app_message.packet_id}' has already been acknowledged"
304 raise AMQTTError(msg)
305 if self.session is None: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true
306 msg = "Session is not initialized."
307 raise AMQTTError(msg)
309 if app_message.direction == OUTGOING:
310 if app_message.packet_id not in self.session.inflight_out and isinstance(app_message, OutgoingApplicationMessage):
311 # Store message in session
312 self.session.inflight_out[app_message.packet_id] = app_message
313 if app_message.publish_packet is not None:
314 # A Publish packet has already been sent, this is a retry
315 publish_packet = app_message.build_publish_packet(dup=True)
316 else:
317 publish_packet = app_message.build_publish_packet()
318 # Send PUBLISH packet
319 await self._send_packet(publish_packet)
320 app_message.publish_packet = publish_packet
321 # Wait for puback
322 waiter: asyncio.Future[PubackPacket] = asyncio.Future()
323 self._puback_waiters[app_message.packet_id] = waiter
324 try:
325 app_message.puback_packet = await asyncio.wait_for(waiter, timeout=5)
326 except asyncio.TimeoutError:
327 msg = f"Timeout waiting for PUBACK for packet ID {app_message.packet_id}"
328 self.logger.warning(msg)
329 raise TimeoutError(msg) from None
330 finally:
331 self._puback_waiters.pop(app_message.packet_id, None)
332 # Discard inflight message
333 self.session.inflight_out.pop(app_message.packet_id, None)
334 elif app_message.direction == INCOMING: 334 ↛ exitline 334 didn't return from function '_handle_qos1_message_flow' because the condition on line 334 was always true
335 # Initiate delivery
336 self.logger.debug("Add message to delivery")
337 await self.session.delivered_message_queue.put(app_message)
338 # Send PUBACK
339 puback = PubackPacket.build(app_message.packet_id)
340 await self._send_packet(puback)
341 app_message.puback_packet = puback
343 async def _handle_qos2_message_flow(self, app_message: OutgoingApplicationMessage | IncomingApplicationMessage) -> None:
344 """Handle QOS_2 application message acknowledgment.
346 For incoming messages, this method stores the message, sends PUBREC, waits for PUBREL, initiate delivery
347 and send PUBCOMP.
348 For outgoing messages, this methods sends PUBLISH, waits for PUBREC, discards messages and wait for PUBCOMP.
349 :param app_message: Application message to handle
350 """
351 if app_message.qos != QOS_2: 351 ↛ 352line 351 didn't jump to line 352 because the condition on line 351 was never true
352 msg = f"Expected QOS_2 message, got QOS_{app_message.qos}"
353 raise ValueError(msg)
354 if app_message.packet_id is None: 354 ↛ 355line 354 didn't jump to line 355 because the condition on line 354 was never true
355 msg = "Packet ID is not set"
356 raise ValueError(msg)
357 if self.session is None: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true
358 msg = "Session is not initialized."
359 raise AMQTTError(msg)
361 if app_message.direction == OUTGOING:
362 if app_message.pubrel_packet and app_message.pubcomp_packet: 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true
363 msg = f"Message '{app_message.packet_id}' has already been acknowledged"
364 raise AMQTTError(msg)
366 if not app_message.pubrel_packet: 366 ↛ 398line 366 didn't jump to line 398 because the condition on line 366 was always true
367 # Store message
368 publish_packet: PublishPacket
369 if app_message.publish_packet is not None:
370 # This is a retry flow, no need to store just check the message exists in session
371 if app_message.packet_id not in self.session.inflight_out: 371 ↛ 372line 371 didn't jump to line 372 because the condition on line 371 was never true
372 msg = f"Unknown inflight message '{app_message.packet_id}' in session"
373 raise AMQTTError(msg)
374 publish_packet = app_message.build_publish_packet(dup=True)
375 elif isinstance(app_message, OutgoingApplicationMessage): 375 ↛ 380line 375 didn't jump to line 380 because the condition on line 375 was always true
376 # Store message in session
377 self.session.inflight_out[app_message.packet_id] = app_message
378 publish_packet = app_message.build_publish_packet()
379 else:
380 self.logger.debug("Message can not be stored, to be checked!")
381 # Send PUBLISH packet
382 await self._send_packet(publish_packet)
383 app_message.publish_packet = publish_packet
384 # Wait PUBREC
385 if app_message.packet_id in self._pubrec_waiters: 385 ↛ 387line 385 didn't jump to line 387 because the condition on line 385 was never true
386 # PUBREC waiter already exists for this packet ID
387 message = f"Can't add PUBREC waiter, a waiter already exists for message Id '{app_message.packet_id}'"
388 self.logger.warning(message)
389 raise AMQTTError(message)
390 waiter_pub_rec: asyncio.Future[PubrecPacket] = asyncio.Future()
391 self._pubrec_waiters[app_message.packet_id] = waiter_pub_rec
392 try:
393 app_message.pubrec_packet = await waiter_pub_rec
394 finally:
395 self._pubrec_waiters.pop(app_message.packet_id, None)
396 self.session.inflight_out.pop(app_message.packet_id, None)
398 if not app_message.pubcomp_packet: 398 ↛ exitline 398 didn't return from function '_handle_qos2_message_flow' because the condition on line 398 was always true
399 # Send pubrel
400 app_message.pubrel_packet = PubrelPacket.build(app_message.packet_id)
401 await self._send_packet(app_message.pubrel_packet)
402 # Wait for PUBCOMP
403 waiter_pub_comp: asyncio.Future[PubcompPacket] = asyncio.Future()
404 self._pubcomp_waiters[app_message.packet_id] = waiter_pub_comp
405 try:
406 app_message.pubcomp_packet = await waiter_pub_comp
407 finally:
408 self._pubcomp_waiters.pop(app_message.packet_id, None)
409 self.session.inflight_out.pop(app_message.packet_id, None)
410 elif app_message.direction == INCOMING and isinstance(app_message, IncomingApplicationMessage): 410 ↛ 438line 410 didn't jump to line 438 because the condition on line 410 was always true
411 self.session.inflight_in[app_message.packet_id] = app_message
412 # Send pubrec
413 pubrec_packet = PubrecPacket.build(app_message.packet_id)
414 await self._send_packet(pubrec_packet)
415 app_message.pubrec_packet = pubrec_packet
416 # Wait PUBREL
417 if app_message.packet_id in self._pubrel_waiters and not self._pubrel_waiters[app_message.packet_id].done():
418 # PUBREL waiter already exists for this packet ID
419 message = f"A waiter already exists for message Id '{app_message.packet_id}', canceling it"
420 self.logger.warning(message)
421 self._pubrel_waiters[app_message.packet_id].cancel()
422 try:
423 waiter_pub_rel: asyncio.Future[PubrelPacket] = asyncio.Future()
424 self._pubrel_waiters[app_message.packet_id] = waiter_pub_rel
425 await waiter_pub_rel
426 del self._pubrel_waiters[app_message.packet_id]
427 app_message.pubrel_packet = waiter_pub_rel.result()
428 # Initiate delivery and discard message
429 await self.session.delivered_message_queue.put(app_message)
430 del self.session.inflight_in[app_message.packet_id]
431 # Send pubcomp
432 pubcomp_packet = PubcompPacket.build(app_message.packet_id)
433 await self._send_packet(pubcomp_packet)
434 app_message.pubcomp_packet = pubcomp_packet
435 except asyncio.CancelledError:
436 self.logger.debug("Message flow cancelled")
437 else:
438 self.logger.debug("Unknown direction!")
440 async def _reader_loop(self) -> None:
441 if self.session is None: 441 ↛ 442line 441 didn't jump to line 442 because the condition on line 441 was never true
442 msg = "Session is not initialized."
443 raise AMQTTError(msg)
444 if not self._reader_ready: 444 ↛ 445line 444 didn't jump to line 445 because the condition on line 444 was never true
445 msg = "Reader ready is not initialized."
446 raise ProtocolHandlerError(msg)
448 self.logger.debug(f"{self.session.client_id} Starting reader coro")
449 running_tasks: collections.deque[asyncio.Task[None]] = collections.deque()
450 keepalive_timeout: int | None = self.session.keep_alive
451 if keepalive_timeout is not None and keepalive_timeout <= 0:
452 keepalive_timeout = None
453 while True:
454 try:
455 self._reader_ready.set()
456 while running_tasks and running_tasks[0].done():
457 running_tasks.popleft()
458 if len(running_tasks) > 1:
459 self.logger.debug(f"Handler running tasks: {len(running_tasks)}")
460 if self.reader is None: 460 ↛ 461line 460 didn't jump to line 461 because the condition on line 460 was never true
461 self.logger.warning("Reader is not initialized!")
462 break
463 fixed_header = await asyncio.wait_for(MQTTFixedHeader.from_stream(self.reader), timeout=keepalive_timeout)
464 if not fixed_header:
465 self.logger.debug(f"{self.session.client_id} No more data (EOF received), stopping reader coro")
466 break
467 if fixed_header.packet_type in (RESERVED_0, RESERVED_15): 467 ↛ 468line 467 didn't jump to line 468 because the condition on line 467 was never true
468 self.logger.warning(
469 f"{self.session.client_id} Received reserved packet, which is forbidden: closing connection",
470 )
471 await self.handle_connection_closed()
472 continue
474 cls = packet_class(fixed_header)
475 packet = await cls.from_stream(self.reader, fixed_header=fixed_header)
476 await self.plugins_manager.fire_event(MQTTEvents.PACKET_RECEIVED, packet=packet, session=self.session)
477 if packet.fixed_header is None or packet.fixed_header.packet_type not in ( 477 ↛ 493line 477 didn't jump to line 493 because the condition on line 477 was never true
478 CONNACK,
479 SUBSCRIBE,
480 UNSUBSCRIBE,
481 SUBACK,
482 UNSUBACK,
483 PUBACK,
484 PUBREC,
485 PUBREL,
486 PUBCOMP,
487 PINGREQ,
488 PINGRESP,
489 PUBLISH,
490 DISCONNECT,
491 CONNECT,
492 ):
493 self.logger.warning(f"{self.session.client_id} Unhandled packet type: {packet.fixed_header.packet_type}")
494 continue
496 task: asyncio.Task[None] | None = None
497 if packet.fixed_header.packet_type == CONNACK and isinstance(packet, ConnackPacket): 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true
498 task = asyncio.create_task(self.handle_connack(packet))
499 elif packet.fixed_header.packet_type == SUBSCRIBE and isinstance(packet, SubscribePacket):
500 task = asyncio.create_task(self.handle_subscribe(packet))
501 elif packet.fixed_header.packet_type == UNSUBSCRIBE and isinstance(packet, UnsubscribePacket):
502 task = asyncio.create_task(self.handle_unsubscribe(packet))
503 elif packet.fixed_header.packet_type == SUBACK and isinstance(packet, SubackPacket):
504 task = asyncio.create_task(self.handle_suback(packet))
505 elif packet.fixed_header.packet_type == UNSUBACK and isinstance(packet, UnsubackPacket):
506 task = asyncio.create_task(self.handle_unsuback(packet))
507 elif packet.fixed_header.packet_type == PUBACK and isinstance(packet, PubackPacket):
508 task = asyncio.create_task(self.handle_puback(packet))
509 elif packet.fixed_header.packet_type == PUBREC and isinstance(packet, PubrecPacket):
510 task = asyncio.create_task(self.handle_pubrec(packet))
511 elif packet.fixed_header.packet_type == PUBREL and isinstance(packet, PubrelPacket):
512 task = asyncio.create_task(self.handle_pubrel(packet))
513 elif packet.fixed_header.packet_type == PUBCOMP and isinstance(packet, PubcompPacket):
514 task = asyncio.create_task(self.handle_pubcomp(packet))
515 elif packet.fixed_header.packet_type == PINGREQ and isinstance(packet, PingReqPacket):
516 task = asyncio.create_task(self.handle_pingreq(packet))
517 elif packet.fixed_header.packet_type == PINGRESP and isinstance(packet, PingRespPacket):
518 task = asyncio.create_task(self.handle_pingresp(packet))
519 elif packet.fixed_header.packet_type == PUBLISH and isinstance(packet, PublishPacket):
520 task = asyncio.create_task(self.handle_publish(packet))
521 elif packet.fixed_header.packet_type == DISCONNECT and isinstance(packet, DisconnectPacket): 521 ↛ 523line 521 didn't jump to line 523 because the condition on line 521 was always true
522 task = asyncio.create_task(self.handle_disconnect(packet))
523 elif packet.fixed_header.packet_type == CONNECT and isinstance(packet, ConnectPacket):
524 # q: why is this not like all other inside a create_task?
525 # a: the connection needs to be established before any other packet tasks for this new session are scheduled
526 await self.handle_connect(packet)
527 if task: 527 ↛ 453line 527 didn't jump to line 453 because the condition on line 527 was always true
528 running_tasks.append(task)
529 except MQTTError:
530 self.logger.debug("Message discarded")
531 except asyncio.CancelledError:
532 self.logger.debug("Task cancelled, reader loop ending")
533 break
534 except asyncio.TimeoutError:
535 self.logger.debug(f"{self.session.client_id} Input stream read timeout")
536 self.handle_read_timeout()
537 except NoDataError:
538 self.logger.debug(f"{self.session.client_id} No data available")
539 except Exception as e: # noqa: BLE001, pylint: disable=W0718
540 self.logger.warning(f"{type(self).__name__} Unhandled exception in reader coro: {e!r}")
541 break
542 while running_tasks:
543 running_tasks.popleft().cancel()
544 await self.handle_connection_closed()
545 self._reader_stopped.set()
546 self.logger.debug("Reader coro stopped")
547 await self.stop()
549 async def _send_packet(
550 self,
551 packet: PublishPacket
552 | PubackPacket
553 | ConnackPacket
554 | SubackPacket
555 | ConnectPacket
556 | SubscribePacket
557 | UnsubscribePacket
558 | DisconnectPacket
559 | PingReqPacket
560 | PubrelPacket
561 | PubrecPacket
562 | PubcompPacket
563 | PingRespPacket
564 | UnsubackPacket,
565 ) -> None:
566 try:
567 if self.writer: 567 ↛ 570line 567 didn't jump to line 570 because the condition on line 567 was always true
568 async with self._write_lock:
569 await packet.to_stream(self.writer)
570 if self._keepalive_task:
571 self._keepalive_task.cancel()
572 if self.keepalive_timeout is not None: 572 ↛ 574line 572 didn't jump to line 574 because the condition on line 572 was always true
573 self._keepalive_task = self._loop.call_later(self.keepalive_timeout, self.handle_write_timeout)
574 await self.plugins_manager.fire_event(MQTTEvents.PACKET_SENT, packet=packet, session=self.session)
575 except (ConnectionResetError, BrokenPipeError):
576 await self.handle_connection_closed()
577 except asyncio.CancelledError as e:
578 msg = "Packet handling was cancelled"
579 raise ProtocolHandlerError(msg) from e
580 except Exception as e:
581 self.logger.warning(f"Unhandled exception: {e}")
582 raise
584 async def mqtt_deliver_next_message(self) -> ApplicationMessage | None:
585 if self.session is None: 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 msg = "Session is not initialized."
587 raise AMQTTError(msg)
589 if not self._is_attached(): 589 ↛ 590line 589 didn't jump to line 590 because the condition on line 589 was never true
590 return None
591 if self.logger.isEnabledFor(logging.DEBUG):
592 self.logger.debug(f"{self.session.delivered_message_queue.qsize()} message(s) available for delivery")
593 message: ApplicationMessage | None = None
594 try:
595 message = await self.session.delivered_message_queue.get()
596 except (asyncio.CancelledError, RuntimeError):
597 message = None
598 if self.logger.isEnabledFor(logging.DEBUG):
599 self.logger.debug(f"Delivering message {message}")
600 return message
602 def handle_write_timeout(self) -> None:
603 if self.session is None:
604 msg = "Session is not initialized."
605 raise AMQTTError(msg)
606 self.logger.debug(f"{self.session.client_id} write timeout unhandled")
608 def handle_read_timeout(self) -> None:
609 if self.session is None:
610 msg = "Session is not initialized."
611 raise AMQTTError(msg)
612 self.logger.debug(f"{self.session.client_id} read timeout unhandled")
614 async def handle_connack(self, connack: ConnackPacket) -> None:
615 if self.session is None:
616 msg = "Session is not initialized."
617 raise AMQTTError(msg)
618 self.logger.debug(f"{self.session.client_id} CONNACK unhandled")
620 async def handle_connect(self, connect: ConnectPacket) -> None:
621 if self.session is None:
622 msg = "Session is not initialized."
623 raise AMQTTError(msg)
624 self.logger.debug(f"{self.session.client_id} CONNECT unhandled")
626 async def handle_subscribe(self, subscribe: SubscribePacket) -> None:
627 if self.session is None:
628 msg = "Session is not initialized."
629 raise AMQTTError(msg)
630 self.logger.debug(f"{self.session.client_id} SUBSCRIBE unhandled")
632 async def handle_unsubscribe(self, unsubscribe: UnsubscribePacket) -> None:
633 if self.session is None:
634 msg = "Session is not initialized."
635 raise AMQTTError(msg)
636 self.logger.debug(f"{self.session.client_id} UNSUBSCRIBE unhandled")
638 async def handle_suback(self, suback: SubackPacket) -> None:
639 if self.session is None:
640 msg = "Session is not initialized."
641 raise AMQTTError(msg)
642 self.logger.debug(f"{self.session.client_id} SUBACK unhandled")
644 async def handle_unsuback(self, unsuback: UnsubackPacket) -> None:
645 if self.session is None:
646 msg = "Session is not initialized."
647 raise AMQTTError(msg)
648 self.logger.debug(f"{self.session.client_id} UNSUBACK unhandled")
650 async def handle_pingresp(self, pingresp: PingRespPacket) -> None:
651 if self.session is None:
652 msg = "Session is not initialized."
653 raise AMQTTError(msg)
654 self.logger.debug(f"{self.session.client_id} PINGRESP unhandled")
656 async def handle_pingreq(self, pingreq: PingReqPacket) -> None:
657 if self.session is None:
658 msg = "Session is not initialized."
659 raise AMQTTError(msg)
660 self.logger.debug(f"{self.session.client_id} PINGREQ unhandled")
662 async def handle_disconnect(self, disconnect: DisconnectPacket) -> None:
663 if self.session is None:
664 msg = "Session is not initialized."
665 raise AMQTTError(msg)
666 self.logger.debug(f"{self.session.client_id} DISCONNECT unhandled")
668 async def handle_connection_closed(self) -> None:
669 if self.session is None: 669 ↛ 670line 669 didn't jump to line 670 because the condition on line 669 was never true
670 msg = "Session is not initialized."
671 raise AMQTTError(msg)
672 self.logger.debug(f"{self.session.client_id} Connection closed unhandled")
674 async def handle_puback(self, puback: PubackPacket) -> None:
675 if puback.variable_header is None: 675 ↛ 676line 675 didn't jump to line 676 because the condition on line 675 was never true
676 msg = "Variable header is not set"
677 raise ValueError(msg)
678 packet_id = puback.variable_header.packet_id
679 try:
680 waiter = self._puback_waiters[packet_id]
681 waiter.set_result(puback)
682 except KeyError:
683 self.logger.warning(f"Received PUBACK for unknown pending message Id: '{packet_id}'")
684 except InvalidStateError:
685 self.logger.warning(f"PUBACK waiter with Id '{packet_id}' already done")
687 async def handle_pubrec(self, pubrec: PubrecPacket) -> None:
688 packet_id = pubrec.packet_id
689 try:
690 waiter = self._pubrec_waiters[packet_id]
691 waiter.set_result(pubrec)
692 except KeyError:
693 self.logger.warning(f"Received PUBREC for unknown pending message with Id: {packet_id}")
694 except InvalidStateError:
695 self.logger.warning(f"PUBREC waiter with Id '{packet_id}' already done")
697 async def handle_pubcomp(self, pubcomp: PubcompPacket) -> None:
698 packet_id = pubcomp.packet_id
699 try:
700 waiter = self._pubcomp_waiters[packet_id]
701 waiter.set_result(pubcomp)
702 except KeyError:
703 self.logger.warning(f"Received PUBCOMP for unknown pending message with Id: {packet_id}")
704 except InvalidStateError:
705 self.logger.warning(f"PUBCOMP waiter with Id '{packet_id}' already done")
707 async def handle_pubrel(self, pubrel: PubrelPacket) -> None:
708 packet_id = pubrel.packet_id
709 try:
710 waiter = self._pubrel_waiters[packet_id]
711 waiter.set_result(pubrel)
712 except KeyError:
713 self.logger.warning(f"Received PUBREL for unknown pending message with Id: {packet_id}")
714 except InvalidStateError:
715 self.logger.warning(f"PUBREL waiter with Id '{packet_id}' already done")
717 async def handle_publish(self, publish_packet: PublishPacket) -> None:
718 packet_id = publish_packet.variable_header.packet_id if publish_packet.variable_header else None
719 qos = publish_packet.qos
720 if publish_packet.topic_name is None or publish_packet.data is None: 720 ↛ 721line 720 didn't jump to line 721 because the condition on line 720 was never true
721 return
722 incoming_message = IncomingApplicationMessage(
723 packet_id,
724 publish_packet.topic_name,
725 qos,
726 publish_packet.data,
727 publish_packet.retain_flag,
728 )
729 incoming_message.publish_packet = publish_packet
730 await self._handle_message_flow(incoming_message)
731 self.logger.debug(f"Message queue size: {self.session.delivered_message_queue.qsize() if self.session else None}")