Skip to main content

This is my custom aiomqtt client

Project description

DM-aiomqtt

Urls

Usage

Warning

For correct operation of the client, readwrite access to ping/# topic is REQUIRED. Improved system for responding to loss of connection, will use this topic to send ping messages.

(Format of ping messages - topic: ping/[uuid4], payload: 1)

Run in Windows

If you run async code in Windows, set correct selector

import asyncio
import sys

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

Example

from dm_aiomqtt import DMAioMqttClient
import asyncio


# create handler for 'test' topic
async def test_topic_handler(publish: DMAioMqttClient.publish, topic: str, payload: str) -> None:
    print(f"Received message from {topic}: {payload}")
    await publish("test/success", payload=True)


async def main():
    # create client
    mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")

    # add handler for 'test' topic
    mqtt_client.add_topic_handler("test", test_topic_handler)

    # start connection
    await mqtt_client.start()

    # publish
    await mqtt_client.publish("test", payload="Hello World!", sent_logging=True)

    # other code (for example, endless waiting)
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(main())

Set custom logger

If you want set up custom logger

from dm_aiomqtt import DMAioMqttClient


# create custom logger
class MyLogger:
    def debug(self, message):
        pass

    def info(self, message):
        pass

    def warning(self, message):
        print(message)

    def error(self, message):
        print(message)


# set up custom logger for all clients
DMAioMqttClient.set_logger(MyLogger())

Publish method parameters

Parameter Type Default Value Description
topic str (required) Topic name
payload str "DEBUG" Content to send
qos 0 | 1 | 2 True MQTT QoS
payload_to_json bool | "auto" True Whether to convert content to JSON
sent_logging bool False Whether to print the sending notification
warn_logging bool False Whether to print a send error warning

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dm-aiomqtt-0.3.3.tar.gz (4.3 kB view hashes)

Uploaded Source

Built Distribution

dm_aiomqtt-0.3.3-py3-none-any.whl (4.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page