Skip to content

Custom Plugins¤

With the aMQTT plugins framework, one can add additional functionality to the client or broker without having to rewrite any of the core logic. Plugins can receive broker or client events events, used for client authentication and controlling topic access.

Overview¤

To create a custom plugin, subclass from BasePlugin (client or broker) or BaseAuthPlugin (broker only) or BaseTopicPlugin (broker only). Each custom plugin may define settings specific to itself by creating a nested (ie. inner) dataclass named Config which declares each option and a default value (if applicable). A plugin's configuration dataclass will be type-checked and made available from within the self.config instance variable.

from dataclasses import dataclass, field
from amqtt.plugins.base import BasePlugin
from amqtt.contexts import BaseContext


class OneClassName(BasePlugin[BaseContext]):
    """This is a plugin with no functionality"""


class TwoClassName(BasePlugin[BaseContext]):
    """This is a plugin with configuration options."""
    def __init__(self, context: BaseContext):
        super().__init__(context)
        self.my_option_one: str = self.config.option1

    async def on_broker_pre_start(self) -> None:
        print(f"On broker pre-start, my option1 is: {self.my_option_one}")

    @dataclass
    class Config:
        option1: int
        option3: str = field(default="my_default_value")

This plugin class then should be added to the configuration file of the broker or client (or to the config dictionary passed to the Broker or MQTTClient), such as myBroker.yaml:

---
listeners:
  default:
    type: tcp
    bind: 0.0.0.0:1883
plugins:
  module.submodule.file.OneClassName:
  module.submodule.file.TwoClassName:
    option1: 123

and then run via amqtt -c myBroker.yaml.

Example: custom plugin within broker script

The example samples/broker_custom_plugin.py demonstrates how to load a custom plugin by passing a config dictionary when instantiating a Broker. While this example is functional, samples is an invalid python module (it does not have a __init__.py); it is recommended that custom plugins are placed in a python module.

import asyncio
from dataclasses import dataclass
import logging

from amqtt.broker import Broker
from amqtt.plugins.base import BasePlugin
from amqtt.session import Session

"""
This sample shows how to run a broker without stacktraces on keyboard interrupt
"""

logger = logging.getLogger(__name__)


class RemoteInfoPlugin(BasePlugin):

    async def on_broker_client_connected(self, *, client_id:str, client_session:Session) -> None:
        display_port_str = f"on port '{client_session.remote_port}'" if self.config.display_port else ""

        logger.info(f"client '{client_id}' connected from"
                    f" '{client_session.remote_address}' {display_port_str}")

    @dataclass
    class Config:
        display_port: bool = False

config = {
    "listeners": {
        "default": {
            "type": "tcp",
            "bind": "0.0.0.0:1883",
        },
        "ws-mqtt": {
            "bind": "127.0.0.1:8080",
            "type": "ws",
            "max_connections": 10,
        },
    },
    "plugins": {
        "amqtt.plugins.authentication.AnonymousAuthPlugin": { "allow_anonymous": True},
        "samples.broker_custom_plugin.RemoteInfoPlugin": { "display_port": True },
    }
}

async def main_loop():
    broker = Broker(config)
    try:
        await broker.start()
        while True:
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        await broker.shutdown()

async def main():
    t = asyncio.create_task(main_loop())
    try:
        await t
    except asyncio.CancelledError:
        pass

def __main__():

    formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
    logging.basicConfig(level=logging.INFO, format=formatter)

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    task = loop.create_task(main())

    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt:
        logger.info("KeyboardInterrupt received. Stopping server...")
        task.cancel()
        loop.run_until_complete(task)  # Ensure task finishes cleanup
    finally:
        logger.info("Server stopped.")
        loop.close()

if __name__ == "__main__":
    __main__()
Deprecated: activating plugins using EntryPoints

With the aMQTT plugins framework, one can add additional functionality to the client or broker without having to rewrite any of the core logic. To define a custom list of plugins to be loaded, add this section to your pyproject.toml"

[project.entry-points."mypackage.mymodule.plugins"]
plugin_alias = "module.submodule.file:ClassName"

Each plugin has access to the full configuration file through the provided BaseContext and can define its own variables to configure its behavior.

BasePlugin ¤

BasePlugin(context: C)

Bases: Generic[C]

The base from which all plugins should inherit.

Type Parameters¤

C: A BaseContext: either BrokerContext or ClientContext, depending on plugin usage

Attributes¤

context (C): Information about the environment in which this plugin is executed. Modifying the broker or client state should happen through methods available here.

config (self.Config): An instance of the Config dataclass defined by the plugin (or an empty dataclass, if not defined). If using entrypoint- or mixed-style configuration, use _get_config_option() to access the variable.

Classes:

  • Config

    Override to define the configuration and defaults for plugin.

Methods:

  • close

    Override if plugin needs to clean up resources upon shutdown.

Config dataclass ¤

Config()

Override to define the configuration and defaults for plugin.

close async ¤

close() -> None

Override if plugin needs to clean up resources upon shutdown.

Events¤

All plugins are notified of events if the BasePlugin subclass implements one or more of these methods:

Client¤

  • async def on_mqtt_packet_sent(self, *, packet: MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader], session: Session | None = None) -> None
  • async def on_mqtt_packet_received(self, *, packet: MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader], session: Session | None = None) -> None

Broker¤

  • async def on_broker_pre_start(self) -> None
  • async def on_broker_post_start(self) -> None
  • async def on_broker_pre_shutdown(self) -> None
  • async def on_broker_post_shutdown(self) -> None

  • async def on_broker_client_connected(self, *, client_id:str, client_session:Session) -> None

  • async def on_broker_client_disconnected(self, *, client_id:str, client_session:Session) -> None

  • async def on_broker_client_connected(self, *, client_id:str) -> None

  • async def on_broker_client_disconnected(self, *, client_id:str) -> None

  • async def on_broker_retained_message(self, *, client_id: str | None, retained_message: RetainedApplicationMessage) -> None

  • async def on_broker_client_subscribed(self, *, client_id: str, topic: str, qos: int) -> None

  • async def on_broker_client_unsubscribed(self, *, client_id: str, topic: str) -> None

  • async def on_broker_message_received(self, *, client_id: str, message: ApplicationMessage) -> None

  • async def on_broker_message_broadcast(self, *, client_id: str, message: ApplicationMessage) -> None
  • async def on_mqtt_packet_sent(self, *, packet: MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader], session: Session | None = None) -> None
  • async def on_mqtt_packet_received(self, *, packet: MQTTPacket[MQTTVariableHeader, MQTTPayload[MQTTVariableHeader], MQTTFixedHeader], session: Session | None = None) -> None

Note

if the client_id is None, the message is retained for a topic if the retained_message.data is None or empty (''), the topic message is being cleared

Authentication Plugins¤

In addition to receiving any of the event callbacks, a plugin which subclasses from BaseAuthPlugin is used by the aMQTT Broker to determine if a connection from a client is allowed by implementing the authenticate method and returning: - True if the session is allowed - False if not allowed - None if plugin can't determine authentication

If there are multiple authentication plugins: - at least one plugin must return True to allow access - False from any plugin will deny access (i.e. all plugins must return True to allow access) - None gets ignored from the determination

BaseAuthPlugin ¤

BaseAuthPlugin(context: BaseContext)

Bases: BasePlugin[BaseContext]

Base class for authentication plugins.

Classes:

  • Config

    Override to define the configuration and defaults for plugin.

Methods:

  • authenticate

    Logic for session authentication.

  • close

    Override if plugin needs to clean up resources upon shutdown.

Config dataclass ¤

Config()

Override to define the configuration and defaults for plugin.

authenticate async ¤

authenticate(*, session: Session) -> bool | None

Logic for session authentication.

Parameters:

  • session ¤
    (Session) –

    amqtt.session.Session

Returns:

  • bool | None
    • True if user is authentication succeed, False if user authentication fails
  • bool | None
    • None if authentication can't be achieved (then plugin result is then ignored)

close async ¤

close() -> None

Override if plugin needs to clean up resources upon shutdown.

Topic Filter Plugins¤

In addition to receiving any of the event callbacks, a plugin which is subclassed from BaseTopicPlugin is used by the aMQTT Broker to determine if a connected client can send (PUBLISH), receive (RECEIVE) and/or subscribe (SUBSCRIBE) messages to a particular topic by implementing the topic_filtering method and returning: - True if topic is allowed - False if not allowed - None will be ignored

If there are multiple topic plugins: - at least one plugin must return True to allow access - False from any plugin will deny access (i.e. all plugins must return True to allow access) - None will be ignored

BaseTopicPlugin ¤

BaseTopicPlugin(context: BaseContext)

Bases: BasePlugin[BaseContext]

Base class for topic plugins.

Classes:

  • Config

    Override to define the configuration and defaults for plugin.

Methods:

  • close

    Override if plugin needs to clean up resources upon shutdown.

  • topic_filtering

    Logic for filtering out topics.

Config dataclass ¤

Config()

Override to define the configuration and defaults for plugin.

close async ¤

close() -> None

Override if plugin needs to clean up resources upon shutdown.

topic_filtering async ¤

topic_filtering(
    *,
    session: Session | None = None,
    topic: str | None = None,
    action: Action | None = None,
) -> bool | None

Logic for filtering out topics.

Parameters:

  • session ¤
    (Session | None, default: None ) –

    amqtt.session.Session

  • topic ¤
    (str | None, default: None ) –

    str

  • action ¤
    (Action | None, default: None ) –

    amqtt.broker.Action

Returns:

  • bool ( bool | None ) –

    True if topic is allowed, False otherwise. None if it can't be determined

Note

A custom plugin class can subclass from both BaseAuthPlugin and BaseTopicPlugin as long it defines both the authenticate and topic_filtering method.