Skip to main content

Asynchronous MQTT client library for Python with optional Django integration

Project description

django-mqtt-client

Unified inter-service event bus utility for Django services (MQTT + RabbitMQ).

Release Notes

1.0.0

  • Major internal refactor into a package-based layout under django_mqtt_client/event_bus/ for better maintainability.
  • Worker orchestration and broker logic split into focused modules:
    • event_bus/worker.py
    • event_bus/rabbitmq/*
    • event_bus/mqtt/*
    • event_bus/shared/*
    • event_bus/inbound.py, event_bus/outbound.py
  • RabbitMQ topology guard added at startup:
    • validates management API snapshot
    • creates missing exchanges/queues/bindings
    • retries indefinitely until topology is ready
  • RabbitMQ consumer flow simplified to queue-driven consumption (from DB config), with Celery-managed topology excluded.
  • Outbound RabbitMQ exchange resolution improved:
    • supports source-based exchange selection (<source_service>_exchange) with cache/fallback.
  • Test suite updated for the new module structure and async loop stability.
  • event_bus_worker now resolves broker type from runtime config_key; no --broker-type override.

Objectives

  • Standardize inter-service messaging using one RabbitMQ envelope schema.
  • Support request/response/event patterns with topic conventions.
  • Ensure reliable publish/consume with retries, backoff, and worker heartbeats.
  • Persist envelope metadata in DB (message_type, message_status, source_service, reply_to, parent_request_id, chain_depth).
  • Guarantee message identity with message_id (auto-generated/fallback from OutboundEvent.id when missing on outbound).
  • Keep MQTT inbound compatible for EdgeX events while enforcing schema for RabbitMQ.

What it does

  • Outbound flow: DB OutboundEvent -> broker publish
  • Inbound flow: broker consume -> DB InboundEvent
  • Shared RabbitMQ envelope validation via RabbitMqSchemaValidator
  • Retry + backoff for outbound publish failures
  • Worker heartbeat/status updates via runtime config table

Install

pip install django-mqtt-client

Django setup

INSTALLED_APPS = [
    # ...
    "django_mqtt_client",
]

Run migrations:

python manage.py migrate

Key modules

  • django_mqtt_client/event_bus/worker.py
    • async worker orchestrator
  • django_mqtt_client/event_bus/rabbitmq/schema.py
    • envelope normalize/validate for RabbitMQ messages
  • django_mqtt_client/exceptions.py
    • broker-neutral exceptions

RabbitMQ envelope

Schema: interservice.envelope.v1

{
  "schema": "interservice.envelope.v1",
  "message_id": "uuid",
  "message_type": "request",
  "topic": "edgenexus.request.machine.read",
  "source_service": "ebmr",
  "request_id": "req-100",
  "parent_request_id": "req-99",
  "chain_depth": 1,
  "reply_to": "ebmr.response.machine.read",
  "payload": {
    "machine_id": "M1"
  }
}

Message type rules

  • request: requires request_id, reply_to
  • response: requires request_id, status (success or error)
  • event: requires event_type

message_id behavior

  • Envelope includes message_id.
  • Outbound RabbitMQ: if message_id is missing, utility auto-generates a UUID.
  • Outbound fallback: if DB row has no envelope message_id, OutboundEvent.id is used before validation.
  • Inbound RabbitMQ: message_id must be present after normalization; invalid messages are dropped.

Topic rules

  • Request: <target_service>.request.<entity>.<action>[.<subaction>...]
  • Response: <requesting_service>.response.<entity>.<action>[.<subaction>...]
  • Event: event.<domain>.<entity>.<action>[.<subaction>...]

Examples:

  • edgenexus.request.machine.read
  • ebmr.response.machine.read
  • event.operational.machine.status.changed

Message samples

Request:

{
  "message_type": "request",
  "topic": "edgenexus.request.machine.read",
  "request_id": "req-100",
  "reply_to": "ebmr.response.machine.read",
  "payload": {
    "machine_id": "M1"
  }
}

Response:

{
  "message_type": "response",
  "topic": "ebmr.response.machine.read",
  "request_id": "req-100",
  "status": "success",
  "payload": {
    "machine_id": "M1",
    "state": "running"
  }
}

Event:

{
  "message_type": "event",
  "topic": "event.operational.machine.status.changed",
  "event_type": "machine.status.changed",
  "payload": {
    "machine_id": "M1",
    "status": "running"
  }
}

Chaining sample:

{
  "message_type": "request",
  "topic": "edgenexus.request.batch.start",
  "request_id": "req-200",
  "parent_request_id": "req-100",
  "chain_depth": 1,
  "reply_to": "ebmr.response.batch.start",
  "payload": {
    "batch_id": "B-001"
  }
}

Runtime DB config

Worker reads active row from EventBusRuntimeConfig (config_key, is_active).

Broker-specific child settings:

  • EventBusMqttSettings
  • EventBusRabbitMqSettings

Shared connection fields are in runtime config (host, port, username, password, connect_timeout, retry_interval_seconds).

Event table fields

OutboundEvent

  • transport fields:
    • status (pending/published/invalid/error) for publish lifecycle
    • published, published_at, retry_count, next_retry_at, last_error
  • envelope-related fields:
    • topic
    • request_id
    • event_type
    • message_type
    • message_status (mapped to envelope status)
    • source_service
    • parent_request_id
    • chain_depth
    • reply_to
    • payload
    • id is used as fallback message_id if payload/message row does not provide one

InboundEvent

  • topic
  • request_id
  • message_type
  • message_status (envelope/business status if present)
  • reply_to
  • source_service
  • parent_request_id
  • chain_depth
  • payload
  • processing fields:
    • status (pending/processed/error) for local processing lifecycle
    • processed, processed_at, last_error

EVENT_BUS_CONFIG example

EVENT_BUS_CONFIG = {
    "runtime_config_key": "default",
    "broker_type": "rabbitmq",
    "rabbitmq_url": "amqp://guest:guest@localhost/",
    "rabbitmq_exchange": "interservice",
    "rabbitmq_queue": "interservice_inbound",
    "rabbitmq_management_url": "http://localhost:15672",
    "rabbitmq_management_username": "guest",
    "rabbitmq_management_password": "guest",
    "rabbitmq_management_timeout_seconds": 5.0,
}

Worker

Start:

python manage.py event_bus_worker --config-key default

Optional override:

python manage.py event_bus_worker --config-key rabbitmq

Celery Integration

  • Optional inbound handoff to Celery is supported.
  • When celery_inbound_handler_task is set in runtime config, each saved InboundEvent triggers:
    • current_app.send_task(celery_inbound_handler_task, args=[event_id])
  • If celery_queue is configured, dispatch uses that queue.
  • If Celery dispatch fails, event persistence is retained and error is logged.

Status:

python manage.py event_bus_status

Real-time Mode (Performance)

By default, the worker polls the database every poll_interval (default 1s). For critical events, you can enable Real-time Mode to eliminate this latency.

1. Inbound (Broker → Workers)

To process incoming messages immediately without waiting for the database poll:

  • Configure celery_inbound_handler_task in EventBusRuntimeConfig.
  • (Optional) Set celery_queue for specific routing. When a message arrives, the worker saves it to the DB and immediately dispatches a Celery task to your main application.

2. Outbound (App → Broker)

To publish messages instantly when they are saved in your main application:

  • MQTT: Set control_topic in EventBusMqttSettings.
  • RabbitMQ: Set control_exchange (type fanout) and control_routing_key in EventBusRabbitMqSettings. The library uses a Django post_save signal to send a lightweight "poke" to the worker whenever an OutboundEvent is created.

Behavior notes

  • Broker connect failure retries every configured retry interval.
  • Outbound invalid payload/envelope is marked status=invalid.
  • Outbound publish errors use exponential backoff until max_retry_count.
  • If InboundEvent.message_id exists, duplicate inbound message IDs are ignored.
  • Pulse Reconciliation: Even if real-time triggers fail, the background polling loop ensures all events are eventually processed within the poll_interval.

Tests

Run from repo root:

pytest -q

If your environment does not include coverage plugins from pyproject.toml, run:

pytest -q -o addopts=''

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_mqtt_client-1.0.0.tar.gz (37.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_mqtt_client-1.0.0-py3-none-any.whl (46.0 kB view details)

Uploaded Python 3

File details

Details for the file django_mqtt_client-1.0.0.tar.gz.

File metadata

  • Download URL: django_mqtt_client-1.0.0.tar.gz
  • Upload date:
  • Size: 37.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for django_mqtt_client-1.0.0.tar.gz
Algorithm Hash digest
SHA256 c0e7c6c1b5ba6dc627adc941b87fd496617714b2b33019655f094e96f2ecb4ac
MD5 2341e217fc559427756f6e35cc8ffeae
BLAKE2b-256 8ec23273c78d84061fc0b4834c1677d59fa48cc34f94fda993df879a673ba749

See more details on using hashes here.

File details

Details for the file django_mqtt_client-1.0.0-py3-none-any.whl.

File metadata

File hashes

Hashes for django_mqtt_client-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 11ffb940281109fb3f63535c587982b704aeb057e0926f6d13222e469b3e704d
MD5 2444789664a7dbfd10a94fa76c0d7ca6
BLAKE2b-256 143b3f65827fa02af607af73c2ad97cef31dca4f3f9e514ea7dee68d9c3cec1c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page