A django application to make it easier to use the transactional outbox pattern
Project description
Django outbox pattern
A django application to make it easier to use the transactional outbox pattern
Installation
Install django-outbox-pattern with pip
pip install django-outbox-pattern
Add to settings
# settings.py
INSTALLED_APPS = [
"django_outbox_pattern",
]
DJANGO_OUTBOX_PATTERN = {
"DEFAULT_STOMP_HOST_AND_PORTS": [("127.0.0.1", 61613)],
"DEFAULT_STOMP_USERNAME": "guest",
"DEFAULT_STOMP_PASSCODE": "guest",
}
Rum migrations
python manage.py migrate
Usage/Examples
The publish decorator adds
the outbox table
to the model. publish accepts list of Config. The Config accepts four params the destination which is required,
fields which the default are all the fields of the model, serializer which by default adds the id in the message
to be sent and version which by default is empty.
Note:
fieldsandserializerare mutually exclusive, serializer overwrites the fields.
The Config typing
from typing import List
from typing import NamedTuple
from typing import Optional
class Config(NamedTuple):
destination: str
fields: Optional[List[str]] = None
serializer: Optional[str] = None
version: Optional[str] = None
Only destination in config
from django.db import models
from django_outbox_pattern.decorators import Config
from django_outbox_pattern.decorators import publish
@publish([Config(destination='/topic/my_route_key')])
class MyModel(models.Model):
field_one = models.CharField(max_length=100)
field_two = models.CharField(max_length=100)
This generates the following data to be sent.
Published(destination='/topic/my_route_key', body='{"id": 1, "field_one": "Field One", "field_two": "Field Two"}')
Change destination version in config
from django.db import models
from django_outbox_pattern.decorators import Config
from django_outbox_pattern.decorators import publish
@publish([Config(destination='/topic/my_route_key', version="v1")])
class MyModel(models.Model):
field_one = models.CharField(max_length=100)
field_two = models.CharField(max_length=100)
This generates the following data to be sent.
Published(destination='/topic/my_route_key.v1', body='{"id": 1, "field_one": "One", "field_two": "Two"}', version="v1")
With destinations and fields
from django.db import models
from django_outbox_pattern.decorators import Config
from django_outbox_pattern.decorators import publish
@publish([Config(destination='/topic/my_route_key', fields=["field_one"])])
class MyModel(models.Model):
field_one = models.CharField(max_length=100)
field_two = models.CharField(max_length=100)
This generates the following data to be sent.
Published(destination='/topic/my_route_key', body='{"id": 1, "field_one": "Field One"}')
With destinations and serializer
from django.db import models
from django_outbox_pattern.decorators import Config
from django_outbox_pattern.decorators import publish
@publish([Config(destination='/topic/my_route_key', serializer='my_serializer')])
class MyModel(models.Model):
field_one = models.CharField(max_length=100)
field_two = models.CharField(max_length=100)
def my_serializer(self):
return {
"id": self.id,
"one": self.field_one,
"two": self.field_two
}
This generates the following data to be sent.
Published(destination='/topic/my_route_key', body='{"id": 1, "one": "Field One", "two": "Field Two"}')
With multi destinations and serializers
from django.db import models
from django_outbox_pattern.decorators import Config
from django_outbox_pattern.decorators import publish
@publish([
Config(destination='/topic/my_route_key_1', serializer="my_serializer_1"),
Config(destination='/topic/my_route_key_2', serializer="my_serializer_2"),
])
class MyModel(models.Model):
field_one = models.CharField(max_length=100)
field_two = models.CharField(max_length=100)
def my_serializer_1(self):
return {
"id": self.id,
"one": self.field_one,
}
def my_serializer_2(self):
return {
"id": self.id,
"two": self.field_two
}
This generates the following data to be sent.
Published(destination='/topic/my_route_key_1', body='{"id": 1, "one": "Field One"}')
Published(destination='/topic/my_route_key_2', body='{"id": 1, "two": "Field Two"}')
Publish/Subscribe commands
Publish command
To send the messages added to the Published model it is necessary to start the producer with the following command.
python manage.py publish
Publish message via outbox
It is possible to use the outbox pattern with a custom logic before sending the message to the outbox table.
from django.db import transaction
from django_outbox_pattern.models import Published
def custom_business_logic() -> None:
# run your custom business logic
with transaction.atomic():
YourBusinessModel.objects.create()
Published.objects.create(destination="your_destination", body={"some": "data"})
With this you can ensure that the messages can be published in the same database transaction of your business logic.
Publish message directly
It is possible to send messages directly without using the outbox table
# send.py
from django_outbox_pattern.factories import factory_producer
def send_event(destination, body, headers):
with factory_producer() as producer:
producer.send_event(destination=destination, body=body, headers=headers)
Subscribe command
Consumers created through the library implement the idempotency pattern using the header attribute message-id. The
library configures it as unique in the database. This ensures a given message is only processed once, no matter what.
To correctly implement this, you must open a transaction with the database to persist the data from your logic and
execute the save method of the payload object. Once the code is performed correctly, the library guarantees the
message is removed from the broker.
If you need to discard the message due to a business rule, use the nack method of the Payload object. This call
removes the message from the broker. This method performs no persistence in the database and can be called outside your
database transaction. If it fails for any reason, the message is resent to the consumer.
Alert:
You need to use either save or nack to process of your message. The library cannot make the decision for the
developer, and it is up to the developer to determine whether to use the save or nack method. However, in case of an
exception during the operation, the nack method will be triggered automatically
The same service (code + database) cannot consume the same message even with different consumers.
Create a function that receives an instance of django_outbox_pattern.payloads.Payload
# callbacks.py
from django.db import transaction
from django_outbox_pattern.payloads import Payload
def callback(payload: Payload):
if message_is_invalid(payload.body):
payload.nack()
return
with transaction.atomic():
persist_your_data()
payload.save()
To start the consumer, after creating the callback, it is necessary to execute the following command.
python manage.py subscribe 'dotted.path.to.callback` 'destination' 'queue_name'
The command takes three parameters:
callback : the path to the callback function.
destination : the destination where messages will be consumed following one of
the stomp patterns
queue_name(optional): the name of the queue that will be consumed. If not provided, the routing_key of the destination
will be used.
Settings
DEFAULT_CONNECTION_CLASS
The stomp.py class responsible for connecting to the broker. Default: stomp.StompConnection12
DEFAULT_CONSUMER_LISTENER_CLASS
The consumer listener class. Default: django_outbox_pattern.listeners.ConsumerListener
DEFAULT_GENERATE_HEADERS
A function to add headers to the message. Default: django_outbox_pattern.headers.generate_headers
DEFAULT_MAXIMUM_BACKOFF:
Maximum wait time for connection attempts in seconds. Default: 3600 (1 hour)
DEFAULT_MAXIMUM_RETRY_ATTEMPTS
Maximum number of message resend attempts. Default: 50
DEFAULT_PAUSE_FOR_RETRY
Pausing for attempts to resend messages in seconds. Defualt: 240 (4 minutes)
DEFAULT_WAIT_RETRY
Time between attempts to send messages after the pause. Default: 60 (1 minute)
DEFAULT_PRODUCER_LISTENER_CLASS:
The producer listener class. Default: django_outbox_pattern.listeners.ProducerListener
DEFAULT_STOMP_HOST_AND_PORTS
List of host and port tuples to try to connect to the broker. Default [("127.0.0.1", 61613)]
DEFAULT_STOMP_QUEUE_HEADERS
Headers for queues. Default: {"durable": "true", "auto-delete": "false", "prefetch-count": "1"}
DEFAULT_STOMP_HEARTBEATS
Time tuples for input and output heartbeats. Default: (10000, 10000)
Optional overrides:
- DEFAULT_STOMP_OUTGOING_HEARTBEAT: Overrides the outgoing heartbeat (ms) if set; otherwise falls back to DEFAULT_STOMP_HEARTBEATS and/or top-level STOMP_OUTGOING_HEARTBEAT.
- DEFAULT_STOMP_INCOMING_HEARTBEAT: Overrides the incoming heartbeat (ms) if set; otherwise falls back to DEFAULT_STOMP_HEARTBEATS and/or top-level STOMP_INCOMING_HEARTBEAT.
Top-level Django settings supported (compat with django-stomp):
STOMP_OUTGOING_HEARTBEATandSTOMP_INCOMING_HEARTBEATcan be defined at settings.py root to control heartbeats without touching DJANGO_OUTBOX_PATTERN.
warning: The heartbeat only works at consumer connection. The publisher process open and close connection for each batch message to publish.
DEFAULT_STOMP_VHOST
Virtual host. Default: "/"
DEFAULT_STOMP_USERNAME
Username for connection. Default: "guest"
DEFAULT_STOMP_PASSCODE
Password for connection. Default: "guest"
DEFAULT_STOMP_USE_SSL
For ssl connections. Default: False
DEFAULT_STOMP_KEY_FILE
The path to a X509 key file. Default: None
DEFAULT_STOMP_CERT_FILE
The path to a X509 certificate. Default: None
DEFAULT_STOMP_CA_CERTS
The path to the a file containing CA certificates to validate the server against. If this is not set, server side certificate validation is not done. Default: None
DEFAULT_STOMP_CERT_VALIDATOR
Function which performs extra validation on the client certificate, for example checking the returned certificate has a commonName attribute equal to the hostname (to avoid man in the middle attacks). The signature is: (OK, err_msg) = validation_function(cert, hostname) where OK is a boolean, and cert is a certificate structure as returned by ssl.SSLSocket.getpeercert(). Default: None
DEFAULT_STOMP_SSL_VERSION
SSL protocol to use for the connection. This should be one of the PROTOCOL_x constants provided by the ssl module. The default is ssl.PROTOCOL_TLSv1.
DEFAULT_STOMP_SSL_PASSWORD
SSL password
DEFAULT_EXCLUSIVE_QUEUE
For exclusive queue feature. Default: False
DAYS_TO_KEEP_DATA
The total number of days that the system will keep a message in the database history. Default: 30
REMOVE_DATA_CACHE_TTL
This variable defines the time-to-live (TTL) value in seconds for the cache used by the _remove_old_messages method in
the django_outbox_pattern application. The cache is used to prevent the method from deleting old data every time it is
run, and the TTL value determines how long the cache entry should remain valid before being automatically deleted. It
can be customized by setting the REMOVE_DATA_CACHE_TTL variable. Default: 86400 seconds (1 day)
OUTBOX_PATTERN_PUBLISHER_CACHE_KEY
The OUTBOX_PATTERN_PUBLISHER_CACHE_KEY variable controls the key name of the cache used to store the outbox pattern
publisher. Default: remove_old_messages_django_outbox_pattern_publisher.
OUTBOX_PATTERN_CONSUMER_CACHE_KEY
The OUTBOX_PATTERN_CONSUMER_CACHE_KEY variable controls the key name of the cache used to store the outbox pattern
publisher. Default: remove_old_messages_django_outbox_pattern_consumer.
DEFAULT_PUBLISHED_CHUNK_SIZE
The DEFAULT_PUBLISHED_CHUNK_SIZE variable controls chunk size for the publish command in get message to publish
action. Default: 200
DEFAULT_CONSUMER_PROCESS_MSG_ON_BACKGROUND
Controls whether Consumer processes incoming messages on a background thread pool.
When set to True, handle_incoming_message submits work to a ThreadPoolExecutor and returns immediately;
The user callback (via message_handler) runs asynchronously. This helps keep the listener responsive (e.g.,
heartbeats) when callbacks are slow or blocking. When False (default), messages are processed synchronously and any
exception raised by the callback will propagate to the caller.
Default: False.
Notes:
- The worker pool is recreated automatically if it was previously shut down and a new message arrives.
- Ensure your callback calls
payload.save()(orpayload.nack()when appropriate); otherwise a warning is logged and the message may not be acked/nacked automatically unless an exception occurs.
Example configuration:
# settings.py
DJANGO_OUTBOX_PATTERN = {
# ... other options ...
"DEFAULT_CONSUMER_PROCESS_MSG_ON_BACKGROUND": True
}
DEFAULT_PRODUCER_WAITING_TIME
The DEFAULT_PRODUCER_WAITING_TIME variable controls the waiting time in seconds for the producer to check for new
messages to be sent.
Default: 1 second
DEFAULT_CONSUMER_SHUTDOWN_TIMEOUT
The DEFAULT_CONSUMER_SHUTDOWN_TIMEOUT variable controls the maximum time in seconds that the consumer will wait for
in-flight message processing to complete during graceful shutdown. This is critical for containerized environments
(Docker, Kubernetes) to ensure messages are processed before the container is forcefully terminated.
When set to None, the consumer waits indefinitely for message processing to complete.
When set to a numeric value (seconds), the consumer will:
- Stop accepting new messages (unsubscribe from queue)
- Wait up to the specified timeout for the current message to finish processing
- If timeout is reached, log a warning and proceed with shutdown (message will be redelivered)
- Clean up connections and exit gracefully
Default: 90 seconds
Important for Kubernetes/Docker deployments:
To ensure graceful shutdown in containerized environments, configure this value to be less than your container's termination grace period, leaving a buffer for cleanup operations:
# settings.py
DJANGO_OUTBOX_PATTERN = {
# ... other options ...
"DEFAULT_CONSUMER_SHUTDOWN_TIMEOUT": 115, # seconds
}
Kubernetes deployment example:
apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer
spec:
template:
spec:
terminationGracePeriodSeconds: 120 # Total time before SIGKILL
containers:
- name: consumer
image: your-image
# Use array form to ensure Python receives signals (runs as PID 1)
command: ["python"]
args: ["manage.py", "subscribe", "callback", "destination"]
env:
- name: DJANGO_SETTINGS_MODULE
value: "your_project.settings"
Signal handling:
The consumer handles the following signals for graceful shutdown:
SIGTERM: Sent by Kubernetes/Docker on pod terminationSIGINT: Sent by Ctrl+C in terminalSIGQUIT: Graceful shutdown signal
If a second signal is received during shutdown, the process will exit immediately (force quit).
Best practices:
- Set
DEFAULT_CONSUMER_SHUTDOWN_TIMEOUTto accommodate your longest expected message processing time - Ensure
DEFAULT_CONSUMER_SHUTDOWN_TIMEOUT<terminationGracePeriodSeconds(leave 5-10 second buffer) - Use array-form command in container definitions to ensure Python runs as PID 1 and receives signals
- Monitor logs for timeout warnings to identify messages that exceed processing expectations
Example timeline (with 90-second message processing):
T=0s: SIGTERM received
T=0s: Unsubscribe from queue (no new messages accepted)
T=0s: Wait up to 115 seconds for message processing
T=90s: Message processing completes
T=90s: Shutdown thread pool executor
T=90s: Disconnect from broker
T=90s: Process exits cleanly ✓
If message processing exceeds the timeout, the process will exit and the message will be redelivered when the consumer restarts.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file django_outbox_pattern-3.1.0.tar.gz.
File metadata
- Download URL: django_outbox_pattern-3.1.0.tar.gz
- Upload date:
- Size: 23.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.5.1 CPython/3.10.19 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bc22a6c34f18773cc1188ee6b3bf3a7b54ac018fa66d94c0023188c33ba0c6bc
|
|
| MD5 |
4138749031f5b30c39ec59590124f4e9
|
|
| BLAKE2b-256 |
7cbf7c06c25fe820226e16e46b58a97ce0cce9988e32dd6a712d23f57b92a4ea
|
File details
Details for the file django_outbox_pattern-3.1.0-py3-none-any.whl.
File metadata
- Download URL: django_outbox_pattern-3.1.0-py3-none-any.whl
- Upload date:
- Size: 25.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.5.1 CPython/3.10.19 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a72a335e760d1a877e5e7cc2446d33af493fd5abf82be961f994eb579303b069
|
|
| MD5 |
308b4c7e6362a6c759d6cf1fb3481e8b
|
|
| BLAKE2b-256 |
793ee37bb06eb0e9995c161c020ce527c485784e559bcf68cf1609e84a1c99ff
|