Skip to main content

Batch consumer for aio-pika with per-message acknowledgment, retry tracking, dead-letter routing, and worker scaling.

Project description

aio-pika-batch

Batch consumer for aio-pika with per-message acknowledgment, retry tracking, dead-letter routing, and worker scaling.

The problem: No Python library provides a batch consumer for RabbitMQ. aio-pika gives you queue iterators and manual ack. Celery, dramatiq, FastStream, pika — none offer batch processing with per-message result semantics. You end up rebuilding this from scratch every time.

The solution: Subclass BatchConsumer, implement process_messages as an async generator, yield MessageResult per message. The library handles batching, acknowledgment, retries, dead-letter routing, and worker scaling.

Quick Start

pip install aio-pika-batch
from aio_pika_batch import BatchConsumer, BatchConsumerSettings, MessageResult

class MyConsumer(BatchConsumer):
    async def process_messages(self, messages):
        for msg in messages:
            try:
                data = json.loads(msg.body)
                await publish(data)
                yield MessageResult.ACK
            except json.JSONDecodeError:
                yield MessageResult.NACK  # Bad data, send to DLX
            except ServiceUnavailable:
                yield MessageResult.REQUEUE  # Retry later

consumer = MyConsumer(BatchConsumerSettings(
    url="amqp://guest:guest@localhost:5672/",
    queue_name="my-queue",
    exchange_name="my-exchange",
    exchange_type="fanout",
    batch_size=50,
    batch_timeout=2.0,
    prefetch_count=300,
    num_workers=6,
))
await consumer.run()

Features

Feature Description
Batch collection Flush by size or timeout, whichever comes first
Per-message results ACK, NACK, REQUEUE, or REQUEUE_IMMEDIATE per message in a batch
Retry tracking x-retry-count header, configurable max attempts, then DLX
Dead-letter routing Exhausted retries and NACKed messages go to DLX
Worker scaling num_workers=N spawns N independent consumers on the same queue
Connection recovery Exponential backoff reconnection per worker
Graceful shutdown SIGINT/SIGTERM drain the current batch before stopping
Lifecycle hooks on_start() / on_stop() for setup and cleanup
Health checks is_healthy / is_running properties
Configuration pydantic-settings — load from env vars, .env files, or code

Message Results

Result When to use Broker action
ACK Success Remove from queue
NACK Bad data, don't retry Send to DLX (or discard)
REQUEUE Transient failure Republish to back of queue with retry count
REQUEUE_IMMEDIATE Immediate retry (use sparingly) Native requeue to front, no retry tracking

How REQUEUE works

REQUEUE republishes the message to the back of the queue with an incremented x-retry-count header. This provides natural backoff — other messages are processed while the failed one waits. When retry_count >= max_requeue_attempts (default: 3), the message is sent to the Dead Letter Exchange instead.

REQUEUE_IMMEDIATE uses RabbitMQ's native nack(requeue=True) which puts the message at the front of the queue with no retry tracking. This can cause infinite retry storms — only use it when you have external retry tracking or need immediate redelivery for very short transient failures.

Chaining Consumers: Fan-In Pipelines

The most powerful pattern is chaining batch consumers into a pipeline. Multiple source consumers read from different exchanges, merge into a single staging queue, and a writer consumer drains to an external sink (Pub/Sub, BigQuery, S3, etc.):

RabbitMQ Exchanges              Staging              Sink
┌──────────────┐
│ user-events  │──┐
├──────────────┤  │         ┌──────────┐       ┌──────────────┐
│ impressions  │──┼────────→│ staging  │──────→│ Pub/Sub / BQ │
├──────────────┤  │         │  queue   │       │  / S3 / DB   │
│ search-logs  │──┘         └──────────┘       └──────────────┘
└──────────────┘
  SourceConsumer (×N)      WriterConsumer (×1)
  batch_size=50             batch_size=100
  num_workers=6             num_workers=4

Each consumer in the chain is a BatchConsumer. The source consumers read from their exchange, wrap the message, and publish to a shared fanout staging exchange. The writer consumer reads from the staging queue and pushes to the final destination:

from aio_pika import ExchangeType, Message
from aio_pika_batch import BatchConsumer, BatchConsumerSettings, MessageResult

STAGING_EXCHANGE = "my-app.staging"

class SourceConsumer(BatchConsumer):
    """Read from a source exchange, forward to staging."""

    async def _run_internal(self, channel, queue, settings, stop_event, log):
        # Declare the shared staging exchange on our channel
        self._staging = await channel.declare_exchange(
            STAGING_EXCHANGE, type=ExchangeType.FANOUT, durable=True,
        )
        await super()._run_internal(channel, queue, settings, stop_event, log)

    async def process_messages(self, messages):
        for msg in messages:
            envelope = json.dumps({
                "source": self.settings.exchange_name,
                "payload": msg.body.decode(),
            }).encode()
            await self._staging.publish(Message(body=envelope), routing_key="")
            yield MessageResult.ACK

class WriterConsumer(BatchConsumer):
    """Read from staging, write to external sink."""

    async def process_messages(self, messages):
        # Publish all concurrently
        results = await asyncio.gather(
            *[write_to_sink(msg.body) for msg in messages],
            return_exceptions=True,
        )
        for result in results:
            if isinstance(result, Exception):
                yield MessageResult.REQUEUE
            else:
                yield MessageResult.ACK

# Start source consumers for each exchange
async def main():
    exchanges = ["user-events", "impressions", "search-logs"]
    consumers = []

    for exchange in exchanges:
        consumers.append(SourceConsumer(BatchConsumerSettings(
            queue_name=f"ingestor:{exchange}",
            exchange_name=exchange,
            exchange_type="fanout",
            batch_size=50, num_workers=6, prefetch_count=300,
        )))

    # Writer reads from the staging queue
    consumers.append(WriterConsumer(BatchConsumerSettings(
        queue_name="staging-writer",
        exchange_name=STAGING_EXCHANGE,
        exchange_type="fanout",
        batch_size=100, num_workers=4, prefetch_count=400,
    )))

    await asyncio.gather(*[c.run() for c in consumers])

This pattern gives you:

  • Fan-in: N sources merge into 1 staging queue
  • Backpressure: each stage has independent batch size, workers, and prefetch
  • Isolation: source failures don't block the writer, writer failures don't block sources
  • Per-message guarantees: ACK/NACK/REQUEUE at every stage

See examples/rmq_to_pubsub/ for a complete Google Cloud Pub/Sub implementation of this pattern.

Single Message Consumer

For simple cases where you don't need batching:

from aio_pika_batch import Consumer, ConsumerSettings, MessageResult

class MyConsumer(Consumer):
    async def process_message(self, message) -> MessageResult:
        data = json.loads(message.body)
        await save(data)
        return MessageResult.ACK

consumer = MyConsumer(ConsumerSettings(
    queue_name="my-queue",
    exchange_name="my-exchange",
))
await consumer.run()

Configuration

All settings can be loaded from environment variables via pydantic-settings:

from pydantic_settings import SettingsConfigDict
from aio_pika_batch import BatchConsumerSettings

class MySettings(BatchConsumerSettings):
    model_config = SettingsConfigDict(env_prefix="MY_APP_")

# Reads MY_APP_QUEUE_NAME, MY_APP_BATCH_SIZE, etc. from environment
settings = MySettings()

Settings Reference

Setting Default Description
url amqp://guest:guest@localhost:5672/ AMQP connection URL
queue_name (required) Queue to consume from
exchange_name None Exchange to declare and bind
exchange_type direct Exchange type (direct, fanout, topic, headers)
routing_key None Routing key for binding
prefetch_count 10 QoS prefetch per channel
num_workers 1 Parallel consumer workers
batch_size 10 Max messages per batch
batch_timeout 5.0 Seconds before flushing incomplete batch
max_requeue_attempts 3 Retries before sending to DLX
unhandled_exception_action REQUEUE Default action for unhandled exceptions
queue_durable True Queue survives broker restarts
exchange_durable True Exchange survives broker restarts
queue_arguments None Extra args (TTL, DLX, etc.)

Tuning Tips

  • Set prefetch_count >= batch_size * num_workers so the broker delivers enough messages to fill batches
  • batch_timeout controls latency for low-traffic periods — lower means faster flushes of partial batches
  • num_workers scales consumption linearly up to the point where the broker or downstream becomes the bottleneck

Lifecycle

Three ways to run:

# 1. run() — blocks with signal handling
await consumer.run()

# 2. start()/stop() — manual control
await consumer.start()
# ... later ...
await consumer.stop()

# 3. Async context manager
async with consumer:
    await asyncio.Event().wait()

Hooks

Override on_start and on_stop for setup/cleanup:

class MyConsumer(BatchConsumer):
    async def on_start(self):
        self.http = aiohttp.ClientSession()

    async def on_stop(self):
        await self.http.close()

    async def process_messages(self, messages):
        ...

Comparison

Feature aio-pika-batch aio-pika MassTransit (.NET) Spring AMQP (Java)
Batch consumer Yes No Yes Yes
Per-message results in batch Yes N/A No (atomic) No (atomic)
Retry tracking x-retry-count header Manual Built-in Built-in
DLX routing Automatic Manual Automatic Automatic
Worker scaling num_workers Manual Concurrent consumers concurrentConsumers
Async Python Yes Yes No (C#) No (Java)

Examples

Requirements

  • Python 3.12+
  • RabbitMQ 3.8+

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio_pika_batch-0.1.0.tar.gz (22.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aio_pika_batch-0.1.0-py3-none-any.whl (17.7 kB view details)

Uploaded Python 3

File details

Details for the file aio_pika_batch-0.1.0.tar.gz.

File metadata

  • Download URL: aio_pika_batch-0.1.0.tar.gz
  • Upload date:
  • Size: 22.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.6

File hashes

Hashes for aio_pika_batch-0.1.0.tar.gz
Algorithm Hash digest
SHA256 fa0f6d6eef01e984bfc644ccffc2b257dcf16e76ade8aa093fd2632f71bea133
MD5 0244b2d6949b4038d92d6a245252afe4
BLAKE2b-256 c308b8cdb609bd6975a53f2ab31e91efd17eef1816a35b88c24d2ab47b272464

See more details on using hashes here.

File details

Details for the file aio_pika_batch-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: aio_pika_batch-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 17.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.6

File hashes

Hashes for aio_pika_batch-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 418bc6d23273728c4953d2d18dbaff5b62d85ef9b2a324b3402be2e7ca8a8eb2
MD5 6f55d7f1bff1faa95ad33abc3556bbbc
BLAKE2b-256 19e9d12e976a56f5dae0491c83586fd1f11848ddb0030c2c9ac1da5fc7799612

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page