Skip to main content

A service to batch your http requests.

Project description

AsyncBatcher - Asynchronous Batching for Python

Overview

AsyncBatcher is a generic, asynchronous batch processor for Python that efficiently groups incoming items into batches and processes them asynchronously. It is designed for scenarios where multiple requests or tasks need to be handled in batches to improve efficiency and throughput.

Key Features

  • Asynchronous processing: Uses asyncio for non-blocking execution.
  • Batching mechanism: Groups items into batches based on size or time constraints.
  • Concurrency control: Limits the number of concurrent batch executions.
  • Custom processing logic: Users must implement the process_batch method to define batch behavior.
  • Queue management: Uses an asyncio.Queue to manage incoming items.
  • Error handling: Ensures robust error reporting and handling.

How it works

1. Receiving Items for Processing

  • Users call process(item), which adds the item to an internal queue.
  • A Future object is returned immediately and the result is awaited asynchronously.

2. Queue Management and Batching

  • A background task (run()) continuously monitors the queue.
  • Items are collected into batches based on:
    • max_batch_size: Maximum items per batch.
    • max_queue_time: Maximum time an item can wait before being processed.
  • Once a batch is ready, it is passed to the processing function.

3. Processing the Batch

  • If process_batch is asynchronous, it is awaited directly.
  • If process_batch is synchronous, it runs inside an Executor.
  • Each item’s future is resolved with the corresponding processed result.

4. Concurrency Control

  • If concurrency > 0, a semaphore ensures that only a limited number of batches are processed simultaneously.
  • Otherwise, all batches run concurrently.

5. Stopping the Batcher

  • Calling stop(force=True) cancels all ongoing tasks.
  • Calling stop(force=False) waits for pending items to be processed before shutting down.
sequenceDiagram
    participant User
    participant AsyncBatcher
    participant Queue as asyncio.Queue
    participant RunLoop
    participant Semaphore
    participant BatchProcessor

    User->>AsyncBatcher: process(item)
    activate AsyncBatcher
    AsyncBatcher->>Queue: put(QueueItem(item, future))
    AsyncBatcher-->>User: returns future
    deactivate AsyncBatcher

    Note over AsyncBatcher: Starts RunLoop on first process()

    loop Run Loop (run() method)
        RunLoop->>Queue: Collect items (max_batch_size/max_queue_time)
        activate Queue
        Queue-->>RunLoop: Batch [QueueItem1, QueueItem2...]
        deactivate Queue

        alt Concurrency Limited (concurrency > 0)
            RunLoop->>Semaphore: acquire()
            activate Semaphore
            Semaphore-->>RunLoop: acquired
            deactivate Semaphore
        end

        RunLoop->>BatchProcessor: create_task(_batch_run(batch))
        activate BatchProcessor

        alt Async process_batch
            BatchProcessor->>AsyncBatcher: await process_batch(batch)
        else Sync process_batch
            BatchProcessor->>Executor: run_in_executor(process_batch)
        end

        AsyncBatcher-->>BatchProcessor: results [S1, S2...]
        BatchProcessor->>QueueItem1.future: set_result(S1)
        BatchProcessor->>QueueItem2.future: set_result(S2)
        deactivate BatchProcessor

        alt Concurrency Limited
            RunLoop->>Semaphore: release()
        end
    end

    Note over User, BatchProcessor: User's await future gets resolved

How to use

To use the library, you need to install the package in your environment. You can install the package using pip:

pip install async-batcher

Then, you can create a new AsyncBatcher class by implementing the process_batch method:

import asyncio
import logging

from async_batcher.batcher import AsyncBatcher

class MyBatchProcessor(AsyncBatcher[int, int]):
    async def process_batch(self, batch: list[int]) -> list[int]:
        await asyncio.sleep(1)  # Simulate processing delay
        return [x * 2 for x in batch]  # Example: Doubling each item

async def main():
    batcher = MyBatchProcessor(max_batch_size=5, max_queue_time=2.0, concurrency=2)
    results = await asyncio.gather(*[batcher.process(i) for i in range(10)])
    print(results)  # Output: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    await batcher.stop()

# Set logging level to DEBUG if you want to see more details and understand the flow
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())

Benchmark

The benchmark is available in the BENCHMARK.md file.

When to Use AsyncBatcher?

The AsyncBatcher library is ideal for applications that need to efficiently handle asynchronous requests in batches, such as:

Machine Learning Model Serving

  • Batch-processing requests to optimize inference performance (e.g., TensorFlow, PyTorch, Scikit-learn).

Database Bulk Operations

  • Inserting multiple records in a single query to improve I/O efficiency and reduce costs (e.g., PostgreSQL, MySQL, AWS DynamoDB).

Messaging and Network Optimization

  • Sending multiple messages in a single API call to reduce latency and costs (e.g., Kafka, RabbitMQ, AWS SQS, AWS SNS).

Rate-Limited API Calls

  • Aggregating requests to comply with API rate limits (e.g., GitHub API, Twitter API, OpenAI API).

Final Notes

  • Implement process_batch according to your needs.
  • Ensure max_batch_size and max_queue_time are configured based on performance requirements.
  • Handle exceptions inside process_batch to avoid failures affecting other tasks.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_batcher-0.2.2.tar.gz (14.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_batcher-0.2.2-py3-none-any.whl (18.5 kB view details)

Uploaded Python 3

File details

Details for the file async_batcher-0.2.2.tar.gz.

File metadata

  • Download URL: async_batcher-0.2.2.tar.gz
  • Upload date:
  • Size: 14.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for async_batcher-0.2.2.tar.gz
Algorithm Hash digest
SHA256 9c97585245efacd4d97192f6b8e42d6d0de5f91f6b277942072a83f690f38db8
MD5 0c2e0410b8c1d16f8abae6314ef28c0c
BLAKE2b-256 1bd6ffa5349299aadc4e513f3f12da9751d37342ee8294a04c240697ad3e9eff

See more details on using hashes here.

Provenance

The following attestation bundles were made for async_batcher-0.2.2.tar.gz:

Publisher: release.yml on hussein-awala/async-batcher

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file async_batcher-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: async_batcher-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 18.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.8

File hashes

Hashes for async_batcher-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 4890f49166bfdc41136ca11cfb68415b78cf2bddef2b1c39c4782b312ba049ca
MD5 e34b44674eb54525d972966465eea298
BLAKE2b-256 be0e4a7389df44cebd8d674b2ee011bf44a7efd64a76047cea930904eea0a387

See more details on using hashes here.

Provenance

The following attestation bundles were made for async_batcher-0.2.2-py3-none-any.whl:

Publisher: release.yml on hussein-awala/async-batcher

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page