Skip to main content

This is my custom aiomqtt client

Project description

DM-aiomqtt

Urls

Usage

Example

from dm_aiomqtt import DMAioMqttClient
import asyncio


# create handler for 'test' topic
async def test_topic_handler(publish: DMAioMqttClient.publish, topic: str, payload: str) -> None:
    print(f"Received message from {topic}: {payload}")
    publish("test/success", payload=True)


async def main():
    # create client
    mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")

    # add handler for 'test' topic
    mqtt_client.add_topic_handler("test", test_topic_handler)

    # start connection
    await mqtt_client.start()

    # publish
    mqtt_client.publish("test", payload="Hello World!", sent_logging=True)

    # other code (for example, endless waiting)
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(main())

You can also start with a block thread

await mqtt_client.start_forever()

TLS connection

  • NOT required client certificate

    mqtt_client = DMAioMqttClient(
        host="localhost",
        port=8883,
        ca_crt="ssl/ca.crt"
    )
    
  • REQUIRED client certificate

    mqtt_client = DMAioMqttClient(
        host="localhost",
        port=8883,
        ca_crt="ssl/ca.crt",
        client_crt="ssl/client.crt",
        client_key="ssl/client.key"
    )
    

RESEND_NOT_SUCCESS_MESSAGES

Set this parameter resend_not_success_messages=True when create mqtt client

mqtt_client = DMAioMqttClient(
    host="localhost",
    port=1883,
    resend_not_success_messages=True
)

Now, in case of loss of connection, all messages that were sent during this period will be re-sent as soon as the connection appears again.

Set custom logger

If you want set up custom logger

from dm_aiomqtt import DMAioMqttClient


# create custom logger
class MyLogger:
    def debug(self, message):
        pass

    def info(self, message):
        pass

    def warning(self, message):
        print(message)

    def error(self, message):
        print(message)


# set up custom logger for all clients
DMAioMqttClient.set_logger(MyLogger())

Publish method parameters

Parameter Type Default Value Description
topic str (required) Topic name
payload str "DEBUG" Content to send
qos 0 | 1 | 2 True MQTT QoS
payload_to_json bool | "auto" True Whether to convert content to JSON
sent_logging bool False Whether to print the sending notification
error_logging bool False Whether to print a send error warning

Run in Windows

If you run async code in Windows, set correct selector

import asyncio
import sys

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dm_aiomqtt-0.4.10.tar.gz (5.4 kB view hashes)

Uploaded Source

Built Distribution

dm_aiomqtt-0.4.10-py3-none-any.whl (6.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page