Skip to main content

A flexible MQTT routing framework with middleware support

Project description

RouteMQ

RouteMQ

Laravel-style MQTT routing for Python: controllers, middleware, jobs, and shared-subscription scaling without callback spaghetti.

PyPI Python License: MIT OpenSSF Scorecard Status: Beta

Status: Beta. Use RouteMQ in test/staging while we harden the v1 surface. The framework is fully tested (581 unit tests) and supply-chain hardened, but APIs may shift before 1.0.


Documentation: docs/ · Source: github.com/ardzz/RouteMQ · PyPI: routemq

RouteMQ is a Python 3.12+ MQTT application framework that turns topics into async controller methods through middleware chains, with optional background jobs and shared-subscription workers.

RouteMQ gives you:

  • Route topics like web routes. Declare devices/{id}/status once; receive id as a typed handler argument.
  • Controllers and middleware. Keep handlers in app/controllers; layer auth, logging, rate limiting, and validation as reusable middleware.
  • Async by default. Use async Redis, MySQL (SQLAlchemy), ClickHouse, and job dispatch naturally inside handlers. RouteMQ bridges paho-mqtt's sync callbacks for you.
  • Shared-subscription scaling. Flip shared=True on a high-volume route; RouteMQ spawns worker processes against $share/<group>/<topic> without you wiring multiple clients.
  • Background jobs. Laravel-style Job classes with retries, delays, timeouts, and Redis or MySQL queue backends.
  • Built-in observability. Optional /health, /ready, and /metrics HTTP endpoints, lifecycle counters, latency histograms, and OpenTelemetry-shaped spans. No mandatory vendor backend.
  • Optional integrations. Redis, MySQL, ClickHouse for time-series telemetry, and a Prometheus client adapter. All opt-in extras.
  • Supply-chain hardened. OpenSSF Scorecard, SLSA L3 provenance, signed CycloneDX SBOMs, Bandit, pip-audit, and Dependabot on every release.

Quick Start

Install the mode you need:

Install Use it for
routemq Runtime engine: routing, middleware, jobs, MySQL queue, app boot.
routemq[cli] Runtime plus the routemq new scaffolder. Start here for a new project.
routemq[redis] Runtime plus Redis support for queues, cache, rate limits, and shared state.
routemq[all] CLI, Redis, Prometheus, and ClickHouse extras in one install.
uv add "routemq[cli]"          # add to an existing uv project
pip install "routemq[cli]"     # install into the active Python environment

Create a project and one route:

uvx --from "routemq[cli]" routemq new sensor-demo
# or, after pip install "routemq[cli]": routemq new sensor-demo
cd sensor-demo
# app/controllers/device_controller.py
from routemq.controller import Controller

class DeviceController(Controller):
    @staticmethod
    async def status(device_id, payload, client):
        print(f"device {device_id}: {payload}")
        return {"ok": True}

# app/routers/devices.py
from routemq.router import Router
from app.controllers.device_controller import DeviceController

router = Router()
router.on("devices/{device_id}/status", DeviceController.status, qos=1)

Point .env at a broker and run the app:

MQTT_BROKER=test.mosquitto.org
MQTT_PORT=1883
uv run routemq run
mosquitto_pub -h test.mosquitto.org -t devices/42/status -m '{"temp":21}'

RouteMQ imports app.routers.*, subscribes to devices/+/status, and calls DeviceController.status(device_id="42", payload={"temp": 21}, ...).

When should I use RouteMQ?

If you need... Use
Low-level MQTT protocol control, custom session/QoS handling paho-mqtt
A web-framework-style structure for an MQTT-first app RouteMQ
Multi-broker streaming across Kafka, RabbitMQ, NATS, Redis, MQTT FastStream
General distributed task queues independent of a broker protocol Celery

RouteMQ sits on top of paho-mqtt. You keep proven protocol behavior and add structure, async handlers, and scaling.

Routes, middleware, and scaling in one snippet

The minimal example above scales up cleanly:

from routemq.router import Router
from app.middleware.rate_limit import RateLimit
from app.controllers.device_controller import DeviceController

router = Router()

with router.group(prefix="devices", middleware=[RateLimit(60)]) as devices:
    devices.on(
        "{device_id}/status",
        DeviceController.handle_status,
        qos=1,
        shared=True,
        worker_count=3,
    )
  • The {device_id} token compiles to a regex with a named group and to a + wildcard for the MQTT subscription.
  • shared=True switches the subscription to $share/<group>/devices/+/status and spawns three worker processes.
  • RateLimit(60) runs as middleware before the handler. Auth, logging, and validation work the same way.

Background jobs

Push slow work out of the MQTT handler. Register concrete jobs so workers can deserialize them safely:

# app/jobs/send_alert_job.py
from routemq.job import Job


@Job.register
class SendAlertJob(Job):
    queue = "alerts"
    max_tries = 3
    retry_after = 10

    def __init__(self):
        super().__init__()
        self.device_id = None
        self.payload = {}

    async def handle(self):
        print(f"alert for {self.device_id}: {self.payload}")

Dispatch the job from a controller or handler:

from routemq.queue import dispatch
from app.jobs.send_alert_job import SendAlertJob


async def handler(device_id, payload, client):
    if payload.get("status") == "critical":
        job = SendAlertJob()
        job.device_id = device_id
        job.payload = payload
        await dispatch(job)
    return {"ok": True}

Run a worker:

routemq queue-work --queue alerts --connection redis

Queue backends: Redis with routemq[redis], or MySQL with base routemq when ENABLE_MYSQL=true.

Real-world sensor telemetry

A sensor pipeline usually has three parts: MQTT routing, queued processing, and a local stack with a broker plus Redis.

# app/routers/sensors.py
from routemq.router import Router
from app.controllers.sensor_controller import SensorController

router = Router()
router.on("sensors/{device_id}/telemetry", SensorController.ingest, qos=1)
# app/controllers/sensor_controller.py
from routemq.controller import Controller
from routemq.queue import dispatch
from app.jobs.store_telemetry_job import StoreTelemetryJob


class SensorController(Controller):
    @staticmethod
    async def ingest(device_id, payload, client):
        job = StoreTelemetryJob()
        job.device_id = device_id
        job.payload = payload
        await dispatch(job)
        return {"accepted": True, "device_id": device_id}
# app/jobs/store_telemetry_job.py
from routemq.job import Job


@Job.register
class StoreTelemetryJob(Job):
    queue = "telemetry"
    max_tries = 5

    def __init__(self):
        super().__init__()
        self.device_id = None
        self.payload = {}

    async def handle(self):
        temperature = self.payload.get("temperature")
        print(f"store {self.device_id}: temperature={temperature}")

Run it against a local broker and Redis queue:

# docker-compose.yml
services:
  mosquitto:
    image: eclipse-mosquitto:2
    ports: ["1883:1883"]
  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
MQTT_BROKER=localhost
MQTT_PORT=1883
ENABLE_REDIS=true
QUEUE_CONNECTION=redis
docker compose up -d
uv run routemq run
uv run routemq queue-work --queue telemetry --connection redis
mosquitto_pub -h localhost -t sensors/pump-7/telemetry -m '{"temperature":31.2}'

Observability

RouteMQ ships health, readiness, and OpenMetrics endpoints, off by default. Set METRICS_HTTP_ENABLED=true to expose them:

curl http://localhost:8080/health     # liveness
curl http://localhost:8080/ready      # MQTT readiness
curl http://localhost:8080/metrics    # OpenMetrics / Prometheus text

Built-in metric families include mqtt_messages_*, router_dispatch_*, queue_job_*, queue-depth gauges, tsdb_write_*, and latency histograms for each. Spans follow OpenTelemetry-shaped semantics (db.system, db.operation, messaging.system, kind=client|consumer|producer|internal).

For details: Metrics · Health checks · Pool tuning evidence

Optional extras

uv add routemq                 # base runtime
uv add "routemq[cli]"          # scaffolder and rich terminal prompts
uv add "routemq[redis]"        # Redis queue + rate limiting backend
uv add "routemq[clickhouse]"   # ClickHouse time-series telemetry
uv add "routemq[prometheus]"   # multiprocess-safe Prometheus client adapter
uv add "routemq[all]"          # everything above plus CLI

# pip works too:
pip install routemq
pip install "routemq[cli]"
pip install "routemq[redis]"
pip install "routemq[all]"

Docker

The scaffolder can drop a docker-compose.yml with Redis, MySQL, the app, and queue workers:

uvx --from "routemq[cli]" routemq new my-app --with-docker --with-redis --with-mysql --with-queue
cd my-app
docker compose up -d
docker compose up -d --scale queue-worker-default=5

For a local MQTT broker, add Mosquitto to the same compose file:

services:
  mosquitto:
    image: eclipse-mosquitto:2
    ports:
      - "1883:1883"

Then set MQTT_BROKER=mosquitto for containers, or MQTT_BROKER=localhost when running RouteMQ on your host.

Documentation

Project Health

Hacking on the framework

git clone https://github.com/ardzz/RouteMQ.git
cd RouteMQ
uv sync --all-extras --dev
uv run python run_tests.py     # 581 tests, ~3 seconds

See TEMPLATE.md if you want to fork the framework rather than depend on the published wheel.

License

MIT, see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

routemq-0.23.0.tar.gz (739.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

routemq-0.23.0-py3-none-any.whl (99.3 kB view details)

Uploaded Python 3

File details

Details for the file routemq-0.23.0.tar.gz.

File metadata

  • Download URL: routemq-0.23.0.tar.gz
  • Upload date:
  • Size: 739.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for routemq-0.23.0.tar.gz
Algorithm Hash digest
SHA256 4d269e6262776030c3d77c41a131f9bf4b4871353e980fcb3282e60d14fcac31
MD5 0f7691b5d30a2bfe870547bc8f3e5033
BLAKE2b-256 be0186349f5fce19e481f898b7592ebd6c7dd79dcb6958d9719c4669c7b124e7

See more details on using hashes here.

Provenance

The following attestation bundles were made for routemq-0.23.0.tar.gz:

Publisher: publish.yml on ardzz/RouteMQ

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file routemq-0.23.0-py3-none-any.whl.

File metadata

  • Download URL: routemq-0.23.0-py3-none-any.whl
  • Upload date:
  • Size: 99.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for routemq-0.23.0-py3-none-any.whl
Algorithm Hash digest
SHA256 60689f51b739b55cbf7a945b0b5333f697522727206ef3c3779ff0d9f2112da2
MD5 9d3f50c83a7c452b67630f88d8025e48
BLAKE2b-256 bb46c437aa2f395a356ac20db0569581182ef62c3f7a5731058dfd07c69b5a4e

See more details on using hashes here.

Provenance

The following attestation bundles were made for routemq-0.23.0-py3-none-any.whl:

Publisher: publish.yml on ardzz/RouteMQ

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page