Skip to main content

A FastAPI-style MQTT framework with dependency injection, topic path parameters, and hierarchical routing

Project description

FasterMQTT

A FastAPI-style MQTT framework inspired by FastStream architecture.

FasterMQTT brings the elegant router pattern from FastAPI to MQTT, enabling clean subscription management with dependency injection, topic path parameters, and hierarchical routing.

Features

  • FastAPI Integration: Seamlessly integrates with FastAPI through lifespan management
  • Decorator-based Subscriptions: Define handlers with @router.subscribe("topic/{param}")
  • Topic Path Parameters: Automatic extraction like client/{client_id}/controlclient_id="abc123"
  • Dependency Injection: Full FastAPI Depends() support in MQTT handlers
  • Hierarchical Routing: Nested routers with prefix accumulation via include_router()
  • Shared Subscriptions: MQTT 5.0 $share/{group}/{topic} consumer groups
  • Middleware System: Onion model middleware for message interception
  • Pydantic/SQLModel Support: Automatic serialization/deserialization of message payloads
  • Type Safety: Full type hints throughout the codebase

Installation

pip install fastermqtt

Quick Start

Basic Usage

from fastapi import FastAPI
from fastermqtt import MqttRouter

# Create root router with MQTT connection config
mqtt_router = MqttRouter(
    host="localhost",
    port=1883,
    username="user",
    password="password",
)

# Subscribe to a topic
@mqtt_router.subscribe("sensors/temperature")
async def handle_temperature(payload: bytes):
    temperature = float(payload.decode())
    print(f"Temperature: {temperature}")

# Integrate with FastAPI
app = FastAPI()
app.include_router(mqtt_router)

Topic Path Parameters

Extract values from topic segments automatically:

@mqtt_router.subscribe("client/{client_id}/control")
async def handle_control(client_id: str, payload: bytes):
    print(f"Command for client {client_id}: {payload}")

Hierarchical Routing

Organize subscriptions with nested routers:

# Root router (manages MQTT connection)
mqtt_router = MqttRouter(host="localhost", port=1883)

# Sub-router (no connection config, shares parent's broker)
client_router = MqttRouter(prefix="client")

@client_router.subscribe("{client_id}/status")
async def handle_status(client_id: str, payload: bytes):
    # Subscribes to: client/{client_id}/status
    pass

# Include sub-router
mqtt_router.include_router(client_router)

Dependency Injection

Use FastAPI's dependency injection in MQTT handlers:

from fastapi import Depends
from sqlmodel.ext.asyncio.session import AsyncSession

async def get_session() -> AsyncSession:
    async with async_session_maker() as session:
        yield session

SessionDep = Annotated[AsyncSession, Depends(get_session)]

@mqtt_router.subscribe("events/{event_type}")
async def handle_event(
    event_type: str,
    payload: bytes,
    session: SessionDep,
):
    # Save event to database
    event = Event(type=event_type, data=payload.decode())
    session.add(event)
    await session.commit()

Publishing Messages

# Publish from router (uses router's prefix)
client_router = MqttRouter(prefix="client/{client_id}/response")

await client_router.publish(
    payload=b"OK",
    client_id="abc123",  # Replaces {client_id}
    qos=1,
)
# Publishes to: client/abc123/response

# Publish directly via broker
from fastermqtt import MQTTBroker

await MQTTBroker.publish(
    topic="notifications/alert",
    payload=b"System alert!",
    qos=2,
    retain=True,
)

Shared Subscriptions (Consumer Groups)

Distribute messages across multiple service instances:

# Global default consumer group
mqtt_router = MqttRouter(
    host="localhost",
    port=1883,
    default_consumer_group="workers",  # All subscriptions use this group
)

# Per-subscription consumer group
@mqtt_router.subscribe("tasks/heavy", group="heavy-workers")
async def handle_heavy_task(payload: bytes):
    # Only one instance in "heavy-workers" group receives each message
    pass

# Force no shared subscription (override default)
@mqtt_router.subscribe("broadcast/all", group="")
async def handle_broadcast(payload: bytes):
    # All instances receive every message
    pass

Pydantic Model Serialization

from pydantic import BaseModel
from fastermqtt import encode_payload, decode_payload

class SensorData(BaseModel):
    sensor_id: str
    value: float
    timestamp: int

# Encode for publishing
data = SensorData(sensor_id="temp-1", value=23.5, timestamp=1234567890)
payload = encode_payload(data)  # Returns JSON bytes

# Decode in handler
@mqtt_router.subscribe("sensors/data")
async def handle_sensor_data(payload: bytes):
    data = decode_payload(payload, SensorData)
    print(f"Sensor {data.sensor_id}: {data.value}")

Middleware

Add cross-cutting concerns like logging and error handling:

from fastermqtt import (
    BaseMQTTMiddleware,
    MiddlewareChain,
    LoggingMiddleware,
    ErrorHandlingMiddleware,
    MQTTMessage,
)

class MetricsMiddleware(BaseMQTTMiddleware):
    async def on_receive(self, message: MQTTMessage, call_next):
        start = time.time()
        result = await call_next(message)
        duration = time.time() - start
        metrics.record("mqtt_message_duration", duration)
        return result

# Build middleware chain
chain = MiddlewareChain()
chain.add(ErrorHandlingMiddleware())
chain.add(LoggingMiddleware(log_payload=True))
chain.add(MetricsMiddleware())

API Reference

MqttRouter

The main router class that inherits from FastAPI's APIRouter.

MqttRouter(
    host: str | None = None,          # MQTT broker address (root router only)
    port: int = 8883,                  # MQTT broker port
    username: str | None = None,      # Authentication username
    password: str | None = None,      # Authentication password
    client_id: str | None = None,     # Client ID (auto-generated if not provided)
    keepalive: int = 60,              # Heartbeat interval (seconds)
    ssl_ca_cert: str | None = None,   # SSL CA certificate path
    clean_session: bool = True,       # Whether to clean session on connect
    default_consumer_group: str | None = None,  # Default shared subscription group
    prefix: str = "",                 # Topic prefix
)

Methods

  • subscribe(topic, qos=0, group=None) - Decorator to register a subscription handler
  • publish(payload, qos=0, retain=False, **path_params) - Publish a message
  • include_router(router, prefix="", ...) - Include a sub-router

MQTTBroker

Singleton manager for the MQTT connection (pure classmethod pattern).

# Lifecycle (called automatically by MqttRouter)
await MQTTBroker.start(config)
await MQTTBroker.stop()

# Publishing
await MQTTBroker.publish(topic, payload, qos=0, retain=False)

# Status
MQTTBroker.is_connected()  # bool
MQTTBroker.is_initialized()  # bool

Dependency Functions

from fastermqtt import (
    get_mqtt_message,   # Get MQTTMessage object
    get_mqtt_topic,     # Get topic string
    get_mqtt_payload,   # Get raw payload bytes
    get_mqtt_qos,       # Get QoS level
    get_topic_param,    # Extract topic segment by index
)

# Type aliases for convenience
from fastermqtt import (
    MqttMessageDep,  # Annotated[MQTTMessage, Depends(get_mqtt_message)]
    MqttTopicDep,    # Annotated[str, Depends(get_mqtt_topic)]
    MqttPayloadDep,  # Annotated[bytes, Depends(get_mqtt_payload)]
    MqttQosDep,      # Annotated[int, Depends(get_mqtt_qos)]
)

Types

from fastermqtt import (
    MQTTMessage,       # Message container (topic, payload, qos, properties)
    SubscriptionInfo,  # Subscription metadata
    MQTTConfig,        # Connection configuration
)

Exceptions

from fastermqtt import (
    MQTTException,           # Base exception
    MQTTConnectionError,     # Connection failures
    MQTTSubscriptionError,   # Subscription failures
    MQTTPublishError,        # Publish failures
    MQTTSerializationError,  # Serialization/deserialization errors
    MQTTTopicError,          # Topic pattern errors
    MQTTMiddlewareError,     # Middleware errors
    MQTTRouterError,         # Router configuration errors
    MQTTNotInitializedError, # Broker not initialized
)

Architecture

FasterMQTT follows FastStream's architecture:

MqttRouter (inherits APIRouter)
    ├── Manages MQTTBroker lifecycle via lifespan
    ├── Supports include_router() for hierarchical routing
    ├── Prefix accumulation: sub-router topics prepend parent prefix
    └── Shares broker across all routers

MQTTBroker (Singleton, pure classmethod)
    ├── Manages gmqtt Client connection
    ├── Dispatches messages to subscribers
    ├── FastAPI-style dependency injection via solve_dependencies
    └── Topic parameter extraction via regex

Configuration

SSL/TLS

mqtt_router = MqttRouter(
    host="mqtt.example.com",
    port=8883,
    ssl_ca_cert="/path/to/ca.crt",
)

Clean Session

mqtt_router = MqttRouter(
    host="localhost",
    port=1883,
    clean_session=False,  # Persist subscriptions across reconnects
)

Requirements

  • Python 3.10+
  • FastAPI
  • gmqtt
  • pydantic
  • orjson (for JSON serialization)

License

MIT License

Acknowledgments

  • FastStream - Inspiration for the router pattern architecture
  • FastAPI - Dependency injection and router patterns
  • gmqtt - Underlying MQTT client

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastermqtt-0.1.0.tar.gz (55.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastermqtt-0.1.0-py3-none-any.whl (24.1 kB view details)

Uploaded Python 3

File details

Details for the file fastermqtt-0.1.0.tar.gz.

File metadata

  • Download URL: fastermqtt-0.1.0.tar.gz
  • Upload date:
  • Size: 55.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for fastermqtt-0.1.0.tar.gz
Algorithm Hash digest
SHA256 9c7e25c6077a564fb76ea6ecf315785844d7d067417a5ab3769039b57aa2ccf5
MD5 a093d7b3c78c3b861cb22d51ab7f0192
BLAKE2b-256 17d4548f003c30e366f3b9c4d53fd71dcfaab471db67e4d895c68693de706e6f

See more details on using hashes here.

File details

Details for the file fastermqtt-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: fastermqtt-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 24.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.4

File hashes

Hashes for fastermqtt-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e1f7e083a800eb2f339a48a8cfd5c4da77a1eed7d426acde9e44708d30897425
MD5 ae4d2c80c3694eae84a7ab9f82215421
BLAKE2b-256 169774783802028ac54e6fb8c48642079da070b8bb565b80b87074632a502d72

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page