Coverage for amqtt/plugins/sys/broker.py: 89%

133 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1import asyncio 

2from collections import deque # pylint: disable=C0412 

3from dataclasses import dataclass 

4from typing import Any, SupportsIndex, SupportsInt, TypeAlias # pylint: disable=C0412 

5 

6import psutil 

7 

8from amqtt.plugins.base import BasePlugin 

9from amqtt.session import Session 

10 

11try: 

12 from collections.abc import Buffer 

13except ImportError: 

14 from typing import Protocol, runtime_checkable 

15 

16 @runtime_checkable 

17 class Buffer(Protocol): # type: ignore[no-redef] 

18 def __buffer__(self, flags: int = ...) -> memoryview: 

19 """Mimic the behavior of `collections.abc.Buffer` for python 3.10-3.12.""" 

20 

21 

22try: 

23 from datetime import UTC, datetime 

24except ImportError: 

25 from datetime import datetime, timezone 

26 

27 UTC = timezone.utc 

28 

29 

30import amqtt 

31from amqtt.broker import BrokerContext 

32from amqtt.codecs_amqtt import float_to_bytes_str, int_to_bytes_str 

33from amqtt.mqtt.packet import PUBLISH, MQTTFixedHeader, MQTTPacket, MQTTPayload, MQTTVariableHeader 

34 

35DOLLAR_SYS_ROOT = "$SYS/broker/" 

36STAT_BYTES_SENT = "bytes_sent" 

37STAT_BYTES_RECEIVED = "bytes_received" 

38STAT_MSG_SENT = "messages_sent" 

39STAT_MSG_RECEIVED = "messages_received" 

40STAT_PUBLISH_SENT = "publish_sent" 

41STAT_PUBLISH_RECEIVED = "publish_received" 

42STAT_START_TIME = "start_time" 

43STAT_CLIENTS_MAXIMUM = "clients_maximum" 

44STAT_CLIENTS_CONNECTED = "clients_connected" 

45STAT_CLIENTS_DISCONNECTED = "clients_disconnected" 

46MEMORY_USAGE_MAXIMUM = "memory_maximum" 

47CPU_USAGE_MAXIMUM = "cpu_usage_maximum" 

48CPU_USAGE_LAST = "cpu_usage_last" 

49 

50 

51PACKET: TypeAlias = MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader] 

52 

53 

54def val_to_bytes_str(value: Any) -> bytes: 

55 """Convert an int, float or string to byte string.""" 

56 match value: 

57 case int(): 

58 return int_to_bytes_str(value) 

59 case float(): 

60 return float_to_bytes_str(value) 

61 case str(): 61 ↛ 63line 61 didn't jump to line 63 because the pattern on line 61 always matched

62 return value.encode("utf-8") 

63 case _: 

64 msg = f"Unsupported type {type(value)}" 

65 raise NotImplementedError(msg) 

66 

67 

68class BrokerSysPlugin(BasePlugin[BrokerContext]): 

69 def __init__(self, context: BrokerContext) -> None: 

70 super().__init__(context) 

71 # Broker statistics initialization 

72 self._stats: dict[str, int] = {} 

73 self._sys_handle: asyncio.Handle | None = None 

74 

75 self._sys_interval: int = 0 

76 self._current_process = psutil.Process() 

77 

78 def _clear_stats(self) -> None: 

79 """Initialize broker statistics data structures.""" 

80 for stat in ( 

81 STAT_BYTES_RECEIVED, 

82 STAT_BYTES_SENT, 

83 STAT_MSG_RECEIVED, 

84 STAT_MSG_SENT, 

85 STAT_CLIENTS_MAXIMUM, 

86 STAT_CLIENTS_CONNECTED, 

87 STAT_CLIENTS_DISCONNECTED, 

88 STAT_PUBLISH_RECEIVED, 

89 STAT_PUBLISH_SENT, 

90 MEMORY_USAGE_MAXIMUM, 

91 CPU_USAGE_MAXIMUM 

92 ): 

93 self._stats[stat] = 0 

94 

95 async def _broadcast_sys_topic(self, topic_basename: str, data: bytes) -> None: 

96 """Broadcast a system topic.""" 

97 await self.context.broadcast_message(topic_basename, data) 

98 

99 def schedule_broadcast_sys_topic(self, topic_basename: str, data: bytes) -> asyncio.Task[None]: 

100 """Schedule broadcasting of system topics.""" 

101 return asyncio.ensure_future( 

102 self._broadcast_sys_topic(DOLLAR_SYS_ROOT + topic_basename, data), 

103 loop=self.context.loop, 

104 ) 

105 

106 async def on_broker_pre_start(self) -> None: 

107 """Clear statistics before broker start.""" 

108 self._clear_stats() 

109 

110 async def on_broker_post_start(self) -> None: 

111 """Initialize statistics and start $SYS broadcasting.""" 

112 self._stats[STAT_START_TIME] = int(datetime.now(tz=UTC).timestamp()) 

113 version = f"aMQTT version {amqtt.__version__}" 

114 await self.context.retain_message(DOLLAR_SYS_ROOT + "version", version.encode()) 

115 

116 # Start $SYS topics management 

117 try: 

118 self._sys_interval = self._get_config_option("sys_interval", None) 

119 if isinstance(self._sys_interval, str | Buffer | SupportsInt | SupportsIndex): 

120 self._sys_interval = int(self._sys_interval) 

121 

122 if self._sys_interval > 0: 

123 self.context.logger.debug(f"Setup $SYS broadcasting every {self._sys_interval} seconds") 

124 self._sys_handle = ( 

125 self.context.loop.call_later(self._sys_interval, self.broadcast_dollar_sys_topics) 

126 if self.context.loop is not None 

127 else None 

128 ) 

129 else: 

130 self.context.logger.debug("$SYS disabled") 

131 except KeyError: 

132 self.context.logger.debug("could not find 'sys_interval' key: {e!r}") 

133 # 'sys_interval' config parameter not found 

134 

135 async def on_broker_pre_shutdown(self) -> None: 

136 """Stop $SYS topics broadcasting.""" 

137 if self._sys_handle: 

138 self._sys_handle.cancel() 

139 

140 def broadcast_dollar_sys_topics(self) -> None: 

141 """Broadcast dynamic $SYS topics updates and reschedule next execution.""" 

142 # Update stats 

143 uptime = int(datetime.now(tz=UTC).timestamp()) - self._stats[STAT_START_TIME] 

144 client_connected = self._stats[STAT_CLIENTS_CONNECTED] 

145 client_disconnected = self._stats[STAT_CLIENTS_DISCONNECTED] 

146 inflight_in = 0 

147 inflight_out = 0 

148 messages_stored = 0 

149 for session in self.context.sessions: 

150 inflight_in += session.inflight_in_count 

151 inflight_out += session.inflight_out_count 

152 messages_stored += session.retained_messages_count 

153 messages_stored += len(self.context.retained_messages) 

154 subscriptions_count = sum(len(sub) for sub in self.context.subscriptions.values()) 

155 self._stats[STAT_CLIENTS_MAXIMUM] = client_connected 

156 

157 cpu_usage = self._current_process.cpu_percent(interval=0) 

158 self._stats[CPU_USAGE_MAXIMUM] = max(self._stats[CPU_USAGE_MAXIMUM], cpu_usage) 

159 

160 mem_info_usage = self._current_process.memory_full_info() 

161 mem_size = mem_info_usage.rss / (1024 ** 2) 

162 self._stats[MEMORY_USAGE_MAXIMUM] = max(self._stats[MEMORY_USAGE_MAXIMUM], mem_size) 

163 

164 # Broadcast updates 

165 tasks: deque[asyncio.Task[None]] = deque() 

166 stats: dict[str, int | str] = { 

167 "load/bytes/received": self._stats[STAT_BYTES_RECEIVED], 

168 "load/bytes/sent": self._stats[STAT_BYTES_SENT], 

169 "messages/received": self._stats[STAT_MSG_RECEIVED], 

170 "messages/sent": self._stats[STAT_MSG_SENT], 

171 "time": int(datetime.now(tz=UTC).timestamp()), 

172 "uptime": str(uptime), 

173 "uptime/formatted": str(datetime.fromtimestamp(self._stats[STAT_START_TIME], UTC)), 

174 "clients/connected": client_connected, 

175 "clients/disconnected": client_disconnected, 

176 "clients/maximum": self._stats[STAT_CLIENTS_MAXIMUM], 

177 "clients/total": client_connected + client_disconnected, 

178 "messages/inflight": inflight_in + inflight_out, 

179 "messages/inflight/in": inflight_in, 

180 "messages/inflight/out": inflight_out, 

181 "messages/inflight/stored": messages_stored, 

182 "messages/publish/received": self._stats[STAT_PUBLISH_RECEIVED], 

183 "messages/publish/sent": self._stats[STAT_PUBLISH_SENT], 

184 "messages/retained/count": len(self.context.retained_messages), 

185 "messages/subscriptions/count": subscriptions_count, 

186 "heap/size": mem_size, 

187 "heap/maximum": self._stats[MEMORY_USAGE_MAXIMUM], 

188 "cpu/percent": cpu_usage, 

189 "cpu/maximum": self._stats[CPU_USAGE_MAXIMUM], 

190 } 

191 for stat_name, stat_value in stats.items(): 

192 data: bytes = val_to_bytes_str(stat_value) 

193 tasks.append(self.schedule_broadcast_sys_topic(stat_name, data)) 

194 

195 # Wait until broadcasting tasks end 

196 while tasks and tasks[0].done(): 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true

197 tasks.popleft() 

198 

199 # Reschedule 

200 self.context.logger.debug(f"Broadcast $SYS topics again in {self._sys_interval} seconds.") 

201 self._sys_handle = ( 

202 self.context.loop.call_later(self._sys_interval, self.broadcast_dollar_sys_topics) 

203 if self.context.loop is not None 

204 else None 

205 ) 

206 

207 async def on_mqtt_packet_received(self, *, packet: PACKET, session: Session | None = None) -> None: 

208 """Handle incoming MQTT packets.""" 

209 if packet: 209 ↛ exitline 209 didn't return from function 'on_mqtt_packet_received' because the condition on line 209 was always true

210 packet_size = packet.bytes_length 

211 self._stats[STAT_BYTES_RECEIVED] += packet_size 

212 self._stats[STAT_MSG_RECEIVED] += 1 

213 if packet.fixed_header.packet_type == PUBLISH: 

214 self._stats[STAT_PUBLISH_RECEIVED] += 1 

215 

216 async def on_mqtt_packet_sent(self, *, packet: PACKET, session: Session | None = None) -> None: 

217 """Handle sent MQTT packets.""" 

218 if packet: 218 ↛ exitline 218 didn't return from function 'on_mqtt_packet_sent' because the condition on line 218 was always true

219 packet_size = packet.bytes_length 

220 self._stats[STAT_BYTES_SENT] += packet_size 

221 self._stats[STAT_MSG_SENT] += 1 

222 if packet.fixed_header.packet_type == PUBLISH: 

223 self._stats[STAT_PUBLISH_SENT] += 1 

224 

225 async def on_broker_client_connected(self, client_id: str, client_session: Session) -> None: 

226 """Handle broker client connection.""" 

227 self._stats[STAT_CLIENTS_CONNECTED] += 1 

228 self._stats[STAT_CLIENTS_MAXIMUM] = max( 

229 self._stats[STAT_CLIENTS_MAXIMUM], 

230 self._stats[STAT_CLIENTS_CONNECTED], 

231 ) 

232 

233 async def on_broker_client_disconnected(self, client_id: str, client_session: Session) -> None: 

234 """Handle broker client disconnection.""" 

235 self._stats[STAT_CLIENTS_CONNECTED] -= 1 

236 self._stats[STAT_CLIENTS_DISCONNECTED] += 1 

237 

238 @dataclass 

239 class Config: 

240 """Configuration struct for plugin.""" 

241 

242 sys_interval: int = 20