MQTTClient API¶
The MQTTClient
class implements the client part of MQTT protocol. It can be used to publish and/or subscribe MQTT message on a broker accessible on the network through TCP or websocket protocol, both secured or unsecured.
Usage examples¶
Subscriber¶
The example below shows how to write a simple MQTT client which subscribes a topic and prints every messages received from the broker :
import logging
import asyncio
from amqtt.client import MQTTClient, ClientException
from amqtt.mqtt.constants import QOS_1, QOS_2
logger = logging.getLogger(__name__)
async def uptime_coro():
C = MQTTClient()
await C.connect('mqtt://test.mosquitto.org/')
# Subscribe to '$SYS/broker/uptime' with QOS=1
# Subscribe to '$SYS/broker/load/#' with QOS=2
await C.subscribe([
('$SYS/broker/uptime', QOS_1),
('$SYS/broker/load/#', QOS_2),
])
try:
for i in range(1, 100):
message = await C.deliver_message()
packet = message.publish_packet
print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data)))
await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
await C.disconnect()
except ClientException as ce:
logger.error("Client exception: %s" % ce)
if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(uptime_coro())
When executed, this script gets the default event loop and asks it to run the uptime_coro
until it completes.
uptime_coro
starts by initializing a MQTTClient
instance.
The coroutine then call connect()
to connect to the broker, here test.mosquitto.org
.
Once connected, the coroutine subscribes to some topics, and then wait for 100 messages. Each message received is simply written to output.
Finally, the coroutine unsubscribes from topics and disconnects from the broker.
Publisher¶
The example below uses the MQTTClient
class to implement a publisher.
This test publish 3 messages asynchronously to the broker on a test topic.
For the purposes of the test, each message is published with a different Quality Of Service.
This example also shows to method for publishing message asynchronously.
import logging
import asyncio
from amqtt.client import MQTTClient
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
logger = logging.getLogger(__name__)
async def test_coro():
C = MQTTClient()
await C.connect('mqtt://test.mosquitto.org/')
tasks = [
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0')),
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)),
]
await asyncio.wait(tasks)
logger.info("messages published")
await C.disconnect()
async def test_coro2():
try:
C = MQTTClient()
ret = await C.connect('mqtt://test.mosquitto.org:1883/')
message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)
message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)
message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
#print(message)
logger.info("messages published")
await C.disconnect()
except ConnectException as ce:
logger.error("Connection failed: %s" % ce)
asyncio.get_event_loop().stop()
if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
asyncio.get_event_loop().run_until_complete(test_coro())
asyncio.get_event_loop().run_until_complete(test_coro2())
As usual, the script runs the publish code through the async loop. test_coro()
and test_coro()
are ran in sequence.
Both do the same job. test_coro()
publish 3 messages in sequence. test_coro2()
publishes the same message asynchronously.
The difference appears the looking at the sequence of MQTT messages sent.
test_coro()
achieves:
amqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.843901, fixed=MQTTFixedHeader(length=28, flags=0x0), variable=PublishVariableHeader(topic=a/b, packet_id=None), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_0'"))
amqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.844152, fixed=MQTTFixedHeader(length=30, flags=0x2), variable=PublishVariableHeader(topic=a/b, packet_id=1), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_1'"))
amqtt/YDYY;NNRpYQSy3?o <-in-- PubackPacket(ts=2015-11-11 21:54:48.979665, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=1), payload=None)
amqtt/YDYY;NNRpYQSy3?o -out-> PublishPacket(ts=2015-11-11 21:54:48.980886, fixed=MQTTFixedHeader(length=30, flags=0x4), variable=PublishVariableHeader(topic=a/b, packet_id=2), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_2'"))
amqtt/YDYY;NNRpYQSy3?o <-in-- PubrecPacket(ts=2015-11-11 21:54:49.029691, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)
amqtt/YDYY;NNRpYQSy3?o -out-> PubrelPacket(ts=2015-11-11 21:54:49.030823, fixed=MQTTFixedHeader(length=2, flags=0x2), variable=PacketIdVariableHeader(packet_id=2), payload=None)
amqtt/YDYY;NNRpYQSy3?o <-in-- PubcompPacket(ts=2015-11-11 21:54:49.092514, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)
while test_coro2()
runs:
amqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466123, fixed=MQTTFixedHeader(length=28, flags=0x0), variable=PublishVariableHeader(topic=a/b, packet_id=None), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_0'"))
amqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466432, fixed=MQTTFixedHeader(length=30, flags=0x2), variable=PublishVariableHeader(topic=a/b, packet_id=1), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_1'"))
amqtt/LYRf52W[56SOjW04 -out-> PublishPacket(ts=2015-11-11 21:54:48.466695, fixed=MQTTFixedHeader(length=30, flags=0x4), variable=PublishVariableHeader(topic=a/b, packet_id=2), payload=PublishPayload(data="b'TEST MESSAGE WITH QOS_2'"))
amqtt/LYRf52W[56SOjW04 <-in-- PubackPacket(ts=2015-11-11 21:54:48.613062, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=1), payload=None)
amqtt/LYRf52W[56SOjW04 <-in-- PubrecPacket(ts=2015-11-11 21:54:48.661073, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)
amqtt/LYRf52W[56SOjW04 -out-> PubrelPacket(ts=2015-11-11 21:54:48.661925, fixed=MQTTFixedHeader(length=2, flags=0x2), variable=PacketIdVariableHeader(packet_id=2), payload=None)
amqtt/LYRf52W[56SOjW04 <-in-- PubcompPacket(ts=2015-11-11 21:54:48.713107, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_id=2), payload=None)
Both coroutines have the same results except that test_coro2()
manages messages flow in parallel which may be more efficient.
Reference¶
MQTTClient API¶
MQTTClient configuration¶
The MQTTClient
__init__
method accepts a config
parameter which allow to setup some behaviour and defaults settings. This argument must be a Python dict object which may contain the following entries:
keep_alive
: keep alive (in seconds) to send when connecting to the broker (defaults to10
seconds).MQTTClient
will auto-ping the broker if not message is sent within the keep-alive interval. This avoids disconnection from the broker.ping_delay
: auto-ping delay before keep-alive times out (defaults to1
seconds).default_qos
: Default QoS (0
) used bypublish()
ifqos
argument is not given.default_retain
: Default retain (False
) used bypublish()
ifqos
argument is not given.,auto_reconnect
: enable or disable auto-reconnect feature (defaults toTrue
).reconnect_max_interval
: maximum interval (in seconds) to wait before two connection retries (defaults to10
).reconnect_retries
: maximum number of connect retries (defaults to2
). Negative value will cause client to reconnect infinietly.
Default QoS and default retain can also be overriden by adding a topics
with may contain QoS and retain values for specific topics. See the following example:
config = {
'keep_alive': 10,
'ping_delay': 1,
'default_qos': 0,
'default_retain': False,
'auto_reconnect': True,
'reconnect_max_interval': 5,
'reconnect_retries': 10,
'topics': {
'test': { 'qos': 1 },
'some_topic': { 'qos': 2, 'retain': True }
}
}
With this setting any message published will set with QOS_0 and retain flag unset except for :
messages sent to
test
topic : they will be sent with QOS_1messages sent to
some_topic
topic : they will be sent with QOS_2 and retain flag set
In any case, the qos
and retain
argument values passed to method publish()
will override these settings.