This is my custom aiomqtt client
Project description
DM-aiomqtt
Urls
Usage
Run in Windows
If you run async code in Windows, set correct selector
import asyncio
import sys
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
One connection for all actions
from dm_aiomqtt import DMAioMqttClient
import asyncio
# create handler for 'test' topic
async def test_topic_handler(publish: DMAioMqttClient.publish, topic: str, payload: str) -> None:
print(f"Received message from {topic}: {payload}")
await publish("test/success", payload=True)
async def main():
# create client
mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")
# add handler for 'test' topic
await mqtt_client.add_topic_handler("test", test_topic_handler)
# start connection
await mqtt_client.connect()
# publish
await mqtt_client.publish("test", payload="Hello World!", logging=True)
# other code (for example, endless waiting)
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
Temporary connection for actions
from dm_aiomqtt import DMAioMqttClient
import asyncio
async def main():
# create client
mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")
# create callback
async def callback(publish: DMAioMqttClient.publish):
await publish("test/1", payload="Hello World!1", qos=2, logging=True)
# other code
await publish("test/2", payload="Hello World!2", qos=2, logging=True)
# execute callback in temporary connection
await mqtt_client.temp_connect(callback)
# other code (for example, endless waiting)
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
Set custom logger
If you want set up custom logger
from dm_aiomqtt import DMAioMqttClient
# create custom logger
class MyLogger:
def info(self, message):
pass
def warning(self, message):
print(message)
def error(self, message):
print(message)
# set up custom logger for all clients
DMAioMqttClient.set_logger(MyLogger())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
dm-aiomqtt-0.2.2.tar.gz
(3.8 kB
view hashes)
Built Distribution
Close
Hashes for dm_aiomqtt-0.2.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3bd044f93fe0ecf6a4f972cc61e8a25d31f6ba76880985ab0535b4bb0ce69106 |
|
MD5 | 03e99847cd1502a296675428b2bdc67f |
|
BLAKE2b-256 | fd91ca409104d84e5b0057da54d7d735e3be0f36c7e21d058d06553d85232e41 |