Skip to main content

Asynchronous MQTT client library for Python with optional Django integration

Project description

django-mqtt-client

Unified inter-service event bus utility for Django services (MQTT + RabbitMQ).

Objectives

  • Standardize inter-service messaging using one RabbitMQ envelope schema.
  • Support request/response/event patterns with topic conventions.
  • Ensure reliable publish/consume with retries, backoff, and worker heartbeats.
  • Persist envelope metadata in DB (message_type, message_status, source_service, reply_to, parent_request_id, chain_depth).
  • Guarantee message identity with message_id (auto-generated/fallback from OutboundEvent.id when missing on outbound).
  • Keep MQTT inbound compatible for EdgeX events while enforcing schema for RabbitMQ.

What it does

  • Outbound flow: DB OutboundEvent -> broker publish
  • Inbound flow: broker consume -> DB InboundEvent
  • Shared RabbitMQ envelope validation via RabbitMqSchemaValidator
  • Retry + backoff for outbound publish failures
  • Worker heartbeat/status updates via runtime config table

Install

pip install django-mqtt-client

Django setup

INSTALLED_APPS = [
    # ...
    "django_mqtt_client",
]

Run migrations:

python manage.py migrate

Key modules

  • django_mqtt_client/event_bus.py
    • async worker for MQTT and RabbitMQ
  • django_mqtt_client/rabbitmq_schema.py
    • envelope normalize/validate for RabbitMQ messages
  • django_mqtt_client/exceptions.py
    • broker-neutral exceptions

RabbitMQ envelope

Schema: interservice.envelope.v1

{
  "schema": "interservice.envelope.v1",
  "message_id": "uuid",
  "message_type": "request",
  "topic": "edgenexus.request.machine.read",
  "source_service": "ebmr",
  "request_id": "req-100",
  "parent_request_id": "req-99",
  "chain_depth": 1,
  "reply_to": "ebmr.response.machine.read",
  "payload": {
    "machine_id": "M1"
  }
}

Message type rules

  • request: requires request_id, reply_to
  • response: requires request_id, status (success or error)
  • event: requires event_type

message_id behavior

  • Envelope includes message_id.
  • Outbound RabbitMQ: if message_id is missing, utility auto-generates a UUID.
  • Outbound fallback: if DB row has no envelope message_id, OutboundEvent.id is used before validation.
  • Inbound RabbitMQ: message_id must be present after normalization; invalid messages are dropped.

Topic rules

  • Request: <target_service>.request.<entity>.<action>[.<subaction>...]
  • Response: <requesting_service>.response.<entity>.<action>[.<subaction>...]
  • Event: event.<domain>.<entity>.<action>[.<subaction>...]

Examples:

  • edgenexus.request.machine.read
  • ebmr.response.machine.read
  • event.operational.machine.status.changed

Message samples

Request:

{
  "message_type": "request",
  "topic": "edgenexus.request.machine.read",
  "request_id": "req-100",
  "reply_to": "ebmr.response.machine.read",
  "payload": {
    "machine_id": "M1"
  }
}

Response:

{
  "message_type": "response",
  "topic": "ebmr.response.machine.read",
  "request_id": "req-100",
  "status": "success",
  "payload": {
    "machine_id": "M1",
    "state": "running"
  }
}

Event:

{
  "message_type": "event",
  "topic": "event.operational.machine.status.changed",
  "event_type": "machine.status.changed",
  "payload": {
    "machine_id": "M1",
    "status": "running"
  }
}

Chaining sample:

{
  "message_type": "request",
  "topic": "edgenexus.request.batch.start",
  "request_id": "req-200",
  "parent_request_id": "req-100",
  "chain_depth": 1,
  "reply_to": "ebmr.response.batch.start",
  "payload": {
    "batch_id": "B-001"
  }
}

Runtime DB config

Worker reads active row from EventBusRuntimeConfig (config_key, is_active).

Broker-specific child settings:

  • EventBusMqttSettings
  • EventBusRabbitMqSettings

Shared connection fields are in runtime config (host, port, username, password, connect_timeout, retry_interval_seconds).

Event table fields

OutboundEvent

  • transport fields:
    • status (pending/published/invalid/error) for publish lifecycle
    • published, published_at, retry_count, next_retry_at, last_error
  • envelope-related fields:
    • topic
    • request_id
    • event_type
    • message_type
    • message_status (mapped to envelope status)
    • source_service
    • parent_request_id
    • chain_depth
    • reply_to
    • payload
    • id is used as fallback message_id if payload/message row does not provide one

InboundEvent

  • topic
  • request_id
  • message_type
  • message_status (envelope/business status if present)
  • reply_to
  • source_service
  • parent_request_id
  • chain_depth
  • payload
  • processing fields:
    • status (pending/processed/error) for local processing lifecycle
    • processed, processed_at, last_error

EVENT_BUS_CONFIG example

Worker

Start:

python manage.py event_bus_worker --config-key default

Optional override:

python manage.py event_bus_worker --config-key default --broker-type rabbitmq

Status:

python manage.py event_bus_status

Behavior notes

  • Broker connect failure retries every configured retry interval.
  • Outbound invalid payload/envelope is marked status=invalid.
  • Outbound publish errors use exponential backoff until max_retry_count.
  • If InboundEvent.message_id exists, duplicate inbound message IDs are ignored.

Tests

Run from repo root:

pytest -q

If your environment does not include coverage plugins from pyproject.toml, run:

pytest -q -o addopts=''

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_mqtt_client-0.1.0.tar.gz (24.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_mqtt_client-0.1.0-py3-none-any.whl (25.0 kB view details)

Uploaded Python 3

File details

Details for the file django_mqtt_client-0.1.0.tar.gz.

File metadata

  • Download URL: django_mqtt_client-0.1.0.tar.gz
  • Upload date:
  • Size: 24.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for django_mqtt_client-0.1.0.tar.gz
Algorithm Hash digest
SHA256 9a2385d19e97d48d8b6599487bfa39a5752e557baaee1c08302824d6a95ac66f
MD5 b7eb35e832b90c5def9b1a8b8c14df04
BLAKE2b-256 0ac04446a53e181400269b0f8f3011dd2643105dba8f0d0398ee254b9cc4ad53

See more details on using hashes here.

File details

Details for the file django_mqtt_client-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for django_mqtt_client-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 421e1989e9135be6fcb6594d069325c754fda52e0dccabacd9c6bf9125bc460b
MD5 92206ce427d367b01c63e43cdb030655
BLAKE2b-256 fe727809305636eefdf75e259654f5847e3487e048719dc778ebc435e24d74d7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page