Batch consumer for aio-pika with per-message acknowledgment, retry tracking, dead-letter routing, and worker scaling.
Project description
aio-pika-batch
Batch consumer for aio-pika with per-message acknowledgment, retry tracking, dead-letter routing, and worker scaling.
The problem: No Python library provides a batch consumer for RabbitMQ. aio-pika gives you queue iterators and manual ack. Celery, dramatiq, FastStream, pika — none offer batch processing with per-message result semantics. You end up rebuilding this from scratch every time.
The solution: Subclass BatchConsumer, implement process_messages as an async generator, yield MessageResult per message. The library handles batching, acknowledgment, retries, dead-letter routing, and worker scaling.
Quick Start
pip install aio-pika-batch
from aio_pika_batch import BatchConsumer, BatchConsumerSettings, MessageResult
class MyConsumer(BatchConsumer):
async def process_messages(self, messages):
for msg in messages:
try:
data = json.loads(msg.body)
await publish(data)
yield MessageResult.ACK
except json.JSONDecodeError:
yield MessageResult.NACK # Bad data, send to DLX
except ServiceUnavailable:
yield MessageResult.REQUEUE # Retry later
consumer = MyConsumer(BatchConsumerSettings(
url="amqp://guest:guest@localhost:5672/",
queue_name="my-queue",
exchange_name="my-exchange",
exchange_type="fanout",
batch_size=50,
batch_timeout=2.0,
prefetch_count=300,
num_workers=6,
))
await consumer.run()
Features
| Feature | Description |
|---|---|
| Batch collection | Flush by size or timeout, whichever comes first |
| Per-message results | ACK, NACK, REQUEUE, or REQUEUE_IMMEDIATE per message in a batch |
| Retry tracking | x-retry-count header, configurable max attempts, then DLX |
| Dead-letter routing | Exhausted retries and NACKed messages go to DLX |
| Worker scaling | num_workers=N spawns N independent consumers on the same queue |
| Connection recovery | Exponential backoff reconnection per worker |
| Graceful shutdown | SIGINT/SIGTERM drain the current batch before stopping |
| Lifecycle hooks | on_start() / on_stop() for setup and cleanup |
| Health checks | is_healthy / is_running properties |
| Configuration | pydantic-settings — load from env vars, .env files, or code |
Message Results
| Result | When to use | Broker action |
|---|---|---|
ACK |
Success | Remove from queue |
NACK |
Bad data, don't retry | Send to DLX (or discard) |
REQUEUE |
Transient failure | Republish to back of queue with retry count |
REQUEUE_IMMEDIATE |
Immediate retry (use sparingly) | Native requeue to front, no retry tracking |
How REQUEUE works
REQUEUE republishes the message to the back of the queue with an incremented x-retry-count header. This provides natural backoff — other messages are processed while the failed one waits. When retry_count >= max_requeue_attempts (default: 3), the message is sent to the Dead Letter Exchange instead.
REQUEUE_IMMEDIATE uses RabbitMQ's native nack(requeue=True) which puts the message at the front of the queue with no retry tracking. This can cause infinite retry storms — only use it when you have external retry tracking or need immediate redelivery for very short transient failures.
Chaining Consumers: Fan-In Pipelines
The most powerful pattern is chaining batch consumers into a pipeline. Multiple source consumers read from different exchanges, merge into a single staging queue, and a writer consumer drains to an external sink (Pub/Sub, BigQuery, S3, etc.):
RabbitMQ Exchanges Staging Sink
┌──────────────┐
│ user-events │──┐
├──────────────┤ │ ┌──────────┐ ┌──────────────┐
│ impressions │──┼────────→│ staging │──────→│ Pub/Sub / BQ │
├──────────────┤ │ │ queue │ │ / S3 / DB │
│ search-logs │──┘ └──────────┘ └──────────────┘
└──────────────┘
SourceConsumer (×N) WriterConsumer (×1)
batch_size=50 batch_size=100
num_workers=6 num_workers=4
Each consumer in the chain is a BatchConsumer. The source consumers read from their exchange, wrap the message, and publish to a shared fanout staging exchange. The writer consumer reads from the staging queue and pushes to the final destination:
from aio_pika import ExchangeType, Message
from aio_pika_batch import BatchConsumer, BatchConsumerSettings, MessageResult
STAGING_EXCHANGE = "my-app.staging"
class SourceConsumer(BatchConsumer):
"""Read from a source exchange, forward to staging."""
async def _run_internal(self, channel, queue, settings, stop_event, log):
# Declare the shared staging exchange on our channel
self._staging = await channel.declare_exchange(
STAGING_EXCHANGE, type=ExchangeType.FANOUT, durable=True,
)
await super()._run_internal(channel, queue, settings, stop_event, log)
async def process_messages(self, messages):
for msg in messages:
envelope = json.dumps({
"source": self.settings.exchange_name,
"payload": msg.body.decode(),
}).encode()
await self._staging.publish(Message(body=envelope), routing_key="")
yield MessageResult.ACK
class WriterConsumer(BatchConsumer):
"""Read from staging, write to external sink."""
async def process_messages(self, messages):
# Publish all concurrently
results = await asyncio.gather(
*[write_to_sink(msg.body) for msg in messages],
return_exceptions=True,
)
for result in results:
if isinstance(result, Exception):
yield MessageResult.REQUEUE
else:
yield MessageResult.ACK
# Start source consumers for each exchange
async def main():
exchanges = ["user-events", "impressions", "search-logs"]
consumers = []
for exchange in exchanges:
consumers.append(SourceConsumer(BatchConsumerSettings(
queue_name=f"ingestor:{exchange}",
exchange_name=exchange,
exchange_type="fanout",
batch_size=50, num_workers=6, prefetch_count=300,
)))
# Writer reads from the staging queue
consumers.append(WriterConsumer(BatchConsumerSettings(
queue_name="staging-writer",
exchange_name=STAGING_EXCHANGE,
exchange_type="fanout",
batch_size=100, num_workers=4, prefetch_count=400,
)))
await asyncio.gather(*[c.run() for c in consumers])
This pattern gives you:
- Fan-in: N sources merge into 1 staging queue
- Backpressure: each stage has independent batch size, workers, and prefetch
- Isolation: source failures don't block the writer, writer failures don't block sources
- Per-message guarantees: ACK/NACK/REQUEUE at every stage
See examples/rmq_to_pubsub/ for a complete Google Cloud Pub/Sub implementation of this pattern.
Single Message Consumer
For simple cases where you don't need batching:
from aio_pika_batch import Consumer, ConsumerSettings, MessageResult
class MyConsumer(Consumer):
async def process_message(self, message) -> MessageResult:
data = json.loads(message.body)
await save(data)
return MessageResult.ACK
consumer = MyConsumer(ConsumerSettings(
queue_name="my-queue",
exchange_name="my-exchange",
))
await consumer.run()
Configuration
All settings can be loaded from environment variables via pydantic-settings:
from pydantic_settings import SettingsConfigDict
from aio_pika_batch import BatchConsumerSettings
class MySettings(BatchConsumerSettings):
model_config = SettingsConfigDict(env_prefix="MY_APP_")
# Reads MY_APP_QUEUE_NAME, MY_APP_BATCH_SIZE, etc. from environment
settings = MySettings()
Settings Reference
| Setting | Default | Description |
|---|---|---|
url |
amqp://guest:guest@localhost:5672/ |
AMQP connection URL |
queue_name |
(required) | Queue to consume from |
exchange_name |
None |
Exchange to declare and bind |
exchange_type |
direct |
Exchange type (direct, fanout, topic, headers) |
routing_key |
None |
Routing key for binding |
prefetch_count |
10 |
QoS prefetch per channel |
num_workers |
1 |
Parallel consumer workers |
batch_size |
10 |
Max messages per batch |
batch_timeout |
5.0 |
Seconds before flushing incomplete batch |
max_requeue_attempts |
3 |
Retries before sending to DLX |
unhandled_exception_action |
REQUEUE |
Default action for unhandled exceptions |
queue_durable |
True |
Queue survives broker restarts |
exchange_durable |
True |
Exchange survives broker restarts |
queue_arguments |
None |
Extra args (TTL, DLX, etc.) |
Tuning Tips
- Set
prefetch_count >= batch_size * num_workersso the broker delivers enough messages to fill batches batch_timeoutcontrols latency for low-traffic periods — lower means faster flushes of partial batchesnum_workersscales consumption linearly up to the point where the broker or downstream becomes the bottleneck
Lifecycle
Three ways to run:
# 1. run() — blocks with signal handling
await consumer.run()
# 2. start()/stop() — manual control
await consumer.start()
# ... later ...
await consumer.stop()
# 3. Async context manager
async with consumer:
await asyncio.Event().wait()
Hooks
Override on_start and on_stop for setup/cleanup:
class MyConsumer(BatchConsumer):
async def on_start(self):
self.http = aiohttp.ClientSession()
async def on_stop(self):
await self.http.close()
async def process_messages(self, messages):
...
Comparison
| Feature | aio-pika-batch | aio-pika | MassTransit (.NET) | Spring AMQP (Java) |
|---|---|---|---|---|
| Batch consumer | Yes | No | Yes | Yes |
| Per-message results in batch | Yes | N/A | No (atomic) | No (atomic) |
| Retry tracking | x-retry-count header |
Manual | Built-in | Built-in |
| DLX routing | Automatic | Manual | Automatic | Automatic |
| Worker scaling | num_workers |
Manual | Concurrent consumers | concurrentConsumers |
| Async Python | Yes | Yes | No (C#) | No (Java) |
Examples
examples/simple/— Single-message consumerexamples/batch/— Batch consumer with concurrent processingexamples/rmq_to_pubsub/— Full RabbitMQ → Google Cloud Pub/Sub pipeline
Requirements
- Python 3.12+
- RabbitMQ 3.8+
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aio_pika_batch-0.1.0.tar.gz.
File metadata
- Download URL: aio_pika_batch-0.1.0.tar.gz
- Upload date:
- Size: 22.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fa0f6d6eef01e984bfc644ccffc2b257dcf16e76ade8aa093fd2632f71bea133
|
|
| MD5 |
0244b2d6949b4038d92d6a245252afe4
|
|
| BLAKE2b-256 |
c308b8cdb609bd6975a53f2ab31e91efd17eef1816a35b88c24d2ab47b272464
|
File details
Details for the file aio_pika_batch-0.1.0-py3-none-any.whl.
File metadata
- Download URL: aio_pika_batch-0.1.0-py3-none-any.whl
- Upload date:
- Size: 17.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
418bc6d23273728c4953d2d18dbaff5b62d85ef9b2a324b3402be2e7ca8a8eb2
|
|
| MD5 |
6f55d7f1bff1faa95ad33abc3556bbbc
|
|
| BLAKE2b-256 |
19e9d12e976a56f5dae0491c83586fd1f11848ddb0030c2c9ac1da5fc7799612
|