Production-ready AMQP message broker abstraction with advanced retry logic, dead letter exchanges, and high availability features.
Project description
MRSAL AMQP
Breaking changes in 3.8.0
MrsalBlockingAMQP.start_consumerdefaultauto_ackflipped fromTruetoFalse, matching the async consumer and every example in this README. Callers that relied on the previous default were silently opting out of DLX accountability while the library advertised reliability features. To keep the old behaviour, passauto_ack=True, dlx_enable=Falseexplicitly and accept that failed callbacks are dropped.start_consumernow rejects incompatible flag combinations at setup by raisingMrsalAbortedSetup:auto_ack=True, dlx_enable=True-- on both consumers. Once the broker has acked at delivery, failed messages cannot be routed to the DLX, so the combination is meaningless.auto_ack=True, threaded=True-- on the blocking consumer. The executor's submit queue is unbounded, so a slow callback grows pending tasks until OOM. See §4.1.1 for full rationale and remediation paths.
Breaking changes in 3.7.0
MrsalAsyncAMQP.start_consumernow wraps the queue iterator inasync with queue.iterator(...) as it:so consumer cancellation is deterministically delivered to the broker on exception or generator GC. Subclasses or tests that monkey-patchedqueue.iterator()to return a bare async generator must update their mocks to also support the async context-manager protocol (__aenter__/__aexit__). Seetests/conftest.py::AsyncIteratorMockfor the reference shape.- New
async def stop()lifecycle method. Oncestop()has been called on aMrsalAsyncAMQPinstance, that instance cannot be restarted — the internal stop event remains set so futurestart_consumercalls exit on the first iteration. To restart, construct a new instance. The persistent state is deliberate: it preserves astop()request that arrives during a tenacity retry backoff, which would otherwise be silently dropped. - New
max_concurrent_tasks: int | Noneparameter on the asyncstart_consumer. DefaultNonepreserves prior behaviour (sequential processing). When set toN > 0, up to N messages are dispatched as concurrentasynciotasks bounded by a semaphore. Note thatprefetch_countis broker-side buffering and does not parallelize the consumer — combine withprefetch_count >= max_concurrent_tasksfor steady throughput. - New
drain_timeout: float | Noneparameter on the asyncstart_consumer. Bounds how long the consumer waits for in-flight tasks to finish after a graceful stop. On timeout, remaining tasks are cancelled and their messages will be redelivered by the broker.
Breaking changes in 3.6.0
validate_payloadnow returns the validated model instance (it previously returnedNone). Whenpayload_modelis passed tostart_consumer, the callback receives the validated model instance as its body argument instead of the rawbytes. Callbacks that calledjson.loads(body)/Model(**json.loads(body))internally must drop that step and treat the third argument as an instance ofpayload_model. See the example in §4.5.- Publishers (
publish_message,publish_messages) and DLX republishes now always enable publisher confirms, soNackError/UnroutableErrorare raised (and retried) instead of silently dropped. DLX republishes also honor an explicitdlx_routing_keyinstead of falling back to the original routing key — fixing silent loss when the DLX bind used a different key. - Internal-only:
_process_single_messagenow reads itsruntime_configdict with[]instead of.get(). Callers that build their ownruntime_config(e.g. in tests) must include all keys produced bystart_consumer:callback,callback_args,auto_ack,payload_model,threaded,dlx_enable,enable_retry_cycles,retry_cycle_interval,max_retry_time_limit,exchange_name,routing_key,dlx_exchange_name,dlx_routing_key,queue_name. Missing keys now raiseKeyErrorinstead of silently beingNone.
Intro
Mrsal is a production-ready message broker abstraction on top of RabbitMQ, aio-pika and Pika.
Why Mrsal? Setting up robust AMQP in production is complex. You need dead letter exchanges, retry logic, quorum queues, proper error handling, queue management, and more. Mrsal gives you enterprise-grade messaging out of the box with just a few lines of code.
What makes Mrsal production-ready:
- Dead Letter Exchange: Automatic DLX setup with configurable retry cycles
- High Availability: Quorum queues for data safety across cluster nodes
- Performance Tuning: Queue limits, overflow behavior, lazy queues, prefetch control
- Zero Configuration: Sensible defaults that work in production
- Full Observability: Comprehensive logging and retry tracking
- Type Safety: Pydantic integration for payload validation
- Async & Sync: Both blocking and async implementations
- Threaded Consumers: Bounded thread pool for long-running callbacks
- Resource Safety: Context manager support for clean connection lifecycle
The goal is to make Mrsal trivial to re-use across all services in your distributed system and to make advanced message queuing protocols easy and safe. No more big chunks of repetitive code across your services or bespoke solutions to handle dead letters.
Perfect for:
- Microservices communication
- Event-driven architectures
- Background job processing
- Real-time data pipelines
- Mission-critical message processing
Mrsal is Arabic for a small arrow and is used to describe something that performs a task with lightness and speed.
Quick Start guide
0. Requirements
- RabbitMQ server up and running
- python 3.10 >=
- tested on linux only
1. Installing
First things first:
poetry add mrsal
Next set the default username, password and servername for your RabbitMQ setup. It's advisable to use a `.env` script or `(.zsh)rc` file for persistence.
[RabbitEnvVars]
RABBITMQ_USER=******
RABBITMQ_PASSWORD=******
RABBITMQ_VHOST=******
RABBITMQ_DOMAIN=******
RABBITMQ_PORT=******
# FOR TLS
RABBITMQ_CAFILE=/path/to/file
RABBITMQ_CERT=/path/to/file
RABBITMQ_KEY=/path/to/file
Mrsal was first developed by NeoMedSys and the research group [CRAI](https://crai.no/) at the university hospital of Oslo.
2. Setup and connect
- Example 1: Lets create a blocking connection on localhost with no TLS encryption
from mrsal.amqp.subclass import MrsalBlockingAMQP
mrsal = MrsalBlockingAMQP(
host=RABBITMQ_DOMAIN, # Use a custom domain if you are using SSL e.g. mrsal.on-example.com
port=int(RABBITMQ_PORT),
credentials=(RABBITMQ_USER, RABBITMQ_PASSWORD),
virtual_host=RABBITMQ_VHOST,
ssl=False # Set this to True for SSL/TLS (you will need to set the cert paths if you do so)
)
# boom you are staged for connection. This instantiation stages for connection only
# When done, call mrsal.close() to clean up — or use a context manager:
# with MrsalBlockingAMQP(...) as mrsal:
2.1 Publish
Now lets publish our message of friendship on the friendship exchange.
Note: When auto_declare=True means that MrsalAMQP will create the specified exchange and queue, then bind them together using routing_key in one go. If you want to customize each step then turn off auto_declare and specify each step yourself with custom arguments etc.
# BasicProperties is used to set the message properties
prop = pika.BasicProperties(
app_id='zoomer_app',
message_id='zoomer_msg',
content_type=' application/json',
content_encoding='utf-8',
delivery_mode=pika.DeliveryMode.Persistent,
headers=None)
message_body = {'zoomer_message': 'Get it yia bish'}
# For publishers, use a context manager so the connection is cleaned up after sending
with MrsalBlockingAMQP(
host=RABBITMQ_DOMAIN,
port=int(RABBITMQ_PORT),
credentials=(RABBITMQ_USER, RABBITMQ_PASSWORD),
virtual_host=RABBITMQ_VHOST,
ssl=False
) as mrsal:
mrsal.publish_message(exchange_name='zoomer_x',
exchange_type='direct',
queue_name='zoomer_q',
routing_key='zoomer_key',
message=message_body,
prop=prop,
auto_declare=True)
# Connection is automatically closed here
2.2 Consume
Now lets setup a consumer that will listen to our very important messages. If you are using scripts rather than notebooks then it's advisable to run consume and publish separately. We are going to need a callback function which is triggered upon receiving the message from the queue we subscribe to. You can use the callback function to activate something in your system.
Note:
- Your callback is invoked as
callback(*callback_args, method_frame, properties, body). Anything you pass viacallback_argsis prepended;method_frame,properties, andbodyare always supplied by mrsal. - When you pass
payload_model=YourModel,bodyis the validated model instance instead of raw bytes (see §4.5). - We can use pydantic BaseModel classes to enforce types in the body
from pydantic import BaseModel
class ZoomerNRJ(BaseModel):
zoomer_message: str
def consumer_callback_with_delivery_info(
method_frame: pika.spec.Basic.Deliver,
properties: pika.spec.BasicProperties,
body: str
):
if 'Get it' in body:
app_id = properties.app_id
msg_id = properties.message_id
print(f'app_id={app_id}, msg_id={msg_id}')
print('Slay with main character vibe')
else:
raise SadZoomerEnergyError('Zoomer sad now')
mrsal.start_consumer(
queue_name='zoomer_q',
exchange_name='zoomer_x',
callback_args=None, # no need to specifiy if you do not need it
callback=consumer_callback_with_delivery_info,
auto_declare=True,
auto_ack=False
)
Done! Your first message of zommerism has been sent to the zoomer queue on the exchange of Zoomeru in a blocking connection. Lets see how we can do it in async in the next step.
3. Setup and Connect Async
Its usually the best practise to use async consumers if high throughput is expected. We can easily do this by adjusting the code a little bit to fit the framework of async connection in python.
from mrsal.amqp.subclass import MrsalAsyncAMQP
mrsal = MrsalAsyncAMQP(
host=RABBITMQ_DOMAIN, # Use a custom domain if you are using SSL e.g. mrsal.on-example.com
port=int(RABBITMQ_PORT),
credentials=(RABBITMQ_USER, RABBITMQ_PASSWORD),
virtual_host=RABBITMQ_VHOST,
ssl=False # Set this to True for SSL/TLS (you will need to set the cert paths if you do so)
)
# boom you are staged for async connection.
# When done, call await mrsal.close() to clean up — or use an async context manager:
# async with MrsalAsyncAMQP(...) as mrsal:
3.1 Consume
Lets go turbo and set up the consumer in async for efficient AMQP handling
import asyncio
from pydantic import BaseModel
class ZoomerNRJ(BaseModel):
zoomer_message: str
async def consumer_callback_with_delivery_info(
method_frame: pika.spec.Basic.Deliver,
properties: pika.spec.BasicProperties,
body: str
):
if 'Get it' in body:
app_id = properties.app_id
msg_id = properties.message_id
print(f'app_id={app_id}, msg_id={msg_id}')
print('Slay with main character vibe')
else:
raise SadZoomerEnergyError('Zoomer sad now')
asyncio.run(mrsal.start_consumer(
queue_name='zoomer_q',
exchange_name='zoomer_x',
callback_args=None, # no need to specifiy if you do not need it
callback=consumer_callback_with_delivery_info,
auto_declare=True,
auto_ack=False
))
That simple! You have now setups for full advanced message queueing protocols that you can use to promote friendship or other necessary communication between your services in both blocking or async connections.
Note! There are many parameters and settings that you can use to set up a more sophisticated communication protocol in both blocking or async connection with pydantic BaseModels to enforce data types in the expected payload.
4. Advanced Features
4.1 Dead Letter Exchange & Retry Logic with Cycles
Mrsal provides time-delayed retry cycles via a broker-managed <queue>.retry queue, with a terminal <queue>.dlx parking lot for messages that exhaust the retry budget:
mrsal = MrsalBlockingAMQP(
host=RABBITMQ_DOMAIN,
port=int(RABBITMQ_PORT),
credentials=(RABBITMQ_USER, RABBITMQ_PASSWORD),
virtual_host=RABBITMQ_VHOST,
dlx_enable=True, # Default: creates '<exchange>.dlx' (+ '<queue>.retry' when retry cycles are on)
)
# Advanced retry configuration with cycles
mrsal.start_consumer(
queue_name='critical_queue',
exchange_name='critical_exchange',
exchange_type='direct',
routing_key='critical_key',
callback=my_callback,
auto_ack=False, # Required for retry logic
dlx_enable=True, # Enable DLX for this queue
dlx_exchange_name='custom_dlx', # Optional: custom DLX name
dlx_routing_key='dlx_key', # Optional: custom DLX routing
enable_retry_cycles=True, # Enable time-delayed retry cycles
retry_cycle_interval=10, # Minutes between retry cycles
max_retry_time_limit=60, # Total minutes before permanent failure
)
How the advanced retry logic works:
When enable_retry_cycles=True, Mrsal declares two queues alongside the DLX exchange:
<queue>.retry— a delay queue with a broker-sidex-message-ttlofretry_cycle_intervalminutes andx-dead-letter-exchange/x-dead-letter-routing-keyqueue arguments pointing back to the original exchange/routing key. Failed messages land here, sit for the TTL, and are dead-lettered back to the original queue automatically by the broker.<queue>.dlx— the terminal parking lot. Messages whosemax_retry_time_limitis exhausted are published here and stay until a human replays them.
- First failure: Message is published to
<queue>.retrywith retry-tracking headers (x-cycle-count,x-first-failure,x-total-elapsed,x-processing-error). - Retry Cycles: After
retry_cycle_intervalminutes, RabbitMQ dead-letters the message back to the original queue. The consumer reprocesses it; if it fails again andtotal_elapsed < max_retry_time_limit, it cycles again. - Permanent Failure: Once
max_retry_time_limitis exceeded, the message is published to<queue>.dlxwithx-retry-exhausted=Trueand stays there for manual review.
Benefits:
- Handles longer outages with time-delayed cycles
- Full observability with retry tracking
- Manual intervention capability for persistent failures
Operational note:
retry_cycle_intervalis baked into the<queue>.retryqueue declaration asx-message-ttl. Changing it between deployments will trip RabbitMQ'sPRECONDITION_FAILED - inequivalent arg 'x-message-ttl'error on the existing queue, and Mrsal will abort startup withMrsalAbortedSetuprather than silently fall through to a misconfigured retry path. To roll out a new interval: delete the<queue>.retryqueue on the broker, then redeploy.
Constraints (rejected at start_consumer time):
Because the two-queue retry topology relies on the broker honoring distinct binding keys for <queue>.retry and <queue>.dlx, some configurations would silently drop cycled messages and are rejected with MrsalAbortedSetup:
exchange_type='fanout'or'headers'withenable_retry_cycles=True— fanout/headers exchanges ignore routing keys, so cycling and parking collapse into the same queue and exhausted messages re-cycle indefinitely. Useenable_retry_cycles=Falsefor these exchange types (terminal DLX still works).exchange_type='topic'with a wildcardrouting_key(*or#segments) andenable_retry_cycles=True— the broker would dead-letter expired messages back to the original exchange with the literal wildcard string as the routing key, which matches no binding. Use a concrete routing key, orenable_retry_cycles=False.retry_cycle_interval <= 0— produces a zero or negative TTL, which either makes the broker reject the declare or dead-letters every message immediately into a tight retry loop.
4.1.1 auto_ack and reliability
auto_ack=True tells the broker to ack each message at delivery, before the consumer has done anything with it. That means you opt out of every reliability feature in this library:
auto_ack=Trueis incompatible withdlx_enable=True. Once the broker has acked, the message no longer exists from RabbitMQ's perspective — there is nothing left to route to the DLX on failure. Mrsal rejects this combination atstart_consumertime withMrsalAbortedSetup. To useauto_ack=True, passdlx_enable=Falseexplicitly.auto_ack=Trueis incompatible withthreaded=True(blocking consumer). With both flags, the consume loop hands each delivery to theThreadPoolExecutorand immediately moves on — the broker has already acked, soprefetch_countno longer provides backpressure. A slow callback plus a fast broker grows the executor's pending-task queue without bound until the process runs out of memory. Mrsal rejects this combination at setup.auto_ack=Truewithdlx_enable=Falseis allowed and is fire-and-forget: failed callbacks are logged and the message is gone. Use it only when message loss is acceptable.auto_ack=Trueon the async consumer has no broker-side backpressure. AMQP'sno_ack=truemode (whatauto_ack=Truetranslates to on the wire) tells the broker to disregardprefetch_countand push deliveries as fast as the TCP connection allows. aio-pika buffers those internally in an unbounded queue between the broker connection and theasync for ... in iteratorloop, so a slow callback plus a fast broker grows that buffer until the process runs out of memory.max_concurrent_tasksbounds mrsal's task set but does not bound aio-pika's receive buffer. The blocking consumer hits the same shape of bug underthreaded=True, which is why that combination is rejected outright -- the async equivalent is allowed for parity with the original API but should be considered unsafe for production. Useauto_ack=Falseif you need both async and reliability.
For production, use auto_ack=False (the default) so the consumer acks on success and routes failures through DLX/retry as configured.
4.2 Queue Management & Performance
Configure queues for optimal performance and resource management:
mrsal.start_consumer(
queue_name='high_performance_queue',
exchange_name='perf_exchange',
exchange_type='direct',
routing_key='perf_key',
callback=my_callback,
# Queue limits and overflow behavior
max_queue_length=10000, # Max messages before overflow
max_queue_length_bytes=None, # Optional: max queue size in bytes
queue_overflow="drop-head", # "drop-head" or "reject-publish"
# Performance settings
single_active_consumer=False, # Allow parallel processing
lazy_queue=False, # Keep messages in RAM for speed
use_quorum_queues=True, # High availability
# Memory optimization (for low-priority queues)
lazy_queue=True, # Store messages on disk
single_active_consumer=True # Sequential processing
)
Queue Configuration Options:
- `max_queue_length`: Limit queue size to prevent memory issues
- `queue_overflow`:
- `"drop-head"`: Remove oldest messages when full
- `"reject-publish"`: Reject new messages when full
- `single_active_consumer`: Only one consumer processes at a time (good for ordered processing)
- `lazy_queue`: Store messages on disk instead of RAM (memory efficient)
- `use_quorum_queues`: Enhanced durability and performance in clusters
4.3 Quorum Queues
Quorum queues provide better data safety and performance for production environments:
mrsal = MrsalBlockingAMQP(
host=RABBITMQ_DOMAIN,
port=int(RABBITMQ_PORT),
credentials=(RABBITMQ_USER, RABBITMQ_PASSWORD),
virtual_host=RABBITMQ_VHOST,
use_quorum_queues=True # Default: enables quorum queues
)
# Per-queue configuration
mrsal.start_consumer(
queue_name='high_availability_queue',
exchange_name='ha_exchange',
exchange_type='direct',
routing_key='ha_key',
callback=my_callback,
use_quorum_queues=True # This queue will be highly available
)
Benefits:
- Better data replication across RabbitMQ cluster nodes
- Improved performance under high load
- Automatic leader election and failover
- Works great in Kubernetes and bare metal deployments
4.4 Threaded Consumer
For long-running callbacks that would otherwise block the heartbeat, use threaded=True. Messages are processed in a bounded thread pool instead of the main thread:
mrsal.start_consumer(
queue_name='heavy_queue',
exchange_name='heavy_exchange',
exchange_type='direct',
routing_key='heavy_key',
callback=slow_callback,
auto_ack=False,
threaded=True, # Process messages in a thread pool
max_workers=10, # Pool size (defaults to prefetch_count)
)
4.5 Production-Ready Example
from mrsal.amqp.subclass import MrsalBlockingAMQP
from pydantic import BaseModel
class OrderMessage(BaseModel):
order_id: str
customer_id: str
amount: float
def process_order(method_frame, properties, order: OrderMessage):
# When ``payload_model`` is set on ``start_consumer``, mrsal validates the
# raw bytes against the model and passes the validated instance here as the
# third argument. Validation failures are routed to DLX before the callback
# runs, so this function only sees well-formed payloads.
print(f"Processing order {order.order_id} for customer {order.customer_id}")
if order.amount < 0:
raise ValueError("Invalid order amount") # triggers retry/DLX
# Production-ready setup with full retry cycles
mrsal = MrsalBlockingAMQP(
host=RABBITMQ_DOMAIN,
port=int(RABBITMQ_PORT),
credentials=(RABBITMQ_USER, RABBITMQ_PASSWORD),
virtual_host=RABBITMQ_VHOST,
dlx_enable=True, # Automatic DLX for failed orders
use_quorum_queues=True, # High availability
prefetch_count=10 # Process up to 10 messages concurrently
)
mrsal.start_consumer(
queue_name='orders_queue',
exchange_name='orders_exchange',
exchange_type='direct',
routing_key='new_order',
callback=process_order,
payload_model=OrderMessage, # Automatic validation
auto_ack=False, # Manual ack for reliability
auto_declare=True, # Auto-create exchange/queue/DLX
# Advanced retry configuration
enable_retry_cycles=True, # Enable retry cycles
retry_cycle_interval=15, # 15 minutes between cycles
max_retry_time_limit=120, # 2 hours total retry time
# Queue performance settings
max_queue_length=50000, # Handle large order volumes
queue_overflow="reject-publish", # Reject when full (backpressure)
single_active_consumer=False # Parallel processing for speed
)
Note! There are many parameters and settings that you can use to set up a more sophisticated communication protocol in both blocking or async connection with pydantic BaseModels to enforce data types in the expected payload.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mrsal-3.8.4.tar.gz.
File metadata
- Download URL: mrsal-3.8.4.tar.gz
- Upload date:
- Size: 49.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.0 CPython/3.11.15 Linux/6.17.0-1013-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
138f6b66127d15f9f469b49a1f56bb6028e62b52487eda691a198aedf7b3e05b
|
|
| MD5 |
897a16d7058d01dc9d66cfa772a48e7e
|
|
| BLAKE2b-256 |
7eab1d532f4ae39c4a0e370d3d0039bb62a30be4b423164aa48f14443a07ed44
|
File details
Details for the file mrsal-3.8.4-py3-none-any.whl.
File metadata
- Download URL: mrsal-3.8.4-py3-none-any.whl
- Upload date:
- Size: 43.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.0 CPython/3.11.15 Linux/6.17.0-1013-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
affc774b0fd405fc375e4c3e84c43a3d229a41468f411f6edb665674d8e32cb7
|
|
| MD5 |
a2a015b3b1175e473d23b178ca61aedf
|
|
| BLAKE2b-256 |
638fe1fe95d22811a2ee5da1c1fa83d2cabbd073f2b828feb7b540ea72d3d1b4
|