A robust MQTT connector for asynchronous MQTT communication
Project description
MQTT Connector
A robust MQTT connector for asynchronous MQTT communication.
Features
- Asynchronous API using Python's asyncio
- Async and sync message callbacks - Native support for both sync and async message handlers
- Automatic reconnection handling
- Message throttling to avoid flooding the broker
- Supports both string and JSON message formats
- Thread-safe async callback scheduling from MQTT background threads
- Customizable logging via callback function
Installation
pip install mqtt-connector
Basic Usage
import asyncio
from mqtt_connector import MqttConnector
async def main():
# Create a connector instance
connector = MqttConnector(
mqtt_broker="mqtt.example.com",
mqtt_port=1883,
client_id="example_client"
)
# Set up message callback (supports both sync and async)
async def message_handler(topic: str, message: str):
print(f"Received: {topic} -> {message}")
# Async operations are supported
await asyncio.sleep(0.01)
connector.set_message_callback(message_handler)
# Connect to the broker
connected = await connector.connect()
# Subscribe to a topic
await connector.subscribe("example/incoming")
# Publish a message
await connector.publish(
topic="example/outgoing",
message={"status": "online", "timestamp": "2025-08-03T12:00:00Z"}
)
# Let messages process for a bit
await asyncio.sleep(2)
# Disconnect
await connector.disconnect()
if __name__ == "__main__":
asyncio.run(main())
Message Callbacks
The connector supports both synchronous and asynchronous message callbacks:
Async Callback Example
async def async_message_handler(topic: str, message: str):
"""Async handler can perform I/O operations."""
print(f"Processing: {topic}")
# Async operations like database writes, file I/O, etc.
await process_message_async(message)
await save_to_database(topic, message)
connector.set_message_callback(async_message_handler)
Sync Callback Example
def sync_message_handler(topic: str, message: str):
"""Sync handler for simple processing."""
print(f"Received: {topic} -> {message}")
# Synchronous operations only
process_message_sync(message)
connector.set_message_callback(sync_message_handler)
Thread Safety
The connector automatically handles thread-safe execution of async callbacks using call_soon_threadsafe(), ensuring proper integration with the asyncio event loop even when MQTT messages arrive on background threads.
Advanced Usage
For more advanced usage examples, check the examples directory in the repository.
API Reference
MqttConnector
connector = MqttConnector(
mqtt_broker="mqtt.example.com", # Broker address
mqtt_port=1883, # Broker port
client_id=None, # Client ID (auto-generated if None)
reconnect_interval=5, # Seconds between reconnection attempts
max_reconnect_attempts=-1, # Maximum reconnection attempts (-1 = infinite)
throttle_interval=0.1 # Minimum seconds between publishes
)
Methods
await connector.connect(force_reconnect=False)- Connect to the brokerawait connector.disconnect()- Disconnect from the brokerawait connector.publish(topic, message, qos=0, retain=False)- Publish a messageawait connector.subscribe(topic, qos=0)- Subscribe to a topicconnector.is_connected()- Check connection statusconnector.set_message_callback(callback)- Set message callback (sync or async)connector.set_log_callback(callback)- Set logging callback function
Message Callback Signature
# Sync callback
def callback(topic: str, message: str) -> None:
pass
# Async callback
async def callback(topic: str, message: str) -> None:
pass
Contributing
We welcome contributions! Please see our Contributing Guide for detailed instructions on:
- Development setup and workflow
- Commit message conventions
- Pre-submission validation checklist
- CI/CD pipeline overview
- Pull request process
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file muxu_io_mqtt_connector-1.0.0.tar.gz.
File metadata
- Download URL: muxu_io_mqtt_connector-1.0.0.tar.gz
- Upload date:
- Size: 13.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5b7012a378c62864fe4325319d58222e6a1a1dd664930dff968745bcb63f8e45
|
|
| MD5 |
6f023fe94d4b2009277165cdabda8042
|
|
| BLAKE2b-256 |
511ab13b01d426bd74d68fa693b7c72c9e9700d7309586b6e1603f4c5509f216
|
Provenance
The following attestation bundles were made for muxu_io_mqtt_connector-1.0.0.tar.gz:
Publisher:
publish.yml on muxu-io/mqtt-connector
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
muxu_io_mqtt_connector-1.0.0.tar.gz -
Subject digest:
5b7012a378c62864fe4325319d58222e6a1a1dd664930dff968745bcb63f8e45 - Sigstore transparency entry: 401682609
- Sigstore integration time:
-
Permalink:
muxu-io/mqtt-connector@d851ce85b0a22361737cf3cbf9741b46c99c9549 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/muxu-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@d851ce85b0a22361737cf3cbf9741b46c99c9549 -
Trigger Event:
push
-
Statement type:
File details
Details for the file muxu_io_mqtt_connector-1.0.0-py3-none-any.whl.
File metadata
- Download URL: muxu_io_mqtt_connector-1.0.0-py3-none-any.whl
- Upload date:
- Size: 9.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
19d5d18b52203dfff2418c397e7e910153d43b0a7082643a15860b0ef172aa37
|
|
| MD5 |
ddb9af6f333c206b07c2f4668a294038
|
|
| BLAKE2b-256 |
bedef48907db8eec12975c60e3e13e1aaf5a4ad3d7cc93c4c390f696de25a8b7
|
Provenance
The following attestation bundles were made for muxu_io_mqtt_connector-1.0.0-py3-none-any.whl:
Publisher:
publish.yml on muxu-io/mqtt-connector
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
muxu_io_mqtt_connector-1.0.0-py3-none-any.whl -
Subject digest:
19d5d18b52203dfff2418c397e7e910153d43b0a7082643a15860b0ef172aa37 - Sigstore transparency entry: 401682636
- Sigstore integration time:
-
Permalink:
muxu-io/mqtt-connector@d851ce85b0a22361737cf3cbf9741b46c99c9549 -
Branch / Tag:
refs/tags/v1.0.0 - Owner: https://github.com/muxu-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@d851ce85b0a22361737cf3cbf9741b46c99c9549 -
Trigger Event:
push
-
Statement type: