Skip to main content

This is my custom aiomqtt client

Project description

DM-aiomqtt

Urls

Usage

Run in Windows

If you run async code in Windows, set correct selector

import asyncio
import sys

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

One connection for all actions

from dm_aiomqtt import DMAioMqttClient
import asyncio


# create handler for 'test' topic
async def test_topic_handler(publish: DMAioMqttClient.publish, topic: str, payload: str) -> None:
    print(f"Received message from {topic}: {payload}")
    publish("test/success", payload=True)


async def main():
    # create client
    mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")

    # add handler for 'test' topic
    await mqtt_client.add_topic_handler("test", test_topic_handler)

    # start connection
    await mqtt_client.connect()

    # publish
    mqtt_client.publish("test", payload="Hello World!", logging=True)

    # other code (for example, endless waiting)
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(main())

Temporary connection for actions

The apublish method is used here to wait for messages to be sent

from dm_aiomqtt import DMAioMqttClient
import asyncio


async def main():
    # create client
    mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")

    # create callback
    async def callback(apublish: DMAioMqttClient.apublish):
        await apublish("test/1", payload="Hello World!1", qos=2, logging=True)
        # other code
        await apublish("test/2", payload="Hello World!2", qos=2, logging=True)

    # execute callback in temporary connection
    await mqtt_client.temp_connect(callback)

    # other code (for example, endless waiting)
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(main())

Set custom logger

If you want set up custom logger

from dm_aiomqtt import DMAioMqttClient


# create custom logger
class MyLogger:
    def info(self, message):
       pass

    def warning(self, message):
       print(message)

    def error(self, message):
       print(message)


# set up custom logger for all clients
DMAioMqttClient.set_logger(MyLogger())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dm-aiomqtt-0.2.1.tar.gz (3.9 kB view hashes)

Uploaded Source

Built Distribution

dm_aiomqtt-0.2.1-py3-none-any.whl (4.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page