Add your description here
Project description
Batch Router
A Python package designed to facilitate batch LLM requests efficiently across multiple providers with a unified interface.
Overview
Batch Router provides a standardized way to send batch requests to different LLM providers (OpenAI, Anthropic, Google, and vLLM), abstracting away provider-specific formats and APIs. This allows you to:
- Write requests once in a unified format
- Switch between providers seamlessly
- Process large volumes of LLM requests cost-effectively
- Track batch status and retrieve results consistently
- Maintain full transparency with JSONL file logging
Features
- Unified Request Format: Single format for all providers
- Multi-Provider Support: OpenAI, Anthropic (Claude), Google (Gemini), and vLLM (local)
- Cost Reduction: Leverage batch APIs for up to 50% cost savings (OpenAI)
- Async Support: Fully asynchronous operations
- Type Safe: Comprehensive type hints throughout
- Transparent: All requests and responses saved as JSONL files
- Flexible: Support for text, images, and multimodal content
- Local Processing: vLLM provider for local batch processing
Installation
# Basic installation
pip install batch-router
# With specific provider dependencies
pip install batch-router[anthropic]
pip install batch-router[openai]
pip install batch-router[google]
# For local processing with vLLM
pip install vllm
Quick Start
import asyncio
from batch_router import (
UnifiedRequest,
UnifiedBatchMetadata,
UnifiedMessage,
TextContent,
OpenAIProvider,
)
async def main():
# Create provider
provider = OpenAIProvider(api_key="your-api-key")
# Create requests
requests = [
UnifiedRequest(
custom_id="request-1",
model="gpt-4o-mini",
messages=[
UnifiedMessage(
role="user",
content=[TextContent(text="What is the capital of France?")]
)
]
),
UnifiedRequest(
custom_id="request-2",
model="gpt-4o-mini",
messages=[
UnifiedMessage(
role="user",
content=[TextContent(text="What is 2+2?")]
)
],
system_prompt="You are a helpful math tutor."
)
]
# Create batch
batch = UnifiedBatchMetadata(
provider="openai",
requests=requests
)
# Send batch
batch_id = await provider.send_batch(batch)
print(f"Batch submitted: {batch_id}")
# Check status
status = await provider.get_status(batch_id)
print(f"Status: {status.status.value}")
print(f"Progress: {status.request_counts.succeeded}/{status.request_counts.total}")
# Get results (when complete)
if status.is_complete():
async for result in provider.get_results(batch_id):
print(f"{result.custom_id}: {result.status.value}")
if result.response:
print(f"Response: {result.response}")
asyncio.run(main())
Supported Providers
OpenAI
- API: Batch API for Chat Completions
- Cost: 50% reduction compared to sync API
- Completion: 24-hour window
- Models: All chat completion models (gpt-4o, gpt-4o-mini, etc.)
from batch_router import OpenAIProvider
provider = OpenAIProvider(api_key="sk-...")
Anthropic (Claude)
- API: Message Batches API
- Cost: 50% reduction compared to sync API
- Completion: 24-hour window
- Models: Claude models (claude-sonnet-4-5, claude-3-5-sonnet-20241022, etc.)
from batch_router import AnthropicProvider
provider = AnthropicProvider(api_key="sk-ant-...")
Google (Gemini)
- API: Batch Prediction API
- Cost: Varies by model
- Completion: Varies
- Models: Gemini models (gemini-2.0-flash-exp, etc.)
from batch_router import GoogleProvider
provider = GoogleProvider(api_key="...")
vLLM (Local)
- API: Local batch processing via vLLM CLI
- Cost: Free (runs locally)
- Completion: Immediate (based on hardware)
- Models: Any model compatible with vLLM
from batch_router import VLLMProvider
# Requires vLLM installed: pip install vllm
provider = VLLMProvider(
vllm_command="vllm",
additional_args=["--tensor-parallel-size", "2"]
)
Core Concepts
Unified Request Format
All providers use the same request structure:
UnifiedRequest(
custom_id="unique-identifier", # Your unique ID
model="model-name", # Provider-specific model
messages=[...], # Conversation messages
system_prompt="...", # Optional system prompt
generation_config=GenerationConfig(...), # Optional params
provider_kwargs={...} # Provider-specific options
)
System Prompts
System prompts are handled differently by each provider:
- OpenAI: Converted to a message with
role="system" - Anthropic: Uses the
systemparameter - Google: Uses
systemInstructionin config - vLLM: Converted to a message with
role="system"(OpenAI-compatible)
Batch Router abstracts this at the request level with system_prompt.
Messages
Messages use a unified content format:
from batch_router import UnifiedMessage, TextContent, ImageContent
# Text message
message = UnifiedMessage(
role="user",
content=[TextContent(text="Hello!")]
)
# Multimodal message
message = UnifiedMessage(
role="user",
content=[
TextContent(text="What's in this image?"),
ImageContent(
source_type="url",
media_type="image/jpeg",
data="https://example.com/image.jpg"
)
]
)
# Base64 image
message = UnifiedMessage(
role="user",
content=[
ImageContent(
source_type="base64",
media_type="image/png",
data="iVBORw0KGgoAAAANS..."
)
]
)
Generation Configuration
Control generation parameters uniformly:
from batch_router import GenerationConfig
config = GenerationConfig(
max_tokens=1024,
temperature=0.7,
top_p=0.9,
top_k=40,
stop_sequences=["END"],
presence_penalty=0.1,
frequency_penalty=0.1
)
request = UnifiedRequest(
custom_id="req-1",
model="gpt-4o",
messages=[...],
generation_config=config
)
Batch Operations
Sending a Batch
batch_id = await provider.send_batch(batch)
This will:
- Convert requests to provider format
- Save unified format JSONL to
.batch_router/generated/<provider>/ - Save provider-specific format JSONL
- Upload/submit to provider API
- Return a batch ID for tracking
Checking Status
status = await provider.get_status(batch_id)
print(f"Status: {status.status.value}")
print(f"Total: {status.request_counts.total}")
print(f"Succeeded: {status.request_counts.succeeded}")
print(f"Failed: {status.request_counts.errored}")
print(f"Processing: {status.request_counts.processing}")
print(f"Created: {status.created_at}")
print(f"Completed: {status.completed_at}")
Possible statuses:
VALIDATING: Initial validation (OpenAI only)IN_PROGRESS: Processing requestsCOMPLETED: All requests processedFAILED: Batch failedCANCELLED: Batch was cancelledEXPIRED: Batch expired before completion
Retrieving Results
async for result in provider.get_results(batch_id):
print(f"Request ID: {result.custom_id}")
print(f"Status: {result.status.value}")
if result.status == ResultStatus.SUCCEEDED:
# Access response based on provider format
response = result.response
print(f"Response: {response}")
elif result.status == ResultStatus.ERRORED:
print(f"Error: {result.error}")
Cancelling a Batch
cancelled = await provider.cancel_batch(batch_id)
if cancelled:
print("Batch cancelled successfully")
else:
print("Batch already complete")
Listing Batches
batches = await provider.list_batches(limit=10)
for batch_status in batches:
print(f"{batch_status.batch_id}: {batch_status.status.value}")
File Management
Batch Router maintains transparency by saving all data as JSONL files:
.batch_router/
└── generated/
├── openai/
│ ├── batch_<batch_id>_unified.jsonl # Unified format (reference)
│ ├── batch_<batch_id>_provider.jsonl # Provider format (sent)
│ ├── batch_<batch_id>_output.jsonl # Raw provider output
│ └── batch_<batch_id>_results.jsonl # Unified results
├── anthropic/
├── google/
└── vllm/
File Types
- unified.jsonl: Your original requests in unified format
- provider.jsonl: Converted to provider-specific format (what gets sent)
- output.jsonl: Raw results from the provider
- results.jsonl: Results converted back to unified format
Advanced Usage
Provider-Specific Options
Use provider_kwargs to pass provider-specific parameters:
# Anthropic: Enable thinking/reasoning
request = UnifiedRequest(
custom_id="reasoning-task",
model="claude-sonnet-4-5",
messages=[...],
provider_kwargs={
"thinking": {
"type": "enabled",
"budget_tokens": 2000
}
}
)
# OpenAI: Enable structured outputs
request = UnifiedRequest(
custom_id="structured-task",
model="gpt-4o",
messages=[...],
provider_kwargs={
"response_format": {
"type": "json_schema",
"json_schema": {...}
}
}
)
Switching Providers
The unified format makes it easy to switch providers:
# Same requests, different providers
requests = [...] # List of UnifiedRequest objects
# Try with OpenAI
openai_batch = UnifiedBatchMetadata(provider="openai", requests=requests)
openai_provider = OpenAIProvider()
batch_id = await openai_provider.send_batch(openai_batch)
# Or try with Anthropic
anthropic_batch = UnifiedBatchMetadata(provider="anthropic", requests=requests)
anthropic_provider = AnthropicProvider()
batch_id = await anthropic_provider.send_batch(anthropic_batch)
Custom Base URLs
For custom endpoints or proxies:
# OpenAI with custom base URL
provider = OpenAIProvider(
api_key="...",
base_url="https://custom-proxy.example.com/v1"
)
# Anthropic with custom base URL
provider = AnthropicProvider(
api_key="...",
base_url="https://custom-endpoint.example.com"
)
vLLM with Custom Arguments
provider = VLLMProvider(
vllm_command="vllm",
additional_args=[
"--tensor-parallel-size", "4",
"--gpu-memory-utilization", "0.9",
"--max-model-len", "8192"
]
)
API Reference
Core Classes
UnifiedRequest
custom_id: str - Unique identifier for the requestmodel: str - Model identifiermessages: list[UnifiedMessage] - Conversation messagessystem_prompt: Optional[str | list[str]] - System promptgeneration_config: Optional[GenerationConfig] - Generation parametersprovider_kwargs: dict[str, Any] - Provider-specific options
UnifiedMessage
role: str - Message role ("user" or "assistant")content: list[TextContent | ImageContent | DocumentContent] - Message content
GenerationConfig
max_tokens: Optional[int]temperature: Optional[float]top_p: Optional[float]top_k: Optional[int]stop_sequences: Optional[list[str]]presence_penalty: Optional[float]frequency_penalty: Optional[float]
UnifiedBatchMetadata
provider: str - Provider name ("openai", "anthropic", "google", "vllm")requests: list[UnifiedRequest] - List of requestsmetadata: dict[str, Any] - Optional metadata
Provider Methods
All providers implement:
async send_batch(batch: UnifiedBatchMetadata) -> str
Submit a batch for processing. Returns batch ID.
async get_status(batch_id: str) -> BatchStatusResponse
Get current status of a batch.
async get_results(batch_id: str) -> AsyncIterator[UnifiedResult]
Stream results from a completed batch.
async cancel_batch(batch_id: str) -> bool
Cancel a running batch. Returns True if cancelled, False if already complete.
async list_batches(limit: int = 20) -> list[BatchStatusResponse]
List recent batches (not supported by all providers).
Environment Variables
Set API keys via environment variables:
export OPENAI_API_KEY="sk-..."
export ANTHROPIC_API_KEY="sk-ant-..."
export GOOGLE_API_KEY="..."
Then initialize providers without explicit keys:
provider = OpenAIProvider() # Uses OPENAI_API_KEY
provider = AnthropicProvider() # Uses ANTHROPIC_API_KEY
provider = GoogleProvider() # Uses GOOGLE_API_KEY
Error Handling
from batch_router.exceptions import (
ProviderError,
BatchNotFoundError,
BatchNotCompleteError,
)
try:
batch_id = await provider.send_batch(batch)
except ProviderError as e:
print(f"Provider error: {e}")
except ValueError as e:
print(f"Invalid request: {e}")
try:
status = await provider.get_status(batch_id)
except BatchNotFoundError:
print("Batch not found")
try:
async for result in provider.get_results(batch_id):
process_result(result)
except BatchNotCompleteError:
print("Batch is still processing")
Best Practices
- Use Unique Custom IDs: Always provide unique
custom_idvalues to track individual requests - Monitor Status: Poll
get_status()before retrieving results - Handle Partial Failures: Some requests may succeed while others fail - check individual result statuses
- Leverage Local Files: Use saved JSONL files for debugging and audit trails
- Start Small: Test with a small batch before scaling up
- Set Appropriate Timeouts: Cloud providers typically complete within 24 hours
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Links
Support
For issues, questions, or contributions, please visit the GitHub repository.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file batch_router-0.1.0.tar.gz.
File metadata
- Download URL: batch_router-0.1.0.tar.gz
- Upload date:
- Size: 26.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ae45a0bda31965eb5c873f26102fb02eff8548c6967ae70663b7b8c390dbe14d
|
|
| MD5 |
8eb1548982cae348168833360fc49291
|
|
| BLAKE2b-256 |
87114c0a92cd05e1d12d624e943eb332b466a636b6daf6aa2ad1643b4e5942db
|
File details
Details for the file batch_router-0.1.0-py3-none-any.whl.
File metadata
- Download URL: batch_router-0.1.0-py3-none-any.whl
- Upload date:
- Size: 39.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
568104049b2cb6c58d22e4cf3ee063d9260c23aae428e84e0176c5486f735f74
|
|
| MD5 |
e4900df1943b75b535dd4530c768f645
|
|
| BLAKE2b-256 |
683b8f93d5ce2ca280381bc945dd71174c88088f41e1267bce1c7c321be07216
|