Skip to main content

FastAPI-like MQTT/MQTTS server for Python, compatible with libcurl clients

Project description

MQTTD - FastAPI-like MQTT/MQTTS Server

A high-performance Python package for creating MQTT and MQTTS servers with a FastAPI-like decorator-based API. Fully compatible with libcurl clients and designed for production use.

Now supports MQTT 5.0 with full backward compatibility for MQTT 3.1.1.


Supported Features (Code-Verified)

The following features are implemented and used in the codebase (ref-code/mqttd). This list is derived from line-by-line analysis of mqttd/app.py, mqttd/session.py, mqttd/thread_safe.py, and related modules.

Core Features

  • FastAPI-like API: Decorators @app.subscribe(topic) and @app.publish_handler(topic) for topic subscriptions and PUBLISH handlers
  • MQTT 5.0 Protocol: Full support with automatic protocol detection (MQTT 3.1.1 vs 5.0)
  • MQTT 3.1.1 Compatibility: Full backward compatibility
  • MQTTS Support: TLS/SSL via ssl_context (e.g. port 8883)
  • QUIC Transport: Optional MQTT over QUIC (ngtcp2, pure Python, or aioquic)
  • Async/Await: Built on asyncio; one task per client connection
  • Configuration File: config_file with options (version, PUBLISH-before-SUBACK, short-PUBLISH, error-CONNACK, excessive-remaining, Testnum)

Multiple Concurrent Clients

  • Per-connection tasks: Each TCP or QUIC connection is handled by a dedicated asyncio task (_handle_client)
  • Connection state: _clients dict maps socket to (MQTTClient, StreamWriter); connection limits via max_connections and max_connections_per_ip
  • Session management: Per ClientID sessions (SessionManager); session takeover and concurrent same-ClientID handling per MQTT 5.0 (Clean Start, Session Present, Session Expiry Interval)

MQTT 5.0 Features

  • Reason codes: In CONNACK, SUBACK, UNSUBACK, PUBACK, etc.
  • Properties: Full encode/decode for property types including User Properties, Message Expiry Interval, Topic Aliases, Response Topic, Correlation Data, Content Type, Subscription Identifier, Receive Maximum, etc.
  • Session: Session Expiry Interval, Clean Start, Session Present, session takeover, expired session cleanup
  • Flow control: Receive Maximum negotiation; in-flight QoS tracking per client
  • Will message: Last Will and Testament with MQTT 5.0 properties; Will Delay Interval supported (delayed send after disconnect)
  • Subscription options: No Local, Retain As Published, Retain Handling (0/1/2) per subscription
  • Topic aliases: Server-side alias mapping per session
  • Message expiry: Message Expiry Interval checked before forwarding

Routing Modes

  • Direct routing (default): In-memory routing; topic trie + shared subscription trie for O(m) lookup
  • Redis Pub/Sub (optional): Publish to Redis channel by topic; subscribe to Redis when MQTT clients subscribe; _redis_message_listener forwards Redis messages to MQTT clients

Redis and MCP Request/Response

  • Redis connection: Optional redis_host/redis_port/redis_url; _connect_redis(), _disconnect_redis(), health check reports redis_connected
  • Redis Pub/Sub: Publish on PUBLISH; subscribe per topic; forward Redis messages to subscribed MQTT clients
  • Store until MCP response, then reply to client: When a client sends a PUBLISH with MQTT 5.0 Response Topic (and optionally Correlation Data), the server:
    • Stores request context in Redis at mqttd:request:{correlation_id} with TTL (e.g. 300s)
    • Publishes to channel mqttd:mcp:requests for MCP workers (payload: correlation_id, topic, payload_b64, response_topic)
    • Subscribes to mqttd:mcp:responses; when a response message arrives (JSON: correlation_id, payload_b64), looks up the stored request, forwards the reply to response_topic, and deletes the request key

Retained Messages

  • Store/delete: Retained PUBLISH stored in _retained_messages; empty payload with retain clears the topic
  • Delivery on subscribe: _deliver_retained_messages with MQTT 5.0 retain_handling and retain_as_published

Shared Subscriptions (MQTT 5.0)

  • Syntax: $share/group/topic; round-robin delivery per group via _shared_trie and _shared_group_index

Keepalive and Timeouts

  • Keepalive tracking: _client_keepalive stores last_activity, keepalive_seconds, and optional ping task per socket
  • Activity reset: On any received message (including PINGREQ and PUBLISH), last_activity is updated so timeout is effectively reset when the client is active
  • Keepalive monitor: Background task disconnects if no activity for 1.5× keepalive interval
  • Read timeout: reader.read() uses keepalive-based timeout (1.5× keepalive) so idle connections are closed

Rate Limiting

  • Per-client: _rate_limits tracks message count and subscription count per socket
  • Options: max_messages_per_second, max_subscriptions_per_minute; _check_rate_limit() used on PUBLISH and SUBSCRIBE

Observability and Admin

  • Metrics: get_metrics() returns total_connections, current_connections, total_messages_published/received, total_subscriptions/unsubscriptions, retained_messages_count, active_subscriptions_count, connections_per_ip
  • Health: health_check() returns status (healthy/degraded), running, connections, max_connections, redis_connected, errors list
  • Graceful shutdown: shutdown(timeout) sets _running, closes server, waits for connections to drain (with timeout)

Programmatic Publish

  • To all: Normal PUBLISH routing and optional app.publish(topic, payload, qos, retain) when using Redis
  • To one client: publish_to_client(client, topic, payload, qos, retain) sends a PUBLISH to a specific client by client_id

Thread-Safety and No-GIL

  • Thread-safe structures (mqttd/thread_safe.py): ThreadSafeDict, ThreadSafeSet, ThreadSafeTopicTrie, ThreadSafeConnectionPool (RLock-based) for use with Python 3.13+ no-GIL or 3.14t
  • Topic lookup: _topic_trie and _shared_trie are ThreadSafeTopicTrie for O(m) subscription matching

Transport

  • TCP: asyncio.start_server(_handle_client, host, port, ssl=...); can be disabled with enable_tcp=False
  • QUIC: ngtcp2 (preferred), pure Python, or aioquic; enable_quic, quic_port, quic_certfile, quic_keyfile; QUIC-only mode when TCP disabled

Installation

Basic

pip install -e .

Requirements

  • Python: 3.7+ (3.13+ recommended for no-GIL; 3.14t for free-threaded)
  • Redis: Optional — only for Redis pub/sub and MCP request/response (pip install redis>=5.0.0 or pip install -e ".[redis]")

QUIC (ngtcp2 + WolfSSL)

./scripts/build-server.sh   # then pip install -e .

See docs/BUILD_SERVER.md for details.


Quick Start

Basic server (direct routing, no Redis)

from mqttd import MQTTApp, MQTTMessage, MQTTClient

app = MQTTApp(port=1883)

@app.subscribe("sensors/temperature")
async def on_subscribe(topic: str, client: MQTTClient):
    print(f"Client {client.client_id} subscribed to {topic}")

@app.publish_handler("sensors/+")
async def on_publish(message: MQTTMessage, client: MQTTClient):
    print(f"Received {message.topic}: {message.payload_str}")

if __name__ == "__main__":
    app.run()

Multiple clients and connection limits

app = MQTTApp(
    port=1883,
    max_connections=1000,
    max_connections_per_ip=50
)
# Each connection gets its own task; sessions are per ClientID
app.run()

Redis Pub/Sub (multi-server)

app = MQTTApp(
    port=1883,
    redis_host="localhost",
    redis_port=6379
)

@app.subscribe("sensors/#")
async def on_sub(topic: str, client: MQTTClient):
    print(f"{client.client_id} subscribed to {topic}")

@app.publish_handler("sensors/+")
async def on_pub(message: MQTTMessage, client: MQTTClient):
    print(f"PUBLISH {message.topic} (published to Redis)")

app.run()

Redis + MCP request/response (store until agent replies)

Client sends PUBLISH with MQTT 5.0 Response Topic and optional Correlation Data. Server stores the request in Redis, publishes to mqttd:mcp:requests; when an MCP worker publishes a response to mqttd:mcp:responses, the server forwards the reply to the client’s response topic.

Server (with Redis):

app = MQTTApp(port=1883, redis_host="localhost", redis_port=6379)

@app.subscribe("devices/+/request")
async def on_request_sub(topic: str, client: MQTTClient):
    print(f"Client {client.client_id} subscribed to {topic}")

@app.publish_handler("devices/+/request")
async def on_request_pub(message: MQTTMessage, client: MQTTClient):
    # Request is auto-stored in Redis (when response_topic is set) and
    # published to mqttd:mcp:requests; MCP workers consume and reply
    # to mqttd:mcp:responses; server then forwards to response_topic
    print(f"Request on {message.topic} from {client.client_id}")

app.run()

MCP worker contract (Redis):

  • Subscribe to Redis channel mqttd:mcp:requests. Each message is JSON: correlation_id, topic, payload_b64, response_topic.
  • After calling your MCP agent, publish to Redis channel mqttd:mcp:responses a JSON message: {"correlation_id": "<id>", "payload_b64": "<base64 reply>"}.

Client (MQTT 5.0): Publish with Response Topic and optional Correlation Data so the server stores the request and later delivers the reply on that topic.

Metrics and health (e.g. for admin API)

app = MQTTApp(port=1883)

# In another thread or admin endpoint:
metrics = app.get_metrics()
# total_connections, current_connections, total_messages_published,
# retained_messages_count, active_subscriptions_count, connections_per_ip, ...

health = app.health_check()
# status, running, connections, max_connections, redis_connected, errors

MQTTS (TLS)

import ssl
from mqttd import MQTTApp, MQTTClient

ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_ctx.load_cert_chain("server.crt", "server.key")

app = MQTTApp(port=8883, ssl_context=ssl_ctx)

@app.subscribe("secure/topic")
async def on_secure(topic: str, client: MQTTClient):
    print(f"Secure client: {client.client_id} -> {topic}")

app.run()

MQTT over QUIC (QUIC-only)

app = MQTTApp(
    enable_tcp=False,
    enable_quic=True,
    quic_port=1884,
    quic_certfile="cert.pem",
    quic_keyfile="key.pem",
)

@app.subscribe("sensors/#")
async def on_sensor(topic: str, client: MQTTClient):
    print(f"[{client.client_id}] Subscribed to {topic}")

app.run()

Shared subscriptions (MQTT 5.0)

Clients subscribe with $share/groupname/topic. Server delivers each message to one member of the group (round-robin).

app = MQTTApp(port=1883)

@app.subscribe("$share/workers/commands")
async def on_shared(topic: str, client: MQTTClient):
    print(f"Shared sub: {client.client_id} -> {topic}")

app.run()

Configuration file

Create mqttd.config:

version 5
app = MQTTApp(port=1883, config_file="mqttd.config")
app.run()

Configuration Options

MQTTApp(
    host="0.0.0.0",
    port=1883,
    ssl_context=None,
    config_file=None,
    redis_host=None,
    redis_port=6379,
    redis_db=0,
    redis_password=None,
    redis_url=None,
    use_redis=False,
    enable_tcp=True,
    enable_quic=False,
    quic_port=1884,
    quic_certfile=None,
    quic_keyfile=None,
    max_connections=None,
    max_connections_per_ip=None,
    max_messages_per_second=None,
    max_subscriptions_per_minute=None,
)

API Reference (Summary)

  • @app.subscribe(topic, qos=0) — Subscription handler; optional return bytes to send to subscriber.
  • @app.publish_handler(topic=None) — PUBLISH handler; topic filter or all if None.
  • app.run(host=None, port=None, ssl_context=None) — Blocking run.
  • app.get_metrics() — Dict of server metrics.
  • app.health_check() — Dict with status, running, connections, redis_connected, errors.
  • app.shutdown(timeout=30.0) — Graceful shutdown (async).
  • app.publish(topic, payload, qos=0, retain=False) — Programmatic publish (async; when Redis used).
  • app.publish_to_client(client, topic, payload, qos=0, retain=False) — Send PUBLISH to one client (async).

Types: MQTTMessage (topic, payload, qos, retain, packet_id, payload_str, payload_json), MQTTClient (client_id, username, password, keepalive, clean_session, address).


Architecture (Summary)

  • Multiple clients: One asyncio task per connection; _clients dict; SessionManager per ClientID.
  • Routing: Direct (in-memory trie) or Redis pub/sub; optional MCP flow via Redis keys/channels.
  • Thread-safety: ThreadSafeTopicTrie (and related structures in thread_safe.py) for no-GIL readiness.
  • Protocols: CONNECT/CONNACK, PUBLISH, PUBACK/PUBREC/PUBREL/PUBCOMP, SUBSCRIBE/SUBACK, UNSUBSCRIBE/UNSUBACK, PINGREQ/PINGRESP, DISCONNECT.

Examples

See examples/:

  • basic_server.py — Basic MQTT server
  • mqtt5_server.py — MQTT 5.0
  • secure_server.py — MQTTS
  • redis_server.py — Redis pub/sub
  • direct_routing_server.py — Direct routing
  • mqtt_quic_server.py / mqtt_quic_only_server.py — QUIC
  • config_server.py — Config file

Testing

python tests/test_new_features.py
# or
pytest tests/ -v

License

MIT License.

Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mqttd-0.5.0.tar.gz (100.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mqttd-0.5.0-py3-none-any.whl (93.4 kB view details)

Uploaded Python 3

File details

Details for the file mqttd-0.5.0.tar.gz.

File metadata

  • Download URL: mqttd-0.5.0.tar.gz
  • Upload date:
  • Size: 100.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for mqttd-0.5.0.tar.gz
Algorithm Hash digest
SHA256 72aa1d045df3533d9e77c2bbe8f6cb3eae070f7d5c2868b53aefd3e4bcb90449
MD5 a039f144122477bd1739ec0b192f2215
BLAKE2b-256 8a326e7491b36e3c3f47b6590e5d397f4344a05edfe5f0715d8254fe38b20996

See more details on using hashes here.

File details

Details for the file mqttd-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: mqttd-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 93.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for mqttd-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 62b2b4a552c97e7af520d1c1b494d180c2bcc67d5b87473196a1a62db5e2e28e
MD5 6271bf72876f7316d8461b3e176157cc
BLAKE2b-256 e21cfba636dfb2938975e5f012e7d2368529623f71848173f0820143bf2153e3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page