Skip to main content

This is my custom aiomqtt client

Project description

DM-aiomqtt

Urls

Usage

Example

from dm_aiomqtt import DMAioMqttClient
import asyncio


# create handler for 'test' topic
async def test_topic_handler(publish: DMAioMqttClient.publish, topic: str, payload: str) -> None:
    print(f"Received message from {topic}: {payload}")
    publish("test/success", payload=True)


async def main():
    # create client
    mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")

    # add handler for 'test' topic
    mqtt_client.add_topic_handler("test", test_topic_handler)

    # start connection
    await mqtt_client.start()

    # publish
    mqtt_client.publish("test", payload="Hello World!", sent_logging=True)

    # other code (for example, endless waiting)
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(main())

TLS connection

  • NOT required client certificate

    mqtt_client = DMAioMqttClient(
        host="localhost",
        port=8883,
        ca_crt="ssl/ca.crt"
    )
    
  • REQUIRED client certificate

    mqtt_client = DMAioMqttClient(
        host="localhost",
        port=8883,
        ca_crt="ssl/ca.crt",
        client_crt="ssl/client.crt",
        client_key="ssl/client.key"
    )
    

Set custom logger

If you want set up custom logger

from dm_aiomqtt import DMAioMqttClient


# create custom logger
class MyLogger:
    def debug(self, message):
        pass

    def info(self, message):
        pass

    def warning(self, message):
        print(message)

    def error(self, message):
        print(message)


# set up custom logger for all clients
DMAioMqttClient.set_logger(MyLogger())

Publish method parameters

Parameter Type Default Value Description
topic str (required) Topic name
payload str "DEBUG" Content to send
qos 0 | 1 | 2 True MQTT QoS
payload_to_json bool | "auto" True Whether to convert content to JSON
sent_logging bool False Whether to print the sending notification
error_logging bool False Whether to print a send error warning

Run in Windows

If you run async code in Windows, set correct selector

import asyncio
import sys

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dm_aiomqtt-0.4.2.tar.gz (4.3 kB view hashes)

Uploaded Source

Built Distribution

dm_aiomqtt-0.4.2-py3-none-any.whl (4.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page