Skip to main content

A comprehensive asynchronous MQTT application library

Project description

MQTT Application

A comprehensive asynchronous MQTT client library for Python, designed for building robust IoT applications and message processing systems.

Features

  • Asynchronous Architecture: Built with asyncio for high-performance concurrent operations
  • Automatic Reconnection: Robust connection handling with configurable retry logic
  • Message Processing: Worker-based system for concurrent message handling
  • Status Publishing: Periodic device status reporting
  • Command Handling: Built-in command processing with acknowledgment system
  • Configuration Management: YAML-based configuration with environment variable support
  • Logging Integration: Seamless integration with mqtt-logger for distributed logging

Installation

pip install muxu-io-mqtt-application

Installation from Source

If you're working with the source code from this repository, you'll need to install the dependencies in the correct order:

cd ~/projects/icsia/dummy-icsia

# Create and activate virtual environment (recommended)
python3 -m venv .venv
source .venv/bin/activate  # On Linux/macOS
# or
.venv\Scripts\activate     # On Windows

# Install the mqtt-logger package
pushd ../mqtt-logger && pip install -e "." && popd

# Install the mqtt-application package with dev dependencies
pushd ../mqtt-application && pip install -e "." && popd

Dependencies

This package requires the following external modules:

  • mqtt-logger: For MQTT-enabled logging capabilities
  • muxu-io-mqtt-connector: For low-level MQTT connection management (PyPI package)

Quick Start

Simplified Usage (Recommended)

The easiest way to use the library is with the MqttApplication class that handles everything automatically:

import asyncio
from mqtt_application import MqttApplication

async def main():
    # Everything configured from config.yaml
    async with MqttApplication() as app:
        await app.run()

if __name__ == "__main__":
    asyncio.run(main())

Or even simpler for standalone usage:

from mqtt_application import MqttApplication

# One-liner to run the application
if __name__ == "__main__":
    MqttApplication.run_from_config()

Custom Command Handlers

To add custom business logic, register command handlers:

import asyncio
from mqtt_application import MqttApplication

async def my_custom_command(data):
    """Handle custom command."""
    # Your business logic here
    print(f"Processing custom command: {data}")
    return {"status": "completed", "result": "success"}

async def main():
    async with MqttApplication() as app:
        # Register custom commands
        app.register_command("my_command", my_custom_command)
        await app.run()

if __name__ == "__main__":
    asyncio.run(main())

Message Subscriptions

Subscribe to MQTT messages using config-based subscriptions or programmatic registration:

Config-Based Subscriptions (Recommended)

# config.yaml
subscriptions:
  status_messages:
    topic_pattern: "icsia/+/status/current"
    callback_method: "_on_status_message"
  ack_messages:
    topic_pattern: "icsia/+/status/ack"
    callback_method: "_on_ack_message"
class MyApplication:
    def __init__(self):
        # Pass self as callback_context so config can find your methods
        self.app = MqttApplication(callback_context=self)

    async def _on_status_message(self, topic: str, payload: str, properties):
        """Handle status messages from any device."""
        print(f"Status from {topic}: {payload}")

    async def _on_ack_message(self, topic: str, payload: str, properties):
        """Handle acknowledgment messages."""
        print(f"ACK from {topic}: {payload}")

    async def run(self):
        async with self.app:
            await self.app.run()

Programmatic Registration

async def my_handler(topic: str, payload: str, properties):
    print(f"Message on {topic}: {payload}")

async def main():
    async with MqttApplication() as app:
        # Register handler for config-based subscriptions
        app.register_callback_handler("my_handler", my_handler)
        await app.run()

Configuration

Create a config.yaml file in your project root:

---
# MQTT Broker settings
mqtt:
  broker: "localhost"
  port: 1883
  reconnect_interval: 5
  max_reconnect_attempts: -1  # -1 means infinite attempts
  throttle_interval: 0.1

# Device configuration
device:
  device_id: "my_device_01"
  namespace: "icsia"  # Configurable namespace for topic patterns
  status_publish_interval: 30.0

# Auto-generated topic patterns from namespace + device_id:
# {namespace}/+/cmd/#
# {namespace}/{device_id}/logs
# {namespace}/{device_id}/status/ack
# {namespace}/{device_id}/status/completion
# {namespace}/{device_id}/status/current

# Logger settings
logger:
  log_file: "{device_id}.log"
  log_level: "INFO"

# Worker configuration
workers:
  count: 3

API Reference

AsyncMqttClient

The main MQTT client class for connecting to brokers and handling messages.

AsyncMqttClient(
    broker_address: str,
    port: int,
    topics: list[str],
    message_queue: asyncio.Queue,
    logger: MqttLogger,
    reconnect_interval: int = 5,
    max_reconnect_attempts: int = -1
)

AsyncCommandHandler

Handles command processing and acknowledgments.

AsyncCommandHandler(
    logger: MqttLogger,
    mqtt_broker: Optional[str] = None,
    mqtt_port: Optional[int] = None,
    ack_topic_pattern: str = "devices/{device_id}/status/ack",
    completion_topic_pattern: str = "devices/{device_id}/status/completion"
)

PeriodicStatusPublisher

Publishes device status at regular intervals.

PeriodicStatusPublisher(
    device_id: str,
    logger: MqttLogger,
    mqtt_broker: str,
    mqtt_port: int,
    publish_interval: float = 30.0,
    status_topic_pattern: str = "devices/{device_id}/status/current"
)

Config

Configuration management with YAML support.

from mqtt_application import config

# Get configuration values
mqtt_config = config.get_mqtt_config()
device_id = config.get("device.device_id", "default")
log_level = config.get_log_level()

Command Line Usage

You can also run the library as a standalone application:

mqtt-application  # Uses config.yaml in current directory

Development Setup

Virtual Environment Setup

It's recommended to use a virtual environment for development:

# Create virtual environment
python3 -m venv .venv

# Activate virtual environment
source .venv/bin/activate  # On Linux/macOS
# or
.venv\Scripts\activate     # On Windows

# Install development dependencies
pip install -e ".[dev]"

Running Integration Tests

This project uses comprehensive integration tests that validate real-world functionality with actual MQTT brokers and network connections.

Prerequisites: Ensure your virtual environment is activated and development dependencies are installed:

# Activate virtual environment
source .venv/bin/activate  # On Linux/macOS
.venv\Scripts\activate     # On Windows

# Install development dependencies
pip install -e ".[dev]"

Note: If you encounter issues with python command not finding pytest, use the virtual environment directly:

# Direct virtual environment usage (Linux/macOS)
.venv/bin/python -m pytest

# Direct virtual environment usage (Windows)
.venv\Scripts\python -m pytest

Running Tests

This project uses comprehensive integration tests that validate real-world functionality with actual MQTT brokers and network connections.

Prerequisites

Make sure your virtual environment is activated and development dependencies are installed:

# Activate virtual environment
source .venv/bin/activate  # On Linux/macOS
.venv\Scripts\activate     # On Windows

# Install development dependencies
pip install -e ".[dev]"

Note: If you encounter No module named pytest errors, make sure your virtual environment is activated:

# Activate virtual environment
source .venv/bin/activate  # On Linux/macOS
.venv\Scripts\activate     # On Windows

# Install development dependencies
pip install -e ".[dev]"

Alternative: Use the virtual environment directly without activation:

# Direct virtual environment usage (Linux/macOS)
.venv/bin/python -m pytest

# Direct virtual environment usage (Windows)
.venv\Scripts\python -m pytest

Basic Test Commands

# Run all tests (integration and unit tests)
python -m pytest

# Run only integration tests
python -m pytest -m integration

# Run only unit tests (non-integration)
python -m pytest -m "not integration"

# Run with verbose output
python -m pytest -v

# Run specific test file
python -m pytest tests/test_integration.py

# Run specific test class
python -m pytest tests/test_integration.py::TestMqttIntegration

# Run specific test method
python -m pytest tests/test_integration.py::TestMqttIntegration::test_mqtt_logger_connection

# Run tests matching a pattern
python -m pytest -k "mqtt_logger"

Advanced Testing Options

# Skip slow tests during development
python -m pytest -m "integration and not slow"

# Run tests with coverage reporting (requires pytest-cov)
python -m pytest --cov=src/mqtt_application --cov-report=html --cov-report=term

# Generate XML coverage report for CI/CD
python -m pytest --cov=src/mqtt_application --cov-report=xml

# Run tests in parallel (requires pytest-xdist)
python -m pytest -n auto

# Stop on first failure (useful during development)
python -m pytest -x

# Run last failed tests only
python -m pytest --lf

# Show local variables in tracebacks for debugging
python -m pytest -l

# Run in quiet mode (less verbose output)
python -m pytest -q

# Show test durations (identify slow tests)
python -m pytest --durations=10

Common Development Workflows

# Quick feedback during development (unit tests only)
python -m pytest -m "not integration" -x

# Fast integration tests only (skip slow network tests)
python -m pytest -m "integration and not slow"

# Full test suite before committing
python -m pytest -v

# Debug a specific failing test with maximum verbosity
python -m pytest tests/test_integration.py::TestMqttIntegration::test_mqtt_logger_connection -vvv -s

# Test specific functionality you're working on
python -m pytest -k "command_handler" -v

# Continuous testing during development (requires pytest-watch)
ptw -- -m "not integration"

Test Organization

This project uses pytest markers to categorize tests:

  • @pytest.mark.integration: Tests requiring network access and real MQTT connections
  • @pytest.mark.slow: Tests that take longer to run (network resilience, retry mechanisms)

View all available markers:

python -m pytest --markers

CI/CD Integration

# Full test suite with coverage for CI
python -m pytest -v --cov=src/mqtt_application --cov-report=xml --cov-report=term

# Integration tests only for production validation
python -m pytest -v -m integration

# Quick validation (unit tests + fast integration tests)
python -m pytest -m "not slow" -v

Note: Integration tests use real MQTT connections to test.mosquitto.org and test actual component interactions. This provides more reliable validation than mock-based unit tests, but requires network access.

Troubleshooting

Virtual Environment Issues:

# If 'python' command doesn't work, use the virtual environment directly
.venv/bin/python -m pytest        # Linux/macOS
.venv\Scripts\python -m pytest    # Windows

# Check you're using the right Python version (3.8+)
python --version
which python  # Should point to .venv/bin/python (Linux/macOS)

Common Issues and Solutions:

  1. SyntaxError with pytest: Ensure you're using Python 3.8+ from your virtual environment, not system Python 2.7
  2. ModuleNotFoundError: Install development dependencies with pip install -e ".[dev]"
  3. Network timeouts: Integration tests require internet access to test.mosquitto.org. Use -m "not integration" to skip them
  4. Permission denied: Use the full path to the virtual environment's Python executable
  5. Tests hang: Some integration tests make real network connections. Use Ctrl+C to interrupt and check your network connection

Getting Help:

# Show pytest help
python -m pytest --help

# Show available fixtures
python -m pytest --fixtures

# Show available markers
python -m pytest --markers

# Collect tests without running them
python -m pytest --collect-only
python -m pytest --markers | grep @pytest.mark

The integration test suite covers:

  • Real MQTT broker connections and message publishing/subscribing
  • End-to-end command processing workflows
  • Status publishing and periodic operations
  • Network resilience and error handling
  • Malformed message handling
  • Connection retry mechanisms

Code Formatting

black .
ruff check .

Architecture

The library is designed with a modular architecture:

  • AsyncMqttClient: Core MQTT connectivity and message routing
  • AsyncCommandHandler: Command processing with built-in acknowledgment system
  • PeriodicStatusPublisher: Regular status reporting functionality
  • Workers: Concurrent message processing system
  • Config: Centralized configuration management

Dependencies

  • External modules (must be installed separately):
    • mqtt-logger: MQTT-enabled logging
    • muxu-io-mqtt-connector: Low-level MQTT operations
  • Standard dependencies:
    • aiomqtt: Async MQTT client
    • pyyaml: YAML configuration parsing

License

MIT License - see LICENSE file for details.

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Set up development environment:
    python3 -m venv .venv
    source .venv/bin/activate  # On Linux/macOS
    pip install -e ".[dev]"
    
  4. Make your changes
  5. Add tests for new functionality
  6. Run the test suite:
    # Run all tests
    python -m pytest
    
    # Or run quick tests during development
    python -m pytest -m "not integration" -x
    
  7. Submit a pull request

Support

For issues and questions:

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

muxu_io_mqtt_application-1.1.0.tar.gz (58.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

muxu_io_mqtt_application-1.1.0-py3-none-any.whl (33.5 kB view details)

Uploaded Python 3

File details

Details for the file muxu_io_mqtt_application-1.1.0.tar.gz.

File metadata

  • Download URL: muxu_io_mqtt_application-1.1.0.tar.gz
  • Upload date:
  • Size: 58.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for muxu_io_mqtt_application-1.1.0.tar.gz
Algorithm Hash digest
SHA256 941164eb92077172b0026d6e7b13512889573413ff545b0c7e3dd678b2472ee2
MD5 81a680572524f041e9b29917391c0339
BLAKE2b-256 0f599b2edcae7ee90af3b2365b127cdec45807e9a3c1ca6f42c5b6ae03626f9f

See more details on using hashes here.

Provenance

The following attestation bundles were made for muxu_io_mqtt_application-1.1.0.tar.gz:

Publisher: pypi-publish.yml on muxu-io/mqtt-application

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file muxu_io_mqtt_application-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for muxu_io_mqtt_application-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a36730be6f4a07018a57f73f2a206900623cf7ac482661057382e06c2fd80ac4
MD5 4bf099ab070a1b142cc2edf5e7b585d5
BLAKE2b-256 107071d9aa53cb012da0a4b8f15c78030b52920b819e9bca23eb62014bc80d54

See more details on using hashes here.

Provenance

The following attestation bundles were made for muxu_io_mqtt_application-1.1.0-py3-none-any.whl:

Publisher: pypi-publish.yml on muxu-io/mqtt-application

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page