A comprehensive asynchronous MQTT application library
Project description
MQTT Application
A comprehensive asynchronous MQTT client library for Python, designed for building robust IoT applications and message processing systems.
Features
- Asynchronous Architecture: Built with
asynciofor high-performance concurrent operations - Automatic Reconnection: Robust connection handling with configurable retry logic
- Message Processing: Worker-based system for concurrent message handling
- Status Publishing: Periodic device status reporting
- Command Handling: Built-in command processing with acknowledgment system
- Configuration Management: YAML-based configuration with environment variable support
- Logging Integration: Seamless integration with mqtt-logger for distributed logging
Installation
pip install muxu-io-mqtt-application
Installation from Source
If you're working with the source code from this repository, you'll need to install the dependencies in the correct order:
cd ~/projects/icsia/dummy-icsia
# Create and activate virtual environment (recommended)
python3 -m venv .venv
source .venv/bin/activate # On Linux/macOS
# or
.venv\Scripts\activate # On Windows
# Install the mqtt-logger package
pushd ../mqtt-logger && pip install -e "." && popd
# Install the mqtt-application package with dev dependencies
pushd ../mqtt-application && pip install -e "." && popd
Dependencies
This package requires the following external modules:
mqtt-logger: For MQTT-enabled logging capabilitiesmuxu-io-mqtt-connector: For low-level MQTT connection management (PyPI package)
Quick Start
Simplified Usage (Recommended)
The easiest way to use the library is with the MqttApplication class that handles everything automatically:
import asyncio
from mqtt_application import MqttApplication
async def main():
# Everything configured from config.yaml
async with MqttApplication() as app:
await app.run()
if __name__ == "__main__":
asyncio.run(main())
Or even simpler for standalone usage:
from mqtt_application import MqttApplication
# One-liner to run the application
if __name__ == "__main__":
MqttApplication.run_from_config()
Custom Command Handlers
To add custom business logic, register command handlers:
import asyncio
from mqtt_application import MqttApplication
async def my_custom_command(data):
"""Handle custom command."""
# Your business logic here
print(f"Processing custom command: {data}")
return {"status": "completed", "result": "success"}
async def main():
async with MqttApplication() as app:
# Register custom commands
app.register_command("my_command", my_custom_command)
await app.run()
if __name__ == "__main__":
asyncio.run(main())
Message Subscriptions
Subscribe to MQTT messages using config-based subscriptions or programmatic registration:
Config-Based Subscriptions (Recommended)
# config.yaml
subscriptions:
status_messages:
topic_pattern: "icsia/+/status/current"
callback_method: "_on_status_message"
ack_messages:
topic_pattern: "icsia/+/status/ack"
callback_method: "_on_ack_message"
class MyApplication:
def __init__(self):
# Pass self as callback_context so config can find your methods
self.app = MqttApplication(callback_context=self)
async def _on_status_message(self, topic: str, payload: str, properties):
"""Handle status messages from any device."""
print(f"Status from {topic}: {payload}")
async def _on_ack_message(self, topic: str, payload: str, properties):
"""Handle acknowledgment messages."""
print(f"ACK from {topic}: {payload}")
async def run(self):
async with self.app:
await self.app.run()
Programmatic Registration
async def my_handler(topic: str, payload: str, properties):
print(f"Message on {topic}: {payload}")
async def main():
async with MqttApplication() as app:
# Register handler for config-based subscriptions
app.register_callback_handler("my_handler", my_handler)
await app.run()
Configuration
Create a config.yaml file in your project root:
---
# MQTT Broker settings
mqtt:
broker: "localhost"
port: 1883
reconnect_interval: 5
max_reconnect_attempts: -1 # -1 means infinite attempts
throttle_interval: 0.1
# Device configuration
device:
device_id: "my_device_01"
namespace: "icsia" # Configurable namespace for topic patterns
status_publish_interval: 30.0
# Auto-generated topic patterns from namespace + device_id:
# {namespace}/+/cmd/#
# {namespace}/{device_id}/logs
# {namespace}/{device_id}/status/ack
# {namespace}/{device_id}/status/completion
# {namespace}/{device_id}/status/current
# Logger settings
logger:
log_file: "{device_id}.log"
log_level: "INFO"
# Worker configuration
workers:
count: 3
API Reference
AsyncMqttClient
The main MQTT client class for connecting to brokers and handling messages.
AsyncMqttClient(
broker_address: str,
port: int,
topics: list[str],
message_queue: asyncio.Queue,
logger: MqttLogger,
reconnect_interval: int = 5,
max_reconnect_attempts: int = -1
)
AsyncCommandHandler
Handles command processing and acknowledgments.
AsyncCommandHandler(
logger: MqttLogger,
mqtt_broker: Optional[str] = None,
mqtt_port: Optional[int] = None,
ack_topic_pattern: str = "devices/{device_id}/status/ack",
completion_topic_pattern: str = "devices/{device_id}/status/completion"
)
PeriodicStatusPublisher
Publishes device status at regular intervals.
PeriodicStatusPublisher(
device_id: str,
logger: MqttLogger,
mqtt_broker: str,
mqtt_port: int,
publish_interval: float = 30.0,
status_topic_pattern: str = "devices/{device_id}/status/current"
)
Config
Configuration management with YAML support.
from mqtt_application import config
# Get configuration values
mqtt_config = config.get_mqtt_config()
device_id = config.get("device.device_id", "default")
log_level = config.get_log_level()
Command Line Usage
You can also run the library as a standalone application:
mqtt-application # Uses config.yaml in current directory
Development Setup
Virtual Environment Setup
It's recommended to use a virtual environment for development:
# Create virtual environment
python3 -m venv .venv
# Activate virtual environment
source .venv/bin/activate # On Linux/macOS
# or
.venv\Scripts\activate # On Windows
# Install development dependencies
pip install -e ".[dev]"
Running Integration Tests
This project uses comprehensive integration tests that validate real-world functionality with actual MQTT brokers and network connections.
Prerequisites: Ensure your virtual environment is activated and development dependencies are installed:
# Activate virtual environment
source .venv/bin/activate # On Linux/macOS
.venv\Scripts\activate # On Windows
# Install development dependencies
pip install -e ".[dev]"
Note: If you encounter issues with python command not finding pytest, use the virtual environment directly:
# Direct virtual environment usage (Linux/macOS)
.venv/bin/python -m pytest
# Direct virtual environment usage (Windows)
.venv\Scripts\python -m pytest
Running Tests
This project uses comprehensive integration tests that validate real-world functionality with actual MQTT brokers and network connections.
Prerequisites
Make sure your virtual environment is activated and development dependencies are installed:
# Activate virtual environment
source .venv/bin/activate # On Linux/macOS
.venv\Scripts\activate # On Windows
# Install development dependencies
pip install -e ".[dev]"
Note: If you encounter No module named pytest errors, make sure your virtual environment is activated:
# Activate virtual environment
source .venv/bin/activate # On Linux/macOS
.venv\Scripts\activate # On Windows
# Install development dependencies
pip install -e ".[dev]"
Alternative: Use the virtual environment directly without activation:
# Direct virtual environment usage (Linux/macOS)
.venv/bin/python -m pytest
# Direct virtual environment usage (Windows)
.venv\Scripts\python -m pytest
Basic Test Commands
# Run all tests (integration and unit tests)
python -m pytest
# Run only integration tests
python -m pytest -m integration
# Run only unit tests (non-integration)
python -m pytest -m "not integration"
# Run with verbose output
python -m pytest -v
# Run specific test file
python -m pytest tests/test_integration.py
# Run specific test class
python -m pytest tests/test_integration.py::TestMqttIntegration
# Run specific test method
python -m pytest tests/test_integration.py::TestMqttIntegration::test_mqtt_logger_connection
# Run tests matching a pattern
python -m pytest -k "mqtt_logger"
Advanced Testing Options
# Skip slow tests during development
python -m pytest -m "integration and not slow"
# Run tests with coverage reporting (requires pytest-cov)
python -m pytest --cov=src/mqtt_application --cov-report=html --cov-report=term
# Generate XML coverage report for CI/CD
python -m pytest --cov=src/mqtt_application --cov-report=xml
# Run tests in parallel (requires pytest-xdist)
python -m pytest -n auto
# Stop on first failure (useful during development)
python -m pytest -x
# Run last failed tests only
python -m pytest --lf
# Show local variables in tracebacks for debugging
python -m pytest -l
# Run in quiet mode (less verbose output)
python -m pytest -q
# Show test durations (identify slow tests)
python -m pytest --durations=10
Common Development Workflows
# Quick feedback during development (unit tests only)
python -m pytest -m "not integration" -x
# Fast integration tests only (skip slow network tests)
python -m pytest -m "integration and not slow"
# Full test suite before committing
python -m pytest -v
# Debug a specific failing test with maximum verbosity
python -m pytest tests/test_integration.py::TestMqttIntegration::test_mqtt_logger_connection -vvv -s
# Test specific functionality you're working on
python -m pytest -k "command_handler" -v
# Continuous testing during development (requires pytest-watch)
ptw -- -m "not integration"
Test Organization
This project uses pytest markers to categorize tests:
@pytest.mark.integration: Tests requiring network access and real MQTT connections@pytest.mark.slow: Tests that take longer to run (network resilience, retry mechanisms)
View all available markers:
python -m pytest --markers
CI/CD Integration
# Full test suite with coverage for CI
python -m pytest -v --cov=src/mqtt_application --cov-report=xml --cov-report=term
# Integration tests only for production validation
python -m pytest -v -m integration
# Quick validation (unit tests + fast integration tests)
python -m pytest -m "not slow" -v
Note: Integration tests use real MQTT connections to test.mosquitto.org and test actual component interactions. This provides more reliable validation than mock-based unit tests, but requires network access.
Troubleshooting
Virtual Environment Issues:
# If 'python' command doesn't work, use the virtual environment directly
.venv/bin/python -m pytest # Linux/macOS
.venv\Scripts\python -m pytest # Windows
# Check you're using the right Python version (3.8+)
python --version
which python # Should point to .venv/bin/python (Linux/macOS)
Common Issues and Solutions:
- SyntaxError with pytest: Ensure you're using Python 3.8+ from your virtual environment, not system Python 2.7
- ModuleNotFoundError: Install development dependencies with
pip install -e ".[dev]" - Network timeouts: Integration tests require internet access to
test.mosquitto.org. Use-m "not integration"to skip them - Permission denied: Use the full path to the virtual environment's Python executable
- Tests hang: Some integration tests make real network connections. Use
Ctrl+Cto interrupt and check your network connection
Getting Help:
# Show pytest help
python -m pytest --help
# Show available fixtures
python -m pytest --fixtures
# Show available markers
python -m pytest --markers
# Collect tests without running them
python -m pytest --collect-only
python -m pytest --markers | grep @pytest.mark
The integration test suite covers:
- Real MQTT broker connections and message publishing/subscribing
- End-to-end command processing workflows
- Status publishing and periodic operations
- Network resilience and error handling
- Malformed message handling
- Connection retry mechanisms
Code Formatting
black .
ruff check .
Architecture
The library is designed with a modular architecture:
- AsyncMqttClient: Core MQTT connectivity and message routing
- AsyncCommandHandler: Command processing with built-in acknowledgment system
- PeriodicStatusPublisher: Regular status reporting functionality
- Workers: Concurrent message processing system
- Config: Centralized configuration management
Dependencies
- External modules (must be installed separately):
mqtt-logger: MQTT-enabled loggingmuxu-io-mqtt-connector: Low-level MQTT operations
- Standard dependencies:
aiomqtt: Async MQTT clientpyyaml: YAML configuration parsing
License
MIT License - see LICENSE file for details.
Contributing
- Fork the repository
- Create a feature branch
- Set up development environment:
python3 -m venv .venv source .venv/bin/activate # On Linux/macOS pip install -e ".[dev]"
- Make your changes
- Add tests for new functionality
- Run the test suite:
# Run all tests python -m pytest # Or run quick tests during development python -m pytest -m "not integration" -x
- Submit a pull request
Support
For issues and questions:
- GitHub Issues: Project Issues
- Email: alex@muxu.io
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file muxu_io_mqtt_application-1.0.1.tar.gz.
File metadata
- Download URL: muxu_io_mqtt_application-1.0.1.tar.gz
- Upload date:
- Size: 58.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f3275e71e6e1bc95527d08c979b76710a1cf58cde595281624014284a513fe16
|
|
| MD5 |
1db6ba314046eaa6ee1e4efb7df8eb56
|
|
| BLAKE2b-256 |
b11cad5faf0e075b68480542d5b90eff33c84517e7c5d3497252ef2bd7b5012e
|
Provenance
The following attestation bundles were made for muxu_io_mqtt_application-1.0.1.tar.gz:
Publisher:
publish.yml on muxu-io/mqtt-application
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
muxu_io_mqtt_application-1.0.1.tar.gz -
Subject digest:
f3275e71e6e1bc95527d08c979b76710a1cf58cde595281624014284a513fe16 - Sigstore transparency entry: 402817752
- Sigstore integration time:
-
Permalink:
muxu-io/mqtt-application@655e1d6148e46ff558258b82f52b0380881402da -
Branch / Tag:
refs/tags/v1.0.1 - Owner: https://github.com/muxu-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@655e1d6148e46ff558258b82f52b0380881402da -
Trigger Event:
push
-
Statement type:
File details
Details for the file muxu_io_mqtt_application-1.0.1-py3-none-any.whl.
File metadata
- Download URL: muxu_io_mqtt_application-1.0.1-py3-none-any.whl
- Upload date:
- Size: 33.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7f144fe0fc72002a9d3748778b7c631360d7c82013e86206a35e6e399f1eda77
|
|
| MD5 |
42e054dbb8b02c7e034548578088e28a
|
|
| BLAKE2b-256 |
54b371593b37df7e633082c76f01a60327bdd8888820d8b7aef47fe670763b3c
|
Provenance
The following attestation bundles were made for muxu_io_mqtt_application-1.0.1-py3-none-any.whl:
Publisher:
publish.yml on muxu-io/mqtt-application
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
muxu_io_mqtt_application-1.0.1-py3-none-any.whl -
Subject digest:
7f144fe0fc72002a9d3748778b7c631360d7c82013e86206a35e6e399f1eda77 - Sigstore transparency entry: 402817759
- Sigstore integration time:
-
Permalink:
muxu-io/mqtt-application@655e1d6148e46ff558258b82f52b0380881402da -
Branch / Tag:
refs/tags/v1.0.1 - Owner: https://github.com/muxu-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@655e1d6148e46ff558258b82f52b0380881402da -
Trigger Event:
push
-
Statement type: