Custom Plugins¤
With the aMQTT plugins framework, one can add additional functionality to the client or broker without having to rewrite any of the core logic. Plugins can receive broker or client events events, used for client authentication and controlling topic access.
Overview¤
To create a custom plugin, subclass from BasePlugin (client or broker) or BaseAuthPlugin (broker only)
or BaseTopicPlugin (broker only). Each custom plugin may define settings specific to itself by creating
a nested (ie. inner) dataclass named Config which declares each option and a default value (if applicable). A
plugin's configuration dataclass will be type-checked and made available from within the self.config instance variable.
from dataclasses import dataclass, field
from amqtt.plugins.base import BasePlugin
from amqtt.contexts import BaseContext
class OneClassName(BasePlugin[BaseContext]):
"""This is a plugin with no functionality"""
class TwoClassName(BasePlugin[BaseContext]):
"""This is a plugin with configuration options."""
def __init__(self, context: BaseContext):
super().__init__(context)
self.my_option_one: str = self.config.option1
async def on_broker_pre_start(self) -> None:
print(f"On broker pre-start, my option1 is: {self.my_option_one}")
@dataclass
class Config:
option1: int
option3: str = field(default="my_default_value")
This plugin class then should be added to the configuration file of the broker or client (or to the config
dictionary passed to the Broker or MQTTClient), such as myBroker.yaml:
---
listeners:
default:
type: tcp
bind: 0.0.0.0:1883
plugins:
module.submodule.file.OneClassName:
module.submodule.file.TwoClassName:
option1: 123
and then run via amqtt -c myBroker.yaml.
Example: custom plugin within broker script
The example samples/broker_custom_plugin.py demonstrates how to load a custom plugin
by passing a config dictionary when instantiating a Broker. While this example is functional,
samples is an invalid python module (it does not have a __init__.py); it is recommended
that custom plugins are placed in a python module.
import asyncio
from dataclasses import dataclass
import logging
from amqtt.broker import Broker
from amqtt.plugins.base import BasePlugin
from amqtt.session import Session
"""
This sample shows how to run a broker without stacktraces on keyboard interrupt
"""
logger = logging.getLogger(__name__)
class RemoteInfoPlugin(BasePlugin):
async def on_broker_client_connected(self, *, client_id:str, client_session:Session) -> None:
display_port_str = f"on port '{client_session.remote_port}'" if self.config.display_port else ""
logger.info(f"client '{client_id}' connected from"
f" '{client_session.remote_address}' {display_port_str}")
@dataclass
class Config:
display_port: bool = False
config = {
"listeners": {
"default": {
"type": "tcp",
"bind": "0.0.0.0:1883",
},
"ws-mqtt": {
"bind": "127.0.0.1:8080",
"type": "ws",
"max_connections": 10,
},
},
"plugins": {
"amqtt.plugins.authentication.AnonymousAuthPlugin": { "allow_anonymous": True},
"samples.broker_custom_plugin.RemoteInfoPlugin": { "display_port": True },
}
}
async def main_loop():
broker = Broker(config)
try:
await broker.start()
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
await broker.shutdown()
async def main():
t = asyncio.create_task(main_loop())
try:
await t
except asyncio.CancelledError:
pass
def __main__():
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = loop.create_task(main())
try:
loop.run_until_complete(task)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received. Stopping server...")
task.cancel()
loop.run_until_complete(task) # Ensure task finishes cleanup
finally:
logger.info("Server stopped.")
loop.close()
if __name__ == "__main__":
__main__()
Deprecated: activating plugins using EntryPoints
With the aMQTT plugins framework, one can add additional functionality to the client or broker without
having to rewrite any of the core logic. To define a custom list of plugins to be loaded, add this section
to your pyproject.toml"
[project.entry-points."mypackage.mymodule.plugins"]
plugin_alias = "module.submodule.file:ClassName"
Each plugin has access to the full configuration file through the provided BaseContext and can define its own
variables to configure its behavior.
BasePlugin
¤
BasePlugin(context: C)
Bases: Generic[C]
The base from which all plugins should inherit.
Type Parameters¤
C: A BaseContext: either BrokerContext or ClientContext, depending on plugin usage
Attributes¤
context (C): Information about the environment in which this plugin is executed. Modifying the broker or client state should happen through methods available here.
config (self.Config):
An instance of the Config dataclass defined by the plugin (or an empty dataclass, if not
defined). If using entrypoint- or mixed-style configuration, use _get_config_option()
to access the variable.
Classes:
-
Config–Override to define the configuration and defaults for plugin.
Methods:
-
close–Override if plugin needs to clean up resources upon shutdown.
Events¤
All plugins are notified of events if the BasePlugin subclass implements one or more of these methods:
Client¤
async def on_mqtt_packet_sent(self, *, packet: MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader], session: Session | None = None) -> Noneasync def on_mqtt_packet_received(self, *, packet: MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader], session: Session | None = None) -> None
Broker¤
async def on_broker_pre_start(self) -> Noneasync def on_broker_post_start(self) -> Noneasync def on_broker_pre_shutdown(self) -> None-
async def on_broker_post_shutdown(self) -> None -
async def on_broker_client_connected(self, *, client_id:str, client_session:Session) -> None -
async def on_broker_client_disconnected(self, *, client_id:str, client_session:Session) -> None -
async def on_broker_client_connected(self, *, client_id:str) -> None -
async def on_broker_client_disconnected(self, *, client_id:str) -> None -
async def on_broker_retained_message(self, *, client_id: str | None, retained_message: RetainedApplicationMessage) -> None -
async def on_broker_client_subscribed(self, *, client_id: str, topic: str, qos: int) -> None -
async def on_broker_client_unsubscribed(self, *, client_id: str, topic: str) -> None -
async def on_broker_message_received(self, *, client_id: str, message: ApplicationMessage) -> None async def on_broker_message_broadcast(self, *, client_id: str, message: ApplicationMessage) -> Noneasync def on_mqtt_packet_sent(self, *, packet: MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader], session: Session | None = None) -> Noneasync def on_mqtt_packet_received(self, *, packet: MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader], session: Session | None = None) -> None
Authentication Plugins¤
In addition to receiving any of the event callbacks, a plugin which subclasses from BaseAuthPlugin
is used by the aMQTT Broker to determine if a connection from a client is allowed by
implementing the authenticate method and returning:
- True if the session is allowed
- False if not allowed
- None if plugin can't determine authentication
If there are multiple authentication plugins:
- at least one plugin must return True to allow access
- False from any plugin will deny access (i.e. all plugins must return True to allow access)
- None gets ignored from the determination
BaseAuthPlugin
¤
BaseAuthPlugin(context: BaseContext)
Bases: BasePlugin[BaseContext]
Base class for authentication plugins.
Classes:
-
Config–Override to define the configuration and defaults for plugin.
Methods:
-
authenticate–Logic for session authentication.
-
close–Override if plugin needs to clean up resources upon shutdown.
authenticate
async
¤
authenticate(*, session: Session) -> bool | None
Logic for session authentication.
Parameters:
-
(session¤Session) –amqtt.session.Session
Returns:
-
bool | None–Trueif user is authentication succeed,Falseif user authentication fails
-
bool | None–Noneif authentication can't be achieved (then plugin result is then ignored)
Topic Filter Plugins¤
In addition to receiving any of the event callbacks, a plugin which is subclassed from BaseTopicPlugin
is used by the aMQTT Broker to determine if a connected client can send (PUBLISH), receive (RECEIVE)
and/or subscribe (SUBSCRIBE) messages to a particular topic by implementing the topic_filtering method and returning:
- True if topic is allowed
- False if not allowed
- None will be ignored
If there are multiple topic plugins:
- at least one plugin must return True to allow access
- False from any plugin will deny access (i.e. all plugins must return True to allow access)
- None will be ignored
BaseTopicPlugin
¤
BaseTopicPlugin(context: BaseContext)
Bases: BasePlugin[BaseContext]
Base class for topic plugins.
Classes:
-
Config–Override to define the configuration and defaults for plugin.
Methods:
-
close–Override if plugin needs to clean up resources upon shutdown.
-
topic_filtering–Logic for filtering out topics.
topic_filtering
async
¤
topic_filtering(
*,
session: Session | None = None,
topic: str | None = None,
action: Action | None = None,
) -> bool | None
Logic for filtering out topics.
Parameters:
-
(session¤Session | None, default:None) –amqtt.session.Session
-
(topic¤str | None, default:None) –str
-
(action¤Action | None, default:None) –amqtt.broker.Action
Returns:
-
bool(bool | None) –Trueif topic is allowed,Falseotherwise.Noneif it can't be determined
Note
A custom plugin class can subclass from both BaseAuthPlugin and BaseTopicPlugin as long it defines
both the authenticate and topic_filtering method.