Skip to main content

Asynchronous MQTT client library for Python with optional Django integration

Project description

django-mqtt-client

Unified inter-service event bus utility for Django services (MQTT + RabbitMQ).

Objectives

  • Standardize inter-service messaging using one RabbitMQ envelope schema.
  • Support request/response/event patterns with topic conventions.
  • Ensure reliable publish/consume with retries, backoff, and worker heartbeats.
  • Persist envelope metadata in DB (message_type, message_status, source_service, reply_to, parent_request_id, chain_depth).
  • Guarantee message identity with message_id (auto-generated/fallback from OutboundEvent.id when missing on outbound).
  • Keep MQTT inbound compatible for EdgeX events while enforcing schema for RabbitMQ.

What it does

  • Outbound flow: DB OutboundEvent -> broker publish
  • Inbound flow: broker consume -> DB InboundEvent
  • Shared RabbitMQ envelope validation via RabbitMqSchemaValidator
  • Retry + backoff for outbound publish failures
  • Worker heartbeat/status updates via runtime config table

Install

pip install django-mqtt-client

Django setup

INSTALLED_APPS = [
    # ...
    "django_mqtt_client",
]

Run migrations:

python manage.py migrate

Key modules

  • django_mqtt_client/event_bus.py
    • async worker for MQTT and RabbitMQ
  • django_mqtt_client/rabbitmq_schema.py
    • envelope normalize/validate for RabbitMQ messages
  • django_mqtt_client/exceptions.py
    • broker-neutral exceptions

RabbitMQ envelope

Schema: interservice.envelope.v1

{
  "schema": "interservice.envelope.v1",
  "message_id": "uuid",
  "message_type": "request",
  "topic": "edgenexus.request.machine.read",
  "source_service": "ebmr",
  "request_id": "req-100",
  "parent_request_id": "req-99",
  "chain_depth": 1,
  "reply_to": "ebmr.response.machine.read",
  "payload": {
    "machine_id": "M1"
  }
}

Message type rules

  • request: requires request_id, reply_to
  • response: requires request_id, status (success or error)
  • event: requires event_type

message_id behavior

  • Envelope includes message_id.
  • Outbound RabbitMQ: if message_id is missing, utility auto-generates a UUID.
  • Outbound fallback: if DB row has no envelope message_id, OutboundEvent.id is used before validation.
  • Inbound RabbitMQ: message_id must be present after normalization; invalid messages are dropped.

Topic rules

  • Request: <target_service>.request.<entity>.<action>[.<subaction>...]
  • Response: <requesting_service>.response.<entity>.<action>[.<subaction>...]
  • Event: event.<domain>.<entity>.<action>[.<subaction>...]

Examples:

  • edgenexus.request.machine.read
  • ebmr.response.machine.read
  • event.operational.machine.status.changed

Message samples

Request:

{
  "message_type": "request",
  "topic": "edgenexus.request.machine.read",
  "request_id": "req-100",
  "reply_to": "ebmr.response.machine.read",
  "payload": {
    "machine_id": "M1"
  }
}

Response:

{
  "message_type": "response",
  "topic": "ebmr.response.machine.read",
  "request_id": "req-100",
  "status": "success",
  "payload": {
    "machine_id": "M1",
    "state": "running"
  }
}

Event:

{
  "message_type": "event",
  "topic": "event.operational.machine.status.changed",
  "event_type": "machine.status.changed",
  "payload": {
    "machine_id": "M1",
    "status": "running"
  }
}

Chaining sample:

{
  "message_type": "request",
  "topic": "edgenexus.request.batch.start",
  "request_id": "req-200",
  "parent_request_id": "req-100",
  "chain_depth": 1,
  "reply_to": "ebmr.response.batch.start",
  "payload": {
    "batch_id": "B-001"
  }
}

Runtime DB config

Worker reads active row from EventBusRuntimeConfig (config_key, is_active).

Broker-specific child settings:

  • EventBusMqttSettings
  • EventBusRabbitMqSettings

Shared connection fields are in runtime config (host, port, username, password, connect_timeout, retry_interval_seconds).

Event table fields

OutboundEvent

  • transport fields:
    • status (pending/published/invalid/error) for publish lifecycle
    • published, published_at, retry_count, next_retry_at, last_error
  • envelope-related fields:
    • topic
    • request_id
    • event_type
    • message_type
    • message_status (mapped to envelope status)
    • source_service
    • parent_request_id
    • chain_depth
    • reply_to
    • payload
    • id is used as fallback message_id if payload/message row does not provide one

InboundEvent

  • topic
  • request_id
  • message_type
  • message_status (envelope/business status if present)
  • reply_to
  • source_service
  • parent_request_id
  • chain_depth
  • payload
  • processing fields:
    • status (pending/processed/error) for local processing lifecycle
    • processed, processed_at, last_error

EVENT_BUS_CONFIG example

Worker

Start:

python manage.py event_bus_worker --config-key default

Optional override:

python manage.py event_bus_worker --config-key default --broker-type rabbitmq

Status:

python manage.py event_bus_status

Behavior notes

  • Broker connect failure retries every configured retry interval.
  • Outbound invalid payload/envelope is marked status=invalid.
  • Outbound publish errors use exponential backoff until max_retry_count.
  • If InboundEvent.message_id exists, duplicate inbound message IDs are ignored.

Tests

Run from repo root:

pytest -q

If your environment does not include coverage plugins from pyproject.toml, run:

pytest -q -o addopts=''

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_mqtt_client-0.1.1.tar.gz (26.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_mqtt_client-0.1.1-py3-none-any.whl (27.7 kB view details)

Uploaded Python 3

File details

Details for the file django_mqtt_client-0.1.1.tar.gz.

File metadata

  • Download URL: django_mqtt_client-0.1.1.tar.gz
  • Upload date:
  • Size: 26.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for django_mqtt_client-0.1.1.tar.gz
Algorithm Hash digest
SHA256 6bf2905a0f1befa52c3b634b8a3dff138951bc876f639b8ae0448a0e1b751226
MD5 430ff88add11e55fbe131e90bd9980dc
BLAKE2b-256 e6384799aeacf817796aad3efd2f14c9e1fd56814a8c2ef1ba6dc0bf57de0002

See more details on using hashes here.

File details

Details for the file django_mqtt_client-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for django_mqtt_client-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 29cf70ab1065bdee845103a1a09a918ad78530aa285299b8431a4806bf2c9feb
MD5 7498ea5358d8e07701b49148b3362800
BLAKE2b-256 e1c231f913bf5cf48104f99f68f30e0e76dd94880549a080b2c95cbc8eee466c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page