Skip to main content

Batch processing for Anthropic's Claude API with structured output

Project description

Batchata

Tests PyPI

Python SDK for AI batch processing with structured output and citation mapping.

  • 50% cost savings via Anthropic's batch API pricing (OpenAI coming soon)
  • Automatic cost tracking with token usage and pricing
  • Structured output with Pydantic models
  • Field-level citations map results to source documents
  • Type safety with full validation

Core Functions

  • batch() - Process message conversations or PDF files
  • BatchManager - Manage large-scale AI batch processing with parallel execution
  • BatchJob - Job object returned by both functions above

Quick Start

from batchata import batch
from pydantic import BaseModel

class Invoice(BaseModel):
    company_name: str
    total_amount: str
    date: str

# Process PDFs with structured output + citations
job = batch(
    files=["invoice1.pdf", "invoice2.pdf", "invoice3.pdf"],
    prompt="Extract the company name, total amount, and date.",
    model="claude-3-5-sonnet-20241022",
    response_model=Invoice,
    enable_citations=True
)

# Wait for completion
while not job.is_complete():
    time.sleep(30)
    
results = job.results()
# Results now contain both data and citations together:
# [{"result": Invoice(...), "citations": {"company_name": [Citation(...)], ...}}, ...]

Installation

pip install batchata

or:

uv add batchata

Setup

Create a .env file in your project root:

ANTHROPIC_API_KEY=your-api-key

API Reference

batch()

Process multiple message conversations with optional structured output.

from batchata import batch
from pydantic import BaseModel

class SpamResult(BaseModel):
    is_spam: bool
    confidence: float
    reason: str

# Process messages
job = batch(
    messages=[
        [{"role": "user", "content": "Is this spam? You've won $1000!"}],
        [{"role": "user", "content": "Meeting at 3pm tomorrow"}],
        [{"role": "user", "content": "URGENT: Click here now!"}]
    ],
    model="claude-3-haiku-20240307",
    response_model=SpamResult
)

# Wait for completion, then get results
while not job.is_complete():
    time.sleep(30)  # Check every 30 seconds
    
results = job.results()
# Results format: [{"result": SpamResult(...), "citations": None}, ...]

Response:

[
    SpamResult(is_spam=True, confidence=0.95, reason="Contains monetary prize claim"),
    SpamResult(is_spam=False, confidence=0.98, reason="Normal meeting reminder"),
    SpamResult(is_spam=True, confidence=0.92, reason="Urgent call-to-action pattern")
]

batch() with files

Process PDF files with optional structured output and citations.

from batchata import batch
from pydantic import BaseModel

class Invoice(BaseModel):
    company_name: str
    total_amount: str
    date: str

# Process PDFs with citations
job = batch(
    files=["invoice1.pdf", "invoice2.pdf"],
    prompt="Extract the company name, total amount, and date.",
    model="claude-3-5-sonnet-20241022",
    response_model=Invoice,
    enable_citations=True
)

results = job.results()
# Results now contain both data and citations together

Result Format:

# All results use this unified format
[
    {
        "result": Invoice(company_name="TechCorp", total_amount=12500.00),
        "citations": {
            "company_name": [Citation(...)],
            "total_amount": [Citation(...)]
        }
    }
]

BatchManager

Manage large-scale batch processing with automatic job splitting, parallel execution, state persistence, and cost management.

from batchata import BatchManager
from pydantic import BaseModel

class Invoice(BaseModel):
    company_name: str
    total_amount: float
    invoice_number: str

# Initialize BatchManager for large-scale processing
manager = BatchManager(
    files=["invoice1.pdf", "invoice2.pdf", ...],  # 100+ files
    prompt="Extract invoice data",
    model="claude-3-5-sonnet-20241022",
    response_model=Invoice,
    enable_citations=True,
    items_per_job=10,      # Process 10 files per job
    max_parallel_jobs=5,   # 5 jobs in parallel
    max_cost=50.0,         # Stop if cost exceeds $50
    state_path="batch_state.json",  # Auto-resume capability
    results_dir="results/"          # Save results (processed + raw)
)

# Run processing (auto-resumes if interrupted)
summary = manager.run(print_progress=True)

# Retry failed items
if summary['failed_items'] > 0:
    retry_summary = manager.retry_failed()

# Get statistics
stats = manager.stats
print(f"Completed: {stats['completed_items']}/{stats['total_items']}")
print(f"Total cost: ${stats['total_cost']:.2f}")
print(f"Results saved to: {stats['results_dir']}")

# Get results directly from BatchManager (returns unified format)
results = manager.results()  # List[{"result": Invoice(...), "citations": {...}}]
for entry in results:
    invoice = entry["result"]  # This is an Invoice instance  
    citations = entry["citations"]  # Citation objects
    print(f"Company: {invoice.company_name}")

# Or later: Load results from disk if program exited
from batchata import load_results_from_disk
results = load_results_from_disk("results", Invoice)
Response Format

The manager.run() method returns a processing summary dictionary:

{
    "total_items": 100,
    "completed_items": 95,
    "failed_items": 5,
    "total_cost": 12.34,
    "jobs_completed": 10,
    "cost_limit_reached": False
}

The manager.retry_failed() method returns the same format with an additional field:

{
    "total_items": 100,
    "completed_items": 98,
    "failed_items": 2,
    "total_cost": 13.45,
    "jobs_completed": 11,
    "cost_limit_reached": False,
    "retry_count": 5  # Number of items that were retried
}

Result Storage:

  • Results saved to {{results_dir}}/processed/ as JSON files
  • Raw API responses saved to {{results_dir}}/raw/ for debugging
  • Use load_results_from_disk() to reload results with full Pydantic model reconstruction

Key Features:

  • Automatic job splitting: Breaks large batches into smaller chunks
  • Parallel processing: Multiple jobs run concurrently with ThreadPoolExecutor
  • State persistence: Resume from interruptions with JSON state files
  • Cost management: Stop processing when budget limit is reached
  • Progress monitoring: Real-time progress updates with statistics
  • Retry mechanism: Easily retry failed items
  • Result saving: Organized directory structure for results

BatchJob

The job object returned by both batch() and used internally by BatchManager.

# Check completion status
if job.is_complete():
    results = job.results()

# Get processing statistics with cost tracking
stats = job.stats(print_stats=True)
# Output:
# 📊 Batch Statistics
#    ID: msgbatch_01BPtdnmEwxtaDcdJ2eUsq4T
#    Status: ended
#    Complete: ✅
#    Elapsed: 41.8s
#    Mode: Text + Citations
#    Results: 2
#    Citations: 6
#    Input tokens: 2,117
#    Output tokens: 81
#    Total cost: $0.0038
#    (50% batch discount applied)
#    Raw results: ./raw_responses

# BatchJob.results() returns unified format: List[{"result": ..., "citations": ...}]
for entry in results:
    result = entry["result"]  # Pydantic model instance, dict, or string
    citations = entry["citations"]  # Dict, list, or None
    print(f"Result: {result}")
    if citations:
        print(f"Citations: {len(citations) if isinstance(citations, (dict, list)) else 'Available'}")

# Save raw API responses (optional)
job = batch(..., raw_results_dir="./raw_responses")

Citations

Citations work in two modes depending on whether you use structured output:

1. Text + Citations (Flat List)

When enable_citations=True without a response model, citations are returned as a flat list:

job = batch(
    files=["document.pdf"],
    prompt="Summarize the key findings",
    enable_citations=True
)

results = job.results()   # List of {"result": str, "citations": List[Citation]}

# Example result structure:
[
    {
        "result": "Summary text...",
        "citations": [
            Citation(cited_text="AI reduces errors by 30%", start_page=2),
            Citation(cited_text="Implementation cost: $50,000", start_page=5)
        ]
    }
]

2. Structured + Field Citations (Mapping)

When using both response_model and enable_citations=True, citations are mapped to specific fields:

job = batch(
    files=["document.pdf"],
    prompt="Extract the data",
    response_model=MyModel,
    enable_citations=True
)

results = job.results()   # List of {"result": Model, "citations": Dict[str, List[Citation]]}

# Example result structure:
[
    {
        "result": MyModel(title="Annual Report 2024", revenue="$1.2M"),
        "citations": {
            "title": [Citation(cited_text="Annual Report 2024", start_page=1)],
            "revenue": [Citation(cited_text="Revenue: $1.2M", start_page=3)],
            "growth": [Citation(cited_text="YoY Growth: 25%", start_page=3)]
        }
    }
]

The field mapping allows you to trace exactly which part of the source document was used to populate each field in your structured output.

Robust Citation Parsing

Batchata uses proper JSON parsing for citation field mapping, ensuring reliability with complex JSON structures:

Handles Complex Scenarios:

  • ✅ Escaped quotes in JSON values: "name": "John \"The Great\" Doe"
  • ✅ URLs with colons: "website": "http://example.com:8080"
  • ✅ Nested objects and arrays: "metadata": {"nested": {"deep": "value"}}
  • ✅ Multi-line strings and special characters
  • ✅ Fields with numbers/underscores: user_name, age_2

Cost Tracking

Batchata automatically tracks token usage and costs for all batch operations:

from batchata import batch

job = batch(
    messages=[...],
    model="claude-3-5-sonnet-20241022"
)

# Get cost information
stats = job.stats()
print(f"Total cost: ${stats['total_cost']:.4f}")
print(f"Input tokens: {stats['total_input_tokens']:,}")
print(f"Output tokens: {stats['total_output_tokens']:,}")

# Or print formatted statistics
job.stats(print_stats=True)

Example Scripts

Run any example with uv run python -m examples.<script_name>:

# Email classification with structured output
uv run python -m examples.spam_detection

# PDF data extraction with citations  
uv run python -m examples.pdf_extraction

# Basic citation usage with text documents
uv run python -m examples.citation_example

# Structured output with field-level citations
uv run python -m examples.citation_with_pydantic

# Large-scale batch processing with BatchManager
uv run python -m examples.batch_manager_example

# Raw text responses without structured output
uv run python -m examples.raw_text_example

Example Files:

Limitations

  • Citation mapping only works with flat Pydantic models (no nested models)
  • OpenAI support coming soon
  • PDFs require Opus/Sonnet models for best results
  • Batch jobs can take up to 24 hours to process
  • Use job.is_complete() to check status before getting results
  • Citations may not be available in all batch API responses
  • Cost limits: Best effort enforcement - costs are only known after job completion, so final costs may slightly exceed max_cost due to jobs already in progress

Comparison with Alternatives

Feature batchata LangChain Instructor PydanticAI
Batch Requests ✅ Native (50% cost savings) ❌ No native batch API ✅ Via OpenAI Batch API (#1092) ⚠️ Planned (#1771)
Structured Output ✅ Full support ✅ Via parsers ✅ Core feature ✅ Native
PDF File Input ✅ Native support ✅ Via document loaders ✅ Via multimodal models ✅ Via file handling
Citation Mapping ✅ Field-level citations ❌ Manual implementation ❌ Manual implementation ❌ Manual implementation
Cost Tracking ✅ Automatic with tokencost ❌ Manual implementation ❌ Manual implementation ❌ Manual implementation
Cost Limits ✅ max_cost parameter ❌ Manual implementation ❌ Manual implementation ❌ Manual implementation
Batch Providers 2/2 (Anthropic, OpenAI planned) 0/2 1/2 (OpenAI only) 0/2
Focus Streamlined batch requests General LLM orchestration Structured outputs CLI Agent framework

License

MIT

AI Documentation

📋 For AI systems: See llms.txt for comprehensive documentation optimized for AI consumption.

Todos

  • Add pricing metadata and max_spend controls (Cost tracking implemented)
  • Auto batch manager (parallel batches, retry, spend control) (BatchManager implemented)
  • Test mode to run on 1% sample before full batch
  • Quick batch - split into smaller chunks for faster results
  • Support text/other file types (not just PDFs)
  • Support for OpenAI

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchata-0.2.6.tar.gz (132.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

batchata-0.2.6-py3-none-any.whl (32.7 kB view details)

Uploaded Python 3

File details

Details for the file batchata-0.2.6.tar.gz.

File metadata

  • Download URL: batchata-0.2.6.tar.gz
  • Upload date:
  • Size: 132.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for batchata-0.2.6.tar.gz
Algorithm Hash digest
SHA256 23bd3f322879c5bec650fd1d576eaeb013b992dd9507c864367145adf7a2d469
MD5 212dacd68caf5f2fd7194122e071a5f6
BLAKE2b-256 85b6f170441ef89851ec8803ec1365e903a267574dde82b5e531978724c15c35

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchata-0.2.6.tar.gz:

Publisher: publish.yml on agamm/batchata

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file batchata-0.2.6-py3-none-any.whl.

File metadata

  • Download URL: batchata-0.2.6-py3-none-any.whl
  • Upload date:
  • Size: 32.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for batchata-0.2.6-py3-none-any.whl
Algorithm Hash digest
SHA256 040d6faafdc63dbca16082a1688f438b19e78dbbd97ca7902a898fd67ac70a9f
MD5 a42d16040e5671809099d841388838a6
BLAKE2b-256 3a443e4099436dbd9fb0ecf087f5e72c69c46498503b10612ac60bbffbb1e88b

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchata-0.2.6-py3-none-any.whl:

Publisher: publish.yml on agamm/batchata

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page