Skip to main content

A robust MQTT connector for asynchronous MQTT communication

Project description

MQTT Connector

A robust MQTT connector for asynchronous MQTT communication.

Features

  • Asynchronous API using Python's asyncio
  • Async and sync message callbacks - Native support for both sync and async message handlers
  • Automatic reconnection handling
  • Message throttling to avoid flooding the broker
  • Supports both string and JSON message formats
  • Thread-safe async callback scheduling from MQTT background threads
  • Customizable logging via callback function

Installation

pip install mqtt-connector

Basic Usage

import asyncio
from mqtt_connector import MqttConnector

async def main():
    # Create a connector instance
    connector = MqttConnector(
        mqtt_broker="mqtt.example.com",
        mqtt_port=1883,
        client_id="example_client"
    )

    # Set up message callback (supports both sync and async)
    async def message_handler(topic: str, message: str):
        print(f"Received: {topic} -> {message}")
        # Async operations are supported
        await asyncio.sleep(0.01)

    connector.set_message_callback(message_handler)

    # Connect to the broker
    connected = await connector.connect()

    # Subscribe to a topic
    await connector.subscribe("example/incoming")

    # Publish a message
    await connector.publish(
        topic="example/outgoing",
        message={"status": "online", "timestamp": "2025-08-03T12:00:00Z"}
    )

    # Let messages process for a bit
    await asyncio.sleep(2)

    # Disconnect
    await connector.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Message Callbacks

The connector supports both synchronous and asynchronous message callbacks:

Async Callback Example

async def async_message_handler(topic: str, message: str):
    """Async handler can perform I/O operations."""
    print(f"Processing: {topic}")

    # Async operations like database writes, file I/O, etc.
    await process_message_async(message)
    await save_to_database(topic, message)

connector.set_message_callback(async_message_handler)

Sync Callback Example

def sync_message_handler(topic: str, message: str):
    """Sync handler for simple processing."""
    print(f"Received: {topic} -> {message}")

    # Synchronous operations only
    process_message_sync(message)

connector.set_message_callback(sync_message_handler)

Thread Safety

The connector automatically handles thread-safe execution of async callbacks using call_soon_threadsafe(), ensuring proper integration with the asyncio event loop even when MQTT messages arrive on background threads.

Advanced Usage

For more advanced usage examples, check the examples directory in the repository.

API Reference

MqttConnector

connector = MqttConnector(
    mqtt_broker="mqtt.example.com",  # Broker address
    mqtt_port=1883,                  # Broker port
    client_id=None,                  # Client ID (auto-generated if None)
    reconnect_interval=5,            # Seconds between reconnection attempts
    max_reconnect_attempts=-1,       # Maximum reconnection attempts (-1 = infinite)
    throttle_interval=0.1            # Minimum seconds between publishes
)

Methods

  • await connector.connect(force_reconnect=False) - Connect to the broker
  • await connector.disconnect() - Disconnect from the broker
  • await connector.publish(topic, message, qos=0, retain=False) - Publish a message
  • await connector.subscribe(topic, qos=0) - Subscribe to a topic
  • connector.is_connected() - Check connection status
  • connector.set_message_callback(callback) - Set message callback (sync or async)
  • connector.set_log_callback(callback) - Set logging callback function

Message Callback Signature

# Sync callback
def callback(topic: str, message: str) -> None:
    pass

# Async callback
async def callback(topic: str, message: str) -> None:
    pass

Contributing

We welcome contributions! Please see our Contributing Guide for detailed instructions on:

  • Development setup and workflow
  • Commit message conventions
  • Pre-submission validation checklist
  • CI/CD pipeline overview
  • Pull request process

License

This project is licensed under the MIT License - see the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

muxu_io_mqtt_connector-1.1.11.tar.gz (13.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

muxu_io_mqtt_connector-1.1.11-py3-none-any.whl (9.4 kB view details)

Uploaded Python 3

File details

Details for the file muxu_io_mqtt_connector-1.1.11.tar.gz.

File metadata

  • Download URL: muxu_io_mqtt_connector-1.1.11.tar.gz
  • Upload date:
  • Size: 13.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for muxu_io_mqtt_connector-1.1.11.tar.gz
Algorithm Hash digest
SHA256 61c980d5d2c42248edbf12877c57a8f46c047605c982e8375c5059b7d739075d
MD5 e55188688d3d88f4157f0f92be35399d
BLAKE2b-256 e0f0b8f83d1c1225c56655b1a57beccf28d37cfa1b609755807af7e0ea350568

See more details on using hashes here.

Provenance

The following attestation bundles were made for muxu_io_mqtt_connector-1.1.11.tar.gz:

Publisher: pypi-publish.yml on muxu-io/mqtt-connector

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file muxu_io_mqtt_connector-1.1.11-py3-none-any.whl.

File metadata

File hashes

Hashes for muxu_io_mqtt_connector-1.1.11-py3-none-any.whl
Algorithm Hash digest
SHA256 b455cad41a35520d63e2e1fe292f3dfacd1307ee7fda4ecdaefc6b6ef6cb6989
MD5 9875fcc5c1483f92beea1117afb3e2cd
BLAKE2b-256 878f775dab75fde8f2543d94998d9caee2212d5dbcc62234743c6aad34046e0f

See more details on using hashes here.

Provenance

The following attestation bundles were made for muxu_io_mqtt_connector-1.1.11-py3-none-any.whl:

Publisher: pypi-publish.yml on muxu-io/mqtt-connector

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page