A flexible MQTT routing framework with middleware support
Project description
RouteMQ
Laravel-style MQTT routing for Python: controllers, middleware, jobs, and shared-subscription scaling without callback spaghetti.
Status: Beta. Use RouteMQ in test/staging while we harden the v1 surface. The framework is fully tested (581 unit tests) and supply-chain hardened, but APIs may shift before 1.0.
Documentation: docs/ · Source: github.com/ardzz/RouteMQ · PyPI: routemq
RouteMQ is a Python 3.12+ MQTT application framework that turns topics into async controller methods through middleware chains, with optional background jobs and shared-subscription workers.
RouteMQ gives you:
- Route topics like web routes. Declare
devices/{id}/statusonce; receiveidas a typed handler argument. - Controllers and middleware. Keep handlers in
app/controllers; layer auth, logging, rate limiting, and validation as reusable middleware. - Async by default. Use async Redis, MySQL (SQLAlchemy), ClickHouse, and job dispatch naturally inside handlers. RouteMQ bridges
paho-mqtt's sync callbacks for you. - Shared-subscription scaling. Flip
shared=Trueon a high-volume route; RouteMQ spawns worker processes against$share/<group>/<topic>without you wiring multiple clients. - Background jobs. Laravel-style
Jobclasses with retries, delays, timeouts, and Redis or MySQL queue backends. - Built-in observability. Optional
/health,/ready, and/metricsHTTP endpoints, lifecycle counters, latency histograms, and OpenTelemetry-shaped spans. No mandatory vendor backend. - Optional integrations. Redis, MySQL, ClickHouse for time-series telemetry, and a Prometheus client adapter. All opt-in extras.
- Supply-chain hardened. OpenSSF Scorecard, SLSA L3 provenance, signed CycloneDX SBOMs, Bandit, pip-audit, and Dependabot on every release.
Quick Start
Install the mode you need:
| Install | Use it for |
|---|---|
routemq |
Runtime engine: routing, middleware, jobs, MySQL queue, app boot. |
routemq[cli] |
Runtime plus the routemq new scaffolder. Start here for a new project. |
routemq[redis] |
Runtime plus Redis support for queues, cache, rate limits, and shared state. |
routemq[all] |
CLI, Redis, Prometheus, and ClickHouse extras in one install. |
uv add "routemq[cli]" # add to an existing uv project
pip install "routemq[cli]" # install into the active Python environment
Create a project and one route:
uvx --from "routemq[cli]" routemq new sensor-demo
# or, after pip install "routemq[cli]": routemq new sensor-demo
cd sensor-demo
# app/controllers/device_controller.py
from routemq.controller import Controller
class DeviceController(Controller):
@staticmethod
async def status(device_id, payload, client):
print(f"device {device_id}: {payload}")
return {"ok": True}
# app/routers/devices.py
from routemq.router import Router
from app.controllers.device_controller import DeviceController
router = Router()
router.on("devices/{device_id}/status", DeviceController.status, qos=1)
Point .env at a broker and run the app:
MQTT_BROKER=test.mosquitto.org
MQTT_PORT=1883
uv run routemq run
mosquitto_pub -h test.mosquitto.org -t devices/42/status -m '{"temp":21}'
RouteMQ imports app.routers.*, subscribes to devices/+/status, and calls DeviceController.status(device_id="42", payload={"temp": 21}, ...).
When should I use RouteMQ?
| If you need... | Use |
|---|---|
| Low-level MQTT protocol control, custom session/QoS handling | paho-mqtt |
| A web-framework-style structure for an MQTT-first app | RouteMQ |
| Multi-broker streaming across Kafka, RabbitMQ, NATS, Redis, MQTT | FastStream |
| General distributed task queues independent of a broker protocol | Celery |
RouteMQ sits on top of paho-mqtt. You keep proven protocol behavior and add structure, async handlers, and scaling.
Routes, middleware, and scaling in one snippet
The minimal example above scales up cleanly:
from routemq.router import Router
from app.middleware.rate_limit import RateLimit
from app.controllers.device_controller import DeviceController
router = Router()
with router.group(prefix="devices", middleware=[RateLimit(60)]) as devices:
devices.on(
"{device_id}/status",
DeviceController.handle_status,
qos=1,
shared=True,
worker_count=3,
)
- The
{device_id}token compiles to a regex with a named group and to a+wildcard for the MQTT subscription. shared=Trueswitches the subscription to$share/<group>/devices/+/statusand spawns three worker processes.RateLimit(60)runs as middleware before the handler. Auth, logging, and validation work the same way.
Background jobs
Push slow work out of the MQTT handler. Register concrete jobs so workers can deserialize them safely:
# app/jobs/send_alert_job.py
from routemq.job import Job
@Job.register
class SendAlertJob(Job):
queue = "alerts"
max_tries = 3
retry_after = 10
def __init__(self):
super().__init__()
self.device_id = None
self.payload = {}
async def handle(self):
print(f"alert for {self.device_id}: {self.payload}")
Dispatch the job from a controller or handler:
from routemq.queue import dispatch
from app.jobs.send_alert_job import SendAlertJob
async def handler(device_id, payload, client):
if payload.get("status") == "critical":
job = SendAlertJob()
job.device_id = device_id
job.payload = payload
await dispatch(job)
return {"ok": True}
Run a worker:
routemq queue-work --queue alerts --connection redis
Queue backends: Redis with routemq[redis], or MySQL with base routemq when ENABLE_MYSQL=true.
Real-world sensor telemetry
A sensor pipeline usually has three parts: MQTT routing, queued processing, and a local stack with a broker plus Redis.
# app/routers/sensors.py
from routemq.router import Router
from app.controllers.sensor_controller import SensorController
router = Router()
router.on("sensors/{device_id}/telemetry", SensorController.ingest, qos=1)
# app/controllers/sensor_controller.py
from routemq.controller import Controller
from routemq.queue import dispatch
from app.jobs.store_telemetry_job import StoreTelemetryJob
class SensorController(Controller):
@staticmethod
async def ingest(device_id, payload, client):
job = StoreTelemetryJob()
job.device_id = device_id
job.payload = payload
await dispatch(job)
return {"accepted": True, "device_id": device_id}
# app/jobs/store_telemetry_job.py
from routemq.job import Job
@Job.register
class StoreTelemetryJob(Job):
queue = "telemetry"
max_tries = 5
def __init__(self):
super().__init__()
self.device_id = None
self.payload = {}
async def handle(self):
temperature = self.payload.get("temperature")
print(f"store {self.device_id}: temperature={temperature}")
Run it against a local broker and Redis queue:
# docker-compose.yml
services:
mosquitto:
image: eclipse-mosquitto:2
ports: ["1883:1883"]
redis:
image: redis:7-alpine
ports: ["6379:6379"]
MQTT_BROKER=localhost
MQTT_PORT=1883
ENABLE_REDIS=true
QUEUE_CONNECTION=redis
docker compose up -d
uv run routemq run
uv run routemq queue-work --queue telemetry --connection redis
mosquitto_pub -h localhost -t sensors/pump-7/telemetry -m '{"temperature":31.2}'
Observability
RouteMQ ships health, readiness, and OpenMetrics endpoints, off by default. Set METRICS_HTTP_ENABLED=true to expose them:
curl http://localhost:8080/health # liveness
curl http://localhost:8080/ready # MQTT readiness
curl http://localhost:8080/metrics # OpenMetrics / Prometheus text
Built-in metric families include mqtt_messages_*, router_dispatch_*, queue_job_*, queue-depth gauges, tsdb_write_*, and latency histograms for each. Spans follow OpenTelemetry-shaped semantics (db.system, db.operation, messaging.system, kind=client|consumer|producer|internal).
For details: Metrics · Health checks · Pool tuning evidence
Optional extras
uv add routemq # base runtime
uv add "routemq[cli]" # scaffolder and rich terminal prompts
uv add "routemq[redis]" # Redis queue + rate limiting backend
uv add "routemq[clickhouse]" # ClickHouse time-series telemetry
uv add "routemq[prometheus]" # multiprocess-safe Prometheus client adapter
uv add "routemq[all]" # everything above plus CLI
# pip works too:
pip install routemq
pip install "routemq[cli]"
pip install "routemq[redis]"
pip install "routemq[all]"
Docker
The scaffolder can drop a docker-compose.yml with Redis, MySQL, the app, and queue workers:
uvx --from "routemq[cli]" routemq new my-app --with-docker --with-redis --with-mysql --with-queue
cd my-app
docker compose up -d
docker compose up -d --scale queue-worker-default=5
For a local MQTT broker, add Mosquitto to the same compose file:
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
Then set MQTT_BROKER=mosquitto for containers, or MQTT_BROKER=localhost when running RouteMQ on your host.
Documentation
- Getting Started, installation, first route, environment
- Architecture, message flow diagram and runtime components
- Configuration, every environment variable, with defaults
- Routing · Controllers · Middleware
- Queue System, jobs, workers, drivers
- Rate Limiting, strategies and Redis backend
- Redis · Database · TSDB / ClickHouse
- Monitoring, metrics, health, traces
- Docker Deployment · Testing
- Examples · API Reference · FAQ
- Release Conformance, SLSA, Scorecard, SBOM, SemVer
Project Health
- Security Policy, private vulnerability reporting and supported versions
- Contributing, issues, PRs, tests, coding standards
- Code of Conduct
- Changelog
- Issue Tracker
Hacking on the framework
git clone https://github.com/ardzz/RouteMQ.git
cd RouteMQ
uv sync --all-extras --dev
uv run python run_tests.py # 581 tests, ~3 seconds
See TEMPLATE.md if you want to fork the framework rather than depend on the published wheel.
License
MIT, see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file routemq-0.23.0.tar.gz.
File metadata
- Download URL: routemq-0.23.0.tar.gz
- Upload date:
- Size: 739.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4d269e6262776030c3d77c41a131f9bf4b4871353e980fcb3282e60d14fcac31
|
|
| MD5 |
0f7691b5d30a2bfe870547bc8f3e5033
|
|
| BLAKE2b-256 |
be0186349f5fce19e481f898b7592ebd6c7dd79dcb6958d9719c4669c7b124e7
|
Provenance
The following attestation bundles were made for routemq-0.23.0.tar.gz:
Publisher:
publish.yml on ardzz/RouteMQ
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
routemq-0.23.0.tar.gz -
Subject digest:
4d269e6262776030c3d77c41a131f9bf4b4871353e980fcb3282e60d14fcac31 - Sigstore transparency entry: 1672873764
- Sigstore integration time:
-
Permalink:
ardzz/RouteMQ@6c6cd9c16ce89adf06883e2704a88354d010eee4 -
Branch / Tag:
refs/tags/v0.23.0 - Owner: https://github.com/ardzz
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@6c6cd9c16ce89adf06883e2704a88354d010eee4 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file routemq-0.23.0-py3-none-any.whl.
File metadata
- Download URL: routemq-0.23.0-py3-none-any.whl
- Upload date:
- Size: 99.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60689f51b739b55cbf7a945b0b5333f697522727206ef3c3779ff0d9f2112da2
|
|
| MD5 |
9d3f50c83a7c452b67630f88d8025e48
|
|
| BLAKE2b-256 |
bb46c437aa2f395a356ac20db0569581182ef62c3f7a5731058dfd07c69b5a4e
|
Provenance
The following attestation bundles were made for routemq-0.23.0-py3-none-any.whl:
Publisher:
publish.yml on ardzz/RouteMQ
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
routemq-0.23.0-py3-none-any.whl -
Subject digest:
60689f51b739b55cbf7a945b0b5333f697522727206ef3c3779ff0d9f2112da2 - Sigstore transparency entry: 1672873816
- Sigstore integration time:
-
Permalink:
ardzz/RouteMQ@6c6cd9c16ce89adf06883e2704a88354d010eee4 -
Branch / Tag:
refs/tags/v0.23.0 - Owner: https://github.com/ardzz
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@6c6cd9c16ce89adf06883e2704a88354d010eee4 -
Trigger Event:
workflow_dispatch
-
Statement type: