Skip to main content

Ondine - The LLM Dataset Engine. SDK for processing tabular datasets using LLMs with reliability, observability, and cost control

Project description

Ondine Logo

LLM Dataset Engine

Batch process millions of rows with LLMs — 100x fewer API calls, 40-50% cost savings, 99.9% completion rate

PyPI version Downloads License: MIT Python 3.10+ GitHub stars Tests Documentation

Ondine Demo

For data engineers and ML practitioners who need to process millions of tabular rows with LLMs, Ondine is an open-source SDK that delivers 100x fewer API calls via multi-row batching and 40-50% cost reduction via prefix caching — with cost estimation, budget limits, checkpointing, and 100+ provider support built in.

Features

  • Quick API: 3-line hello world with smart defaults and auto-detection
  • Simple API: Fluent builder pattern for full control when needed
  • Multi-Row Batching: Process N rows per API call for 100× speedup (NEW!)
  • Prefix Caching: 40-50% cost reduction by caching system prompts across millions of rows
  • Reliability: Automatic retries, checkpointing, error policies (99.9% completion rate)
  • Cost Control: Pre-execution estimation, budget limits, real-time tracking
  • Observability: Progress bars, structured logging, metrics, cost reports
  • Extensibility: Plugin architecture, custom stages, multiple LLM providers
  • Fault Tolerant: Zero data loss on crashes, resume from checkpoint
  • 100+ Providers: Native LiteLLM integration supporting OpenAI, Azure, Anthropic, Groq, Cerebras, Moonshot, and 100+ others
  • Smart Routing: Built-in LiteLLM Router with latency-based routing (fastest wins) and automatic failover for high availability
  • Local Inference: Run models locally with MLX (Apple Silicon) or Ollama - 100% free, private, offline-capable
  • Multi-Column Processing: Generate multiple output columns with composition or JSON parsing
  • Custom Providers: Integrate any OpenAI-compatible API (Together.AI, vLLM, Ollama, custom endpoints)
  • Context Store (Anti-Hallucination): Post-LLM quality layer with grounding verification, contradiction detection, and confidence scoring (Rust/SQLite/FTS5)
  • Knowledge Store (RAG): Pre-LLM knowledge retrieval with document ingestion, hybrid search, and reranking
  • Evidence Priming: Pre-LLM injection of prior validated answers for cross-run consistency

Quick Start

Option 1: Quick API (Recommended)

The simplest way to get started - just provide your data, prompt, and model:

from ondine import QuickPipeline

# Process data with smart defaults
pipeline = QuickPipeline.create(
    data="data.csv",
    prompt="Clean this text: {description}",
    model="gpt-4o-mini"
)

# Execute pipeline
result = pipeline.execute()
print(f"Processed {result.metrics.processed_rows} rows")
print(f"Total cost: ${result.costs.total_cost:.4f}")

What's auto-detected:

  • Input columns from {placeholders} in prompt
  • Provider from model name (gpt-4 → openai, claude → anthropic)
  • Parser type (JSON for multi-column, text for single column)
  • Sensible batch size and concurrency for the provider

Option 2: Builder API (Full Control)

For advanced use cases requiring explicit configuration:

from ondine import PipelineBuilder

# Build with explicit settings
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["description"],
              output_columns=["cleaned"])
    .with_prompt("Clean this text: {description}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .with_batch_size(100)
    .with_concurrency(5)
    .build()
)

# Estimate cost before running
estimate = pipeline.estimate_cost()
print(f"Estimated cost: ${estimate.total_cost:.4f}")

# Execute pipeline
result = pipeline.execute()
print(f"Total cost: ${result.costs.total_cost:.4f}")

Why Ondine?

Feature Ondine LangChain DSPy Custom Scripts
Purpose-built for tabular data ❌ General purpose ❌ Prompt optimization ⚠️ Manual
Multi-row batching (100x fewer calls) ✅ Built-in ⚠️ DIY
Prefix caching (40-50% savings) ✅ Automatic ⚠️ DIY
Pre-run cost estimation
Budget limits & real-time tracking
Checkpointing & resume ✅ Automatic ⚠️ DIY
100+ LLM providers ✅ Via LiteLLM ✅ Via LangChain Hub ⚠️ Limited ⚠️ Manual
Structured output (Pydantic) ✅ Via Instructor ⚠️ DIY
Setup complexity pip install ondine Complex chains Research-oriented Significant engineering

Installation

Using uv (recommended)

# Basic installation
uv add ondine

# With observability support
uv add "ondine[observability]"

# With Excel support
uv add "ondine[excel]"

# With Parquet support
uv add "ondine[parquet]"

# With MLX support (Apple Silicon only)
uv add "ondine[mlx]"

# Everything included
uv add "ondine[all]"

Using pip

# Basic installation
pip install ondine

# With observability support
pip install "ondine[observability]"

# With Excel support
pip install "ondine[excel]"

# With Parquet support
pip install "ondine[parquet]"

# With MLX support (Apple Silicon only)
pip install "ondine[mlx]"

# Everything included
pip install "ondine[all]"

Set up API keys

# For cloud providers
export OPENAI_API_KEY="your-key-here"  # pragma: allowlist secret
# or
export AZURE_OPENAI_API_KEY="your-key-here"  # pragma: allowlist secret
export AZURE_OPENAI_ENDPOINT="https://your-endpoint.openai.azure.com/"
# or
export ANTHROPIC_API_KEY="your-key-here"
# or
export GROQ_API_KEY="your-key-here"
# or
export TOGETHER_API_KEY="your-key-here"

# For MLX (Apple Silicon)
export HUGGING_FACE_HUB_TOKEN="your-token-here"  # For model downloads

# Local providers (Ollama, vLLM) don't need API keys

Documentation

Complete documentation is available at: https://ptimizeroracle.github.io/ondine

The documentation includes:

  • Installation and setup guides
  • Quickstart tutorial (build your first pipeline in 5 minutes)
  • Core concepts and architecture
  • Execution modes (sync, async, streaming)
  • Structured output with Pydantic
  • Cost control and optimization
  • Provider-specific guides
  • Complete API reference (auto-generated from source)

Usage Examples

1. Simple Data Processing

from ondine import DatasetProcessor

# Minimal configuration for simple use cases
processor = DatasetProcessor(
    data="reviews.csv",
    input_column="customer_review",
    output_column="sentiment",
    prompt="Classify sentiment as: Positive, Negative, or Neutral\nReview: {customer_review}\nSentiment:",
    llm_config={"provider": "openai", "model": "gpt-4o-mini"}
)

# Test on sample first
sample = processor.run_sample(n=10)
print(sample)

# Process full dataset
result = processor.run()

2. Structured Data Extraction (JSON)

from ondine import PipelineBuilder

pipeline = (
    PipelineBuilder.create()
    .from_dataframe(
        df,
        input_columns=["product_description"],
        output_columns=["brand", "model", "price", "condition"]
    )
    .with_prompt("""
        Extract structured information and return JSON:
        {
          "brand": "...",
          "model": "...",
          "price": "...",
          "condition": "new|used|refurbished"
        }

        Description: {product_description}
    """)
    .with_llm(provider="openai", model="gpt-4o-mini", temperature=0.0)
    .build()
)

result = pipeline.execute()

3. Prefix Caching for Cost Reduction (NEW!)

Reduce costs by 40-50% on large datasets by caching system prompts:

from ondine import PipelineBuilder

# Define shared context once (cached across all stages and rows)
SHARED_CONTEXT = """You are an expert data analyst.
[General domain knowledge and principles - 1024+ tokens for OpenAI caching]
"""

# Stage 1: First transformation
pipeline1 = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["result1"])
    .with_prompt("TASK: Analyze text\nINPUT: {text}\nOUTPUT:")
    .with_system_prompt(SHARED_CONTEXT)  # Cached!
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

# Stage 2: Second transformation (reuses Stage 1's cache!)
pipeline2 = (
    PipelineBuilder.create()
    .from_csv("data_stage1.csv",
              input_columns=["text", "result1"],
              output_columns=["result2"])
    .with_prompt("TASK: Further analysis\nINPUT: {text}, {result1}\nOUTPUT:")
    .with_system_prompt(SHARED_CONTEXT)  # Same cache!
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

# Execute both stages
result1 = pipeline1.execute()
result2 = pipeline2.execute()

# Cost savings: 40-50% reduction from caching!

How it works:

  • System prompt (1024+ tokens) cached automatically by provider
  • Subsequent requests reuse the cache (no warm-up needed)
  • Only pay full price for dynamic data (your row-specific content)
  • 50% discount on cached tokens (OpenAI), up to 90% (Anthropic)

Requirements:

  • OpenAI: System prompt >1024 tokens for automatic caching
  • Anthropic: System message separation (automatic caching)
  • Groq: Model-specific support (check provider docs)

Use cases:

  • Multi-stage pipelines (classification, enrichment, validation)
  • Large datasets with repeated instructions
  • Any workflow with static context + dynamic data

See examples/20_prefix_caching.py for complete example.

4. Multi-Row Batching for 100× Speedup (NEW!)

Process 100 rows in a single API call to reduce API calls by 100×:

from ondine import PipelineBuilder

# Traditional (slow): 5M rows = 5M API calls
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["sentiment"])
    .with_prompt("Classify: {text}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

# With batching (fast): 5M rows = 50K API calls (100× fewer!)
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["sentiment"])
    .with_prompt("Classify: {text}")
    .with_batch_size(100)  # Process 100 rows per API call!
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

How it works:

  • Aggregates N rows into a single JSON-formatted prompt
  • LLM processes all rows in one call and returns JSON array
  • Automatically disaggregates response back to individual rows
  • Handles partial failures (retries failed rows individually)

Benefits:

  • 100× fewer API calls (5M → 50K with batch_size=100)
  • 100× faster processing (69 hours → 42 minutes)
  • Same token cost, eliminates API overhead
  • Automatic context window validation

Requirements:

  • Batch size limited by model context window (auto-validated)
  • Works with all providers (OpenAI, Anthropic, Groq, custom)
  • Recommended: Start with batch_size=10-50, increase based on results

See examples/21_multi_row_batching.py for complete examples and benchmarks.

5. Type-Safe Structured Output (Pydantic)

from pydantic import BaseModel
from ondine import PipelineBuilder
from ondine.stages.response_parser_stage import PydanticParser

# Define your Pydantic model for type-safe validation
class ProductInfo(BaseModel):
    brand: str
    model: str
    price: float
    condition: str

pipeline = (
    PipelineBuilder.create()
    .from_dataframe(
        df,
        input_columns=["product_description"],
        output_columns=["brand", "model", "price", "condition"]
    )
    .with_prompt("""
        Extract product information and return JSON:
        {
          "brand": "manufacturer name",
          "model": "product model",
          "price": 999.99,
          "condition": "new|used|refurbished"
        }

        Description: {product_description}
    """)
    .with_llm(provider="openai", model="gpt-4o-mini", temperature=0.0)
    .with_parser(PydanticParser(ProductInfo, strict=True))  # Type-safe validation!
    .build()
)

result = pipeline.execute()
# Results are validated against ProductInfo model

6. With Cost Control

pipeline = (
    PipelineBuilder.create()
    .from_csv("large_dataset.csv",
              input_columns=["text"],
              output_columns=["summary"])
    .with_prompt("Summarize in 10 words: {text}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    # Cost control settings
    .with_max_budget(10.0)  # Maximum $10
    .with_batch_size(100)
    .with_concurrency(5)
    .with_rate_limit(60)  # 60 requests/min
    .with_checkpoint_interval(500)  # Checkpoint every 500 rows
    .build()
)

# Estimate first
estimate = pipeline.estimate_cost()
if estimate.total_cost > 10.0:
    print("Cost too high!")
    exit()

result = pipeline.execute()

7. Multiple Input Columns

pipeline = (
    PipelineBuilder.create()
    .from_csv("products.csv",
              input_columns=["title", "description", "category"],
              output_columns=["optimized_title"])
    .with_prompt("""
        Optimize this product title for SEO.

        Current Title: {title}
        Description: {description}
        Category: {category}

        Optimized Title:
    """)
    .with_llm(provider="openai", model="gpt-4o-mini")
    .with_output("optimized_products.csv", format="csv")
    .build()
)

result = pipeline.execute()

8. Azure OpenAI

pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["result"])
    .with_prompt("Process: {text}")
    .with_llm(
        provider="azure_openai",
        model="gpt-4",
        azure_endpoint="https://your-endpoint.openai.azure.com/",
        azure_deployment="your-deployment-name",
        api_version="2024-02-15-preview"
    )
    .build()
)

9. Anthropic Claude

pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["analysis"])
    .with_prompt("Analyze: {text}")
    .with_llm(
        provider="anthropic",
        model="claude-3-opus-20240229",
        temperature=0.0,
        max_tokens=1024
    )
    .build()
)

10. Local Inference with MLX (Apple Silicon)

# 100% free, private, offline-capable inference on M1/M2/M3/M4 Macs
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["summary"])
    .with_prompt("Summarize: {text}")
    .with_llm(
        provider="mlx",
        model="mlx-community/Qwen3-1.7B-4bit",  # Fast, small model
        max_tokens=100,
        input_cost_per_1k_tokens=0.0,  # Free!
        output_cost_per_1k_tokens=0.0
    )
    .with_concurrency(1)  # MLX works best with concurrency=1
    .build()
)

Requirements:

  • macOS with Apple Silicon (M1/M2/M3/M4)
  • Install with: pip install ondine[mlx]

11. Provider Presets (Simplified Configuration)

from ondine import PipelineBuilder
from ondine.core.specifications import LLMProviderPresets

# Use pre-configured providers (80% less boilerplate!)
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["result"])
    .with_prompt("Process: {text}")
    .with_llm_spec(LLMProviderPresets.TOGETHER_AI_LLAMA_70B)  # One line!
    .build()
)

# Available presets:
# - LLMProviderPresets.GPT4O_MINI
# - LLMProviderPresets.GPT4O
# - LLMProviderPresets.TOGETHER_AI_LLAMA_70B
# - LLMProviderPresets.TOGETHER_AI_LLAMA_8B
# - LLMProviderPresets.OLLAMA_LLAMA_70B (free, local)
# - LLMProviderPresets.OLLAMA_LLAMA_8B (free, local)
# - LLMProviderPresets.GROQ_LLAMA_70B
# - LLMProviderPresets.CLAUDE_SONNET_4

# Override preset settings:
custom = LLMProviderPresets.GPT4O_MINI.model_copy(
    update={"temperature": 0.9, "max_tokens": 500}
)
pipeline.with_llm_spec(custom)

# Custom provider via factory:
custom_vllm = LLMProviderPresets.create_custom_openai_compatible(
    provider_name="My vLLM Server",
    model="mistral-7b-instruct",
    base_url="http://my-server:8000/v1"
)
pipeline.with_llm_spec(custom_vllm)

Benefits:

  • Zero configuration errors (pre-validated)
  • Correct pricing and URLs built-in
  • IDE autocomplete for discovery
  • 80% code reduction vs parameter-based config

12. Custom OpenAI-Compatible APIs (Parameter-Based)

# Alternative: Configure providers with individual parameters
pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["result"])
    .with_prompt("Process: {text}")
    .with_llm(
        provider="openai_compatible",
        provider_name="Together.AI",  # Or "Ollama", "vLLM", etc.
        model="meta-llama/Meta-Llama-3.1-70B-Instruct-Turbo",
        base_url="https://api.together.xyz/v1",  # Custom endpoint
        api_key="${TOGETHER_API_KEY}",
        input_cost_per_1k_tokens=0.0006,
        output_cost_per_1k_tokens=0.0006
    )
    .build()
)

Supported APIs:

  • Ollama (local): http://localhost:11434/v1
  • Together.AI (cloud): https://api.together.xyz/v1
  • vLLM (self-hosted): Your custom endpoint
  • Any OpenAI-compatible API

13. Multi-Column Output with JSON Parsing

# Single LLM call generates multiple output columns
pipeline = (
    PipelineBuilder.create()
    .from_csv("products.csv",
              input_columns=["description"],
              output_columns=["brand", "category", "price"])  # Multiple outputs!
    .with_prompt("""
        Extract structured data from this product description.
        Return JSON format:
        {
          "brand": "...",
          "category": "...",
          "price": "..."
        }

        Description: {description}
    """)
    .with_llm(provider="openai", model="gpt-4o-mini", temperature=0.0)
    .build()
)

result = pipeline.execute()
# Result has 3 new columns: brand, category, price

14. Pipeline Composition (Multi-Column with Dependencies)

from ondine import PipelineComposer

# Create multiple pipelines with dependencies
composer = PipelineComposer(input_data=df)

# Pipeline 1: Generate sentiment score
sentiment_pipeline = (
    PipelineBuilder.create()
    .from_dataframe(df, input_columns=["review"], output_columns=["sentiment"])
    .with_prompt("Rate sentiment (0-100): {review}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

# Pipeline 2: Generate explanation (depends on sentiment)
explanation_pipeline = (
    PipelineBuilder.create()
    .from_dataframe(df,
                    input_columns=["review", "sentiment"],
                    output_columns=["explanation"])
    .with_prompt("Explain why this review has {sentiment}% sentiment: {review}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .build()
)

# Compose and execute
result = (
    composer
    .add_column("sentiment", sentiment_pipeline)
    .add_column("explanation", explanation_pipeline, depends_on=["sentiment"])
    .execute()
)

15. Context Store — Anti-Hallucination Quality Layer (NEW!)

Add post-LLM grounding, contradiction detection, and confidence scoring to catch hallucinations before they reach your output:

from ondine import PipelineBuilder

pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["summary"])
    .with_prompt("Summarize: {text}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    # Anti-hallucination quality layer (post-LLM)
    .with_context_store()                # Enable evidence graph (Rust/SQLite/FTS5)
    .with_grounding()                    # Verify claims against source text
    .with_contradiction_detection()      # Flag conflicting statements across rows
    .with_confidence_scoring()           # Score each output 0.0-1.0
    .build()
)

result = pipeline.execute()
# Outputs include grounding status, contradiction flags, and confidence scores

How it works:

  • Stores validated claims in a persistent evidence graph (SQLite + FTS5 full-text search)
  • Grounding stage checks LLM output against input text to flag unsupported claims
  • Contradiction detection compares new outputs against previously validated evidence
  • Confidence scoring combines grounding + contradiction signals into a 0-1 score
  • Powered by a Rust backend via PyO3 for sub-millisecond lookups

16. Knowledge Store — RAG Retrieval (NEW!)

Inject relevant knowledge from your documents into each prompt before the LLM call:

from ondine import PipelineBuilder
from ondine.knowledge import KnowledgeStore, SentenceTransformerEmbedder

# Build a knowledge store from your documents
kb = KnowledgeStore(embedder=SentenceTransformerEmbedder())
kb.ingest("docs/")

pipeline = (
    PipelineBuilder.create()
    .from_csv("questions.csv", input_columns=["question"], output_columns=["answer"])
    .with_prompt("Answer using the provided context: {question}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    # RAG: retrieve top-k chunks and inject as {_kb_context} (pre-LLM)
    .with_knowledge_base(kb, top_k=5)
    .build()
)

result = pipeline.execute()
# Each row's prompt is augmented with the most relevant knowledge chunks

How it works:

  • Ingests documents (PDF, text, images via OCR) into chunked embeddings
  • Hybrid search combines BM25 keyword matching with vector similarity
  • Optional reranking stage scores candidates for higher precision
  • Retrieved chunks are injected into the system prompt before each LLM call
  • Rust-powered chunking and indexing for fast ingestion at scale

17. Evidence Priming — Cross-Run Consistency (NEW!)

Re-inject prior validated answers so the LLM stays consistent across pipeline runs:

from ondine import PipelineBuilder

pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["category"])
    .with_prompt("Classify: {text}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .with_context_store()  # Required: evidence graph stores validated answers
    # Inject top-k prior answers matching the current input (pre-LLM)
    .with_evidence_priming(
        query_columns=["text"],  # Columns to search against evidence graph
        top_k=3,                 # Number of prior answers to inject
        min_score=0.1            # Minimum similarity threshold
    )
    .build()
)

result = pipeline.execute()
# LLM sees relevant prior answers, improving consistency across runs

How it works:

  • Queries the evidence graph for previously validated outputs similar to the current input
  • Matched evidence is prepended to the LLM prompt as reference examples
  • Helps the model produce consistent classifications and extractions over time
  • Works with with_context_store() to build a growing knowledge loop

CLI Usage

Ondine includes a powerful command-line interface for processing datasets without writing code.

List Available Providers

# See all supported LLM providers
ondine list-providers

This shows:

  • Provider IDs (openai, azure_openai, anthropic, groq, mlx, openai_compatible)
  • Platform requirements
  • Cost estimates
  • Use cases
  • Required environment variables

Process Datasets

# Basic usage
ondine process --config config.yaml

# Override input/output
ondine process --config config.yaml --input data.csv --output results.csv

# Override provider and model
ondine process --config config.yaml --provider groq --model llama-3.3-70b-versatile

# Set budget limit
ondine process --config config.yaml --max-budget 10.0

# Dry run (estimate only, don't execute)
ondine process --config config.yaml --dry-run

# Estimate cost
ondine estimate --config config.yaml --input data.csv

# Inspect data
ondine inspect --input data.csv --head 10

Example Config File

# config.yaml
dataset:
  source_type: csv
  source_path: data.csv
  input_columns: [text]
  output_columns: [sentiment]

prompt:
  template: "Classify sentiment: {text}"

llm:
  provider: openai
  model: gpt-4o-mini
  temperature: 0.0

processing:
  batch_size: 100
  concurrency: 5
  max_budget: 10.0

output:
  destination_type: csv
  destination_path: output.csv

Architecture

The SDK follows a layered architecture:

┌─────────────────────────────────────────┐
│  Layer 5: High-Level API                │
│  (Pipeline, PipelineBuilder)            │
├─────────────────────────────────────────┤
│  Layer 4: Orchestration Engine          │
│  (PipelineExecutor, StateManager)       │
├─────────────────────────────────────────┤
│  Layer 3: Quality & Knowledge (NEW)     │
│  Pre-LLM: KnowledgeRetrieval,          │
│           EvidencePriming               │
│  Post-LLM: Grounding, Contradiction,   │
│            ConfidenceScoring            │
├─────────────────────────────────────────┤
│  Layer 2: Processing Stages             │
│  (DataLoader, LLMInvocation, Parser)    │
├─────────────────────────────────────────┤
│  Layer 1: Infrastructure Adapters       │
│  (LLMClient, DataReader, Checkpoint,   │
│   ContextStore, KnowledgeStore)         │
├─────────────────────────────────────────┤
│  Layer 0: Core Utilities (Rust/PyO3)    │
│  (RetryHandler, RateLimiter, Logging,   │
│   SQLite/FTS5, EvidenceGraph)           │
└─────────────────────────────────────────┘

Key Design Principles

  • Simple: Straightforward solutions
  • DRY: No code duplication
  • Type Safe: Type hints throughout
  • Separation of Concerns: Configuration vs. execution

Supported LLM Providers

Provider Platform Cost Use Case Setup
OpenAI Cloud (All) $$ Production, high quality OPENAI_API_KEY
Azure OpenAI Cloud (All) $$ Enterprise, compliance, Managed Identity support AZURE_OPENAI_API_KEY or Managed Identity
Anthropic Cloud (All) $$$ Long context, Claude models ANTHROPIC_API_KEY
Groq Cloud (All) Free tier Fast inference, development GROQ_API_KEY
MLX macOS (M1/M2/M3/M4) Free Local, private, offline pip install ondine[mlx]
OpenAI-Compatible Custom/Local/Cloud Varies Ollama, vLLM, Together.AI base_url + optional API key

Run ondine list-providers to see detailed information about each provider.

Use Cases

  • Data Cleaning: Clean, normalize, standardize text data
  • Sentiment Analysis: Classify sentiment at scale
  • Information Extraction: Extract structured data from unstructured text
  • Categorization: Auto-categorize products, documents, emails
  • Content Generation: Generate descriptions, summaries, titles
  • Translation: Translate content to multiple languages
  • Data Enrichment: Enhance datasets with LLM-generated insights
  • Product Matching: Compare and score product similarity
  • Content Moderation: Flag inappropriate content at scale

Performance

  • Throughput: Process 1,000 rows in < 5 minutes (GPT-4o-mini, concurrency=5)
  • Reliability: 99.9% completion rate with automatic retries
  • Cost Efficiency: Pre-execution estimation within 10% accuracy
  • Memory: < 500MB for datasets up to 50K rows

Observability & Debugging

Ondine supports multiple observability backends via LiteLLM callbacks for automatic instrumentation of all LLM calls. Add observability with a single line:

from ondine import PipelineBuilder

pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", input_columns=["text"], output_columns=["result"])
    .with_prompt("Process: {text}")
    .with_llm(provider="openai", model="gpt-4o-mini")
    # Add observability - automatically tracks ALL LLM calls!
    .with_observer("langfuse", config={
        "public_key": "pk-lf-...",
        "secret_key": "sk-lf-..."  # pragma: allowlist secret
    })
    .build()
)

result = pipeline.execute()

Supported Observers

Langfuse - LLM-specific observability (recommended):

.with_observer("langfuse", config={
    "public_key": "pk-lf-...",
    "secret_key": "sk-lf-...",  # pragma: allowlist secret
    "host": "https://cloud.langfuse.com"  # optional
})

Tracks: prompts, completions, tokens, costs, latency, model info

OpenTelemetry - Infrastructure monitoring:

.with_observer("opentelemetry", config={})

Tracks: spans, traces, metrics - works with Jaeger, Datadog, Grafana

Logging - Simple console output:

.with_observer("logging", config={})

Tracks: basic LLM call logs to console

Multiple observers simultaneously:

pipeline = (
    PipelineBuilder.create()
    .from_csv("data.csv", ...)
    .with_prompt("...")
    .with_llm(provider="openai", model="gpt-4o-mini")
    .with_observer("langfuse", config={...})
    .with_observer("opentelemetry", config={...})
    .with_observer("logging", config={})
    .build()
)

What's Tracked Automatically

Powered by LiteLLM callbacks:

  • Full prompt and completion text
  • Token usage (input, output, cached tokens)
  • Cost per call (automatic via litellm.completion_cost)
  • Latency metrics
  • Model and provider information
  • Router failover events
  • Cache hit/miss metrics

Examples

See complete examples:

  • examples/15_observability_logging.py - Simple console logging
  • examples/16_observability_opentelemetry.py - OpenTelemetry + Jaeger
  • examples/17_observability_langfuse.py - Langfuse integration
  • examples/18_observability_multi.py - Multiple observers

Setup Langfuse (Recommended for LLM Observability)

  1. Sign up at https://cloud.langfuse.com (free tier available)
  2. Get your API keys
  3. Add to your pipeline:
.with_observer("langfuse", config={
    "public_key": "pk-lf-...",
    "secret_key": "sk-lf-..."  # pragma: allowlist secret
})
  1. View detailed traces in Langfuse dashboard

Configuration Options

Execution Modes

Standard Execution (default)

pipeline = PipelineBuilder.create().from_csv(...).build()
result = pipeline.execute()

Use when: Dataset fits in memory (< 50K rows typical), straightforward processing.

Async Execution (concurrent processing)

pipeline = (
    PipelineBuilder.create()
    .from_csv(...)
    .with_async_execution(max_concurrency=10)
    .build()
)
result = await pipeline.execute_async()

Use when: Need high throughput, LLM API supports async, running in async context (FastAPI, aiohttp).

Streaming Execution (memory-efficient)

pipeline = (
    PipelineBuilder.create()
    .from_csv(...)
    .with_streaming(chunk_size=1000)
    .build()
)
for chunk_result in pipeline.execute_stream():
    # Process each chunk as it completes
    chunk_result.data.to_csv("output_chunk.csv", mode="a")

Use when: Large datasets (100K+ rows), limited memory, need constant memory footprint, early results desired.

When NOT to use streaming:

  • Dataset under 50K rows (overhead not justified)
  • Need entire dataset in memory for post-processing
  • Pipeline has dependencies between rows

See examples/08_streaming_large_files.py for detailed streaming example.

Processing Configuration

.with_batch_size(100)          # Rows per batch
.with_concurrency(5)            # Parallel requests
.with_checkpoint_interval(500)  # Checkpoint frequency
.with_rate_limit(60)            # Requests per minute
.with_max_budget(10.0)          # Maximum USD budget

LLM Configuration

.with_llm(
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.0,        # 0.0-2.0
    max_tokens=1024,        # Max output tokens
    api_key="..."           # Or from env
)

Output Configuration

.with_output(
    path="output.csv",
    format="csv",              # csv, excel, parquet
    merge_strategy="replace"   # replace, append, update
)

Testing

# Run tests
uv run pytest

# With coverage
uv run pytest --cov=src --cov-report=html

# Run specific test
uv run pytest tests/test_pipeline.py

Documentation

Full documentation: https://ptimizeroracle.github.io/ondine

Additional resources:

  • README.md (this file): Quick start and usage guide
  • examples/: Example scripts demonstrating various features
  • Code docstrings: Inline documentation for all public APIs

Contributing

Contributions welcome! Please follow:

  1. Fork the repository at https://github.com/ptimizeroracle/Ondine
  2. Create a feature branch
  3. Follow the existing code style (Black, Ruff)
  4. Add tests for new features
  5. Update documentation
  6. Submit a pull request

License

MIT License - see LICENSE file for details

Acknowledgments

  • Built with LiteLLM for native multi-provider LLM integration (100+ providers)
  • Uses Instructor for type-safe structured output with Pydantic models
  • LlamaIndex integration preserved for future RAG capabilities
  • Ondine adds: batch processing, Router for load balancing, automatic cost tracking, checkpointing, YAML configuration, and dataset orchestration
  • Thanks to the open-source community

Support

Recent Updates

Version 1.4.1 (November 27, 2025)

Native Caching Upgrade:

  • 🚀 Native LiteLLM Caching: Switched to litellm.cache for robust, multi-backend caching (Redis, S3, Memory)
  • 💾 Disk Caching Support: New with_disk_cache() for zero-setup local persistence (using diskcache)
  • 🧹 Code Cleanup: Removed custom caching implementation for better maintainability and feature parity with LiteLLM

Version 1.4.0 (November 27, 2025)

Latest Release - Router Optimization & Robust Batching:

  • 🚀 Latency-Based Routing: Automatically routes traffic to the fastest provider (Groq, Cerebras, etc.) in real-time
  • 🛡️ Resilient Router Fallback: Fixed critical bug where single-node failures (404/NotFound) stopped pipelines; now automatically retries on other healthy providers
  • 📦 Minified JSON Batching: Optimized batch prompt payload (indent=None) to save tokens and context window
  • 🔄 Smart Auto-Retry: Improved logic to only retry rows where all output columns failed, preserving valid partial data
  • High-Concurrency defaults: Optimized default settings for multi-provider pools (concurrency=10, batch_size=50)
  • 📊 Enhanced Progress Tracking: Fixed UI glitches (freezing bars, 1% stuck) and improved per-provider cost attribution

Version 1.3.4 (November 24, 2025)

LiteLLM Native Integration:

  • 🚀 Native LiteLLM Integration: Replaced LlamaIndex wrappers with direct litellm.acompletion for 100+ provider support
  • 📦 Instructor for Structured Output: Type-safe Pydantic models with auto-detection (JSON mode for Groq, function calling for OpenAI/Anthropic)
  • 🔄 Router for Load Balancing: Built-in multi-provider failover and latency-based routing via LiteLLM Router
  • 💾 Redis Caching: Native LiteLLM response caching to avoid duplicate API calls
  • 📊 Prefix Caching Detection: Automatic logging of cached tokens (40-50% cost savings)
  • Async-First Design: Native async throughout with litellm.acompletion
  • 🧹 Code Reduction: Removed 673 lines of wrapper code (-11%), cleaner architecture
  • 100% Test Coverage: 461/461 unit tests, 103/103 integration tests passing

Breaking Changes:

  • None! Fully backward compatible with existing code

Version 1.2.1 (November 12, 2025)

Previous Release:

  • Progress tracking enhancements
  • Bug fixes and stability improvements
  • Enhanced error handling

Version 1.2.0 (November 9, 2025)

New Features:

  • Enhanced API documentation with examples
  • Fixed broken documentation references
  • Improved code organization

Version 1.1.0 (November 9, 2025)

New Features:

  • Additional provider improvements
  • Enhanced testing coverage
  • Documentation updates

Version 1.0.x (October 2025)

Initial Release Features:

  • Provider Presets: Pre-configured LLMSpec objects for common providers (80% code reduction)
  • Simplified Configuration: New with_llm_spec() method accepting LLMSpec objects
  • MLX Integration: Local inference on Apple Silicon (M1/M2/M3/M4) - 100% free, private, offline
  • OpenAI-Compatible Provider: Support for Ollama, vLLM, Together.AI, and custom APIs
  • Multi-Column Processing: Generate multiple output columns with JSON parsing
  • Pipeline Composition: Chain pipelines with dependencies between columns
  • CLI Provider Discovery: ondine list-providers command to explore all providers
  • Auto-Retry for Multi-Column: Automatic retry now checks all output columns for failures
  • Custom LLM Clients: Extend LLMClient base class for exotic APIs

Improvements:

  • Zero configuration errors with validated presets
  • Enhanced error handling for multi-column outputs
  • Better streaming implementation
  • Improved documentation with provider comparison guide
  • More examples (14+ example files including provider presets demo)

Roadmap

Recently Completed (v1.4.0 - November 24, 2025)

LiteLLM Native Integration - AGGRESSIVE REFACTOR

  • Native LiteLLM: Direct litellm.acompletion() integration (removed 673 lines of wrappers)
  • Instructor: Type-safe structured output with Pydantic (auto-retry on validation errors)
  • Router: Load balancing + failover across providers
  • Redis Caching: Response deduplication via litellm.cache
  • 100+ Providers: Expanded from 5 to 100+ supported providers
  • Async-First: Native async throughout (true non-blocking I/O)
  • Prefix Caching: Detection and logging (40-90% cost savings)
  • 100% Tests: 461 unit + 103 integration tests passing

Completed (v1.3.0 - November 2025)

Performance & Cost Optimizations

  • ✅ Multi-row batching (100× speedup)
  • ✅ Prefix caching support (40-50% cost reduction)
  • ✅ Flatten-then-concurrent pattern for true parallelism
  • ✅ Cache hit detection and monitoring
  • ✅ Shared context caching across pipeline stages
  • ✅ Optimized prompt formatting (10× faster with itertuples)

Upcoming Features

Performance & Cost Optimizations

  • Smart model selection and cost comparison
  • Automatic prompt optimization
  • Dynamic batch size optimization based on context window

New Capabilities

  • Enhanced streaming execution (async streaming)
  • Multi-modal support (images, PDFs)
  • RAG integration using LlamaIndex (vector stores, embeddings, retrieval)
  • Distributed processing (Spark/Dask integration)

Developer Experience

  • Web UI for pipeline management
  • Enhanced Router strategies (cost-based routing)
  • Redis caching analytics dashboard

Star History

Star History Chart

Built with LiteLLM and Instructor

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ondine-1.7.0.tar.gz (250.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ondine-1.7.0-cp311-cp311-manylinux_2_34_x86_64.whl (1.7 MB view details)

Uploaded CPython 3.11manylinux: glibc 2.34+ x86-64

File details

Details for the file ondine-1.7.0.tar.gz.

File metadata

  • Download URL: ondine-1.7.0.tar.gz
  • Upload date:
  • Size: 250.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.12.6

File hashes

Hashes for ondine-1.7.0.tar.gz
Algorithm Hash digest
SHA256 f8998f027562da7ad6e08a94527a9035cfbd87aed9c6ad010c3f04ca112062d5
MD5 14f9dd7be28c52f57a8364872d5f5d45
BLAKE2b-256 1df90572967d437a88bee4372c3da1b13ce9a3e4a5a0acb6f85229ae9cdc153f

See more details on using hashes here.

File details

Details for the file ondine-1.7.0-cp311-cp311-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for ondine-1.7.0-cp311-cp311-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 d388b16ab2d72583c595e389ad5058d917df26ea0bbb5a781ec1927766e5e7db
MD5 905f4ffd8470d57da7550143cd80dcf6
BLAKE2b-256 628a7c6afbc942792d1b5d73694790af5ea1e9db6a40e056191d0b1a8de55b2b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page