Skip to main content

MQTT plugin for FlowerPower - trigger pipeline execution on MQTT messages

Project description

FlowerPower MQTT Plugin

A simple yet powerful MQTT plugin for FlowerPower that triggers pipeline execution when messages arrive on subscribed topics. Features configurable QoS levels, optional RQ job queue integration for asynchronous processing, and a beautiful CLI for easy management.

Features

Core Features

  • Simple API: Easy-to-use interface for connecting, subscribing, and listening
  • QoS Support: Full MQTT QoS support (0, 1, 2) for reliable message delivery
  • Async Processing: Optional RQ job queue integration for background pipeline execution
  • High Performance: Uses msgspec for fast serialization and deserialization
  • Flexible Payload Handling: Supports JSON, YAML, MessagePack, Pickle, Protobuf, and PyArrow IPC payload serialization/deserialization with auto-detection.
  • Multiple Execution Modes:
    • sync: Direct pipeline execution (blocking)
    • async: Background execution via RQ
    • mixed: QoS-based routing (QoS 2 → sync, QoS 0/1 → async)
  • Topic Wildcards: Support for MQTT topic patterns (+, #)
  • Graceful Shutdown: Ctrl+C handling and proper cleanup
  • Statistics & Monitoring: Built-in metrics and job tracking
  • Configuration Management: YAML-based configuration support with msgspec validation
  • Automatic Reconnection: Robust connection handling with configurable retry attempts and exponential backoff

CLI Features

  • Beautiful Interface: Rich, colorful CLI with tables and progress bars
  • Interactive Configuration: Step-by-step configuration setup
  • Real-time Monitoring: Live statistics and subscription monitoring
  • Configuration Management: Create, validate, edit, and show configurations
  • Job Queue Management: Monitor and manage RQ workers and jobs
  • Shell Completion: Auto-completion support for commands and options
  • JSON Output: Machine-readable output for scripting and automation

Installation

flowerpower-mqtt is available on PyPI and can be installed using uv pip (recommended) or pip.

Using uv pip (Recommended)

uv is a fast Python package installer and resolver. It's the recommended way to install flowerpower-mqtt.

uv pip install flowerpower-mqtt

Using pip

If you prefer using pip, you can install flowerpower-mqtt directly:

pip install flowerpower-mqtt

After installation, the flowerpower-mqtt CLI command will be available.

Quick Start

CLI Quick Start

The fastest way to get started is using the CLI:

# 1. Create configuration interactively
flowerpower-mqtt config create --interactive

# 2. Connect to MQTT broker
flowerpower-mqtt connect --config mqtt_config.yml

# 3. Subscribe to topics
flowerpower-mqtt subscribe "sensors/+/temperature" temperature_processor --qos 1

# 4. Start listening (press Ctrl+C to stop)
flowerpower-mqtt listen

For async processing with job queue:

# 1. Create config with job queue enabled
flowerpower-mqtt config create --interactive --job-queue

# 2. Start RQ worker (in separate terminal)
rq worker mqtt_pipelines --url redis://localhost:6379

# 3. Connect and subscribe with async mode
flowerpower-mqtt connect --config mqtt_config.yml
flowerpower-mqtt subscribe "data/+" data_processor --qos 1 --mode async

# 4. Start listening and monitor
flowerpower-mqtt listen --background &
flowerpower-mqtt monitor --interval 5

Programmatic Usage

1. Basic Synchronous Usage

import asyncio
from flowerpower_mqtt import MQTTPlugin

async def main():
    # Create plugin instance
    mqtt = MQTTPlugin(
        broker="localhost",
        base_dir="/path/to/flowerpower/project"
    )
    
    # Connect to broker
    await mqtt.connect()
    
    # Subscribe to topic
    await mqtt.subscribe("sensors/temperature", "temperature_pipeline", qos=1)
    
    # Start listening (blocks until Ctrl+C)
    await mqtt.start_listener()

# Run the plugin
asyncio.run(main())

2. Asynchronous with Job Queue

import asyncio
from flowerpower_mqtt import MQTTPlugin

async def main():
    # Create plugin with RQ job queue enabled
    mqtt = MQTTPlugin(
        broker="mqtt.example.com",
        base_dir="/path/to/flowerpower/project",
        use_job_queue=True,
        redis_url="redis://localhost:6379"
    )
    
    await mqtt.connect()
    
    # Subscribe with async execution
    await mqtt.subscribe("sensors/+/data", "sensor_processor", qos=1, execution_mode="async")
    await mqtt.subscribe("alerts/critical", "alert_handler", qos=2, execution_mode="sync")
    
    # Start listener in background
    await mqtt.start_listener(background=True)
    
    # Do other work...
    await asyncio.sleep(60)
    
    # Stop gracefully
    await mqtt.stop_listener()

asyncio.run(main())

3. Using Configuration File

Create mqtt_config.yml:

mqtt:
  broker: "mqtt.example.com"
  port: 1883
  keepalive: 60
  client_id: "flowerpower_mqtt_client"
  reconnect_retries: 5
  reconnect_delay: 5

subscriptions:
  - topic: "sensors/+/temperature"
    pipeline: "temperature_processor"
    qos: 1
    execution_mode: "async"
  - topic: "alerts/critical"
    pipeline: "alert_handler"
    qos: 2
    execution_mode: "sync"

job_queue:
  enabled: true
  type: "rq"
  redis_url: "redis://localhost:6379"
  queue_name: "mqtt_pipelines"
  worker_count: 4

base_dir: "/path/to/flowerpower/project"
log_level: "INFO"

Then use it:

import asyncio
from flowerpower_mqtt import MQTTPlugin

async def main():
    # Load from configuration
    mqtt = MQTTPlugin.from_config("mqtt_config.yml")
    
    await mqtt.connect()
    await mqtt.start_listener()

asyncio.run(main())

CLI Reference

The FlowerPower MQTT CLI provides a comprehensive interface for managing MQTT connections, subscriptions, and monitoring.

Configuration Management

# Create configuration interactively
flowerpower-mqtt config create --interactive

# Create configuration with job queue enabled
flowerpower-mqtt config create --job-queue --output my_config.yml

# Validate configuration file
flowerpower-mqtt config validate mqtt_config.yml

# Show current configuration
flowerpower-mqtt config show

# Edit configuration (opens in $EDITOR)
flowerpower-mqtt config edit

Connection Management

# Connect to MQTT broker
flowerpower-mqtt connect --broker localhost --port 1883

# Connect with job queue enabled
flowerpower-mqtt connect --broker mqtt.example.com --job-queue --redis-url redis://localhost:6379

# Connect using configuration file
flowerpower-mqtt connect --config mqtt_config.yml

# Disconnect from broker
flowerpower-mqtt disconnect

Subscription Management

# Subscribe to topic
flowerpower-mqtt subscribe "sensors/temperature" temp_processor --qos 1

# Subscribe with async execution
flowerpower-mqtt subscribe "data/+/events" event_processor --qos 1 --mode async

# Subscribe with mixed mode (QoS-based routing)
flowerpower-mqtt subscribe "mixed/data" mixed_processor --qos 2 --mode mixed

# List all subscriptions
flowerpower-mqtt list-subscriptions

# List only active subscriptions
flowerpower-mqtt list-subscriptions --active

# Unsubscribe from topic
flowerpower-mqtt unsubscribe "sensors/temperature"

Listening and Monitoring

# Start listening (blocks until Ctrl+C)
flowerpower-mqtt listen

# Start listening in background
flowerpower-mqtt listen --background

# Start with execution mode override
flowerpower-mqtt listen --override-mode async

# Start with timeout
flowerpower-mqtt listen --timeout 300

# Show current status
flowerpower-mqtt status

# Show status as JSON
flowerpower-mqtt status --json

# Real-time monitoring
flowerpower-mqtt monitor --interval 5

# Monitor for specific duration
flowerpower-mqtt monitor --interval 10 --duration 300

Job Queue Management

# Check job queue status
flowerpower-mqtt jobs status

# Start RQ worker (shows command to run)
flowerpower-mqtt jobs worker start --count 2

# Check worker status
flowerpower-mqtt jobs worker status

Common CLI Workflows

Development Workflow

# 1. Create and validate configuration
flowerpower-mqtt config create --interactive
flowerpower-mqtt config validate mqtt_config.yml

# 2. Quick testing
flowerpower-mqtt connect
flowerpower-mqtt subscribe "test/+" test_pipeline --qos 0
flowerpower-mqtt listen --timeout 60

# 3. Real-time monitoring
flowerpower-mqtt monitor --interval 5

Production Workflow

# 1. Load from version-controlled config
flowerpower-mqtt config validate production_config.yml
flowerpower-mqtt connect --config production_config.yml

# 2. Start with job queue
rq worker mqtt_pipelines --url redis://localhost:6379 &
flowerpower-mqtt listen --background --override-mode async

# 3. Monitor operations
flowerpower-mqtt monitor --json > monitoring.log &
flowerpower-mqtt jobs status

CLI Tips

  • Use --json flag for machine-readable output in scripts
  • Configuration files can be version controlled and shared
  • Use --save-config to persist CLI-created subscriptions
  • The CLI supports shell completion (enable with your shell)
  • Use flowerpower-mqtt --help to see all available commands
  • Each command has detailed help: flowerpower-mqtt connect --help

Documentation

For comprehensive and detailed information about FlowerPower MQTT, including advanced topics, API references, and in-depth usage guides, please refer to our Full Documentation.

Our documentation has been recently reorganized to provide a more user-friendly navigation structure, making it easier to find the information you need.

Usage Patterns

QoS Levels

Choose the appropriate QoS level based on your use case:

# QoS 0: Fire-and-forget (best for high-volume, non-critical data)
await mqtt.subscribe("logs/debug", "log_processor", qos=0)

# QoS 1: At-least-once delivery (good for important events)
await mqtt.subscribe("sensors/data", "data_processor", qos=1)

# QoS 2: Exactly-once delivery (critical business processes)
await mqtt.subscribe("payments/completed", "payment_processor", qos=2)

Execution Modes

# Synchronous: Direct execution (blocking)
await mqtt.subscribe("critical/alerts", "alert_handler", execution_mode="sync")

# Asynchronous: Background execution via RQ
await mqtt.subscribe("batch/data", "batch_processor", execution_mode="async")

# Mixed: QoS-based routing
await mqtt.subscribe("mixed/topic", "mixed_pipeline", execution_mode="mixed")

### Payload Deserialization

Specify how incoming MQTT message payloads should be deserialized:

```python
# JSON (default if not specified for text-based payloads)
await mqtt.subscribe("data/json", "json_processor", deserialization_format="json")

# MessagePack
await mqtt.subscribe("data/msgpack", "msgpack_processor", deserialization_format="msgpack")

# YAML
await mqtt.subscribe("data/yaml", "yaml_processor", deserialization_format="yaml")

# Pickle
await mqtt.subscribe("data/pickle", "pickle_processor", deserialization_format="pickle")

# Protobuf (requires a compiled Protobuf schema)
await mqtt.subscribe("data/protobuf", "protobuf_processor", deserialization_format="protobuf")

# PyArrow IPC (e.g., for Arrow Tables)
await mqtt.subscribe("data/arrow", "arrow_processor", deserialization_format="pyarrow")

# Auto-detection (attempts JSON, MessagePack, YAML, PyArrow, Pickle in order)
await mqtt.subscribe("data/auto", "auto_processor", deserialization_format="auto")

### Context Manager Usage

```python
async def main():
    async with MQTTPlugin("localhost", base_dir=".") as mqtt:
        await mqtt.subscribe("test/topic", "test_pipeline")
        await mqtt.start_listener()

Bulk Subscriptions

subscriptions = [
    {"topic": "sensors/+/temperature", "pipeline": "temp_monitor", "qos": 1},
    {"topic": "alerts/critical/#", "pipeline": "alert_handler", "qos": 2},
    {"topic": "data/batch/+", "pipeline": "batch_processor", "qos": 0}
]

await mqtt.subscribe_bulk(subscriptions)

Pipeline Integration

Your FlowerPower pipelines will receive MQTT message data as input:

# pipelines/sensor_processor.py
import pandas as pd
from hamilton.function_modifiers import parameterize

def process_mqtt_message(mqtt_message: dict, mqtt_topic: str, mqtt_qos: int) -> dict:
    """Process incoming MQTT message."""
    print(f"Received message from {mqtt_topic} (QoS {mqtt_qos})")
    
    # Access message payload
    sensor_data = mqtt_message.get("sensor_data", {})
    
    # Process the data
    result = {
        "processed_at": mqtt_message["execution_timestamp"],
        "topic": mqtt_topic,
        "temperature": sensor_data.get("temperature"),
        "status": "processed"
    }
    
    return result

def save_results(process_mqtt_message: dict) -> str:
    """Save processing results."""
    # Save to database, file, etc.
    return f"Saved results: {process_mqtt_message['status']}"

Monitoring and Statistics

# Get current subscriptions
subscriptions = mqtt.get_subscriptions()
for sub in subscriptions:
    print(f"Topic: {sub['topic']}, Messages: {sub['message_count']}")

# Get plugin statistics
stats = mqtt.get_statistics()
print(f"Connected: {stats['connected']}")
print(f"Message count: {stats['message_count']}")
print(f"Pipeline executions: {stats['pipeline_count']}")
print(f"Errors: {stats['error_count']}")

# Save current configuration
mqtt.save_config("current_config.yml")

Job Queue Integration

When using RQ for asynchronous processing:

Start RQ Worker

# In a separate terminal, start RQ worker
rq worker mqtt_pipelines --url redis://localhost:6379

Monitor Jobs

# Job status is automatically tracked
stats = mqtt.get_statistics()
if "job_queue_stats" in stats:
    print(f"Queue: {stats['job_queue_stats']['queue_name']}")

Error Handling

The plugin includes comprehensive error handling:

from flowerpower_mqtt import MQTTPlugin, ConnectionError, SubscriptionError

try:
    mqtt = MQTTPlugin("invalid.broker.com")
    await mqtt.connect()
except ConnectionError as e:
    print(f"Connection failed: {e}")

try:
    await mqtt.subscribe("test/topic", "nonexistent_pipeline", qos=3)
except SubscriptionError as e:
    print(f"Subscription failed: {e}")

Examples

Check the examples/ directory for more comprehensive examples:

Programmatic Examples

  • examples/basic_usage.py - Simple synchronous usage
  • examples/async_with_rq.py - Asynchronous processing with RQ
  • examples/config_based.py - Configuration file usage
  • examples/multiple_qos.py - Different QoS levels
  • examples/monitoring.py - Statistics and monitoring

CLI Integration Examples

  • examples/cli_usage.py - CLI and programmatic integration
  • examples/programmatic_vs_cli.py - Comparison of approaches
  • examples/example_pipeline.py - Sample FlowerPower pipeline

CLI vs Programmatic Usage

Choose the right approach for your use case:

Use CLI When:

  • Development & Testing: Quick iterations and prototyping
  • Operations: Monitoring, troubleshooting, and administration
  • Configuration Management: Creating, validating, and editing configurations
  • Simple Use Cases: Basic MQTT message processing
  • Learning: Exploring features with immediate feedback
  • Scripting: Automation with shell scripts

Use Programmatic API When:

  • Application Integration: Embedding in larger applications
  • Complex Logic: Conditional operations and business rules
  • Custom Monitoring: Integration with existing dashboards
  • Error Handling: Advanced error handling and recovery
  • Dynamic Behavior: Runtime decision making
  • Performance Critical: Fine-tuned control over operations

Hybrid Approach (Recommended):

Use CLI for setup and operations, programmatic API for application logic:

# Use CLI-generated config in Python code
from flowerpower_mqtt import MQTTPlugin

plugin = MQTTPlugin.from_config("cli_generated_config.yml")
# Add custom business logic here...

Requirements

  • Python >=3.11
  • FlowerPower
  • aiomqtt >=2.0.0
  • msgspec >=0.18.0 (high-performance serialization)
  • typer[all] >=0.9.0 (CLI framework)
  • rich >=13.0.0 (beautiful CLI output)
  • Redis (for job queue functionality)
  • RQ >=1.15.0 (for async processing)

Development

# Clone repository
git clone https://github.com/legout/flowerpower-mqtt.git
cd flowerpower-mqtt

# Install development dependencies
uv pip install -e ".[dev]"

# Run tests
pytest

# Type checking
mypy src/

# Code formatting
black src/
ruff check src/

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Add tests
  5. Run the test suite
  6. Submit a pull request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Changelog

v0.2.0 (Current)

  • Breaking Change: Migrated from Pydantic to msgspec.Struct for 10-50x performance improvement
  • New: Comprehensive CLI with beautiful rich output
  • New: Interactive configuration creation and management
  • New: Real-time monitoring with rich tables and charts
  • New: Job queue management via CLI
  • New: Shell completion support
  • New: JSON output for scripting and automation
  • Enhanced: Configuration validation with detailed error messages
  • Enhanced: Better error handling throughout the codebase

v0.1.0

  • Initial release
  • Basic MQTT subscription and pipeline execution
  • QoS support (0, 1, 2)
  • RQ job queue integration
  • Configuration management
  • Graceful shutdown handling
  • Statistics and monitoring

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowerpower_mqtt-0.2.1.tar.gz (25.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowerpower_mqtt-0.2.1-py3-none-any.whl (29.7 kB view details)

Uploaded Python 3

File details

Details for the file flowerpower_mqtt-0.2.1.tar.gz.

File metadata

  • Download URL: flowerpower_mqtt-0.2.1.tar.gz
  • Upload date:
  • Size: 25.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.14

File hashes

Hashes for flowerpower_mqtt-0.2.1.tar.gz
Algorithm Hash digest
SHA256 8d0b90181580786cd588329a510e89921fdcdce67fc4b2146d8108b9faadecd2
MD5 6279ce4c9e47fb250a22d204f3694795
BLAKE2b-256 33380d8e7f1ba62f4445b825f14f92705015b929f7019dc4823a3a6f45d08021

See more details on using hashes here.

File details

Details for the file flowerpower_mqtt-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for flowerpower_mqtt-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b010ac560582d6df97b53bdc8196a376f3d8254785cfc4b8b99f00a864f3224b
MD5 e4903c6e6bab818423c767429240b7e4
BLAKE2b-256 625a6d0c87e01796d42993a5cb2385de49a5ccda8cc66ca4c8385862f45eafe5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page