Skip to main content

Graceful routing of aiomqtt subscriptions

Project description

aiomqtt-router

An async router for MQTT topics with aiomqtt.

aiomqtt-router makes subscribing to multiple MQTT topics with aiomqtt much more straightforward and tidier than it otherwise would be. Here's an example from the aiomqtt docs without aiomqtt-router:

import asyncio
import aiomqtt


async def temperature_consumer():
    while True:
        message = await temperature_queue.get()
        print(f"[temperature/#] {message.payload}")


async def humidity_consumer():
    while True:
        message = await humidity_queue.get()
        print(f"[humidity/#] {message.payload}")


temperature_queue = asyncio.Queue()
humidity_queue = asyncio.Queue()


async def distributor(client):
    # Sort messages into the appropriate queues
    async for message in client.messages:
        if message.topic.matches("temperature/#"):
            temperature_queue.put_nowait(message)
        elif message.topic.matches("humidity/#"):
            humidity_queue.put_nowait(message)


async def main():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        await client.subscribe("temperature/#")
        await client.subscribe("humidity/#")
        # Use a task group to manage and await all tasks
        async with asyncio.TaskGroup() as tg:
            tg.create_task(distributor(client))
            tg.create_task(temperature_consumer())
            tg.create_task(humidity_consumer())


if __name__ == "__main__":
    asyncio.run(main())

And here's the same example with aiomqtt-router:

import asyncio
import aiomqtt

from aiomqtt_router import AiomqttRouter

router = AiomqttRouter()


@router.subscribe("humidity/#")
def handle_humidity(message: aiomqtt.Message):
    print(f"[humidity/#] {message.payload}")


@router.subscribe("temperature/#")
async def handle_temperature(message: aiomqtt.Message):
    print(f"[temperature/#] {message.payload}")


async def main():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        await router(client)


if __name__ == "__main__":
    asyncio.run(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiomqtt_router-0.1.0.tar.gz (2.5 kB view details)

Uploaded Source

Built Distribution

aiomqtt_router-0.1.0-py3-none-any.whl (2.9 kB view details)

Uploaded Python 3

File details

Details for the file aiomqtt_router-0.1.0.tar.gz.

File metadata

  • Download URL: aiomqtt_router-0.1.0.tar.gz
  • Upload date:
  • Size: 2.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.15.3 CPython/3.11.7 Darwin/23.2.0

File hashes

Hashes for aiomqtt_router-0.1.0.tar.gz
Algorithm Hash digest
SHA256 f9b2681ef08d41eda49a52f431729080ff7fbbc6417fb0f239f200adcafcfc16
MD5 fdc5bad11d4aa562ca1a34821a867c9e
BLAKE2b-256 76f717758557964c71dd31eb52b95abd12d701c2b9ff3c242a27b9988d7691a0

See more details on using hashes here.

File details

Details for the file aiomqtt_router-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: aiomqtt_router-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 2.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.15.3 CPython/3.11.7 Darwin/23.2.0

File hashes

Hashes for aiomqtt_router-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e6625592baef3613bba2f93aa55b64a84345d58f1d699ad45044387e7b4e97d9
MD5 607b3335b77ff0b33c3fe925fd02e944
BLAKE2b-256 1e267a743f6358dfdd162a3242fcd0fa847b4ca9117ddecc28adedd4dd8a203a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page