Skip to main content

A django application to make it easier to use the transactional outbox pattern

Project description

Django outbox pattern

Build Status Maintainability Rating Coverage Code style: black Downloads Downloads Downloads PyPI version GitHub

A django application to make it easier to use the transactional outbox pattern

Installation

Install django-outbox-pattern with pip

pip install django-outbox-pattern

Add to settings

# settings.py

INSTALLED_APPS = [
    "django_outbox_pattern",
]

DJANGO_OUTBOX_PATTERN = {
    "DEFAULT_STOMP_HOST_AND_PORTS": [("127.0.0.1", 61613)],
    "DEFAULT_STOMP_USERNAME": "guest",
    "DEFAULT_STOMP_PASSCODE": "guest",
}

Rum migrations

python manage.py migrate

Usage/Examples

The publish decorator adds the outbox table to the model. publish accepts list of Config. The Config accepts four params the destination which is required, fields which the default are all the fields of the model, serializer which by default adds the id in the message to be sent and version which by default is empty.

Note: fields and serializer are mutually exclusive, serializer overwrites the fields.

The Config typing

from typing import List
from typing import NamedTuple
from typing import Optional


class Config(NamedTuple):
    destination: str
    fields: Optional[List[str]] = None
    serializer: Optional[str] = None
    version: Optional[str] = None

Only destination in config

from django.db import models
from django_outbox_pattern.decorators import Config
from django_outbox_pattern.decorators import publish


@publish([Config(destination='/topic/my_route_key')])
class MyModel(models.Model):
    field_one = models.CharField(max_length=100)
    field_two = models.CharField(max_length=100)

This generates the following data to be sent.

Published(destination='/topic/my_route_key', body='{"id": 1, "field_one": "Field One", "field_two": "Field Two"}')

Change destination version in config

from django.db import models
from django_outbox_pattern.decorators import Config
from django_outbox_pattern.decorators import publish


@publish([Config(destination='/topic/my_route_key', version="v1")])
class MyModel(models.Model):
    field_one = models.CharField(max_length=100)
    field_two = models.CharField(max_length=100)

This generates the following data to be sent.

Published(destination='/topic/my_route_key.v1', body='{"id": 1, "field_one": "One", "field_two": "Two"}', version="v1")

With destinations and fields

from django.db import models
from django_outbox_pattern.decorators import Config
from django_outbox_pattern.decorators import publish


@publish([Config(destination='/topic/my_route_key', fields=["field_one"])])
class MyModel(models.Model):
    field_one = models.CharField(max_length=100)
    field_two = models.CharField(max_length=100)

This generates the following data to be sent.

Published(destination='/topic/my_route_key', body='{"id": 1, "field_one": "Field One"}')

With destinations and serializer

from django.db import models
from django_outbox_pattern.decorators import Config
from django_outbox_pattern.decorators import publish


@publish([Config(destination='/topic/my_route_key', serializer='my_serializer')])
class MyModel(models.Model):
    field_one = models.CharField(max_length=100)
    field_two = models.CharField(max_length=100)

    def my_serializer(self):
        return {
            "id": self.id,
            "one": self.field_one,
            "two": self.field_two
        }

This generates the following data to be sent.

Published(destination='/topic/my_route_key', body='{"id": 1, "one": "Field One", "two": "Field Two"}')

With multi destinations and serializers

from django.db import models
from django_outbox_pattern.decorators import Config
from django_outbox_pattern.decorators import publish


@publish([
    Config(destination='/topic/my_route_key_1', serializer="my_serializer_1"),
    Config(destination='/topic/my_route_key_2', serializer="my_serializer_2"),
])
class MyModel(models.Model):
    field_one = models.CharField(max_length=100)
    field_two = models.CharField(max_length=100)

    def my_serializer_1(self):
        return {
            "id": self.id,
            "one": self.field_one,
        }

    def my_serializer_2(self):
        return {
            "id": self.id,
            "two": self.field_two
        }

This generates the following data to be sent.

Published(destination='/topic/my_route_key_1', body='{"id": 1, "one": "Field One"}')
Published(destination='/topic/my_route_key_2', body='{"id": 1, "two": "Field Two"}')

Publish/Subscribe commands

Publish command

To send the messages added to the Published model it is necessary to start the producer with the following command.

python manage.py publish
Publish message via outbox

It is possible to use the outbox pattern with a custom logic before sending the message to the outbox table.

from django.db import transaction
from django_outbox_pattern.models import Published


def custom_business_logic() -> None:
    # run your custom business logic

    with transaction.atomic():
        YourBusinessModel.objects.create()
        Published.objects.create(destination="your_destination", body={"some": "data"})

With this you can ensure that the messages can be published in the same database transaction of your business logic.

Publish message directly

It is possible to send messages directly without using the outbox table

# send.py
from django_outbox_pattern.factories import factory_producer


def send_event(destination, body, headers):
    with factory_producer() as producer:
        producer.send_event(destination=destination, body=body, headers=headers)
Subscribe command

Consumers created through the library implement the idempotency pattern using the header attribute message-id. The library configures it as unique in the database. This ensures a given message is only processed once, no matter what. To correctly implement this, you must open a transaction with the database to persist the data from your logic and execute the save method of the payload object. Once the code is performed correctly, the library guarantees the message is removed from the broker.

If you need to discard the message due to a business rule, use the nack method of the Payload object. This call removes the message from the broker. This method performs no persistence in the database and can be called outside your database transaction. If it fails for any reason, the message is resent to the consumer.

Alert:

You need to use either save or nack to process of your message. The library cannot make the decision for the developer, and it is up to the developer to determine whether to use the save or nack method. However, in case of an exception during the operation, the nack method will be triggered automatically

The same service (code + database) cannot consume the same message even with different consumers.

Create a function that receives an instance of django_outbox_pattern.payloads.Payload

# callbacks.py
from django.db import transaction
from django_outbox_pattern.payloads import Payload


def callback(payload: Payload):
    if message_is_invalid(payload.body):
        payload.nack()
        return

    with transaction.atomic():
        persist_your_data()
        payload.save()

To start the consumer, after creating the callback, it is necessary to execute the following command.

python manage.py subscribe 'dotted.path.to.callback` 'destination' 'queue_name'

The command takes three parameters:

callback : the path to the callback function.

destination : the destination where messages will be consumed following one of the stomp patterns

queue_name(optional): the name of the queue that will be consumed. If not provided, the routing_key of the destination will be used.

Settings

DEFAULT_CONNECTION_CLASS

The stomp.py class responsible for connecting to the broker. Default: stomp.StompConnection12

DEFAULT_CONSUMER_LISTENER_CLASS

The consumer listener class. Default: django_outbox_pattern.listeners.ConsumerListener

DEFAULT_GENERATE_HEADERS

A function to add headers to the message. Default: django_outbox_pattern.headers.generate_headers

DEFAULT_MAXIMUM_BACKOFF:

Maximum wait time for connection attempts in seconds. Default: 3600 (1 hour)

DEFAULT_MAXIMUM_RETRY_ATTEMPTS

Maximum number of message resend attempts. Default: 50

DEFAULT_PAUSE_FOR_RETRY

Pausing for attempts to resend messages in seconds. Defualt: 240 (4 minutes)

DEFAULT_WAIT_RETRY

Time between attempts to send messages after the pause. Default: 60 (1 minute)

DEFAULT_PRODUCER_LISTENER_CLASS:

The producer listener class. Default: django_outbox_pattern.listeners.ProducerListener

DEFAULT_STOMP_HOST_AND_PORTS

List of host and port tuples to try to connect to the broker. Default [("127.0.0.1", 61613)]

DEFAULT_STOMP_QUEUE_HEADERS

Headers for queues. Default: {"durable": "true", "auto-delete": "false", "prefetch-count": "1"}

DEFAULT_STOMP_HEARTBEATS

Time tuples for input and output heartbeats. Default: (10000, 10000)

Optional overrides:

  • DEFAULT_STOMP_OUTGOING_HEARTBEAT: Overrides the outgoing heartbeat (ms) if set; otherwise falls back to DEFAULT_STOMP_HEARTBEATS and/or top-level STOMP_OUTGOING_HEARTBEAT.
  • DEFAULT_STOMP_INCOMING_HEARTBEAT: Overrides the incoming heartbeat (ms) if set; otherwise falls back to DEFAULT_STOMP_HEARTBEATS and/or top-level STOMP_INCOMING_HEARTBEAT.

Top-level Django settings supported (compat with django-stomp):

  • STOMP_OUTGOING_HEARTBEAT and STOMP_INCOMING_HEARTBEAT can be defined at settings.py root to control heartbeats without touching DJANGO_OUTBOX_PATTERN.

warning: The heartbeat only works at consumer connection. The publisher process open and close connection for each batch message to publish.

DEFAULT_STOMP_VHOST

Virtual host. Default: "/"

DEFAULT_STOMP_USERNAME

Username for connection. Default: "guest"

DEFAULT_STOMP_PASSCODE

Password for connection. Default: "guest"

DEFAULT_STOMP_USE_SSL

For ssl connections. Default: False

DEFAULT_STOMP_KEY_FILE

The path to a X509 key file. Default: None

DEFAULT_STOMP_CERT_FILE

The path to a X509 certificate. Default: None

DEFAULT_STOMP_CA_CERTS

The path to the a file containing CA certificates to validate the server against. If this is not set, server side certificate validation is not done. Default: None

DEFAULT_STOMP_CERT_VALIDATOR

Function which performs extra validation on the client certificate, for example checking the returned certificate has a commonName attribute equal to the hostname (to avoid man in the middle attacks). The signature is: (OK, err_msg) = validation_function(cert, hostname) where OK is a boolean, and cert is a certificate structure as returned by ssl.SSLSocket.getpeercert(). Default: None

DEFAULT_STOMP_SSL_VERSION

SSL protocol to use for the connection. This should be one of the PROTOCOL_x constants provided by the ssl module. The default is ssl.PROTOCOL_TLSv1.

DEFAULT_STOMP_SSL_PASSWORD

SSL password

DEFAULT_EXCLUSIVE_QUEUE

For exclusive queue feature. Default: False

DAYS_TO_KEEP_DATA

The total number of days that the system will keep a message in the database history. Default: 30

REMOVE_DATA_CACHE_TTL

This variable defines the time-to-live (TTL) value in seconds for the cache used by the _remove_old_messages method in the django_outbox_pattern application. The cache is used to prevent the method from deleting old data every time it is run, and the TTL value determines how long the cache entry should remain valid before being automatically deleted. It can be customized by setting the REMOVE_DATA_CACHE_TTL variable. Default: 86400 seconds (1 day)

OUTBOX_PATTERN_PUBLISHER_CACHE_KEY

The OUTBOX_PATTERN_PUBLISHER_CACHE_KEY variable controls the key name of the cache used to store the outbox pattern publisher. Default: remove_old_messages_django_outbox_pattern_publisher.

OUTBOX_PATTERN_CONSUMER_CACHE_KEY

The OUTBOX_PATTERN_CONSUMER_CACHE_KEY variable controls the key name of the cache used to store the outbox pattern publisher. Default: remove_old_messages_django_outbox_pattern_consumer.

DEFAULT_PUBLISHED_CHUNK_SIZE

The DEFAULT_PUBLISHED_CHUNK_SIZE variable controls chunk size for the publish command in get message to publish action. Default: 200

DEFAULT_CONSUMER_PROCESS_MSG_ON_BACKGROUND

Controls whether Consumer processes incoming messages on a background thread pool. When set to True, handle_incoming_message submits work to a ThreadPoolExecutor and returns immediately; The user callback (via message_handler) runs asynchronously. This helps keep the listener responsive (e.g., heartbeats) when callbacks are slow or blocking. When False (default), messages are processed synchronously and any exception raised by the callback will propagate to the caller.

Default: False.

Notes:

  • The worker pool is recreated automatically if it was previously shut down and a new message arrives.
  • Ensure your callback calls payload.save() (or payload.nack() when appropriate); otherwise a warning is logged and the message may not be acked/nacked automatically unless an exception occurs.

Example configuration:

# settings.py
DJANGO_OUTBOX_PATTERN = {
    # ... other options ...
    "DEFAULT_CONSUMER_PROCESS_MSG_ON_BACKGROUND": True
}

DEFAULT_PRODUCER_WAITING_TIME

The DEFAULT_PRODUCER_WAITING_TIME variable controls the waiting time in seconds for the producer to check for new messages to be sent. Default: 1 second

DEFAULT_CONSUMER_SHUTDOWN_TIMEOUT

The DEFAULT_CONSUMER_SHUTDOWN_TIMEOUT variable controls the maximum time in seconds that the consumer will wait for in-flight message processing to complete during graceful shutdown. This is critical for containerized environments (Docker, Kubernetes) to ensure messages are processed before the container is forcefully terminated.

When set to None, the consumer waits indefinitely for message processing to complete.

When set to a numeric value (seconds), the consumer will:

  1. Stop accepting new messages (unsubscribe from queue)
  2. Wait up to the specified timeout for the current message to finish processing
  3. If timeout is reached, log a warning and proceed with shutdown (message will be redelivered)
  4. Clean up connections and exit gracefully

Default: 90 seconds

Important for Kubernetes/Docker deployments:

To ensure graceful shutdown in containerized environments, configure this value to be less than your container's termination grace period, leaving a buffer for cleanup operations:

# settings.py
DJANGO_OUTBOX_PATTERN = {
    # ... other options ...
    "DEFAULT_CONSUMER_SHUTDOWN_TIMEOUT": 115,  # seconds
}

Kubernetes deployment example:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer
spec:
  template:
    spec:
      terminationGracePeriodSeconds: 120  # Total time before SIGKILL
      containers:
      - name: consumer
        image: your-image
        # Use array form to ensure Python receives signals (runs as PID 1)
        command: ["python"]
        args: ["manage.py", "subscribe", "callback", "destination"]
        env:
        - name: DJANGO_SETTINGS_MODULE
          value: "your_project.settings"

Signal handling:

The consumer handles the following signals for graceful shutdown:

  • SIGTERM: Sent by Kubernetes/Docker on pod termination
  • SIGINT: Sent by Ctrl+C in terminal
  • SIGQUIT: Graceful shutdown signal

If a second signal is received during shutdown, the process will exit immediately (force quit).

Best practices:

  • Set DEFAULT_CONSUMER_SHUTDOWN_TIMEOUT to accommodate your longest expected message processing time
  • Ensure DEFAULT_CONSUMER_SHUTDOWN_TIMEOUT < terminationGracePeriodSeconds (leave 5-10 second buffer)
  • Use array-form command in container definitions to ensure Python runs as PID 1 and receives signals
  • Monitor logs for timeout warnings to identify messages that exceed processing expectations

Example timeline (with 90-second message processing):

T=0s:   SIGTERM received
T=0s:   Unsubscribe from queue (no new messages accepted)
T=0s:   Wait up to 115 seconds for message processing
T=90s:  Message processing completes
T=90s:  Shutdown thread pool executor
T=90s:  Disconnect from broker
T=90s:  Process exits cleanly ✓

If message processing exceeds the timeout, the process will exit and the message will be redelivered when the consumer restarts.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django_outbox_pattern-3.1.0.tar.gz (23.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

django_outbox_pattern-3.1.0-py3-none-any.whl (25.8 kB view details)

Uploaded Python 3

File details

Details for the file django_outbox_pattern-3.1.0.tar.gz.

File metadata

  • Download URL: django_outbox_pattern-3.1.0.tar.gz
  • Upload date:
  • Size: 23.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.5.1 CPython/3.10.19 Linux/6.11.0-1018-azure

File hashes

Hashes for django_outbox_pattern-3.1.0.tar.gz
Algorithm Hash digest
SHA256 bc22a6c34f18773cc1188ee6b3bf3a7b54ac018fa66d94c0023188c33ba0c6bc
MD5 4138749031f5b30c39ec59590124f4e9
BLAKE2b-256 7cbf7c06c25fe820226e16e46b58a97ce0cce9988e32dd6a712d23f57b92a4ea

See more details on using hashes here.

File details

Details for the file django_outbox_pattern-3.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for django_outbox_pattern-3.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a72a335e760d1a877e5e7cc2446d33af493fd5abf82be961f994eb579303b069
MD5 308b4c7e6362a6c759d6cf1fb3481e8b
BLAKE2b-256 793ee37bb06eb0e9995c161c020ce527c485784e559bcf68cf1609e84a1c99ff

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page