Skip to main content

This is my custom aiomqtt client

Project description

DM-aiomqtt

Urls

Usage

Example

from dm_aiomqtt import DMAioMqttClient
import asyncio


# create handler for 'test' topic
async def test_topic_handler(publish: DMAioMqttClient.publish, topic: str, payload: str) -> None:
    print(f"Received message from {topic}: {payload}")
    publish("test/success", payload=True)


async def main():
    # create client
    mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")

    # add handler for 'test' topic
    mqtt_client.add_topic_handler("test", test_topic_handler)

    # start connection
    await mqtt_client.start()

    # publish
    mqtt_client.publish("test", payload="Hello World!", sent_logging=True)

    # other code (for example, endless waiting)
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(main())

You can also start with a block thread

await mqtt_client.start_forever()

TLS connection

  • NOT required client certificate

    mqtt_client = DMAioMqttClient(
        host="localhost",
        port=8883,
        ca_crt="ssl/ca.crt"
    )
    
  • REQUIRED client certificate

    mqtt_client = DMAioMqttClient(
        host="localhost",
        port=8883,
        ca_crt="ssl/ca.crt",
        client_crt="ssl/client.crt",
        client_key="ssl/client.key"
    )
    

RESEND_NOT_SUCCESS_MESSAGES

Set this parameter resend_not_success_messages=True when create mqtt client

mqtt_client = DMAioMqttClient(
    host="localhost",
    port=1883,
    resend_not_success_messages=True
)

Now, in case of loss of connection, all messages that were sent during this period will be re-sent as soon as the connection appears again.

Set custom logger

If you want set up custom logger parameters

from dm_aiomqtt import DMAioMqttClient
from dm_logger import FormatterConfig


# set up custom logger for all clients
DMAioMqttClient.set_logger_params(
   {
      "name": "my_name",
      "formatter_config": FormatterConfig(
         show_datetime=False,
      )
   }
)

See more about DMLogger here

Publish method parameters

Parameter Type Default Value Description
topic str (required) Topic name
payload str "DEBUG" Content to send
qos 0 | 1 | 2 True MQTT QoS
payload_to_json bool | "auto" True Whether to convert content to JSON
sent_logging bool False Whether to print the sending notification
error_logging bool False Whether to print a send error warning

Run in Windows

If you run async code in Windows, set correct selector

import asyncio
import sys

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dm_aiomqtt-0.4.17.tar.gz (5.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dm_aiomqtt-0.4.17-py3-none-any.whl (6.1 kB view details)

Uploaded Python 3

File details

Details for the file dm_aiomqtt-0.4.17.tar.gz.

File metadata

  • Download URL: dm_aiomqtt-0.4.17.tar.gz
  • Upload date:
  • Size: 5.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for dm_aiomqtt-0.4.17.tar.gz
Algorithm Hash digest
SHA256 b3fdf39bfc3620fe906714b8af9caaabca450d7542cf1797a24f19b20f9a766a
MD5 b21872f5f8454bab705bba0827b5fade
BLAKE2b-256 d2d1b86c748c6567f80d6259b726e06a43fd48cedd92aa57985685f6f102509e

See more details on using hashes here.

File details

Details for the file dm_aiomqtt-0.4.17-py3-none-any.whl.

File metadata

  • Download URL: dm_aiomqtt-0.4.17-py3-none-any.whl
  • Upload date:
  • Size: 6.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for dm_aiomqtt-0.4.17-py3-none-any.whl
Algorithm Hash digest
SHA256 1879b16825bbee2a7040d29f810f3f61874c9121f639e7b1337234c1b86301cf
MD5 12f89f8538fceb194717e511392d37b1
BLAKE2b-256 5480a7993cab3b242c2b87bffe11387b2d4e67570fa3355783b51c0d57327a27

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page