Coverage for amqtt/plugins/logging_amqtt.py: 96%
33 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1from collections.abc import Callable, Coroutine
2from functools import partial
3import logging
4from typing import Any, TypeAlias
6from amqtt.contexts import BaseContext
7from amqtt.events import BrokerEvents
8from amqtt.mqtt import MQTTPacket
9from amqtt.mqtt.packet import MQTTFixedHeader, MQTTPayload, MQTTVariableHeader
10from amqtt.plugins.base import BasePlugin
11from amqtt.session import Session
13PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader]
16class EventLoggerPlugin(BasePlugin[BaseContext]):
17 """A plugin to log events dynamically based on method names."""
19 async def log_event(self, *args: Any, **kwargs: Any) -> None:
20 """Log the occurrence of an event."""
21 event_name = kwargs["event_name"].replace("old", "")
22 if event_name.replace("on_", "") in (BrokerEvents.CLIENT_CONNECTED, BrokerEvents.CLIENT_DISCONNECTED):
23 self.context.logger.info(f"### '{event_name}' EVENT FIRED ###")
24 else:
25 self.context.logger.debug(f"### '{event_name}' EVENT FIRED ###")
27 def __getattr__(self, name: str) -> Callable[..., Coroutine[Any, Any, None]]:
28 """Dynamically handle calls to methods starting with 'on_'."""
29 if name.startswith("on_"):
30 return partial(self.log_event, event_name=name)
31 msg = f"'EventLoggerPlugin' object has no attribute {name!r}"
32 raise AttributeError(msg)
35class PacketLoggerPlugin(BasePlugin[BaseContext]):
36 """A plugin to log MQTT packets sent and received."""
38 async def on_mqtt_packet_received(self, *, packet: PACKET, session: Session | None = None) -> None:
39 """Log an MQTT packet when it is received."""
40 if self.context.logger.isEnabledFor(logging.DEBUG):
41 if session is not None:
42 self.context.logger.debug(f"{session.client_id} <-in-- {packet!r}")
43 else:
44 self.context.logger.debug(f"<-in-- {packet!r}")
46 async def on_mqtt_packet_sent(self, *, packet: PACKET, session: Session | None = None) -> None:
47 """Log an MQTT packet when it is sent."""
48 if self.context.logger.isEnabledFor(logging.DEBUG):
49 if session is not None: 49 ↛ 52line 49 didn't jump to line 52 because the condition on line 49 was always true
50 self.context.logger.debug(f"{session.client_id} -out-> {packet!r}")
51 else:
52 self.context.logger.debug(f"-out-> {packet!r}")