A FastAPI-style MQTT framework with dependency injection, topic path parameters, and hierarchical routing
Project description
FasterMQTT
A FastAPI-style MQTT framework inspired by FastStream architecture.
FasterMQTT brings the elegant router pattern from FastAPI to MQTT, enabling clean subscription management with dependency injection, topic path parameters, and hierarchical routing.
Features
- FastAPI Integration: Seamlessly integrates with FastAPI through lifespan management
- Decorator-based Subscriptions: Define handlers with
@router.subscribe("topic/{param}") - Topic Path Parameters: Automatic extraction like
client/{client_id}/control→client_id="abc123" - Dependency Injection: Full FastAPI
Depends()support in MQTT handlers - Hierarchical Routing: Nested routers with prefix accumulation via
include_router() - Shared Subscriptions: MQTT 5.0
$share/{group}/{topic}consumer groups - Middleware System: Onion model middleware for message interception
- Pydantic/SQLModel Support: Automatic serialization/deserialization of message payloads
- Type Safety: Full type hints throughout the codebase
Installation
pip install fastermqtt
Quick Start
Basic Usage
from fastapi import FastAPI
from fastermqtt import MqttRouter
# Create root router with MQTT connection config
mqtt_router = MqttRouter(
host="localhost",
port=1883,
username="user",
password="password",
)
# Subscribe to a topic
@mqtt_router.subscribe("sensors/temperature")
async def handle_temperature(payload: bytes):
temperature = float(payload.decode())
print(f"Temperature: {temperature}")
# Integrate with FastAPI
app = FastAPI()
app.include_router(mqtt_router)
Topic Path Parameters
Extract values from topic segments automatically:
@mqtt_router.subscribe("client/{client_id}/control")
async def handle_control(client_id: str, payload: bytes):
print(f"Command for client {client_id}: {payload}")
Hierarchical Routing
Organize subscriptions with nested routers:
# Root router (manages MQTT connection)
mqtt_router = MqttRouter(host="localhost", port=1883)
# Sub-router (no connection config, shares parent's broker)
client_router = MqttRouter(prefix="client")
@client_router.subscribe("{client_id}/status")
async def handle_status(client_id: str, payload: bytes):
# Subscribes to: client/{client_id}/status
pass
# Include sub-router
mqtt_router.include_router(client_router)
Dependency Injection
Use FastAPI's dependency injection in MQTT handlers:
from fastapi import Depends
from sqlmodel.ext.asyncio.session import AsyncSession
async def get_session() -> AsyncSession:
async with async_session_maker() as session:
yield session
SessionDep = Annotated[AsyncSession, Depends(get_session)]
@mqtt_router.subscribe("events/{event_type}")
async def handle_event(
event_type: str,
payload: bytes,
session: SessionDep,
):
# Save event to database
event = Event(type=event_type, data=payload.decode())
session.add(event)
await session.commit()
Publishing Messages
# Publish from router (uses router's prefix)
client_router = MqttRouter(prefix="client/{client_id}/response")
await client_router.publish(
payload=b"OK",
client_id="abc123", # Replaces {client_id}
qos=1,
)
# Publishes to: client/abc123/response
# Publish directly via broker
from fastermqtt import MQTTBroker
await MQTTBroker.publish(
topic="notifications/alert",
payload=b"System alert!",
qos=2,
retain=True,
)
Shared Subscriptions (Consumer Groups)
Distribute messages across multiple service instances:
# Global default consumer group
mqtt_router = MqttRouter(
host="localhost",
port=1883,
default_consumer_group="workers", # All subscriptions use this group
)
# Per-subscription consumer group
@mqtt_router.subscribe("tasks/heavy", group="heavy-workers")
async def handle_heavy_task(payload: bytes):
# Only one instance in "heavy-workers" group receives each message
pass
# Force no shared subscription (override default)
@mqtt_router.subscribe("broadcast/all", group="")
async def handle_broadcast(payload: bytes):
# All instances receive every message
pass
Pydantic Model Serialization
from pydantic import BaseModel
from fastermqtt import encode_payload, decode_payload
class SensorData(BaseModel):
sensor_id: str
value: float
timestamp: int
# Encode for publishing
data = SensorData(sensor_id="temp-1", value=23.5, timestamp=1234567890)
payload = encode_payload(data) # Returns JSON bytes
# Decode in handler
@mqtt_router.subscribe("sensors/data")
async def handle_sensor_data(payload: bytes):
data = decode_payload(payload, SensorData)
print(f"Sensor {data.sensor_id}: {data.value}")
Middleware
Add cross-cutting concerns like logging and error handling:
from fastermqtt import (
BaseMQTTMiddleware,
MiddlewareChain,
LoggingMiddleware,
ErrorHandlingMiddleware,
MQTTMessage,
)
class MetricsMiddleware(BaseMQTTMiddleware):
async def on_receive(self, message: MQTTMessage, call_next):
start = time.time()
result = await call_next(message)
duration = time.time() - start
metrics.record("mqtt_message_duration", duration)
return result
# Build middleware chain
chain = MiddlewareChain()
chain.add(ErrorHandlingMiddleware())
chain.add(LoggingMiddleware(log_payload=True))
chain.add(MetricsMiddleware())
API Reference
MqttRouter
The main router class that inherits from FastAPI's APIRouter.
MqttRouter(
host: str | None = None, # MQTT broker address (root router only)
port: int = 8883, # MQTT broker port
username: str | None = None, # Authentication username
password: str | None = None, # Authentication password
client_id: str | None = None, # Client ID (auto-generated if not provided)
keepalive: int = 60, # Heartbeat interval (seconds)
ssl_ca_cert: str | None = None, # SSL CA certificate path
clean_session: bool = True, # Whether to clean session on connect
default_consumer_group: str | None = None, # Default shared subscription group
prefix: str = "", # Topic prefix
)
Methods
subscribe(topic, qos=0, group=None)- Decorator to register a subscription handlerpublish(payload, qos=0, retain=False, **path_params)- Publish a messageinclude_router(router, prefix="", ...)- Include a sub-router
MQTTBroker
Singleton manager for the MQTT connection (pure classmethod pattern).
# Lifecycle (called automatically by MqttRouter)
await MQTTBroker.start(config)
await MQTTBroker.stop()
# Publishing
await MQTTBroker.publish(topic, payload, qos=0, retain=False)
# Status
MQTTBroker.is_connected() # bool
MQTTBroker.is_initialized() # bool
Dependency Functions
from fastermqtt import (
get_mqtt_message, # Get MQTTMessage object
get_mqtt_topic, # Get topic string
get_mqtt_payload, # Get raw payload bytes
get_mqtt_qos, # Get QoS level
get_topic_param, # Extract topic segment by index
)
# Type aliases for convenience
from fastermqtt import (
MqttMessageDep, # Annotated[MQTTMessage, Depends(get_mqtt_message)]
MqttTopicDep, # Annotated[str, Depends(get_mqtt_topic)]
MqttPayloadDep, # Annotated[bytes, Depends(get_mqtt_payload)]
MqttQosDep, # Annotated[int, Depends(get_mqtt_qos)]
)
Types
from fastermqtt import (
MQTTMessage, # Message container (topic, payload, qos, properties)
SubscriptionInfo, # Subscription metadata
MQTTConfig, # Connection configuration
)
Exceptions
from fastermqtt import (
MQTTException, # Base exception
MQTTConnectionError, # Connection failures
MQTTSubscriptionError, # Subscription failures
MQTTPublishError, # Publish failures
MQTTSerializationError, # Serialization/deserialization errors
MQTTTopicError, # Topic pattern errors
MQTTMiddlewareError, # Middleware errors
MQTTRouterError, # Router configuration errors
MQTTNotInitializedError, # Broker not initialized
)
Architecture
FasterMQTT follows FastStream's architecture:
MqttRouter (inherits APIRouter)
├── Manages MQTTBroker lifecycle via lifespan
├── Supports include_router() for hierarchical routing
├── Prefix accumulation: sub-router topics prepend parent prefix
└── Shares broker across all routers
MQTTBroker (Singleton, pure classmethod)
├── Manages gmqtt Client connection
├── Dispatches messages to subscribers
├── FastAPI-style dependency injection via solve_dependencies
└── Topic parameter extraction via regex
Configuration
SSL/TLS
mqtt_router = MqttRouter(
host="mqtt.example.com",
port=8883,
ssl_ca_cert="/path/to/ca.crt",
)
Clean Session
mqtt_router = MqttRouter(
host="localhost",
port=1883,
clean_session=False, # Persist subscriptions across reconnects
)
Requirements
- Python 3.10+
- FastAPI
- gmqtt
- pydantic
- orjson (for JSON serialization)
License
MIT License
Acknowledgments
- FastStream - Inspiration for the router pattern architecture
- FastAPI - Dependency injection and router patterns
- gmqtt - Underlying MQTT client
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastermqtt-0.1.0.tar.gz.
File metadata
- Download URL: fastermqtt-0.1.0.tar.gz
- Upload date:
- Size: 55.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9c7e25c6077a564fb76ea6ecf315785844d7d067417a5ab3769039b57aa2ccf5
|
|
| MD5 |
a093d7b3c78c3b861cb22d51ab7f0192
|
|
| BLAKE2b-256 |
17d4548f003c30e366f3b9c4d53fd71dcfaab471db67e4d895c68693de706e6f
|
File details
Details for the file fastermqtt-0.1.0-py3-none-any.whl.
File metadata
- Download URL: fastermqtt-0.1.0-py3-none-any.whl
- Upload date:
- Size: 24.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e1f7e083a800eb2f339a48a8cfd5c4da77a1eed7d426acde9e44708d30897425
|
|
| MD5 |
ae4d2c80c3694eae84a7ab9f82215421
|
|
| BLAKE2b-256 |
169774783802028ac54e6fb8c48642079da070b8bb565b80b87074632a502d72
|