This is my custom aiomqtt client
Project description
DM-aiomqtt
Urls
Usage
Run in Windows
If you run async code in Windows, set correct selector
import asyncio
import sys
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
Listen
from dm_aiomqtt import DMAioMqttClient
import asyncio
# create handler for 'test' topic
async def test_handler(client: DMAioMqttClient, topic: str, payload: str) -> None:
print(f"Received message from {topic}: {payload}")
await client.publish("test/success", payload=True)
async def main():
# create client
mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password")
# add handler for 'test' topic
mqtt_client.add_handler("test", test_handler)
# start listening (this operation is block async thread)
await mqtt_client.listen()
if __name__ == "__main__":
asyncio.run(main())
Publish
-
Connection only when publishing
from dm_aiomqtt import DMAioMqttClient import asyncio async def main(): # create client mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password") # publish json message await mqtt_client.publish("test", payload={"key": "value"}) # other code # publish new json message await mqtt_client.publish("test2", payload={"key2": "value2"}) if __name__ == "__main__": asyncio.run(main())
-
One connection for all publishing
from dm_aiomqtt import DMAioMqttClient import asyncio async def main(mqtt_client: DMAioMqttClient): # publish json message await mqtt_client.publish("test", payload={"key": "value"}) # other code # publish new json message await mqtt_client.publish("test2", payload={"key2": "value2"}) async def start(): # create client mqtt_client = DMAioMqttClient("localhost", 1883, "username", "password") # create task for listening (this operation is block async thread) listen_task = asyncio.create_task(mqtt_client.listen(reconnect_interval=5)) # create task to execute other code with an active connection main_task = asyncio.create_task(main(mqtt_client)) # run async tasks in parallel await asyncio.gather(listen_task, main_task) if __name__ == "__main__": asyncio.run(start())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
dm-aiomqtt-0.1.1.tar.gz
(3.4 kB
view hashes)
Built Distribution
Close
Hashes for dm_aiomqtt-0.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0388ec40762df56a0f485ee8e45d71ff1a75b5eb9e9bf6b7501b25be5d4ec992 |
|
MD5 | 0ab5b8b5392c49f286a8444f9b6f54a5 |
|
BLAKE2b-256 | 7efa91a82c23c5cbbf7379ac5fedd00791df6eca690dcd796acb29399fe567de |