Skip to main content

A robust MQTT connector for asynchronous MQTT communication

Project description

MQTT Connector

A robust MQTT connector for asynchronous MQTT communication.

Features

  • Asynchronous API using Python's asyncio
  • Async and sync message callbacks - Native support for both sync and async message handlers
  • Automatic reconnection handling
  • Message throttling to avoid flooding the broker
  • Supports both string and JSON message formats
  • Thread-safe async callback scheduling from MQTT background threads
  • Customizable logging via callback function

Installation

pip install mqtt-connector

Basic Usage

import asyncio
from mqtt_connector import MqttConnector

async def main():
    # Create a connector instance
    connector = MqttConnector(
        mqtt_broker="mqtt.example.com",
        mqtt_port=1883,
        client_id="example_client"
    )

    # Set up message callback (supports both sync and async)
    async def message_handler(topic: str, message: str):
        print(f"Received: {topic} -> {message}")
        # Async operations are supported
        await asyncio.sleep(0.01)

    connector.set_message_callback(message_handler)

    # Connect to the broker
    connected = await connector.connect()

    # Subscribe to a topic
    await connector.subscribe("example/incoming")

    # Publish a message
    await connector.publish(
        topic="example/outgoing",
        message={"status": "online", "timestamp": "2025-08-03T12:00:00Z"}
    )

    # Let messages process for a bit
    await asyncio.sleep(2)

    # Disconnect
    await connector.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Message Callbacks

The connector supports both synchronous and asynchronous message callbacks:

Async Callback Example

async def async_message_handler(topic: str, message: str):
    """Async handler can perform I/O operations."""
    print(f"Processing: {topic}")

    # Async operations like database writes, file I/O, etc.
    await process_message_async(message)
    await save_to_database(topic, message)

connector.set_message_callback(async_message_handler)

Sync Callback Example

def sync_message_handler(topic: str, message: str):
    """Sync handler for simple processing."""
    print(f"Received: {topic} -> {message}")

    # Synchronous operations only
    process_message_sync(message)

connector.set_message_callback(sync_message_handler)

Thread Safety

The connector automatically handles thread-safe execution of async callbacks using call_soon_threadsafe(), ensuring proper integration with the asyncio event loop even when MQTT messages arrive on background threads.

Advanced Usage

For more advanced usage examples, check the examples directory in the repository.

API Reference

MqttConnector

connector = MqttConnector(
    mqtt_broker="mqtt.example.com",  # Broker address
    mqtt_port=1883,                  # Broker port
    client_id=None,                  # Client ID (auto-generated if None)
    reconnect_interval=5,            # Seconds between reconnection attempts
    max_reconnect_attempts=-1,       # Maximum reconnection attempts (-1 = infinite)
    throttle_interval=0.1            # Minimum seconds between publishes
)

Methods

  • await connector.connect(force_reconnect=False) - Connect to the broker
  • await connector.disconnect() - Disconnect from the broker
  • await connector.publish(topic, message, qos=0, retain=False) - Publish a message
  • await connector.subscribe(topic, qos=0) - Subscribe to a topic
  • connector.is_connected() - Check connection status
  • connector.set_message_callback(callback) - Set message callback (sync or async)
  • connector.set_log_callback(callback) - Set logging callback function

Message Callback Signature

# Sync callback
def callback(topic: str, message: str) -> None:
    pass

# Async callback
async def callback(topic: str, message: str) -> None:
    pass

Contributing

We welcome contributions! Please see our Contributing Guide for detailed instructions on:

  • Development setup and workflow
  • Commit message conventions
  • Pre-submission validation checklist
  • CI/CD pipeline overview
  • Pull request process

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

muxu_io_mqtt_connector-1.1.12.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

muxu_io_mqtt_connector-1.1.12-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file muxu_io_mqtt_connector-1.1.12.tar.gz.

File metadata

  • Download URL: muxu_io_mqtt_connector-1.1.12.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for muxu_io_mqtt_connector-1.1.12.tar.gz
Algorithm Hash digest
SHA256 9e468aaa03fbb7ddabe5ca9a5acfb42a49d2e12cd681a294aee9754c6f8342ed
MD5 d9958eb565b385976a2d2cef77614718
BLAKE2b-256 657851ff65c9c23295090e80b2c11c1e61cfb680c00bdac7f01707daeb2ab7eb

See more details on using hashes here.

Provenance

The following attestation bundles were made for muxu_io_mqtt_connector-1.1.12.tar.gz:

Publisher: pypi-publish.yml on muxu-io/mqtt-connector

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file muxu_io_mqtt_connector-1.1.12-py3-none-any.whl.

File metadata

File hashes

Hashes for muxu_io_mqtt_connector-1.1.12-py3-none-any.whl
Algorithm Hash digest
SHA256 84145257569737d9a5a079eb1b6dd9309b2c055def4a613fc853fbd7750340d5
MD5 7bfb469f39bdf9a0490e494432727841
BLAKE2b-256 5bbd0bf700bc31754eec9d5cc6ace4b008e4bdca51c42121bc778fd3d764af29

See more details on using hashes here.

Provenance

The following attestation bundles were made for muxu_io_mqtt_connector-1.1.12-py3-none-any.whl:

Publisher: pypi-publish.yml on muxu-io/mqtt-connector

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page