Skip to main content

Asynchronous In-Memory Batch Queue Processor

Project description

Beque - Asynchronous In-Memory Batch Queue Processor

Beque (pronounced "Beck") is a lightweight, high-performance Python library for asynchronous batch processing. It accumulates items in memory and flushes them to an async sink when either:

  • max_batch_size items are queued, or
  • flush_interval seconds have passed since the last successful flush

Flushing is serialized and items are never lost: on failure, the batch is re-queued in original order.

Features

  • Async/await support: Built for modern Python asyncio applications
  • Batch processing: Efficiently processes items in configurable batches
  • Time-based flushing: Automatic flushing based on configurable intervals
  • Error resilience: Failed batches are re-queued and retried
  • Thread-safe: Safe for concurrent access across async tasks
  • Generic typing: Full type safety with Python's generic typing system
  • Comprehensive stats: Built-in statistics and monitoring
  • Zero dependencies: No external dependencies required

Installation

pip install beque

Quick Start

import asyncio
from beque import Beque

async def process_batch(items):
    """Your async batch processing function."""
    print(f"Processing {len(items)} items: {items}")
    # Simulate async work (database write, API call, etc.)
    await asyncio.sleep(0.1)

async def main():
    # Create a Beque that flushes every 5 items or every 10 seconds
    async with Beque(
        on_flush=process_batch, 
        max_batch_size=5, 
        flush_interval=10.0
    ) as queue:
        
        # Add items to the queue
        for i in range(12):
            await queue.add(f"item-{i}")
            await asyncio.sleep(0.5)
        
        # Items are automatically flushed in batches
        # Final flush happens when exiting the context manager

asyncio.run(main())

API Reference

Beque Class

class Beque(Generic[T]):
    def __init__(
        self,
        *,
        on_flush: Callable[[List[T]], Awaitable[None]],
        max_batch_size: int = 100,
        flush_interval: float = 10.0,
        name: str = "Beque",
        logger: logging.Logger = None,
    )

Parameters:

  • on_flush: Async function that processes batches of items
  • max_batch_size: Maximum items in a batch before auto-flush (default: 100)
  • flush_interval: Seconds between time-based flushes (default: 10.0)
  • name: Name for logging and identification (default: "Beque")
  • logger: Custom logger instance (optional)

Methods

async add(item: T) -> None

Add a single item to the queue.

async add_many(items: List[T]) -> None

Add multiple items to the queue atomically.

async flush(*, force: bool = True) -> None

Manually trigger a flush. If force=True, flushes all queued items.

stats -> dict

Get current statistics:

{
    "flushes": int,      # Total successful flushes
    "items": int,        # Total items processed  
    "failures": int,     # Total flush failures
    "queued": int,       # Current items in queue
    "last_flush_time": float,  # Timestamp of last flush
    "running": bool      # Whether queue is active
}

Context Manager Usage

Beque is designed to be used as an async context manager:

async with Beque(on_flush=handler) as queue:
    await queue.add(item)
    # Automatic cleanup and final flush on exit

Or manually:

queue = Beque(on_flush=handler)
await queue.start()
try:
    await queue.add(item)
finally:
    await queue.stop()  # Ensures final flush

Advanced Examples

Database Batch Inserts

import asyncio
from beque import Beque

class DatabaseWriter:
    async def write_users(self, user_batch):
        # Simulate batch database insert
        print(f"INSERT INTO users VALUES {user_batch}")
        await asyncio.sleep(0.1)  # Simulated I/O

async def main():
    db = DatabaseWriter()
    
    async with Beque(
        on_flush=db.write_users,
        max_batch_size=10,
        flush_interval=5.0
    ) as user_queue:
        
        # Simulate receiving user data
        for i in range(25):
            user_data = {"id": i, "name": f"user-{i}"}
            await user_queue.add(user_data)
            await asyncio.sleep(0.2)

asyncio.run(main())

Error Handling and Recovery

import asyncio
import random
from beque import Beque

async def flaky_processor(batch):
    """Processor that occasionally fails."""
    if random.random() < 0.3:  # 30% failure rate
        raise Exception("Processing failed!")
    
    print(f"Successfully processed: {batch}")
    await asyncio.sleep(0.1)

async def main():
    async with Beque(
        on_flush=flaky_processor,
        max_batch_size=3,
        flush_interval=2.0
    ) as queue:
        
        for i in range(10):
            await queue.add(f"task-{i}")
            await asyncio.sleep(0.5)
        
        # Check stats to see failure/retry information
        print("Final stats:", queue.stats)

asyncio.run(main())

Multiple Concurrent Producers

import asyncio
from beque import Beque

async def log_processor(batch):
    print(f"Logged {len(batch)} events: {batch}")
    await asyncio.sleep(0.05)

async def producer(queue, producer_id):
    """Simulate concurrent event producers."""
    for i in range(10):
        event = f"producer-{producer_id}-event-{i}"
        await queue.add(event)
        await asyncio.sleep(0.1)

async def main():
    async with Beque(
        on_flush=log_processor,
        max_batch_size=5,
        flush_interval=1.0
    ) as event_queue:
        
        # Start multiple concurrent producers
        await asyncio.gather(
            producer(event_queue, 1),
            producer(event_queue, 2),
            producer(event_queue, 3),
        )

asyncio.run(main())

Type Safety

Beque is fully typed and supports generic type parameters:

from beque import Beque
from typing import Dict

async def process_dicts(batch: List[Dict[str, int]]) -> None:
    for item in batch:
        print(f"Processing: {item}")

# Type-safe queue for dictionaries
queue: Beque[Dict[str, int]] = Beque(on_flush=process_dicts)

Performance Characteristics

  • Memory: O(n) where n is the number of queued items
  • Throughput: Optimized for high-frequency additions with batched processing
  • Latency: Configurable via flush_interval and max_batch_size
  • Concurrency: Thread-safe with asyncio locks, supports many concurrent producers

Error Handling

Beque provides robust error handling:

  1. Flush failures: Batches are re-queued in original order
  2. Automatic retry: Failed batches will be retried on next flush opportunity
  3. Graceful shutdown: Context manager ensures final flush even on exceptions
  4. Statistics tracking: Monitor success/failure rates via stats property

Logging

Beque provides structured logging at various levels:

  • INFO: Start/stop events with configuration
  • DEBUG: Individual flush operations
  • ERROR: Flush failures with full context
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("my_app.queue")

async with Beque(on_flush=handler, logger=logger) as queue:
    # Custom logger will be used for all queue events
    pass

Requirements

  • Python 3.8+
  • No external dependencies

License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

beque-0.1.0.tar.gz (30.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

beque-0.1.0-py3-none-any.whl (6.7 kB view details)

Uploaded Python 3

File details

Details for the file beque-0.1.0.tar.gz.

File metadata

  • Download URL: beque-0.1.0.tar.gz
  • Upload date:
  • Size: 30.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for beque-0.1.0.tar.gz
Algorithm Hash digest
SHA256 8c87e0810dfab3b54e02ad6962bc1296673413f7b7a15cfc5c7be653d4c6c2ef
MD5 0557784433795a9824b57ad7dbebc0c7
BLAKE2b-256 3458fc9dcecf714166b14f252936cf56e87d5105302d09ef12fe8f096c6851e7

See more details on using hashes here.

Provenance

The following attestation bundles were made for beque-0.1.0.tar.gz:

Publisher: publish.yaml on punitarani/beque

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file beque-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: beque-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 6.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for beque-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 99f48d23405fb48de58a882970a48bf9433ccb9d769ca9c9269ac95163f84e76
MD5 83775ca0aab69f7139ca75ddd4c0335e
BLAKE2b-256 6dacd397e332d31620ffa9a082ac47238242eb529aafd84b800fe4b429d31326

See more details on using hashes here.

Provenance

The following attestation bundles were made for beque-0.1.0-py3-none-any.whl:

Publisher: publish.yaml on punitarani/beque

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page