Asynchronous MQTT client library for Python with optional Django integration
Project description
django-mqtt-client
Unified inter-service event bus utility for Django services (MQTT + RabbitMQ).
Objectives
- Standardize inter-service messaging using one RabbitMQ envelope schema.
- Support request/response/event patterns with topic conventions.
- Ensure reliable publish/consume with retries, backoff, and worker heartbeats.
- Persist envelope metadata in DB (
message_type,message_status,source_service,reply_to,parent_request_id,chain_depth). - Guarantee message identity with
message_id(auto-generated/fallback fromOutboundEvent.idwhen missing on outbound). - Keep MQTT inbound compatible for EdgeX events while enforcing schema for RabbitMQ.
What it does
- Outbound flow: DB
OutboundEvent-> broker publish - Inbound flow: broker consume -> DB
InboundEvent - Shared RabbitMQ envelope validation via
RabbitMqSchemaValidator - Retry + backoff for outbound publish failures
- Worker heartbeat/status updates via runtime config table
Install
pip install django-mqtt-client
Django setup
INSTALLED_APPS = [
# ...
"django_mqtt_client",
]
Run migrations:
python manage.py migrate
Key modules
django_mqtt_client/event_bus.py- async worker for MQTT and RabbitMQ
django_mqtt_client/rabbitmq_schema.py- envelope normalize/validate for RabbitMQ messages
django_mqtt_client/exceptions.py- broker-neutral exceptions
RabbitMQ envelope
Schema: interservice.envelope.v1
{
"schema": "interservice.envelope.v1",
"message_id": "uuid",
"message_type": "request",
"topic": "edgenexus.request.machine.read",
"source_service": "ebmr",
"request_id": "req-100",
"parent_request_id": "req-99",
"chain_depth": 1,
"reply_to": "ebmr.response.machine.read",
"payload": {
"machine_id": "M1"
}
}
Message type rules
request: requiresrequest_id,reply_toresponse: requiresrequest_id,status(successorerror)event: requiresevent_type
message_id behavior
- Envelope includes
message_id. - Outbound RabbitMQ: if
message_idis missing, utility auto-generates a UUID. - Outbound fallback: if DB row has no envelope
message_id,OutboundEvent.idis used before validation. - Inbound RabbitMQ:
message_idmust be present after normalization; invalid messages are dropped.
Topic rules
- Request:
<target_service>.request.<entity>.<action>[.<subaction>...] - Response:
<requesting_service>.response.<entity>.<action>[.<subaction>...] - Event:
event.<domain>.<entity>.<action>[.<subaction>...]
Examples:
edgenexus.request.machine.readebmr.response.machine.readevent.operational.machine.status.changed
Message samples
Request:
{
"message_type": "request",
"topic": "edgenexus.request.machine.read",
"request_id": "req-100",
"reply_to": "ebmr.response.machine.read",
"payload": {
"machine_id": "M1"
}
}
Response:
{
"message_type": "response",
"topic": "ebmr.response.machine.read",
"request_id": "req-100",
"status": "success",
"payload": {
"machine_id": "M1",
"state": "running"
}
}
Event:
{
"message_type": "event",
"topic": "event.operational.machine.status.changed",
"event_type": "machine.status.changed",
"payload": {
"machine_id": "M1",
"status": "running"
}
}
Chaining sample:
{
"message_type": "request",
"topic": "edgenexus.request.batch.start",
"request_id": "req-200",
"parent_request_id": "req-100",
"chain_depth": 1,
"reply_to": "ebmr.response.batch.start",
"payload": {
"batch_id": "B-001"
}
}
Runtime DB config
Worker reads active row from EventBusRuntimeConfig (config_key, is_active).
Broker-specific child settings:
EventBusMqttSettingsEventBusRabbitMqSettings
Shared connection fields are in runtime config (host, port, username, password, connect_timeout, retry_interval_seconds).
Event table fields
OutboundEvent
- transport fields:
status(pending/published/invalid/error) for publish lifecyclepublished,published_at,retry_count,next_retry_at,last_error
- envelope-related fields:
topicrequest_idevent_typemessage_typemessage_status(mapped to envelopestatus)source_serviceparent_request_idchain_depthreply_topayloadidis used as fallbackmessage_idif payload/message row does not provide one
InboundEvent
topicrequest_idmessage_typemessage_status(envelope/business status if present)reply_tosource_serviceparent_request_idchain_depthpayload- processing fields:
status(pending/processed/error) for local processing lifecycleprocessed,processed_at,last_error
EVENT_BUS_CONFIG example
Worker
Start:
python manage.py event_bus_worker --config-key default
Optional override:
python manage.py event_bus_worker --config-key default --broker-type rabbitmq
Status:
python manage.py event_bus_status
Behavior notes
- Broker connect failure retries every configured retry interval.
- Outbound invalid payload/envelope is marked
status=invalid. - Outbound publish errors use exponential backoff until
max_retry_count. - If
InboundEvent.message_idexists, duplicate inbound message IDs are ignored.
Tests
Run from repo root:
pytest -q
If your environment does not include coverage plugins from pyproject.toml, run:
pytest -q -o addopts=''
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_mqtt_client-0.1.0.tar.gz.
File metadata
- Download URL: django_mqtt_client-0.1.0.tar.gz
- Upload date:
- Size: 24.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9a2385d19e97d48d8b6599487bfa39a5752e557baaee1c08302824d6a95ac66f
|
|
| MD5 |
b7eb35e832b90c5def9b1a8b8c14df04
|
|
| BLAKE2b-256 |
0ac04446a53e181400269b0f8f3011dd2643105dba8f0d0398ee254b9cc4ad53
|
File details
Details for the file django_mqtt_client-0.1.0-py3-none-any.whl.
File metadata
- Download URL: django_mqtt_client-0.1.0-py3-none-any.whl
- Upload date:
- Size: 25.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.15
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
421e1989e9135be6fcb6594d069325c754fda52e0dccabacd9c6bf9125bc460b
|
|
| MD5 |
92206ce427d367b01c63e43cdb030655
|
|
| BLAKE2b-256 |
fe727809305636eefdf75e259654f5847e3487e048719dc778ebc435e24d74d7
|