Skip to main content

Batched is a flexible and efficient batch processing library implemented in Python. It supports asynchronous batch processing with dynamic batching and prioritization.

Project description

Batched (Dynamic Batching)

The Batched API provides a flexible and efficient way to process multiple requests in a batch, with a primary focus on dynamic batching of inference workloads. It is designed to optimize throughput while maintaining a low-latency experience, especially useful in scenarios where you need to handle a high volume of requests simultaneously. It is designed for both async and sync execution.

Batch Performance

Table of Contents

Why Dynamic Batching?

Dynamic batching is a technique that automatically groups multiple incoming inference requests into a single batch for processing. This is particularly beneficial for inference workloads, where processing multiple inputs together can significantly improve throughput and efficiency.

In machine learning models, dynamic batching matters because it optimizes hardware utilization, especially for GPUs and specialized AI hardware designed for parallel processing. By batching requests, we can fully leverage this parallel processing, leading to higher throughput. It also reduces overhead by amortizing fixed costs (such as data transfer and model initialization) across multiple requests, improving overall efficiency. Furthermore, dynamic batching enhances real-time performance by adapting to varying request rates, maintaining low latency during quiet periods while maximizing throughput during busy times.

This makes dynamic batching a crucial technique for deploying ML models in production environments where request patterns can be unpredictable and resource optimization is key.

Installation

To install the Batched, you can use pip:

pip install batched

Usage

Basic Example

Below is a basic example of how to use the Batched API to process text data in batches.

   from sentence_transformers import SentenceTransformer
   import numpy as np
+  import batched

   class SentenceEmbedder:
      def __init__(self, model_name='mixedbread-ai/mxbai-embed-large-v1'):
         self.model = SentenceTransformer(model_name)

+     @batched.dynamically
      def embed_sentences(self, sentences: list[str]) -> list[np.ndarray]:
         # Convert sentences to embeddings
         return self.model.encode(sentences)

   # Create an instance of SentenceEmbedder
   embedder = SentenceEmbedder()

   # Embed single sentences
   single_sent = "This is a test sentence."
   embedding = embedder.embed_sentences(single_sent)
+  awaited_embedding = await embedder.embed_sentences.acall(single_sent)

   # Embed a batch of 1000 sentences
   batch_sentences = [f"This is test sentence number {i}." for i in range(1000)]
   batch_embeddings = embedder.embed_sentences(batch_sentences)
+  awaited_batch_embeddings = await embedder.embed_sentences.acall(batch_sentences)

   # Check the statistics
+  stats = embedder.embed_sentences.stats

Advanced Usage

For more advanced usage, such as customizing batch size and timeout dynamically, the Batched API provides decorators that allow fine-grained control over the batching process.

  • Batch Size: You can specify the max. number of requests to group together in a single batch.
  • Timeout: The maximum time to wait for more requests before processing the batch.
  • Small Batch Threshold: The threshold to give more priority to smaller batches.
  • Pad Token: The token to use for padding when batching tensors, only for @inference.dynamically and @aio.inference.dynamically.

For example:

@batched.dynamically(batch_size=64, timeout_ms=5.0, small_batch_threshold=2)
def custom_batch_function(data):
    # Custom processing logic here
    pass

API Reference

The API offers both thread and asyncio implementations for batching general tasks and inference tasks:

Thread Implementation

  • @batched.dynamically: Allows dynamic batching for general tasks (Both sync and async supported).
  • The decorated method should:
    • Take in a list of items (list[T])
    • Return a list of results (list[U]) of the same length.
import batched


@batched.dynamically(batch_size=64)
def my_function(items: list[int]) -> list[str]:
  # Custom processing logic here
  return [f"{item * 2}" for item in items]

# Sync call with single item
my_function(2)

# Sync call with a batch of items
my_function([2, 3, 4])

# Call with asyncio
await my_function.acall(2)
await my_function.acall([2, 3, 4])

# Support stat checking
print(my_function.stats)
  • @batched.inference.dynamically: Allows dynamic batching for inference tasks, handling numpy arrays and tensors with padding.
  • The decorated method should:
    • Take in a dictionary of tensors or numpy arrays (dict[str, Feature]). Feature is a batch of item values for a single feature, and the keys are the feature names.
    • Return a tensor or numpy array (Feature). Each row is a single inference result.
    • Feature can be any of the following types: np.ndarray, torch.Tensor, list[np.ndarray] and list[torch.Tensor].
    • features[feature_name].shape[0] == outputs.shape[0]
from batched import inference
import torch

@inference.dynamically(pad_token={"input_ids": 0})
def my_inference_function(features: dict[str, torch.Tensor]) -> torch.Tensor:
  # input_ids = features["input_ids"]
  # attention_mask = features["attention_mask"]
  # token_type_ids = features["token_type_ids"]

  logits = model(**features)
  return logits

# Sync call
my_inference_function(data)

# Call with asyncio
await my_inference_function.acall(data)

print(my_inference_function.stats)

Asyncio Implementation

  • @batched.aio.dynamically: Allows dynamic batching for general tasks using asyncio.
  • The decorated method should:
    • Take in a list of items (list[T])
    • Return a list of results (list[U]) of the same length.
from batched import aio

@aio.dynamically(batch_size=64, timeout_ms=20.0, small_batch_threshold=10)
def my_function(items: list[int]) -> list[int]:  # can also be an async function: async def ...
  # Custom processing logic here
  return [item * 2 for item in items]


# Allow single item
await my_function(2)

# Allow batch of items
await my_function([2, 3, 4])

# Support stat checking
print(my_function.stats)
  • @batched.aio.inference.dynamically: Allows dynamic batching for inference tasks, handling numpy arrays and tensors with padding, using asyncio.
  • The decorated method should:
    • Take in a dictionary of tensors or numpy arrays (dict[str, Feature]). Feature is a batch of item values for a single feature, and the keys are the feature names.
    • Return a tensor or numpy array (Feature). Each row is a single inference result.
    • Feature can be any of the following types: np.ndarray, torch.Tensor, list[np.ndarray] and list[torch.Tensor].
    • features[feature_name].shape[0] == outputs.shape[0]
from batched import aio
import torch

@aio.inference.dynamically(pad_token={"input_ids": 0})
async def my_inference_function(features: dict[str, torch.Tensor]) -> list[torch.Tensor]:
  # input_ids = features["input_ids"]
  # attention_mask = features["attention_mask"]
  # token_type_ids = features["token_type_ids"]

  logits1 = await model1(**features)
  logits2 = await model2(**features)
  return [logits1, logits2]


await my_inference_function(data)

print(my_inference_function.stats)

Contributing

Contributions are welcome! Please feel free to submit a pull request or report an issue on GitHub.

Attribution

This project was inspired by the following projects:

License

This project is licensed under the Apache License, Version 2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batched-0.1.5.tar.gz (23.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

batched-0.1.5-py3-none-any.whl (29.4 kB view details)

Uploaded Python 3

File details

Details for the file batched-0.1.5.tar.gz.

File metadata

  • Download URL: batched-0.1.5.tar.gz
  • Upload date:
  • Size: 23.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.18

File hashes

Hashes for batched-0.1.5.tar.gz
Algorithm Hash digest
SHA256 58b8a41d3f8d4d39a0edba79c6238ed204938cfc2c8908224919d70af07c610d
MD5 8cb399f7619220a38accba207ad68d66
BLAKE2b-256 c9408d9a8ed9b95cb95acf599698557b7074b462df652823a61e7e43899aa519

See more details on using hashes here.

File details

Details for the file batched-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: batched-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 29.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.18

File hashes

Hashes for batched-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 356dae99f15c906629992e4bd3481a857114790b5316268fa38fe8ad0d0b9480
MD5 eb0b751a9c675c60c1063ed50e2f6466
BLAKE2b-256 fbc816a977fd90cdc974ef7781e237b8a0e0008a6204768ededbef2b2ff1bb43

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page