This is my custom aiomqtt client
Project description
DM-aiomqtt
Urls
Usage
Run in Windows
If you run async code in Windows, set correct selector
import asyncio
import sys
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
One connection for all actions
from dm_aiomqtt import DMAioMqttClient
import asyncio
# create handler for 'test' topic
async def test_topic_handler(publish: DMAioMqttClient.publish, topic: str, payload: str) -> None:
print(f"Received message from {topic}: {payload}")
await publish("test/success", payload=True)
async def main():
# create client
mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")
# add handler for 'test' topic
await mqtt_client.add_topic_handler("test", test_topic_handler)
# start connection
await mqtt_client.connect()
# publish
await mqtt_client.publish("test", payload="Hello World!", logging=True)
# other code (for example, endless waiting)
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
Temporary connection for actions
from dm_aiomqtt import DMAioMqttClient
import asyncio
async def main():
# create client
mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")
# create callback
async def callback(publish: DMAioMqttClient.publish):
await publish("test/1", payload="Hello World!1", qos=2, logging=True)
# other code
await publish("test/2", payload="Hello World!2", qos=2, logging=True)
# execute callback in temporary connection
await mqtt_client.temp_connect(callback)
# other code (for example, endless waiting)
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
Set custom logger
If you want set up custom logger
from dm_aiomqtt import DMAioMqttClient
# create custom logger
class MyLogger:
def info(self, message):
pass
def warning(self, message):
print(message)
def error(self, message):
print(message)
# set up custom logger for all clients
DMAioMqttClient.set_logger(MyLogger())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
dm-aiomqtt-0.2.3.tar.gz
(3.9 kB
view hashes)
Built Distribution
Close
Hashes for dm_aiomqtt-0.2.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 012a49be484412cb43f523260bbaac1f272900be965800e8dbb9b4306244c4ac |
|
MD5 | 35cd3f804d4ca7641c242f1e5b3dc77d |
|
BLAKE2b-256 | b2438828b1eb830af77a6b952563006cc54fe608958c8ab6b1a9db27553de9aa |