Core utilities for AI-powered processing pipelines using prefect
Project description
AI Pipeline Core
A high-performance async framework for building type-safe AI pipelines with LLMs, document processing, and workflow orchestration.
Overview
AI Pipeline Core is a production-ready framework that combines document processing, LLM integration, and workflow orchestration into a unified system. Built with strong typing (Pydantic), automatic retries, cost tracking, and distributed tracing, it enforces best practices while maintaining high performance through fully async operations.
Key Features
- Document Processing: Type-safe handling of text, JSON, YAML, PDFs, and images with automatic MIME type detection and provenance tracking
- LLM Integration: Unified interface to any model via LiteLLM proxy with configurable context caching
- Structured Output: Type-safe generation with Pydantic model validation
- Workflow Orchestration: Prefect-based flows and tasks with automatic retries
- Observability: Built-in distributed tracing via Laminar (LMNR) with cost tracking for debugging and monitoring
- Local Development: Simple runner for testing pipelines without infrastructure
Installation
pip install ai-pipeline-core
Requirements
- Python 3.12 or higher
- Linux/macOS (Windows via WSL2)
Development Installation
git clone https://github.com/bbarwik/ai-pipeline-core.git
cd ai-pipeline-core
pip install -e ".[dev]"
make install-dev # Installs pre-commit hooks
Quick Start
Basic Pipeline
from ai_pipeline_core import (
pipeline_flow,
FlowDocument,
DocumentList,
FlowOptions,
FlowConfig,
llm,
AIMessages
)
# Define document types
class InputDoc(FlowDocument):
"""Input document for processing."""
class OutputDoc(FlowDocument):
"""Analysis result document."""
# Define flow configuration
class AnalysisConfig(FlowConfig):
INPUT_DOCUMENT_TYPES = [InputDoc]
OUTPUT_DOCUMENT_TYPE = OutputDoc
# Create pipeline flow with required config
@pipeline_flow(config=AnalysisConfig)
async def analyze_flow(
project_name: str,
documents: DocumentList,
flow_options: FlowOptions
) -> DocumentList:
# Process documents
outputs = []
for doc in documents:
# Use AIMessages for LLM interaction
response = await llm.generate(
model="gpt-5",
messages=AIMessages([doc])
)
output = OutputDoc.create(
name=f"analysis_{doc.name}",
content=response.content
)
outputs.append(output)
# RECOMMENDED: Always validate output
return AnalysisConfig.create_and_validate_output(outputs)
Structured Output
from pydantic import BaseModel
from ai_pipeline_core import llm
class Analysis(BaseModel):
summary: str
sentiment: float
key_points: list[str]
# Generate structured output
response = await llm.generate_structured(
model="gpt-5",
response_format=Analysis,
messages="Analyze this product review: ..."
)
# Access parsed result with type safety
analysis = response.parsed
print(f"Sentiment: {analysis.sentiment}")
for point in analysis.key_points:
print(f"- {point}")
Document Handling
from ai_pipeline_core import FlowDocument, TemporaryDocument
# Create documents with automatic conversion
doc = MyDocument.create(
name="data.json",
content={"key": "value"} # Automatically converted to JSON bytes
)
# Parse back to original type
data = doc.parse(dict) # Returns {"key": "value"}
# Document provenance tracking (new in v0.1.14)
doc_with_sources = MyDocument.create(
name="derived.json",
content={"result": "processed"},
sources=[source_doc.sha256, "https://api.example.com/data"]
)
# Check provenance
for hash in doc_with_sources.get_source_documents():
print(f"Derived from document: {hash}")
for ref in doc_with_sources.get_source_references():
print(f"External source: {ref}")
# Temporary documents (never persisted)
temp = TemporaryDocument.create(
name="api_response.json",
content={"status": "ok"}
)
Core Concepts
Documents
Documents are immutable Pydantic models that wrap binary content with metadata:
- FlowDocument: Persists across flow runs, saved to filesystem
- TaskDocument: Temporary within task execution, not persisted
- TemporaryDocument: Never persisted, useful for sensitive data
class MyDocument(FlowDocument):
"""Custom document type."""
# Use create() for automatic conversion
doc = MyDocument.create(
name="data.json",
content={"key": "value"} # Auto-converts to JSON
)
# Access content
if doc.is_text:
print(doc.text)
# Parse structured data
data = doc.as_json() # or as_yaml(), as_pydantic_model()
# Convert between document types (new in v0.2.1)
task_doc = flow_doc.model_convert(TaskDocument) # Convert FlowDocument to TaskDocument
new_doc = doc.model_convert(OtherDocType, content={"new": "data"}) # With content update
# Enhanced filtering (new in v0.1.14)
filtered = documents.filter_by([Doc1, Doc2, Doc3]) # Multiple types
named = documents.filter_by(["file1.txt", "file2.txt"]) # Multiple names
# Immutable collections (new in v0.2.1)
frozen_docs = DocumentList(docs, frozen=True) # Immutable document list
frozen_msgs = AIMessages(messages, frozen=True) # Immutable message list
LLM Integration
The framework provides a unified interface for LLM interactions with smart caching:
from ai_pipeline_core import llm, AIMessages, ModelOptions
# Simple generation
response = await llm.generate(
model="gpt-5",
messages="Explain quantum computing"
)
print(response.content)
# With context caching (saves 50-90% tokens)
static_context = AIMessages([large_document])
# First call: caches context
r1 = await llm.generate(
model="gpt-5",
context=static_context, # Cached for 120 seconds by default
messages="Summarize" # Dynamic query
)
# Second call: reuses cache
r2 = await llm.generate(
model="gpt-5",
context=static_context, # Reused from cache!
messages="Key points?" # Different query
)
# Custom cache TTL (new in v0.1.14)
response = await llm.generate(
model="gpt-5",
context=static_context,
messages="Analyze",
options=ModelOptions(cache_ttl="300s") # Cache for 5 minutes
)
# Disable caching for dynamic contexts
response = await llm.generate(
model="gpt-5",
context=dynamic_context,
messages="Process",
options=ModelOptions(cache_ttl=None) # No caching
)
Flow Configuration
Type-safe flow configuration ensures proper document flow:
from ai_pipeline_core import FlowConfig
class ProcessingConfig(FlowConfig):
INPUT_DOCUMENT_TYPES = [RawDataDocument]
OUTPUT_DOCUMENT_TYPE = ProcessedDocument # Must be different!
# Use in flows for validation
@pipeline_flow(config=ProcessingConfig)
async def process(
project_name: str,
documents: DocumentList,
flow_options: FlowOptions
) -> DocumentList:
# ... processing logic ...
return ProcessingConfig.create_and_validate_output(outputs)
Pipeline Decorators
Enhanced decorators with built-in tracing and monitoring:
from ai_pipeline_core import pipeline_flow, pipeline_task, set_trace_cost
@pipeline_task # Automatic retry, tracing, and monitoring
async def process_chunk(data: str) -> str:
result = await transform(data)
set_trace_cost(0.05) # Track costs (new in v0.1.14)
return result
@pipeline_flow(
config=MyFlowConfig,
trace_trim_documents=True # Trim large documents in traces (new in v0.2.1)
)
async def main_flow(
project_name: str,
documents: DocumentList,
flow_options: FlowOptions
) -> DocumentList:
# Your pipeline logic
# Large documents are automatically trimmed to 100 chars in traces
# for better observability without overwhelming the tracing UI
return DocumentList(results)
Configuration
Environment Variables
# LLM Configuration (via LiteLLM proxy)
OPENAI_BASE_URL=http://localhost:4000
OPENAI_API_KEY=your-api-key
# Optional: Observability
LMNR_PROJECT_API_KEY=your-lmnr-key
LMNR_DEBUG=true # Enable debug traces
# Optional: Orchestration
PREFECT_API_URL=http://localhost:4200/api
PREFECT_API_KEY=your-prefect-key
# Optional: Storage (for Google Cloud Storage)
GCS_SERVICE_ACCOUNT_FILE=/path/to/service-account.json # GCS auth file
Settings Management
Create custom settings by inheriting from the base Settings class:
from ai_pipeline_core import Settings
class ProjectSettings(Settings):
"""Project-specific configuration."""
app_name: str = "my-app"
max_retries: int = 3
enable_cache: bool = True
# Create singleton instance
settings = ProjectSettings()
# Access configuration
print(settings.openai_base_url)
print(settings.app_name)
Best Practices
Framework Rules (90% Use Cases)
- Decorators: Use
@pipeline_taskWITHOUT parameters,@pipeline_flowWITH config - Logging: Use
get_pipeline_logger(__name__)- NEVERprint()orloggingmodule - LLM calls: Use
AIMessagesorstr. Wrap Documents inAIMessages - Options: Omit
ModelOptionsunless specifically needed (defaults are optimal) - Documents: Create with just
nameandcontent- skipdescription - FlowConfig:
OUTPUT_DOCUMENT_TYPEmust differ from allINPUT_DOCUMENT_TYPES - Initialization:
PromptManagerand logger at module scope, not in functions - DocumentList: Use default constructor - no validation flags needed
- setup_logging(): Only in application
main(), never at import time
Import Convention
Always import from the top-level package:
# CORRECT
from ai_pipeline_core import llm, pipeline_flow, FlowDocument
# WRONG - Never import from submodules
from ai_pipeline_core.llm import generate # NO!
from ai_pipeline_core.documents import FlowDocument # NO!
Development
Running Tests
make test # Run all tests
make test-cov # Run with coverage report
make test-showcase # Test showcase example
Code Quality
make lint # Run linting
make format # Auto-format code
make typecheck # Type checking with basedpyright
Building Documentation
make docs-build # Generate API.md
make docs-check # Verify documentation is up-to-date
Examples
The examples/ directory contains:
showcase.py- Comprehensive example demonstrating all major features- Run with:
cd examples && python showcase.py /path/to/documents
API Reference
See API.md for complete API documentation.
Navigation Tips
For humans:
grep -n '^##' API.md # List all main sections
grep -n '^###' API.md # List all classes and functions
For AI assistants:
- Use pattern
^##to find module sections - Use pattern
^###for classes and functions - Use pattern
^####for methods and properties
Project Structure
ai-pipeline-core/
├── ai_pipeline_core/
│ ├── documents/ # Document abstraction system
│ ├── flow/ # Flow configuration and options
│ ├── llm/ # LLM client and response handling
│ ├── logging/ # Logging infrastructure
│ ├── tracing.py # Distributed tracing
│ ├── pipeline.py # Pipeline decorators
│ ├── prompt_manager.py # Jinja2 template management
│ └── settings.py # Configuration management
├── tests/ # Comprehensive test suite
├── examples/ # Usage examples
├── API.md # Complete API reference
└── pyproject.toml # Project configuration
Contributing
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make changes following the project's style guide
- Run tests and linting (
make test lint typecheck) - Commit your changes
- Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Support
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Documentation: API Reference
Acknowledgments
- Built on Prefect for workflow orchestration
- Uses LiteLLM for LLM provider abstraction
- Integrates Laminar (LMNR) for observability
- Type checking with Pydantic and basedpyright
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ai_pipeline_core-0.2.1.tar.gz.
File metadata
- Download URL: ai_pipeline_core-0.2.1.tar.gz
- Upload date:
- Size: 90.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0e1f529407483ceac76fdf09e060ab73a43d926cff9a4dcaca0d3d71c925d64d
|
|
| MD5 |
9efac95e0bca63e44d6a7abce7ac3457
|
|
| BLAKE2b-256 |
851a9c33bf5ebe81a35fa8caaafdf2add6c6dd3afd5f430374d7746a6603c76e
|
Provenance
The following attestation bundles were made for ai_pipeline_core-0.2.1.tar.gz:
Publisher:
release.yml on bbarwik/ai-pipeline-core
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ai_pipeline_core-0.2.1.tar.gz -
Subject digest:
0e1f529407483ceac76fdf09e060ab73a43d926cff9a4dcaca0d3d71c925d64d - Sigstore transparency entry: 543757647
- Sigstore integration time:
-
Permalink:
bbarwik/ai-pipeline-core@4a54616eee6989fa3d6d9e1e96d66b30bd301378 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/bbarwik
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@4a54616eee6989fa3d6d9e1e96d66b30bd301378 -
Trigger Event:
push
-
Statement type:
File details
Details for the file ai_pipeline_core-0.2.1-py3-none-any.whl.
File metadata
- Download URL: ai_pipeline_core-0.2.1-py3-none-any.whl
- Upload date:
- Size: 106.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b9426c1f35821cc0529e51d441f43029fac613f90931b7fba759b31c3d3df5d8
|
|
| MD5 |
d2bf24394027a50f8051319de32316e6
|
|
| BLAKE2b-256 |
5e41de0a3b9f1bd06b149ac990687b6dc7966b29b8de0d89ac645acae06c25fc
|
Provenance
The following attestation bundles were made for ai_pipeline_core-0.2.1-py3-none-any.whl:
Publisher:
release.yml on bbarwik/ai-pipeline-core
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ai_pipeline_core-0.2.1-py3-none-any.whl -
Subject digest:
b9426c1f35821cc0529e51d441f43029fac613f90931b7fba759b31c3d3df5d8 - Sigstore transparency entry: 543757648
- Sigstore integration time:
-
Permalink:
bbarwik/ai-pipeline-core@4a54616eee6989fa3d6d9e1e96d66b30bd301378 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/bbarwik
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@4a54616eee6989fa3d6d9e1e96d66b30bd301378 -
Trigger Event:
push
-
Statement type: