Skip to main content

Idomatic asyncio wrapper around paho-mqtt.

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

license semver

MQTT client with idiomatic asyncio interface 🙌

Write code like this:

async with Client("test.mosquitto.org") as client:
    async with client.filtered_messages("floors/+/humidity") as messages:
        await client.subscribe("floors/#")
        async for message in messages:
            print(message.payload.decode())

asyncio-mqtt combines the stability of the time-proven paho-mqtt library with a modern, asyncio-based interface.

  • No more callbacks! 👍
  • No more return codes (welcome to the MqttError)
  • Graceful disconnection (forget about on_unsubscribe, on_disconnect, etc.)
  • Compatible with async code
  • Did we mention no more callbacks?

The whole thing is less than 400 lines of code.

Installation 📚

pip install asyncio-mqtt

Advanced use ⚡

Let's make the example from before more interesting:

import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from random import randrange
from asyncio_mqtt import Client, MqttError


async def advanced_example():
    # We 💛 context managers. Let's create a stack to help
    # us manage them.
    async with AsyncExitStack() as stack:
        # Keep track of the asyncio tasks that we create, so that
        # we can cancel them on exit
        tasks = set()
        stack.push_async_callback(cancel_tasks, tasks)

        # Connect to the MQTT broker
        client = Client("test.mosquitto.org")
        await stack.enter_async_context(client)

        # You can create any number of topic filters
        topic_filters = (
            "floors/+/humidity",
            "floors/rooftop/#"
            # 👉 Try to add more filters!
        )
        for topic_filter in topic_filters:
            # Log all messages that matches the filter
            manager = client.filtered_messages(topic_filter)
            messages = await stack.enter_async_context(manager)
            template = f'[topic_filter="{topic_filter}"] {{}}'
            task = asyncio.create_task(log_messages(messages, template))
            tasks.add(task)

        # Messages that doesn't match a filter will get logged here
        messages = await stack.enter_async_context(client.unfiltered_messages())
        task = asyncio.create_task(log_messages(messages, "[unfiltered] {}"))
        tasks.add(task)

        # Subscribe to topic(s)
        # 🤔 Note that we subscribe *after* starting the message
        # loggers. Otherwise, we may miss retained messages.
        await client.subscribe("floors/#")

        # Publish a random value to each of these topics
        topics = (
            "floors/basement/humidity",
            "floors/rooftop/humidity",
            "floors/rooftop/illuminance",
            # 👉 Try to add more topics!
        )
        task = asyncio.create_task(post_to_topics(client, topics))
        tasks.add(task)

        # Wait for everything to complete (or fail due to, e.g., network
        # errors)
        await asyncio.gather(*tasks)

async def post_to_topics(client, topics):
    while True:
        for topic in topics:
            message = randrange(100)
            print(f'[topic="{topic}"] Publishing message={message}')
            await client.publish(topic, message, qos=1)
            await asyncio.sleep(2)

async def log_messages(messages, template):
    async for message in messages:
        # 🤔 Note that we assume that the message paylod is an
        # UTF8-encoded string (hence the `bytes.decode` call).
        print(template.format(message.payload.decode()))

async def cancel_tasks(tasks):
    for task in tasks:
        if task.done():
            continue
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

async def main():
    # Run the advanced_example indefinitely. Reconnect automatically
    # if the connection is lost.
    reconnect_interval = 3  # [seconds]
    while True:
        try:
            await advanced_example()
        except MqttError as error:
            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
        finally:
            await asyncio.sleep(reconnect_interval)


asyncio.run(main())

Alternative asyncio-based MQTT clients

Is asyncio-mqtt not what you are looking for? Try another client:

  • hbmqtt - Own protocol implementation. Includes a broker.
    GitHub stars license
  • gmqtt - Own protocol implementation. No dependencies.
    GitHub stars license
  • aiomqtt - Wrapper around paho-mqtt.
    GitHub stars license
  • mqttools - Own protocol implementation. No dependencies.
    GitHub stars license
  • aio-mqtt - Own protocol implementation. No dependencies.
    GitHub stars license

This is not an exhaustive list.

Honorable mentions

Requirements

Python 3.7 or later.

There is only a single dependency:

Note for Windows Users

Since Python 3.8, the default asyncio event loop is the ProactorEventLoop. Said loop doesn't support the add_reader method that is required by asyncio-mqtt. To use asyncio-mqtt, please switch to an event loop that supports the add_reader method such as the built-in SelectorEventLoop. E.g:

asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

Changelog

Please refer to the CHANGELOG document. It adheres to the principles of Keep a Changelog.

Versioning

semver

This project adheres to Semantic Versioning.

Expect API changes until we reach version 1.0.0. After 1.0.0, breaking changes will only occur in major release (e.g., 2.0.0, 3.0.0, etc.).

License

license

Note that the underlying paho-mqtt library is dual-licensed. One of the licenses is the so-called Eclipse Distribution License v1.0. It is almost word-for-word identical to the BSD 3-clause License. The only differences are:

  • One use of "COPYRIGHT OWNER" (EDL) instead of "COPYRIGHT HOLDER" (BSD)
  • One use of "Eclipse Foundation, Inc." (EDL) instead of "copyright holder" (BSD)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

asyncio_mqtt-0.7.0.tar.gz (11.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

asyncio_mqtt-0.7.0-py3-none-any.whl (10.3 kB view details)

Uploaded Python 3

File details

Details for the file asyncio_mqtt-0.7.0.tar.gz.

File metadata

  • Download URL: asyncio_mqtt-0.7.0.tar.gz
  • Upload date:
  • Size: 11.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.2

File hashes

Hashes for asyncio_mqtt-0.7.0.tar.gz
Algorithm Hash digest
SHA256 d945c37e549ada03d826705cf868617e97a69cf91ef09fea3381a27ffe31f691
MD5 bea553889f2109bc8a509d1003c28a7d
BLAKE2b-256 c436acd6e575f7aa4f4d8ccf4af0e72d95e7d158e7b446ee183ab65ec29a0ed5

See more details on using hashes here.

File details

Details for the file asyncio_mqtt-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: asyncio_mqtt-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 10.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.2

File hashes

Hashes for asyncio_mqtt-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 eed08ad8f8f3c6b83987e43629b81c4a491c082b3eebb40c14061d72024376a9
MD5 91779212f1e531c9bbe6312f0204ac35
BLAKE2b-256 2c7100b5d2d7ed713904fc8a03ef5dfa78bee8fc4b139ba6c62f489278b8c62d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page