Skip to main content

Batch processing for Anthropic's Claude API with structured output

Project description

Batchata

Python SDK for batch processing with structured output and citation mapping.

  • 50% cost savings via Anthropic's batch API pricing
  • Automatic cost tracking with token usage and pricing
  • Structured output with Pydantic models
  • Field-level citations map results to source documents
  • Type safety with full validation

Currently supports Anthropic Claude. OpenAI support coming soon.

API Reference

  • batch() - Process message conversations or PDF files
  • BatchJob - Job status and results
  • BatchManager - Manage large-scale batch processing with parallel execution

Quick Start

from batchata import batch
from pydantic import BaseModel

class Invoice(BaseModel):
    company_name: str
    total_amount: str
    date: str

# Process PDFs with structured output + citations
job = batch(
    files=["invoice1.pdf", "invoice2.pdf", "invoice3.pdf"],
    prompt="Extract the company name, total amount, and date.",
    model="claude-3-5-sonnet-20241022",
    response_model=Invoice,
    enable_citations=True
)

# Wait for completion
while not job.is_complete():
    time.sleep(30)
    
results = job.results()
# Results now contain both data and citations together:
# [{"result": Invoice(...), "citations": {"company_name": [Citation(...)], ...}}, ...]

Installation

pip install batchata

Usage

Create a .env file in your project root:

ANTHROPIC_API_KEY=your-api-key

API Functions

batch()

Process multiple message conversations with optional structured output.

from batchata import batch
from pydantic import BaseModel

class SpamResult(BaseModel):
    is_spam: bool
    confidence: float
    reason: str

# Process messages
job = batch(
    messages=[
        [{"role": "user", "content": "Is this spam? You've won $1000!"}],
        [{"role": "user", "content": "Meeting at 3pm tomorrow"}],
        [{"role": "user", "content": "URGENT: Click here now!"}]
    ],
    model="claude-3-haiku-20240307",
    response_model=SpamResult
)

# Get results
results = job.results()

Response:

[
    SpamResult(is_spam=True, confidence=0.95, reason="Contains monetary prize claim"),
    SpamResult(is_spam=False, confidence=0.98, reason="Normal meeting reminder"),
    SpamResult(is_spam=True, confidence=0.92, reason="Urgent call-to-action pattern")
]

batch() with files

Process PDF files with optional structured output and citations.

from batchata import batch
from pydantic import BaseModel

class Invoice(BaseModel):
    company_name: str
    total_amount: str
    date: str

# Process PDFs with citations
job = batch(
    files=["invoice1.pdf", "invoice2.pdf"],
    prompt="Extract the company name, total amount, and date.",
    model="claude-3-5-sonnet-20241022",
    response_model=Invoice,
    enable_citations=True
)

results = job.results()
# Results now contain both data and citations together

Response:

# Results now contain both data and citations together
[
    {
        "result": Invoice(company_name="TechCorp Solutions", total_amount="$12,500.00", date="March 15, 2024"),
        "citations": {
            "company_name": [Citation(cited_text="TechCorp Solutions", start_page=1)],
            "total_amount": [Citation(cited_text="TOTAL: $12,500.00", start_page=2)],
            "date": [Citation(cited_text="Date: March 15, 2024", start_page=1)]
        }
    },
    {
        "result": Invoice(company_name="DataFlow Systems", total_amount="$8,750.00", date="March 18, 2024"),
        "citations": {
            "company_name": [Citation(cited_text="DataFlow Systems", start_page=1)],
            "total_amount": [Citation(cited_text="Total Due: $8,750.00", start_page=3)],
            "date": [Citation(cited_text="Invoice Date: March 18, 2024", start_page=1)]
        }
    }
]

BatchJob

The job object returned by batch().

# Check completion status
if job.is_complete():
    results = job.results()

# Get processing statistics with cost tracking
stats = job.stats(print_stats=True)
# Output:
# 📊 Batch Statistics
#    ID: msgbatch_01BPtdnmEwxtaDcdJ2eUsq4T
#    Status: ended
#    Complete: ✅
#    Elapsed: 41.8s
#    Mode: Text + Citations
#    Results: 0
#    Citations: 0
#    Input tokens: 2,117
#    Output tokens: 81
#    Total cost: $0.0038
#    (50% batch discount applied)

# Citations are now included in results (if enabled)
# Access via: results[0]["citations"]

# Save raw API responses
job = batch(..., raw_results_dir="./raw_responses")

BatchManager

Manage large-scale batch processing with automatic job splitting, parallel execution, state persistence, and cost management.

from batchata import BatchManager
from pydantic import BaseModel

class Invoice(BaseModel):
    company_name: str
    total_amount: float
    invoice_number: str

# Initialize BatchManager for large-scale processing
manager = BatchManager(
    files=["invoice1.pdf", "invoice2.pdf", ...],  # 100+ files
    prompt="Extract invoice data",
    model="claude-3-5-sonnet-20241022",
    response_model=Invoice,
    enable_citations=True,
    items_per_job=10,      # Process 10 files per job
    max_parallel_jobs=5,   # 5 jobs in parallel
    max_cost=50.0,         # Stop if cost exceeds $50
    state_path="batch_state.json",  # Auto-resume capability
    save_results_dir="results/"     # Save results to disk
)

# Run processing (auto-resumes if interrupted)
summary = manager.run(print_progress=True)

# Retry failed items
if summary['failed_items'] > 0:
    retry_summary = manager.retry_failed()

# Get statistics
stats = manager.stats
print(f"Completed: {stats['completed_items']}/{stats['total_items']}")
print(f"Total cost: ${stats['total_cost']:.2f}")

# Load results from disk
results = manager.get_results_from_disk()

Key Features:

  • Automatic job splitting: Breaks large batches into smaller chunks
  • Parallel processing: Multiple jobs run concurrently with ThreadPoolExecutor
  • State persistence: Resume from interruptions with JSON state files
  • Cost management: Stop processing when budget limit is reached
  • Progress monitoring: Real-time progress updates with statistics
  • Retry mechanism: Easily retry failed items
  • Result saving: Organized directory structure for results

Citations

Citations work in two modes depending on whether you use structured output:

1. Text + Citations (Flat List)

When enable_citations=True without a response model, citations are returned as a flat list:

job = batch(
    files=["document.pdf"],
    prompt="Summarize the key findings",
    enable_citations=True
)

results = job.results()   # List of {"result": str, "citations": List[Citation]}

# Example result structure:
[
    {
        "result": "Summary text...",
        "citations": [
            Citation(cited_text="AI reduces errors by 30%", start_page=2),
            Citation(cited_text="Implementation cost: $50,000", start_page=5)
        ]
    }
]

2. Structured + Field Citations (Mapping)

When using both response_model and enable_citations=True, citations are mapped to specific fields:

job = batch(
    files=["document.pdf"],
    prompt="Extract the data",
    response_model=MyModel,
    enable_citations=True
)

results = job.results()   # List of {"result": Model, "citations": Dict[str, List[Citation]]}

# Example result structure:
[
    {
        "result": MyModel(title="Annual Report 2024", revenue="$1.2M"),
        "citations": {
            "title": [Citation(cited_text="Annual Report 2024", start_page=1)],
            "revenue": [Citation(cited_text="Revenue: $1.2M", start_page=3)],
            "growth": [Citation(cited_text="YoY Growth: 25%", start_page=3)]
        }
    }
]

The field mapping allows you to trace exactly which part of the source document was used to populate each field in your structured output.

Robust Citation Parsing

Batchata uses proper JSON parsing for citation field mapping, ensuring reliability with complex JSON structures:

Handles Complex Scenarios:

  • ✅ Escaped quotes in JSON values: "name": "John \"The Great\" Doe"
  • ✅ URLs with colons: "website": "http://example.com:8080"
  • ✅ Nested objects and arrays: "metadata": {"nested": {"deep": "value"}}
  • ✅ Multi-line strings and special characters
  • ✅ Fields with numbers/underscores: user_name, age_2

Cost Tracking

Batchata automatically tracks token usage and costs for all batch operations:

from batchata import batch

job = batch(
    messages=[...],
    model="claude-3-5-sonnet-20241022"
)

# Get cost information
stats = job.stats()
print(f"Total cost: ${stats['total_cost']:.4f}")
print(f"Input tokens: {stats['total_input_tokens']:,}")
print(f"Output tokens: {stats['total_output_tokens']:,}")

# Or print formatted statistics
job.stats(print_stats=True)

Example Scripts

Limitations

  • Citationm mapping only work with flat Pydantic models (no nested models)
  • No support for OpenAI.
  • PDFs require Opus/Sonnet models for best results
  • Batch jobs can take up to 24 hours to process
  • Use job.is_complete() to check status before getting results
  • Citations may not be available in all batch API responses

Comparison with Alternatives

Feature batchata LangChain Instructor PydanticAI
Batch Requests ✅ Native (50% cost savings) ❌ No native batch API ✅ Via OpenAI Batch API (#1092) ⚠️ Planned (#1771)
Structured Output ✅ Full support ✅ Via parsers ✅ Core feature ✅ Native
PDF File Input ✅ Native support ✅ Via document loaders ✅ Via multimodal models ✅ Via file handling
Citation Mapping ✅ Field-level citations ❌ Manual implementation ❌ Manual implementation ❌ Manual implementation
Cost Tracking ✅ Automatic with tokencost ❌ Manual implementation ❌ Manual implementation ❌ Manual implementation
Cost Limits ✅ max_cost parameter ❌ Manual implementation ❌ Manual implementation ❌ Manual implementation
Batch Providers 2/2 (Anthropic, OpenAI planned) 0/2 1/2 (OpenAI only) 0/2
Focus Streamlined batch requests General LLM orchestration Structured outputs CLI Agent framework

License

MIT

Todos

  • Add pricing metadata and max_spend controls (Cost tracking implemented)
  • Auto batch manager (parallel batches, retry, spend control) (BatchManager implemented)
  • Test mode to run on 1% sample before full batch
  • Quick batch - split into smaller chunks for faster results
  • Support text/other file types (not just PDFs)
  • Support for OpenAI

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

batchata-0.2.1.tar.gz (125.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

batchata-0.2.1-py3-none-any.whl (29.7 kB view details)

Uploaded Python 3

File details

Details for the file batchata-0.2.1.tar.gz.

File metadata

  • Download URL: batchata-0.2.1.tar.gz
  • Upload date:
  • Size: 125.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for batchata-0.2.1.tar.gz
Algorithm Hash digest
SHA256 245882280b141c8e8313807f416582604f68d673fc43eb61278304f2444c77a1
MD5 605698d6ddb694339b0006434d36f7d6
BLAKE2b-256 fb041bfd54a1d2d3ff8195a6d03eae561f65b32ce080b481403918b2ecd7f3a6

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchata-0.2.1.tar.gz:

Publisher: publish.yml on agamm/batchata

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file batchata-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: batchata-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 29.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for batchata-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5cf7921dd3bc89b33672923f2cd5f1f7703dbc3adbdaefcaab526cc4144843ea
MD5 2180298c6bec9450c5ae358f20bc28b8
BLAKE2b-256 9b39b60f656653266ecb6c3061adaa5f5862ab64c65cd31442467717b2bb2c56

See more details on using hashes here.

Provenance

The following attestation bundles were made for batchata-0.2.1-py3-none-any.whl:

Publisher: publish.yml on agamm/batchata

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page