Skip to main content

A robust MQTT connector for asynchronous MQTT communication

Project description

MQTT Connector

A robust MQTT connector for asynchronous MQTT communication.

Features

  • Asynchronous API using Python's asyncio
  • Async and sync message callbacks - Native support for both sync and async message handlers
  • Automatic reconnection handling
  • Message throttling to avoid flooding the broker
  • Supports both string and JSON message formats
  • Thread-safe async callback scheduling from MQTT background threads
  • Customizable logging via callback function

Installation

pip install mqtt-connector

Basic Usage

import asyncio
from mqtt_connector import MqttConnector

async def main():
    # Create a connector instance
    connector = MqttConnector(
        mqtt_broker="mqtt.example.com",
        mqtt_port=1883,
        client_id="example_client"
    )

    # Set up message callback (supports both sync and async)
    async def message_handler(topic: str, message: str):
        print(f"Received: {topic} -> {message}")
        # Async operations are supported
        await asyncio.sleep(0.01)

    connector.set_message_callback(message_handler)

    # Connect to the broker
    connected = await connector.connect()

    # Subscribe to a topic
    await connector.subscribe("example/incoming")

    # Publish a message
    await connector.publish(
        topic="example/outgoing",
        message={"status": "online", "timestamp": "2025-08-03T12:00:00Z"}
    )

    # Let messages process for a bit
    await asyncio.sleep(2)

    # Disconnect
    await connector.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Message Callbacks

The connector supports both synchronous and asynchronous message callbacks:

Async Callback Example

async def async_message_handler(topic: str, message: str):
    """Async handler can perform I/O operations."""
    print(f"Processing: {topic}")

    # Async operations like database writes, file I/O, etc.
    await process_message_async(message)
    await save_to_database(topic, message)

connector.set_message_callback(async_message_handler)

Sync Callback Example

def sync_message_handler(topic: str, message: str):
    """Sync handler for simple processing."""
    print(f"Received: {topic} -> {message}")

    # Synchronous operations only
    process_message_sync(message)

connector.set_message_callback(sync_message_handler)

Thread Safety

The connector automatically handles thread-safe execution of async callbacks using call_soon_threadsafe(), ensuring proper integration with the asyncio event loop even when MQTT messages arrive on background threads.

Advanced Usage

For more advanced usage examples, check the examples directory in the repository.

API Reference

MqttConnector

connector = MqttConnector(
    mqtt_broker="mqtt.example.com",  # Broker address
    mqtt_port=1883,                  # Broker port
    client_id=None,                  # Client ID (auto-generated if None)
    reconnect_interval=5,            # Seconds between reconnection attempts
    max_reconnect_attempts=-1,       # Maximum reconnection attempts (-1 = infinite)
    throttle_interval=0.1            # Minimum seconds between publishes
)

Methods

  • await connector.connect(force_reconnect=False) - Connect to the broker
  • await connector.disconnect() - Disconnect from the broker
  • await connector.publish(topic, message, qos=0, retain=False) - Publish a message
  • await connector.subscribe(topic, qos=0) - Subscribe to a topic
  • connector.is_connected() - Check connection status
  • connector.set_message_callback(callback) - Set message callback (sync or async)
  • connector.set_log_callback(callback) - Set logging callback function

Message Callback Signature

# Sync callback
def callback(topic: str, message: str) -> None:
    pass

# Async callback
async def callback(topic: str, message: str) -> None:
    pass

Contributing

We welcome contributions! Please see our Contributing Guide for detailed instructions on:

  • Development setup and workflow
  • Commit message conventions
  • Pre-submission validation checklist
  • CI/CD pipeline overview
  • Pull request process

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

muxu_io_mqtt_connector-1.1.9.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

muxu_io_mqtt_connector-1.1.9-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file muxu_io_mqtt_connector-1.1.9.tar.gz.

File metadata

  • Download URL: muxu_io_mqtt_connector-1.1.9.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for muxu_io_mqtt_connector-1.1.9.tar.gz
Algorithm Hash digest
SHA256 468be41b30fdc0997ea38c021b8a978fd0977104a29235a95ab151ad936dd6d5
MD5 cfe0492a6d40d13a4d78943301f050f3
BLAKE2b-256 936c26a3bf927bbae706a7adc4e73e06784cac0c8ee43f150c422097728be44a

See more details on using hashes here.

Provenance

The following attestation bundles were made for muxu_io_mqtt_connector-1.1.9.tar.gz:

Publisher: pypi-publish.yml on muxu-io/mqtt-connector

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file muxu_io_mqtt_connector-1.1.9-py3-none-any.whl.

File metadata

File hashes

Hashes for muxu_io_mqtt_connector-1.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 c3ffe69a30a7132737ec95d2ca2e163e4ee33de31ad4d7d6f89fc5587fd26abf
MD5 e977f7f62224750f57d3022491803136
BLAKE2b-256 a47b0c70b0738ac30f71258c0fd7553cf3f23bbfa1d96126eefb87438adf9d4e

See more details on using hashes here.

Provenance

The following attestation bundles were made for muxu_io_mqtt_connector-1.1.9-py3-none-any.whl:

Publisher: pypi-publish.yml on muxu-io/mqtt-connector

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page