Coverage for amqtt/plugins/sys/broker.py: 89%
133 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1import asyncio
2from collections import deque # pylint: disable=C0412
3from dataclasses import dataclass
4from typing import Any, SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412
6import psutil
8from amqtt.plugins.base import BasePlugin
9from amqtt.session import Session
11try:
12 from collections.abc import Buffer
13except ImportError:
14 from typing import Protocol, runtime_checkable
16 @runtime_checkable
17 class Buffer(Protocol): # type: ignore[no-redef]
18 def __buffer__(self, flags: int = ...) -> memoryview:
19 """Mimic the behavior of `collections.abc.Buffer` for python 3.10-3.12."""
22try:
23 from datetime import UTC, datetime
24except ImportError:
25 from datetime import datetime, timezone
27 UTC = timezone.utc
30import amqtt
31from amqtt.broker import BrokerContext
32from amqtt.codecs_amqtt import float_to_bytes_str, int_to_bytes_str
33from amqtt.mqtt.packet import PUBLISH, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader
35DOLLAR_SYS_ROOT = "$SYS/broker/"
36STAT_BYTES_SENT = "bytes_sent"
37STAT_BYTES_RECEIVED = "bytes_received"
38STAT_MSG_SENT = "messages_sent"
39STAT_MSG_RECEIVED = "messages_received"
40STAT_PUBLISH_SENT = "publish_sent"
41STAT_PUBLISH_RECEIVED = "publish_received"
42STAT_START_TIME = "start_time"
43STAT_CLIENTS_MAXIMUM = "clients_maximum"
44STAT_CLIENTS_CONNECTED = "clients_connected"
45STAT_CLIENTS_DISCONNECTED = "clients_disconnected"
46MEMORY_USAGE_MAXIMUM = "memory_maximum"
47CPU_USAGE_MAXIMUM = "cpu_usage_maximum"
48CPU_USAGE_LAST = "cpu_usage_last"
51PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader]
54def val_to_bytes_str(value: Any) -> bytes:
55 """Convert an int, float or string to byte string."""
56 match value:
57 case int():
58 return int_to_bytes_str(value)
59 case float():
60 return float_to_bytes_str(value)
61 case str(): 61 ↛ 63line 61 didn't jump to line 63 because the pattern on line 61 always matched
62 return value.encode("utf-8")
63 case _:
64 msg = f"Unsupported type {type(value)}"
65 raise NotImplementedError(msg)
68class BrokerSysPlugin(BasePlugin[BrokerContext]):
69 def __init__(self, context: BrokerContext) -> None:
70 super().__init__(context)
71 # Broker statistics initialization
72 self._stats: dict[str, int] = {}
73 self._sys_handle: asyncio.Handle | None = None
75 self._sys_interval: int = 0
76 self._current_process = psutil.Process()
78 def _clear_stats(self) -> None:
79 """Initialize broker statistics data structures."""
80 for stat in (
81 STAT_BYTES_RECEIVED,
82 STAT_BYTES_SENT,
83 STAT_MSG_RECEIVED,
84 STAT_MSG_SENT,
85 STAT_CLIENTS_MAXIMUM,
86 STAT_CLIENTS_CONNECTED,
87 STAT_CLIENTS_DISCONNECTED,
88 STAT_PUBLISH_RECEIVED,
89 STAT_PUBLISH_SENT,
90 MEMORY_USAGE_MAXIMUM,
91 CPU_USAGE_MAXIMUM
92 ):
93 self._stats[stat] = 0
95 async def _broadcast_sys_topic(self, topic_basename: str, data: bytes) -> None:
96 """Broadcast a system topic."""
97 await self.context.broadcast_message(topic_basename, data)
99 def schedule_broadcast_sys_topic(self, topic_basename: str, data: bytes) -> asyncio.Task[None]:
100 """Schedule broadcasting of system topics."""
101 return asyncio.ensure_future(
102 self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data),
103 loop=self.context.loop,
104 )
106 async def on_broker_pre_start(self) -> None:
107 """Clear statistics before broker start."""
108 self._clear_stats()
110 async def on_broker_post_start(self) -> None:
111 """Initialize statistics and start $SYS broadcasting."""
112 self._stats[STAT_START_TIME] = int(datetime.now(tz=UTC).timestamp())
113 version = f"aMQTT version {amqtt.__version__}"
114 await self.context.retain_message(DOLLAR_SYS_ROOT + "version", version.encode())
116 # Start $SYS topics management
117 try:
118 self._sys_interval = self._get_config_option("sys_interval", None)
119 if isinstance(self._sys_interval, str | Buffer | SupportsInt | SupportsIndex):
120 self._sys_interval = int(self._sys_interval)
122 if self._sys_interval > 0:
123 self.context.logger.debug(f"Setup $SYS broadcasting every {self._sys_interval} seconds")
124 self._sys_handle = (
125 self.context.loop.call_later(self._sys_interval, self.broadcast_dollar_sys_topics)
126 if self.context.loop is not None
127 else None
128 )
129 else:
130 self.context.logger.debug("$SYS disabled")
131 except KeyError:
132 self.context.logger.debug("could not find 'sys_interval' key: {e!r}")
133 # 'sys_interval' config parameter not found
135 async def on_broker_pre_shutdown(self) -> None:
136 """Stop $SYS topics broadcasting."""
137 if self._sys_handle:
138 self._sys_handle.cancel()
140 def broadcast_dollar_sys_topics(self) -> None:
141 """Broadcast dynamic $SYS topics updates and reschedule next execution."""
142 # Update stats
143 uptime = int(datetime.now(tz=UTC).timestamp()) - self._stats[STAT_START_TIME]
144 client_connected = self._stats[STAT_CLIENTS_CONNECTED]
145 client_disconnected = self._stats[STAT_CLIENTS_DISCONNECTED]
146 inflight_in = 0
147 inflight_out = 0
148 messages_stored = 0
149 for session in self.context.sessions:
150 inflight_in += session.inflight_in_count
151 inflight_out += session.inflight_out_count
152 messages_stored += session.retained_messages_count
153 messages_stored += len(self.context.retained_messages)
154 subscriptions_count = sum(len(sub) for sub in self.context.subscriptions.values())
155 self._stats[STAT_CLIENTS_MAXIMUM] = client_connected
157 cpu_usage = self._current_process.cpu_percent(interval=0)
158 self._stats[CPU_USAGE_MAXIMUM] = max(self._stats[CPU_USAGE_MAXIMUM], cpu_usage)
160 mem_info_usage = self._current_process.memory_full_info()
161 mem_size = mem_info_usage.rss / (1024 ** 2)
162 self._stats[MEMORY_USAGE_MAXIMUM] = max(self._stats[MEMORY_USAGE_MAXIMUM], mem_size)
164 # Broadcast updates
165 tasks: deque[asyncio.Task[None]] = deque()
166 stats: dict[str, int | str] = {
167 "load/bytes/received": self._stats[STAT_BYTES_RECEIVED],
168 "load/bytes/sent": self._stats[STAT_BYTES_SENT],
169 "messages/received": self._stats[STAT_MSG_RECEIVED],
170 "messages/sent": self._stats[STAT_MSG_SENT],
171 "time": int(datetime.now(tz=UTC).timestamp()),
172 "uptime": str(uptime),
173 "uptime/formatted": str(datetime.fromtimestamp(self._stats[STAT_START_TIME], UTC)),
174 "clients/connected": client_connected,
175 "clients/disconnected": client_disconnected,
176 "clients/maximum": self._stats[STAT_CLIENTS_MAXIMUM],
177 "clients/total": client_connected + client_disconnected,
178 "messages/inflight": inflight_in + inflight_out,
179 "messages/inflight/in": inflight_in,
180 "messages/inflight/out": inflight_out,
181 "messages/inflight/stored": messages_stored,
182 "messages/publish/received": self._stats[STAT_PUBLISH_RECEIVED],
183 "messages/publish/sent": self._stats[STAT_PUBLISH_SENT],
184 "messages/retained/count": len(self.context.retained_messages),
185 "messages/subscriptions/count": subscriptions_count,
186 "heap/size": mem_size,
187 "heap/maximum": self._stats[MEMORY_USAGE_MAXIMUM],
188 "cpu/percent": cpu_usage,
189 "cpu/maximum": self._stats[CPU_USAGE_MAXIMUM],
190 }
191 for stat_name, stat_value in stats.items():
192 data: bytes = val_to_bytes_str(stat_value)
193 tasks.append(self.schedule_broadcast_sys_topic(stat_name, data))
195 # Wait until broadcasting tasks end
196 while tasks and tasks[0].done(): 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 tasks.popleft()
199 # Reschedule
200 self.context.logger.debug(f"Broadcast $SYS topics again in {self._sys_interval} seconds.")
201 self._sys_handle = (
202 self.context.loop.call_later(self._sys_interval, self.broadcast_dollar_sys_topics)
203 if self.context.loop is not None
204 else None
205 )
207 async def on_mqtt_packet_received(self, *, packet: PACKET, session: Session | None = None) -> None:
208 """Handle incoming MQTT packets."""
209 if packet: 209 ↛ exitline 209 didn't return from function 'on_mqtt_packet_received' because the condition on line 209 was always true
210 packet_size = packet.bytes_length
211 self._stats[STAT_BYTES_RECEIVED] += packet_size
212 self._stats[STAT_MSG_RECEIVED] += 1
213 if packet.fixed_header.packet_type == PUBLISH:
214 self._stats[STAT_PUBLISH_RECEIVED] += 1
216 async def on_mqtt_packet_sent(self, *, packet: PACKET, session: Session | None = None) -> None:
217 """Handle sent MQTT packets."""
218 if packet: 218 ↛ exitline 218 didn't return from function 'on_mqtt_packet_sent' because the condition on line 218 was always true
219 packet_size = packet.bytes_length
220 self._stats[STAT_BYTES_SENT] += packet_size
221 self._stats[STAT_MSG_SENT] += 1
222 if packet.fixed_header.packet_type == PUBLISH:
223 self._stats[STAT_PUBLISH_SENT] += 1
225 async def on_broker_client_connected(self, client_id: str, client_session: Session) -> None:
226 """Handle broker client connection."""
227 self._stats[STAT_CLIENTS_CONNECTED] += 1
228 self._stats[STAT_CLIENTS_MAXIMUM] = max(
229 self._stats[STAT_CLIENTS_MAXIMUM],
230 self._stats[STAT_CLIENTS_CONNECTED],
231 )
233 async def on_broker_client_disconnected(self, client_id: str, client_session: Session) -> None:
234 """Handle broker client disconnection."""
235 self._stats[STAT_CLIENTS_CONNECTED] -= 1
236 self._stats[STAT_CLIENTS_DISCONNECTED] += 1
238 @dataclass
239 class Config:
240 """Configuration struct for plugin."""
242 sys_interval: int = 20