Skip to content

Broker API reference¤

The amqtt.broker.Broker class provides a complete MQTT 3.1.1 broker implementation. This class allows Python developers to embed a MQTT broker in their own applications.

Usage example¤

The following example shows how to start a broker using the default configuration:

import asyncio
from asyncio import CancelledError
import logging

from amqtt.broker import Broker

"""
This sample shows how to run a broker
"""

formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)


async def run_server() -> None:
    broker = Broker()
    try:
        await broker.start()
        while True:
            await asyncio.sleep(1)
    except CancelledError:
        await broker.shutdown()

def __main__():
    try:
        asyncio.run(run_server())
    except KeyboardInterrupt:
        print("Server exiting...")

if __name__ == "__main__":
    __main__()

This will start the broker and let it run until it is shutdown by ^c.

Reference¤

Broker API¤

The amqtt.broker module provides the following key methods in the Broker class:

  • start(): Starts the broker and begins serving
  • shutdown(): Gracefully shuts down the broker

Broker ¤

Broker(
    config: BrokerConfig | dict[str, Any] | None = None,
    loop: AbstractEventLoop | None = None,
    plugin_namespace: str | None = None,
)

MQTT 3.1.1 compliant broker implementation.

Parameters:

  • config ¤

    (BrokerConfig | dict[str, Any] | None, default: None ) –

    BrokerConfig or dictionary of equivalent structure options (see broker configuration).

  • loop ¤

    (AbstractEventLoop | None, default: None ) –

    asyncio loop. defaults to asyncio.new_event_loop().

  • plugin_namespace ¤

    (str | None, default: None ) –

    plugin namespace to use when loading plugin entry_points. defaults to amqtt.broker.plugins.

Raises:

  • BrokerError

    problem with broker configuration

  • PluginImportError

    if importing a plugin from configuration

  • PluginInitError

    if initialization plugin fails

Methods:

  • external_connected

    Engage the broker in handling the data stream to/from an established connection.

  • shutdown

    Stop broker instance.

  • start

    Start the broker to serve with the given configuration.

external_connected async ¤

external_connected(
    reader: ReaderAdapter,
    writer: WriterAdapter,
    listener_name: str,
) -> None

Engage the broker in handling the data stream to/from an established connection.

shutdown async ¤

shutdown() -> None

Stop broker instance.

start async ¤

start() -> None

Start the broker to serve with the given configuration.

Start method opens network sockets and will start listening for incoming connections.