Skip to content

MQTTClient API¤

The amqtt.client.MQTTClient class implements the client part of MQTT protocol. It can be used to publish and/or subscribe MQTT message on a broker accessible on the network through TCP or websocket protocol, both secured or unsecured.

Usage examples¤

Subscriber¤

The example below shows how to write a simple MQTT client which subscribes a topic and prints every messages received from the broker:

import logging
import asyncio

from amqtt.client import MQTTClient, ClientException
from amqtt.mqtt.constants import QOS_1, QOS_2

logger = logging.getLogger(__name__)

async def uptime_coro():
    C = MQTTClient()
    await C.connect('mqtt://test.mosquitto.org/')
    # Subscribe to '$SYS/broker/uptime' with QOS=1
    # Subscribe to '$SYS/broker/load/#' with QOS=2
    await C.subscribe([
            ('$SYS/broker/uptime', QOS_1),
            ('$SYS/broker/load/#', QOS_2),
         ])
    try:
        for i in range(1, 100):
            message = await C.deliver_message()
            packet = message.publish_packet
            print("%d:  %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
        await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
        await C.disconnect()
    except ClientException as ce:
        logger.error("Client exception: %s" % ce)

if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    asyncio.get_event_loop().run_until_complete(uptime_coro())

When executed, this script gets the default event loop and asks it to run the uptime_coro until it completes. uptime_coro starts by initializing a MQTTClient instance. The coroutine then calls connect() to connect to the broker, here test.mosquitto.org. Once connected, the coroutine subscribes to some topics, and then wait for 100 messages. Each message received is simply written to output. Finally, the coroutine unsubscribes from topics and disconnects from the broker.

Publisher¤

The example below uses the MQTTClient class to implement a publisher. This test publish 3 messages asynchronously to the broker on a test topic. For the purposes of the test, each message is published with a different Quality Of Service. This example also shows two methods for publishing messages asynchronously.

import logging
import asyncio

from amqtt.client import MQTTClient
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2

logger = logging.getLogger(__name__)

async def test_coro():
    C = MQTTClient()
    await C.connect('mqtt://test.mosquitto.org/')
    tasks = [
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0')),
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
        asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
    ]
    await asyncio.wait(tasks)
    logger.info("messages published")
    await C.disconnect()


async def test_coro2():
    try:
        C = MQTTClient()
        ret = await C.connect('mqtt://test.mosquitto.org:1883/')
        message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)
        message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)
        message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
        #print(message)
        logger.info("messages published")
        await C.disconnect()
    except ConnectException as ce:
        logger.error("Connection failed: %s" % ce)
        asyncio.get_event_loop().stop()


if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.DEBUG, format=formatter)
    asyncio.get_event_loop().run_until_complete(test_coro())
    asyncio.get_event_loop().run_until_complete(test_coro2())

As usual, the script runs the publish code through the async loop. test_coro() and test_coro2() are ran in sequence. Both do the same job. test_coro() publishes 3 messages in sequence. test_coro2() publishes the same message asynchronously. The difference appears when looking at the sequence of MQTT messages sent.

test_coro() achieves:

amqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.843901, fixed=MQTTFixedHeader(length=28, flags=0x0), variable=PublishVariableHeader(topic=a/b, packet_id=None), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_0'"))
amqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.844152, fixed=MQTTFixedHeader(length=30, flags=0x2), variable=PublishVariableHeader(topic=a/b, packet_id=1), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_1'"))
amqtt/YDYY;NNRpYQSy3?o <-in-- PubackPacket(ts=2015-11-11 21:54:48.979665, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=1), payload=None)
amqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.980886, fixed=MQTTFixedHeader(length=30, flags=0x4), variable=PublishVariableHeader(topic=a/b, packet_id=2), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_2'"))
amqtt/YDYY;NNRpYQSy3?o <-in-- PubrecPacket(ts=2015-11-11 21:54:49.029691, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)
amqtt/YDYY;NNRpYQSy3?o -out-> PubrelPacket(ts=2015-11-11 21:54:49.030823, fixed=MQTTFixedHeader(length=2, flags=0x2), variable=PacketIdVariableHeader(packet_id=2), payload=None)
amqtt/YDYY;NNRpYQSy3?o <-in-- PubcompPacket(ts=2015-11-11 21:54:49.092514, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)

while test_coro2() runs:

amqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466123, fixed=MQTTFixedHeader(length=28, flags=0x0), variable=PublishVariableHeader(topic=a/b, packet_id=None), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_0'"))
amqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466432, fixed=MQTTFixedHeader(length=30, flags=0x2), variable=PublishVariableHeader(topic=a/b, packet_id=1), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_1'"))
amqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466695, fixed=MQTTFixedHeader(length=30, flags=0x4), variable=PublishVariableHeader(topic=a/b, packet_id=2), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_2'"))
amqtt/LYRf52W[56SOjW04 <-in-- PubackPacket(ts=2015-11-11 21:54:48.613062, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=1), payload=None)
amqtt/LYRf52W[56SOjW04 <-in-- PubrecPacket(ts=2015-11-11 21:54:48.661073, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)
amqtt/LYRf52W[56SOjW04 -out-> PubrelPacket(ts=2015-11-11 21:54:48.661925, fixed=MQTTFixedHeader(length=2, flags=0x2), variable=PacketIdVariableHeader(packet_id=2), payload=None)
amqtt/LYRf52W[56SOjW04 <-in-- PubcompPacket(ts=2015-11-11 21:54:48.713107, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)

Both coroutines have the same results except that test_coro2() manages messages flow in parallel which may be more efficient.

MQTTClient ¤

MQTTClient(
    client_id: str | None = None,
    config: ClientConfig | dict[str, Any] | None = None,
)

MQTT client implementation, providing an API for connecting to a broker and send/receive messages using the MQTT protocol.

Parameters:

  • client_id ¤

    (str | None, default: None ) –

    MQTT client ID to use when connecting to the broker. If none, it will be generated randomly by amqtt.utils.gen_client_id

  • config ¤

    (ClientConfig | dict[str, Any] | None, default: None ) –

    ClientConfig or dictionary of equivalent structure options (see client configuration).

Raises:

  • PluginImportError

    if importing a plugin from configuration fails

  • PluginInitError

    if initialization plugin fails

Methods:

cancel_tasks async ¤

cancel_tasks() -> None

Cancel all pending tasks.

connect async ¤

connect(
    uri: str | None = None,
    cleansession: bool | None = None,
    cafile: str | None = None,
    capath: str | None = None,
    cadata: str | None = None,
    additional_headers: dict[str, Any]
    | HeadersLike
    | None = None,
) -> int

Connect to a remote broker.

At first, a network connection is established with the server using the given protocol (mqtt, mqtts, ws or wss). Once the socket is connected, a CONNECT message is sent with the requested information.

Parameters:

  • uri ¤

    (str | None, default: None ) –

    Broker URI connection, conforming to MQTT URI scheme. default, will be taken from the uri config attribute.

  • cleansession ¤

    (bool | None, default: None ) –

    MQTT CONNECT clean session flag

  • cafile ¤

    (str | None, default: None ) –

    server certificate authority file (optional, used for secured connection)

  • capath ¤

    (str | None, default: None ) –

    server certificate authority path (optional, used for secured connection)

  • cadata ¤

    (str | None, default: None ) –

    server certificate authority data (optional, used for secured connection)

  • additional_headers ¤

    (dict[str, Any] | HeadersLike | None, default: None ) –

    a dictionary with additional http headers that should be sent on the initial connection (optional, used only with websocket connections)

Returns:

Raises:

  • ConnectError

    could not connect to broker

deliver_message async ¤

deliver_message(
    timeout_duration: float | None = None,
) -> ApplicationMessage | None

Deliver the next received message.

Deliver next message received from the broker. If no message is available, this methods waits until next message arrives or timeout_duration occurs.

Parameters:

  • timeout_duration ¤

    (float | None, default: None ) –

    maximum number of seconds to wait before returning. If not specified or None, there is no limit.

Returns:

  • ApplicationMessage | None

    instance of amqtt.session.ApplicationMessage containing received message information flow.

Raises:

  • TimeoutError

    if timeout occurs before a message is delivered

  • ClientError

    if client is not connected

disconnect async ¤

disconnect() -> None

Disconnect from the connected broker.

This method sends a DISCONNECT message and closes the network socket.

handle_connection_close async ¤

handle_connection_close() -> None

Handle disconnection from the broker.

ping async ¤

ping() -> None

Ping the broker.

Send a MQTT PINGREQ message for response.

publish async ¤

publish(
    topic: str,
    message: bytes,
    qos: int | None = None,
    retain: bool | None = None,
    ack_timeout: int | None = None,
) -> OutgoingApplicationMessage

Publish a message to the broker.

Send a MQTT PUBLISH message and wait for acknowledgment depending on Quality Of Service

Parameters:

  • topic ¤

    (str) –

    topic name to which message data is published

  • message ¤

    (bytes) –

    payload message (as bytes) to send.

  • qos ¤

    (int | None, default: None ) –

    requested publish quality of service : QOS_0, QOS_1 or QOS_2. Defaults to default_qos config parameter or QOS_0.

  • retain ¤

    (bool | None, default: None ) –

    retain flag. Defaults to default_retain config parameter or False.

  • ack_timeout ¤

    (int | None, default: None ) –

    duration to wait for connection acknowledgment from broker.

Returns:

reconnect async ¤

reconnect(cleansession: bool | None = None) -> int

Reconnect a previously connected broker.

Reconnection tries to establish a network connection and send a CONNECT message. Retries interval and attempts can be controlled with the reconnect_max_interval and reconnect_retries configuration parameters.

Parameters:

  • cleansession ¤

    (bool | None, default: None ) –

    clean session flag used in MQTT CONNECT messages sent for reconnections.

Returns:

Raises:

  • ConnectException

    if re-connection fails after max retries.

subscribe async ¤

subscribe(topics: list[tuple[str, int]]) -> list[int]

Subscribe to topics.

Send a MQTT SUBSCRIBE message and wait for broker acknowledgment.

Parameters:

  • topics ¤

    (list[tuple[str, int]]) –

    array of tuples containing topic pattern and QOS from amqtt.mqtt.constants to subscribe. For example:

    [
        ("$SYS/broker/uptime", QOS_1),
        ("$SYS/broker/load/#", QOS_2),
    ]
    

Returns:

  • list[int]

    SUBACK message return code.

unsubscribe async ¤

unsubscribe(topics: list[str]) -> None

Unsubscribe from topics.

Send a MQTT UNSUBSCRIBE message and wait for broker UNSUBACK message.

Parameters:

  • topics ¤

    (list[str]) –

    array of topics to unsubscribe from.

    ["$SYS/broker/uptime", "$SYS/broker/load/#"]