Skip to main content

A robust MQTT connector for asynchronous MQTT communication

Project description

MQTT Connector

A robust MQTT connector for asynchronous MQTT communication.

Features

  • Asynchronous API using Python's asyncio
  • Async and sync message callbacks - Native support for both sync and async message handlers
  • Automatic reconnection handling
  • Message throttling to avoid flooding the broker
  • Supports both string and JSON message formats
  • Thread-safe async callback scheduling from MQTT background threads
  • Customizable logging via callback function

Installation

pip install mqtt-connector

Basic Usage

import asyncio
from mqtt_connector import MqttConnector

async def main():
    # Create a connector instance
    connector = MqttConnector(
        mqtt_broker="mqtt.example.com",
        mqtt_port=1883,
        client_id="example_client"
    )

    # Set up message callback (supports both sync and async)
    async def message_handler(topic: str, message: str):
        print(f"Received: {topic} -> {message}")
        # Async operations are supported
        await asyncio.sleep(0.01)

    connector.set_message_callback(message_handler)

    # Connect to the broker
    connected = await connector.connect()

    # Subscribe to a topic
    await connector.subscribe("example/incoming")

    # Publish a message
    await connector.publish(
        topic="example/outgoing",
        message={"status": "online", "timestamp": "2025-08-03T12:00:00Z"}
    )

    # Let messages process for a bit
    await asyncio.sleep(2)

    # Disconnect
    await connector.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Message Callbacks

The connector supports both synchronous and asynchronous message callbacks:

Async Callback Example

async def async_message_handler(topic: str, message: str):
    """Async handler can perform I/O operations."""
    print(f"Processing: {topic}")

    # Async operations like database writes, file I/O, etc.
    await process_message_async(message)
    await save_to_database(topic, message)

connector.set_message_callback(async_message_handler)

Sync Callback Example

def sync_message_handler(topic: str, message: str):
    """Sync handler for simple processing."""
    print(f"Received: {topic} -> {message}")

    # Synchronous operations only
    process_message_sync(message)

connector.set_message_callback(sync_message_handler)

Thread Safety

The connector automatically handles thread-safe execution of async callbacks using call_soon_threadsafe(), ensuring proper integration with the asyncio event loop even when MQTT messages arrive on background threads.

Advanced Usage

For more advanced usage examples, check the examples directory in the repository.

API Reference

MqttConnector

connector = MqttConnector(
    mqtt_broker="mqtt.example.com",  # Broker address
    mqtt_port=1883,                  # Broker port
    client_id=None,                  # Client ID (auto-generated if None)
    reconnect_interval=5,            # Seconds between reconnection attempts
    max_reconnect_attempts=-1,       # Maximum reconnection attempts (-1 = infinite)
    throttle_interval=0.1            # Minimum seconds between publishes
)

Methods

  • await connector.connect(force_reconnect=False) - Connect to the broker
  • await connector.disconnect() - Disconnect from the broker
  • await connector.publish(topic, message, qos=0, retain=False) - Publish a message
  • await connector.subscribe(topic, qos=0) - Subscribe to a topic
  • connector.is_connected() - Check connection status
  • connector.set_message_callback(callback) - Set message callback (sync or async)
  • connector.set_log_callback(callback) - Set logging callback function

Message Callback Signature

# Sync callback
def callback(topic: str, message: str) -> None:
    pass

# Async callback
async def callback(topic: str, message: str) -> None:
    pass

Contributing

We welcome contributions! Please see our Contributing Guide for detailed instructions on:

  • Development setup and workflow
  • Commit message conventions
  • Pre-submission validation checklist
  • CI/CD pipeline overview
  • Pull request process

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

muxu_io_mqtt_connector-1.1.10.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

muxu_io_mqtt_connector-1.1.10-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file muxu_io_mqtt_connector-1.1.10.tar.gz.

File metadata

  • Download URL: muxu_io_mqtt_connector-1.1.10.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for muxu_io_mqtt_connector-1.1.10.tar.gz
Algorithm Hash digest
SHA256 7f97bea6fdbf9a3b659b23f0417794304730495f0a8c9ede443d4300fbb2807d
MD5 7a5d1eff1c05dc4621e93a9503d058e9
BLAKE2b-256 0ab078fa6d18d73a996aefc664af1beb5b242366651b7d1fce824e044159511c

See more details on using hashes here.

Provenance

The following attestation bundles were made for muxu_io_mqtt_connector-1.1.10.tar.gz:

Publisher: pypi-publish.yml on muxu-io/mqtt-connector

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file muxu_io_mqtt_connector-1.1.10-py3-none-any.whl.

File metadata

File hashes

Hashes for muxu_io_mqtt_connector-1.1.10-py3-none-any.whl
Algorithm Hash digest
SHA256 7ee302b2efab8af62d03583939a54e90852f88760f5bd78d2e0aefdf446b664c
MD5 da6522dd1a5c595df6ac974fa44181ea
BLAKE2b-256 53c9cb6384fd96c86e15f3556c8eea329986ebb1124fd333c38a533eea56788e

See more details on using hashes here.

Provenance

The following attestation bundles were made for muxu_io_mqtt_connector-1.1.10-py3-none-any.whl:

Publisher: pypi-publish.yml on muxu-io/mqtt-connector

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page