Asynchronous In-Memory Batch Queue Processor
Project description
Beque - Asynchronous In-Memory Batch Queue Processor
Beque (pronounced "Beck") is a lightweight, high-performance Python library for asynchronous batch processing. It accumulates items in memory and flushes them to an async sink when either:
max_batch_sizeitems are queued, orflush_intervalseconds have passed since the last successful flush
Flushing is serialized and items are never lost: on failure, the batch is re-queued in original order.
Features
- Async/await support: Built for modern Python asyncio applications
- Batch processing: Efficiently processes items in configurable batches
- Time-based flushing: Automatic flushing based on configurable intervals
- Error resilience: Failed batches are re-queued and retried
- Thread-safe: Safe for concurrent access across async tasks
- Generic typing: Full type safety with Python's generic typing system
- Comprehensive stats: Built-in statistics and monitoring
- Zero dependencies: No external dependencies required
Installation
pip install beque
Quick Start
import asyncio
from beque import Beque
async def process_batch(items):
"""Your async batch processing function."""
print(f"Processing {len(items)} items: {items}")
# Simulate async work (database write, API call, etc.)
await asyncio.sleep(0.1)
async def main():
# Create a Beque that flushes every 5 items or every 10 seconds
async with Beque(
on_flush=process_batch,
max_batch_size=5,
flush_interval=10.0
) as queue:
# Add items to the queue
for i in range(12):
await queue.add(f"item-{i}")
await asyncio.sleep(0.5)
# Items are automatically flushed in batches
# Final flush happens when exiting the context manager
asyncio.run(main())
API Reference
Beque Class
class Beque(Generic[T]):
def __init__(
self,
*,
on_flush: Callable[[List[T]], Awaitable[None]],
max_batch_size: int = 100,
flush_interval: float = 10.0,
name: str = "Beque",
logger: logging.Logger = None,
)
Parameters:
on_flush: Async function that processes batches of itemsmax_batch_size: Maximum items in a batch before auto-flush (default: 100)flush_interval: Seconds between time-based flushes (default: 10.0)name: Name for logging and identification (default: "Beque")logger: Custom logger instance (optional)
Methods
async add(item: T) -> None
Add a single item to the queue.
async add_many(items: List[T]) -> None
Add multiple items to the queue atomically.
async flush(*, force: bool = True) -> None
Manually trigger a flush. If force=True, flushes all queued items.
stats -> dict
Get current statistics:
{
"flushes": int, # Total successful flushes
"items": int, # Total items processed
"failures": int, # Total flush failures
"queued": int, # Current items in queue
"last_flush_time": float, # Timestamp of last flush
"running": bool # Whether queue is active
}
Context Manager Usage
Beque is designed to be used as an async context manager:
async with Beque(on_flush=handler) as queue:
await queue.add(item)
# Automatic cleanup and final flush on exit
Or manually:
queue = Beque(on_flush=handler)
await queue.start()
try:
await queue.add(item)
finally:
await queue.stop() # Ensures final flush
Advanced Examples
Database Batch Inserts
import asyncio
from beque import Beque
class DatabaseWriter:
async def write_users(self, user_batch):
# Simulate batch database insert
print(f"INSERT INTO users VALUES {user_batch}")
await asyncio.sleep(0.1) # Simulated I/O
async def main():
db = DatabaseWriter()
async with Beque(
on_flush=db.write_users,
max_batch_size=10,
flush_interval=5.0
) as user_queue:
# Simulate receiving user data
for i in range(25):
user_data = {"id": i, "name": f"user-{i}"}
await user_queue.add(user_data)
await asyncio.sleep(0.2)
asyncio.run(main())
Error Handling and Recovery
import asyncio
import random
from beque import Beque
async def flaky_processor(batch):
"""Processor that occasionally fails."""
if random.random() < 0.3: # 30% failure rate
raise Exception("Processing failed!")
print(f"Successfully processed: {batch}")
await asyncio.sleep(0.1)
async def main():
async with Beque(
on_flush=flaky_processor,
max_batch_size=3,
flush_interval=2.0
) as queue:
for i in range(10):
await queue.add(f"task-{i}")
await asyncio.sleep(0.5)
# Check stats to see failure/retry information
print("Final stats:", queue.stats)
asyncio.run(main())
Multiple Concurrent Producers
import asyncio
from beque import Beque
async def log_processor(batch):
print(f"Logged {len(batch)} events: {batch}")
await asyncio.sleep(0.05)
async def producer(queue, producer_id):
"""Simulate concurrent event producers."""
for i in range(10):
event = f"producer-{producer_id}-event-{i}"
await queue.add(event)
await asyncio.sleep(0.1)
async def main():
async with Beque(
on_flush=log_processor,
max_batch_size=5,
flush_interval=1.0
) as event_queue:
# Start multiple concurrent producers
await asyncio.gather(
producer(event_queue, 1),
producer(event_queue, 2),
producer(event_queue, 3),
)
asyncio.run(main())
Type Safety
Beque is fully typed and supports generic type parameters:
from beque import Beque
from typing import Dict
async def process_dicts(batch: List[Dict[str, int]]) -> None:
for item in batch:
print(f"Processing: {item}")
# Type-safe queue for dictionaries
queue: Beque[Dict[str, int]] = Beque(on_flush=process_dicts)
Performance Characteristics
- Memory: O(n) where n is the number of queued items
- Throughput: Optimized for high-frequency additions with batched processing
- Latency: Configurable via
flush_intervalandmax_batch_size - Concurrency: Thread-safe with asyncio locks, supports many concurrent producers
Error Handling
Beque provides robust error handling:
- Flush failures: Batches are re-queued in original order
- Automatic retry: Failed batches will be retried on next flush opportunity
- Graceful shutdown: Context manager ensures final flush even on exceptions
- Statistics tracking: Monitor success/failure rates via
statsproperty
Logging
Beque provides structured logging at various levels:
- INFO: Start/stop events with configuration
- DEBUG: Individual flush operations
- ERROR: Flush failures with full context
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("my_app.queue")
async with Beque(on_flush=handler, logger=logger) as queue:
# Custom logger will be used for all queue events
pass
Requirements
- Python 3.8+
- No external dependencies
License
MIT License - see LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file beque-0.1.0.tar.gz.
File metadata
- Download URL: beque-0.1.0.tar.gz
- Upload date:
- Size: 30.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8c87e0810dfab3b54e02ad6962bc1296673413f7b7a15cfc5c7be653d4c6c2ef
|
|
| MD5 |
0557784433795a9824b57ad7dbebc0c7
|
|
| BLAKE2b-256 |
3458fc9dcecf714166b14f252936cf56e87d5105302d09ef12fe8f096c6851e7
|
Provenance
The following attestation bundles were made for beque-0.1.0.tar.gz:
Publisher:
publish.yaml on punitarani/beque
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
beque-0.1.0.tar.gz -
Subject digest:
8c87e0810dfab3b54e02ad6962bc1296673413f7b7a15cfc5c7be653d4c6c2ef - Sigstore transparency entry: 451477752
- Sigstore integration time:
-
Permalink:
punitarani/beque@5b7231f5aa5b222621b06e197c4b6e836540a65c -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/punitarani
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@5b7231f5aa5b222621b06e197c4b6e836540a65c -
Trigger Event:
release
-
Statement type:
File details
Details for the file beque-0.1.0-py3-none-any.whl.
File metadata
- Download URL: beque-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
99f48d23405fb48de58a882970a48bf9433ccb9d769ca9c9269ac95163f84e76
|
|
| MD5 |
83775ca0aab69f7139ca75ddd4c0335e
|
|
| BLAKE2b-256 |
6dacd397e332d31620ffa9a082ac47238242eb529aafd84b800fe4b429d31326
|
Provenance
The following attestation bundles were made for beque-0.1.0-py3-none-any.whl:
Publisher:
publish.yaml on punitarani/beque
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
beque-0.1.0-py3-none-any.whl -
Subject digest:
99f48d23405fb48de58a882970a48bf9433ccb9d769ca9c9269ac95163f84e76 - Sigstore transparency entry: 451477755
- Sigstore integration time:
-
Permalink:
punitarani/beque@5b7231f5aa5b222621b06e197c4b6e836540a65c -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/punitarani
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@5b7231f5aa5b222621b06e197c4b6e836540a65c -
Trigger Event:
release
-
Statement type: