Asynchronous MQTT client library for Python with optional Django integration
Project description
django-mqtt-client
Unified inter-service event bus utility for Django services (MQTT + RabbitMQ).
Release Notes
1.0.0
- Major internal refactor into a package-based layout under
django_mqtt_client/event_bus/for better maintainability. - Worker orchestration and broker logic split into focused modules:
event_bus/worker.pyevent_bus/rabbitmq/*event_bus/mqtt/*event_bus/shared/*event_bus/inbound.py,event_bus/outbound.py
- RabbitMQ topology guard added at startup:
- validates management API snapshot
- creates missing exchanges/queues/bindings
- retries indefinitely until topology is ready
- RabbitMQ consumer flow simplified to queue-driven consumption (from DB config), with Celery-managed topology excluded.
- Outbound RabbitMQ exchange resolution improved:
- supports source-based exchange selection (
<source_service>_exchange) with cache/fallback.
- supports source-based exchange selection (
- Test suite updated for the new module structure and async loop stability.
event_bus_workernow resolves broker type from runtimeconfig_key; no--broker-typeoverride.
Objectives
- Standardize inter-service messaging using one RabbitMQ envelope schema.
- Support request/response/event patterns with topic conventions.
- Ensure reliable publish/consume with retries, backoff, and worker heartbeats.
- Persist envelope metadata in DB (
message_type,message_status,source_service,reply_to,parent_request_id,chain_depth). - Guarantee message identity with
message_id(auto-generated/fallback fromOutboundEvent.idwhen missing on outbound). - Keep MQTT inbound compatible for EdgeX events while enforcing schema for RabbitMQ.
What it does
- Outbound flow: DB
OutboundEvent-> broker publish - Inbound flow: broker consume -> DB
InboundEvent - Shared RabbitMQ envelope validation via
RabbitMqSchemaValidator - Retry + backoff for outbound publish failures
- Worker heartbeat/status updates via runtime config table
Install
pip install django-mqtt-client
Django setup
INSTALLED_APPS = [
# ...
"django_mqtt_client",
]
Run migrations:
python manage.py migrate
Key modules
django_mqtt_client/event_bus/worker.py- async worker orchestrator
django_mqtt_client/event_bus/rabbitmq/schema.py- envelope normalize/validate for RabbitMQ messages
django_mqtt_client/exceptions.py- broker-neutral exceptions
RabbitMQ envelope
Schema: interservice.envelope.v1
{
"schema": "interservice.envelope.v1",
"message_id": "uuid",
"message_type": "request",
"topic": "edgenexus.request.machine.read",
"source_service": "ebmr",
"request_id": "req-100",
"parent_request_id": "req-99",
"chain_depth": 1,
"reply_to": "ebmr.response.machine.read",
"payload": {
"machine_id": "M1"
}
}
Message type rules
request: requiresrequest_id,reply_toresponse: requiresrequest_id,status(successorerror)event: requiresevent_type
message_id behavior
- Envelope includes
message_id. - Outbound RabbitMQ: if
message_idis missing, utility auto-generates a UUID. - Outbound fallback: if DB row has no envelope
message_id,OutboundEvent.idis used before validation. - Inbound RabbitMQ:
message_idmust be present after normalization; invalid messages are dropped.
Topic rules
- Request:
<target_service>.request.<entity>.<action>[.<subaction>...] - Response:
<requesting_service>.response.<entity>.<action>[.<subaction>...] - Event:
event.<domain>.<entity>.<action>[.<subaction>...]
Examples:
edgenexus.request.machine.readebmr.response.machine.readevent.operational.machine.status.changed
Message samples
Request:
{
"message_type": "request",
"topic": "edgenexus.request.machine.read",
"request_id": "req-100",
"reply_to": "ebmr.response.machine.read",
"payload": {
"machine_id": "M1"
}
}
Response:
{
"message_type": "response",
"topic": "ebmr.response.machine.read",
"request_id": "req-100",
"status": "success",
"payload": {
"machine_id": "M1",
"state": "running"
}
}
Event:
{
"message_type": "event",
"topic": "event.operational.machine.status.changed",
"event_type": "machine.status.changed",
"payload": {
"machine_id": "M1",
"status": "running"
}
}
Chaining sample:
{
"message_type": "request",
"topic": "edgenexus.request.batch.start",
"request_id": "req-200",
"parent_request_id": "req-100",
"chain_depth": 1,
"reply_to": "ebmr.response.batch.start",
"payload": {
"batch_id": "B-001"
}
}
Runtime DB config
Worker reads active row from EventBusRuntimeConfig (config_key, is_active).
Broker-specific child settings:
EventBusMqttSettingsEventBusRabbitMqSettings
Shared connection fields are in runtime config (host, port, username, password, connect_timeout, retry_interval_seconds).
Event table fields
OutboundEvent
- transport fields:
status(pending/published/invalid/error) for publish lifecyclepublished,published_at,retry_count,next_retry_at,last_error
- envelope-related fields:
topicrequest_idevent_typemessage_typemessage_status(mapped to envelopestatus)source_serviceparent_request_idchain_depthreply_topayloadidis used as fallbackmessage_idif payload/message row does not provide one
InboundEvent
topicrequest_idmessage_typemessage_status(envelope/business status if present)reply_tosource_serviceparent_request_idchain_depthpayload- processing fields:
status(pending/processed/error) for local processing lifecycleprocessed,processed_at,last_error
EVENT_BUS_CONFIG example
EVENT_BUS_CONFIG = {
"runtime_config_key": "default",
"broker_type": "rabbitmq",
"rabbitmq_url": "amqp://guest:guest@localhost/",
"rabbitmq_exchange": "interservice",
"rabbitmq_queue": "interservice_inbound",
"rabbitmq_management_url": "http://localhost:15672",
"rabbitmq_management_username": "guest",
"rabbitmq_management_password": "guest",
"rabbitmq_management_timeout_seconds": 5.0,
}
Worker
Start:
python manage.py event_bus_worker --config-key default
Optional override:
python manage.py event_bus_worker --config-key rabbitmq
Celery Integration
- Optional inbound handoff to Celery is supported.
- When
celery_inbound_handler_taskis set in runtime config, each savedInboundEventtriggers:current_app.send_task(celery_inbound_handler_task, args=[event_id])
- If
celery_queueis configured, dispatch uses that queue. - If Celery dispatch fails, event persistence is retained and error is logged.
Status:
python manage.py event_bus_status
Real-time Mode (Performance)
By default, the worker polls the database every poll_interval (default 1s). For critical events, you can enable Real-time Mode to eliminate this latency.
1. Inbound (Broker → Workers)
To process incoming messages immediately without waiting for the database poll:
- Configure
celery_inbound_handler_taskinEventBusRuntimeConfig. - (Optional) Set
celery_queuefor specific routing. When a message arrives, the worker saves it to the DB and immediately dispatches a Celery task to your main application.
2. Outbound (App → Broker)
To publish messages instantly when they are saved in your main application:
- MQTT: Set
control_topicinEventBusMqttSettings. - RabbitMQ: Set
control_exchange(typefanout) andcontrol_routing_keyinEventBusRabbitMqSettings. The library uses a Djangopost_savesignal to send a lightweight "poke" to the worker whenever anOutboundEventis created.
Behavior notes
- Broker connect failure retries every configured retry interval.
- Outbound invalid payload/envelope is marked
status=invalid. - Outbound publish errors use exponential backoff until
max_retry_count. - If
InboundEvent.message_idexists, duplicate inbound message IDs are ignored. - Pulse Reconciliation: Even if real-time triggers fail, the background polling loop ensures all events are eventually processed within the
poll_interval.
Tests
Run from repo root:
pytest -q
If your environment does not include coverage plugins from pyproject.toml, run:
pytest -q -o addopts=''
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_mqtt_client-1.0.0.tar.gz.
File metadata
- Download URL: django_mqtt_client-1.0.0.tar.gz
- Upload date:
- Size: 37.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c0e7c6c1b5ba6dc627adc941b87fd496617714b2b33019655f094e96f2ecb4ac
|
|
| MD5 |
2341e217fc559427756f6e35cc8ffeae
|
|
| BLAKE2b-256 |
8ec23273c78d84061fc0b4834c1677d59fa48cc34f94fda993df879a673ba749
|
File details
Details for the file django_mqtt_client-1.0.0-py3-none-any.whl.
File metadata
- Download URL: django_mqtt_client-1.0.0-py3-none-any.whl
- Upload date:
- Size: 46.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
11ffb940281109fb3f63535c587982b704aeb057e0926f6d13222e469b3e704d
|
|
| MD5 |
2444789664a7dbfd10a94fa76c0d7ca6
|
|
| BLAKE2b-256 |
143b3f65827fa02af607af73c2ad97cef31dca4f3f9e514ea7dee68d9c3cec1c
|