Skip to main content

Batch processing for Anthropic's Claude API with structured output

Project description

Batchata

Tests PyPI

Python SDK for AI batch processing with structured output and citation mapping.

  • 50% cost savings via Anthropic's batch API pricing (OpenAI coming soon)
  • Automatic cost tracking with token usage and pricing
  • Structured output with Pydantic models
  • Field-level citations map results to source documents
  • Type safety with full validation

Core Functions

  • batch() - Process message conversations or PDF files
  • BatchManager - Manage large-scale AI batch processing with parallel execution
  • BatchJob - Job object returned by both functions above

Quick Start

from batchata import batch
from pydantic import BaseModel

class Invoice(BaseModel):
    company_name: str
    total_amount: str
    date: str

# Process PDFs with structured output + citations
job = batch(
    files=["invoice1.pdf", "invoice2.pdf", "invoice3.pdf"],
    prompt="Extract the company name, total amount, and date.",
    model="claude-3-5-sonnet-20241022",
    response_model=Invoice,
    enable_citations=True
)

# Wait for completion
while not job.is_complete():
    time.sleep(30)
    
results = job.results()
# Results now contain both data and citations together:
# [{"result": Invoice(...), "citations": {"company_name": [Citation(...)], ...}}, ...]

Installation

pip install batchata

or:

uv add batchata

Setup

Create a .env file in your project root:

ANTHROPIC_API_KEY=your-api-key

Supported Models

Model File Support Notes
claude-opus-4-20250514 Latest Claude 4 Opus
claude-sonnet-4-20250514 Latest Claude 4 Sonnet
claude-3-7-sonnet-20250219 Claude 3.7 Sonnet
claude-3-7-sonnet-latest Claude 3.7 Sonnet (latest)
claude-3-5-sonnet-20241022 Recommended for most tasks
claude-3-5-sonnet-latest Latest Claude 3.5 Sonnet
claude-3-5-sonnet-20240620 Legacy Claude 3.5 Sonnet
claude-3-5-haiku-20241022 Fast, cost-effective
claude-3-5-haiku-latest Latest Claude 3.5 Haiku
claude-3-haiku-20240307 Messages only, no files
claude-3-opus-20240229 Legacy (deprecated)
claude-3-sonnet-20240229 Legacy (deprecated)
claude-3-5-haiku-20240307 Legacy (deprecated)

API Reference

batch()

Process multiple message conversations with optional structured output.

from batchata import batch
from pydantic import BaseModel

class SpamResult(BaseModel):
    is_spam: bool
    confidence: float
    reason: str

# Process messages
job = batch(
    messages=[
        [{"role": "user", "content": "Is this spam? You've won $1000!"}],
        [{"role": "user", "content": "Meeting at 3pm tomorrow"}],
        [{"role": "user", "content": "URGENT: Click here now!"}]
    ],
    model="claude-3-haiku-20240307",
    response_model=SpamResult
)

# Wait for completion, then get results
while not job.is_complete():
    time.sleep(30)  # Check every 30 seconds
    
results = job.results()
# Results format: [{"result": SpamResult(...), "citations": None}, ...]

Response:

[
    SpamResult(is_spam=True, confidence=0.95, reason="Contains monetary prize claim"),
    SpamResult(is_spam=False, confidence=0.98, reason="Normal meeting reminder"),
    SpamResult(is_spam=True, confidence=0.92, reason="Urgent call-to-action pattern")
]

batch() with files

Process PDF files with optional structured output and citations.

from batchata import batch
from pydantic import BaseModel

class Invoice(BaseModel):
    company_name: str
    total_amount: str
    date: str

# Process PDFs with citations
job = batch(
    files=["invoice1.pdf", "invoice2.pdf"],
    prompt="Extract the company name, total amount, and date.",
    model="claude-3-5-sonnet-20241022",
    response_model=Invoice,
    enable_citations=True
)

results = job.results()
# Results now contain both data and citations together

Result Format:

# All results use this unified format
[
    {
        "result": Invoice(company_name="TechCorp", total_amount=12500.00),
        "citations": {
            "company_name": [Citation(...)],
            "total_amount": [Citation(...)]
        }
    }
]

BatchManager

Manage large-scale batch processing with automatic job splitting, parallel execution, state persistence, and cost management.

from batchata import BatchManager
from pydantic import BaseModel

class Invoice(BaseModel):
    company_name: str
    total_amount: float
    invoice_number: str

# Initialize BatchManager for large-scale processing
manager = BatchManager(
    files=["invoice1.pdf", "invoice2.pdf", ...],  # 100+ files
    prompt="Extract invoice data",
    model="claude-3-5-sonnet-20241022",
    response_model=Invoice,
    enable_citations=True,
    items_per_job=10,      # Process 10 files per job
    max_parallel_jobs=5,   # 5 jobs in parallel
    max_cost=50.0,         # Stop if cost exceeds $50
    state_path="batch_state.json",  # Auto-resume capability
    results_dir="results/"          # Save results (processed + raw)
)

# Run processing (auto-resumes if interrupted)
summary = manager.run(print_progress=True)

# Retry failed items
if summary['failed_items'] > 0:
    retry_summary = manager.retry_failed()

# Get statistics
stats = manager.stats
print(f"Completed: {stats['completed_items']}/{stats['total_items']}")
print(f"Total cost: ${stats['total_cost']:.2f}")
print(f"Results saved to: {stats['results_dir']}")

# Get results directly from BatchManager (returns unified format)
results = manager.results()  # List[{"result": Invoice(...), "citations": {...}}]
for entry in results:
    invoice = entry["result"]  # This is an Invoice instance  
    citations = entry["citations"]  # Citation objects
    print(f"Company: {invoice.company_name}")

# Or later: Load results from disk if program exited
from batchata import load_results_from_disk
results = load_results_from_disk("results", Invoice)
Response Format

The manager.run() method returns a processing summary dictionary:

{
    "total_items": 100,
    "completed_items": 95,
    "failed_items": 5,
    "total_cost": 12.34,
    "jobs_completed": 10,
    "cost_limit_reached": False
}

The manager.retry_failed() method returns the same format with an additional field:

{
    "total_items": 100,
    "completed_items": 98,
    "failed_items": 2,
    "total_cost": 13.45,
    "jobs_completed": 11,
    "cost_limit_reached": False,
    "retry_count": 5  # Number of items that were retried
}

Result Storage:

  • Results saved to {{results_dir}}/processed/ as JSON files
  • Raw API responses saved to {{results_dir}}/raw/ for debugging
  • Use load_results_from_disk() to reload results with full Pydantic model reconstruction

Key Features:

  • Automatic job splitting: Breaks large batches into smaller chunks
  • Parallel processing: Multiple jobs run concurrently with ThreadPoolExecutor
  • State persistence: Resume from interruptions with JSON state files
  • Cost management: Stop processing when budget limit is reached
  • Progress monitoring: Real-time progress updates with statistics
  • Retry mechanism: Easily retry failed items
  • Result saving: Organized directory structure for results

BatchJob

The job object returned by both batch() and used internally by BatchManager.

# Check completion status
if job.is_complete():
    results = job.results()

# Get processing statistics with cost tracking
stats = job.stats(print_stats=True)
# Output:
# 📊 Batch Statistics
#    ID: msgbatch_01BPtdnmEwxtaDcdJ2eUsq4T
#    Status: ended
#    Complete: ✅
#    Elapsed: 41.8s
#    Mode: Text + Citations
#    Results: 2
#    Citations: 6
#    Input tokens: 2,117
#    Output tokens: 81
#    Total cost: $0.0038
#    (50% batch discount applied)
#    Raw results: ./raw_responses

# BatchJob.results() returns unified format: List[{"result": ..., "citations": ...}]
for entry in results:
    result = entry["result"]  # Pydantic model instance, dict, or string
    citations = entry["citations"]  # Dict, list, or None
    print(f"Result: {result}")
    if citations:
        print(f"Citations: {len(citations) if isinstance(citations, (dict, list)) else 'Available'}")

# Save raw API responses (optional)
job = batch(..., raw_results_dir="./raw_responses")

Citations

Citations work in two modes depending on whether you use structured output:

1. Text + Citations (Flat List)

When enable_citations=True without a response model, citations are returned as a flat list:

job = batch(
    files=["document.pdf"],
    prompt="Summarize the key findings",
    enable_citations=True
)

results = job.results()   # List of {"result": str, "citations": List[Citation]}

# Example result structure:
[
    {
        "result": "Summary text...",
        "citations": [
            Citation(cited_text="AI reduces errors by 30%", start_page=2),
            Citation(cited_text="Implementation cost: $50,000", start_page=5)
        ]
    }
]

2. Structured + Field Citations (Mapping)

When using both response_model and enable_citations=True, citations are mapped to specific fields:

job = batch(
    files=["document.pdf"],
    prompt="Extract the data",
    response_model=MyModel,
    enable_citations=True
)

results = job.results()   # List of {"result": Model, "citations": Dict[str, List[Citation]]}

# Example result structure:
[
    {
        "result": MyModel(title="Annual Report 2024", revenue="$1.2M"),
        "citations": {
            "title": [Citation(cited_text="Annual Report 2024", start_page=1)],
            "revenue": [Citation(cited_text="Revenue: $1.2M", start_page=3)],
            "growth": [Citation(cited_text="YoY Growth: 25%", start_page=3)]
        }
    }
]

The field mapping allows you to trace exactly which part of the source document was used to populate each field in your structured output.

Robust Citation Parsing

Batchata uses proper JSON parsing for citation field mapping, ensuring reliability with complex JSON structures:

Handles Complex Scenarios:

  • ✅ Escaped quotes in JSON values: "name": "John \"The Great\" Doe"
  • ✅ URLs with colons: "website": "http://example.com:8080"
  • ✅ Nested objects and arrays: "metadata": {"nested": {"deep": "value"}}
  • ✅ Multi-line strings and special characters
  • ✅ Fields with numbers/underscores: user_name, age_2

Cost Tracking

Batchata automatically tracks token usage and costs for all batch operations:

from batchata import batch

job = batch(
    messages=[...],
    model="claude-3-5-sonnet-20241022"
)

# Get cost information
stats = job.stats()
print(f"Total cost: ${stats['total_cost']:.4f}")
print(f"Input tokens: {stats['total_input_tokens']:,}")
print(f"Output tokens: {stats['total_output_tokens']:,}")

# Or print formatted statistics
job.stats(print_stats=True)

Example Scripts

Run any example with uv run python -m examples.<script_name>:

# Email classification with structured output
uv run python -m examples.spam_detection

# PDF data extraction with citations  
uv run python -m examples.pdf_extraction

# Basic citation usage with text documents
uv run python -m examples.citation_example

# Structured output with field-level citations
uv run python -m examples.citation_with_pydantic

# Large-scale batch processing with BatchManager
uv run python -m examples.batch_manager_example

# Raw text responses without structured output
uv run python -m examples.raw_text_example

Example Files:

Error Handling

Batchata provides comprehensive error handling with specific exceptions and early validation:

File Validation

  • File Size Limits: Provider-specific limits (32MB for Anthropic) with early validation
  • Empty Files: Clear ValueError messages for empty files or bytes content
  • File Type Detection: Automatic detection of PDF, PNG, JPEG, GIF, WebP files
  • Missing Files: FileNotFoundError for non-existent file paths

Content-Specific Errors

  • Image Citations: UnsupportedContentError when requesting citations on images
  • Invalid Formats: UnsupportedFileFormatError for unsupported file types
  • Large Files: FileTooLargeError when files exceed provider limits

All validation happens early to save time and costs before expensive API operations.

Limitations

  • Citation mapping only works with flat Pydantic models (no nested models)
  • OpenAI support coming soon
  • PDFs require Opus/Sonnet models for best results
  • Batch jobs can take up to 24 hours to process
  • Use job.is_complete() to check status before getting results
  • Citations may not be available in all batch API responses
  • Cost limits: Best effort enforcement - costs are only known after job completion, so final costs may slightly exceed max_cost due to jobs already in progress

Comparison with Alternatives

Feature batchata LangChain Instructor PydanticAI
Batch Requests ✅ Native (50% cost savings) ❌ No native batch API ✅ Via OpenAI Batch API (#1092) ⚠️ Planned (#1771)
Structured Output ✅ Full support ✅ Via parsers ✅ Core feature ✅ Native
PDF File Input ✅ Native support ✅ Via document loaders ✅ Via multimodal models ✅ Via file handling
Citation Mapping ✅ Field-level citations ❌ Manual implementation ❌ Manual implementation ❌ Manual implementation
Cost Tracking ✅ Automatic with tokencost ❌ Manual implementation ❌ Manual implementation ❌ Manual implementation
Cost Limits ✅ max_cost parameter ❌ Manual implementation ❌ Manual implementation ❌ Manual implementation
Batch Providers 2/2 (Anthropic, OpenAI planned) 0/2 1/2 (OpenAI only) 0/2
Focus Streamlined batch requests General LLM orchestration Structured outputs CLI Agent framework

License

MIT

AI Documentation

📋 For AI systems: See llms.txt for comprehensive documentation optimized for AI consumption.

Todos

  • Add pricing metadata and max_spend controls (Cost tracking implemented)
  • Auto batch manager (parallel batches, retry, spend control) (BatchManager implemented)
  • Test mode to run on 1% sample before full batch
  • Quick batch - split into smaller chunks for faster results
  • Support text/other file types (not just PDFs)
  • Support for OpenAI

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchata-0.2.7.tar.gz (136.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

batchata-0.2.7-py3-none-any.whl (34.7 kB view details)

Uploaded Python 3

File details

Details for the file batchata-0.2.7.tar.gz.

File metadata

  • Download URL: batchata-0.2.7.tar.gz
  • Upload date:
  • Size: 136.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for batchata-0.2.7.tar.gz
Algorithm Hash digest
SHA256 41ce7c667e7991c486d918cf91c3d2662b3261624f8934f45b8806e0576940b2
MD5 fb168e26b25bf3e8a96da829dea9081b
BLAKE2b-256 7e7aaccfbf3d9db9a395cb8f3f647d4e4f2f61edb80bdd2ae56a891059412b2b

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchata-0.2.7.tar.gz:

Publisher: publish.yml on agamm/batchata

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file batchata-0.2.7-py3-none-any.whl.

File metadata

  • Download URL: batchata-0.2.7-py3-none-any.whl
  • Upload date:
  • Size: 34.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for batchata-0.2.7-py3-none-any.whl
Algorithm Hash digest
SHA256 9674dc58efb818256ef5be0bb468df9bc17074ad694b7e1df0566a27f03c241c
MD5 53ebeaf0bf38a4459c4d3cecb44ad129
BLAKE2b-256 e5877a3027f1618fd567b6560d9a0de9f1d37771695e48aa8d03372a878258c0

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchata-0.2.7-py3-none-any.whl:

Publisher: publish.yml on agamm/batchata

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page