This is my custom aiomqtt client
Project description
DM-aiomqtt
Urls
Usage
Example
from dm_aiomqtt import DMAioMqttClient
import asyncio
# create handler for 'test' topic
async def test_topic_handler(publish: DMAioMqttClient.publish, topic: str, payload: str) -> None:
print(f"Received message from {topic}: {payload}")
publish("test/success", payload=True)
async def main():
# create client
mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")
# add handler for 'test' topic
mqtt_client.add_topic_handler("test", test_topic_handler)
# start connection
await mqtt_client.start()
# publish
mqtt_client.publish("test", payload="Hello World!", sent_logging=True)
# other code (for example, endless waiting)
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
You can also start with a block thread
await mqtt_client.start_forever()
TLS connection
-
NOT required client certificate
mqtt_client = DMAioMqttClient( host="localhost", port=8883, ca_crt="ssl/ca.crt" )
-
REQUIRED client certificate
mqtt_client = DMAioMqttClient( host="localhost", port=8883, ca_crt="ssl/ca.crt", client_crt="ssl/client.crt", client_key="ssl/client.key" )
RESEND_NOT_SUCCESS_MESSAGES
Set this parameter resend_not_success_messages=True
when create mqtt client
mqtt_client = DMAioMqttClient(
host="localhost",
port=1883,
resend_not_success_messages=True
)
Now, in case of loss of connection, all messages that were sent during this period will be re-sent as soon as the connection appears again.
Set custom logger
If you want set up custom logger
from dm_aiomqtt import DMAioMqttClient
# create custom logger
class MyLogger:
def debug(self, message):
pass
def info(self, message):
pass
def warning(self, message):
print(message)
def error(self, message):
print(message)
# set up custom logger for all clients
DMAioMqttClient.set_logger(MyLogger())
Publish method parameters
Parameter | Type | Default Value | Description |
---|---|---|---|
topic |
str |
(required) | Topic name |
payload |
str |
"DEBUG" |
Content to send |
qos |
0 | 1 | 2 |
True |
MQTT QoS |
payload_to_json |
bool | "auto" |
True |
Whether to convert content to JSON |
sent_logging |
bool |
False |
Whether to print the sending notification |
error_logging |
bool |
False |
Whether to print a send error warning |
Run in Windows
If you run async code in Windows, set correct selector
import asyncio
import sys
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
dm_aiomqtt-0.4.12.tar.gz
(5.4 kB
view details)
Built Distribution
File details
Details for the file dm_aiomqtt-0.4.12.tar.gz
.
File metadata
- Download URL: dm_aiomqtt-0.4.12.tar.gz
- Upload date:
- Size: 5.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | dd00fd4da87507faa6ad7d52410f67dce290dcae588e4cacdf62b106e066077f |
|
MD5 | 020dd3e6dfe3a6b14d9e780f0bfbee7c |
|
BLAKE2b-256 | f823665956fedbf4256395a9fa91194371965f829091c006c8a43e448943e8a5 |
File details
Details for the file dm_aiomqtt-0.4.12-py3-none-any.whl
.
File metadata
- Download URL: dm_aiomqtt-0.4.12-py3-none-any.whl
- Upload date:
- Size: 6.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 00e0153708ca0315b6912b59e7751caddc67fa8a227eb3d6633ccc7eb92101ee |
|
MD5 | 1bd8e76d4a0bcbd300ca657f06459c6d |
|
BLAKE2b-256 | 5ed0165968d9b4b9ba6caca1335320fa4b95665b442d11237d43fa05027822c1 |