Coverage for amqtt/plugins/logging_amqtt.py: 96%

33 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from collections.abc import Callable, Coroutine 

2from functools import partial 

3import logging 

4from typing import Any, TypeAlias 

5 

6from amqtt.contexts import BaseContext 

7from amqtt.events import BrokerEvents 

8from amqtt.mqtt import MQTTPacket 

9from amqtt.mqtt.packet import MQTTFixedHeader, MQTTPayload, MQTTVariableHeader 

10from amqtt.plugins.base import BasePlugin 

11from amqtt.session import Session 

12 

13PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader] 

14 

15 

16class EventLoggerPlugin(BasePlugin[BaseContext]): 

17 """A plugin to log events dynamically based on method names.""" 

18 

19 async def log_event(self, *args: Any, **kwargs: Any) -> None: 

20 """Log the occurrence of an event.""" 

21 event_name = kwargs["event_name"].replace("old", "") 

22 if event_name.replace("on_", "") in (BrokerEvents.CLIENT_CONNECTED, BrokerEvents.CLIENT_DISCONNECTED): 

23 self.context.logger.info(f"### '{event_name}' EVENT FIRED ###") 

24 else: 

25 self.context.logger.debug(f"### '{event_name}' EVENT FIRED ###") 

26 

27 def __getattr__(self, name: str) -> Callable[..., Coroutine[Any, Any, None]]: 

28 """Dynamically handle calls to methods starting with 'on_'.""" 

29 if name.startswith("on_"): 

30 return partial(self.log_event, event_name=name) 

31 msg = f"'EventLoggerPlugin' object has no attribute {name!r}" 

32 raise AttributeError(msg) 

33 

34 

35class PacketLoggerPlugin(BasePlugin[BaseContext]): 

36 """A plugin to log MQTT packets sent and received.""" 

37 

38 async def on_mqtt_packet_received(self, *, packet: PACKET, session: Session | None = None) -> None: 

39 """Log an MQTT packet when it is received.""" 

40 if self.context.logger.isEnabledFor(logging.DEBUG): 

41 if session is not None: 

42 self.context.logger.debug(f"{session.client_id} <-in-- {packet!r}") 

43 else: 

44 self.context.logger.debug(f"<-in-- {packet!r}") 

45 

46 async def on_mqtt_packet_sent(self, *, packet: PACKET, session: Session | None = None) -> None: 

47 """Log an MQTT packet when it is sent.""" 

48 if self.context.logger.isEnabledFor(logging.DEBUG): 

49 if session is not None: 49 ↛ 52line 49 didn't jump to line 52 because the condition on line 49 was always true

50 self.context.logger.debug(f"{session.client_id} -out-> {packet!r}") 

51 else: 

52 self.context.logger.debug(f"-out-> {packet!r}")