Skip to main content

Core utilities for AI-powered processing pipelines using prefect

Project description

AI Pipeline Core

Production framework for building type-safe AI pipelines — designed to be developed and used by AI coding agents. Open-sourced by research.tech.

Python Version License: MIT Code Style: Ruff Type Checked: Basedpyright

Overview

AI Pipeline Core is a production-ready framework that combines document processing, LLM integration, and workflow orchestration into a unified system. Built with strong typing (Pydantic), automatic retries, cost tracking, and database-backed execution tracking, it enforces best practices while keeping application code minimal and straightforward.

This framework is the foundation of AI projects at research.tech. It is an internal-first solution, open-sourced because we believe in sharing production infrastructure publicly. The design prioritizes strictness over flexibility — all data structures are immutable, all inputs are validated at definition time, and all prompts are typed Python classes. These constraints exist because the framework is primarily developed and maintained by AI coding agents, which require rigid guardrails rather than flexible guidelines.

Breaking Changes / Migration Notes

Conversation model identity: Conversation now accepts only AIModel instances at the model boundary; raw strings are rejected. Wrap model names with AIModel(name=...) before constructing conversations or passing model identity through options. See Conversation and AIModel.

LLM transport: OpenRouter support has been dropped as a supported transport path. The AIPL-compatible LiteLLM proxy is the only production transport, including deployment routing, fallback chains, group exhaustion, and trace fetch. See Required Proxy Capabilities.

LLM exceptions: The LLM core now exposes TerminalError, RetryableError, GroupExhaustedError, ContentPolicyError, LLMValidationError, and StreamWatchdogError for routing and failure handling. See Exceptions.

Top-level exception exports: EmptyResponseError and OutputDegenerationError are no longer exported from the top-level package. They remain importable from ai_pipeline_core.exceptions for internal retry handling details. See Exceptions.

Retry defaults: Settings.task_retries now defaults from 2 to 0, and Settings.conversation_retries now defaults from 3 to 2. Class-level None still resolves through deployment/settings fallback. See PipelineTask, ModelOptions, and Environment Variables.

Document summary model: doc_summary_model is now typed as AIModel | None; empty string and None both disable generated summaries. Set DOC_SUMMARY_MODEL to an AIPL model name to enable summaries. See Database-backed Persistence and Environment Variables.

Replay deployment pinning: ai-replay run and ai-replay batch now accept --force-deployment-id to pin replayed LLM rounds to a specific AIPL deployment ID. See Replay.

List structured outputs: List-typed structured outputs must use ListOf; direct list[Model] response formats are not supported. See Structured Output and the internal request primitives noted under AIModel.

Key Features

  • Document System: Single Document base class with immutable content, SHA256-based identity, automatic MIME type detection, provenance tracking, multi-part attachments, and optional typed content via Document[ModelType]
  • Database Storage: Unified database backends (ClickHouse production, filesystem CLI/download/replay, in-memory testing) with automatic deduplication
  • Conversation Class: Immutable, stateful multi-turn LLM conversations with context caching, automatic URL/address shortening, and eager response restoration
  • LLM Integration: Unified interface to any model via an AIPL-compatible LiteLLM proxy with context caching (default 300s TTL). Production execution requires fallback chains, group exhaustion, deployment routing, and trace fetch — capabilities only the AIPL proxy provides
  • Tool Calling: Define tools as typed Python classes with import-time validation, automatic schema generation, and a built-in auto-loop that executes tools and re-sends results until the LLM produces a final answer
  • Structured Output: Type-safe generation with Pydantic model validation via Conversation.send_structured()
  • Workflow Orchestration: Class-based PipelineTask, PipelineFlow, and PipelineDeployment with annotation-driven document types and import-time validation
  • Auto-Persistence: PipelineTask saves returned documents to the active database backend automatically with provenance tracking
  • Image Processing: Automatic image tiling/splitting for LLM vision models with model-specific presets
  • External Providers: Base classes for integrating external services — ExternalProvider for synchronous HTTP APIs and StatelessPollingProvider for submit-then-poll patterns, with managed HTTP lifecycle, transport retries, exponential backoff, cost tracking, and ContextVar-based test overrides
  • Observability: Database-backed execution DAGs, logs, replay payloads, and ai-trace download support
  • Prompt Compiler: Type-safe prompt specifications replacing Jinja2 templates — typed Python classes for roles, rules, guides, and output formats with definition-time validation and a CLI tool for inspection
  • Replay: Capture and re-execute any LLM conversation, pipeline task, or flow from recorded span JSON files or database-backed runs with document resolution via SHA256 references
  • Deployment: Unified pipeline execution for local, CLI, and production environments with per-flow resume, idempotent remote Prefect submissions, and framework-specific remote error handling

Installation

pip install ai-pipeline-core

This installs four CLI commands:

  • ai-prompt-compiler — discover, inspect, render, and compile prompt specifications
  • ai-pipeline-deploy — build and deploy pipelines to Prefect Cloud
  • ai-replay — execute or inspect replayable span JSON files, or replay directly from database-backed runs
  • ai-trace — list, show, and download execution data from the database

Development Tools for Downstream Projects

Projects built on ai-pipeline-core install the companion development tools as dev dependencies:

pip install ai-dev-cli==0.2.0
pip install "trace-inspector @ git+https://github.com/researchtech-inc/ai-pipeline-core.git#subdirectory=tools/trace-inspector"

Or add them to your project's pyproject.toml:

[project.optional-dependencies]
dev = [
    "ai-dev-cli==0.2.0",
    "trace-inspector @ git+https://github.com/researchtech-inc/ai-pipeline-core.git#subdirectory=tools/trace-inspector",
]

ai-dev-cli ships on PyPI; pin the exact version to lock the workflow contract. To pin trace-inspector to a specific revision, append @v0.23.2 (or a commit hash) before #subdirectory:

    "trace-inspector @ git+https://github.com/researchtech-inc/ai-pipeline-core.git@v0.23.2#subdirectory=tools/trace-inspector"

This installs two additional CLI commands:

  • dev — development workflow CLI (test, lint, typecheck, format, check, probe)
  • ai-trace-inspect — trace inspection and markdown debug bundle generation

Probe model capabilities through the dev probe interface:

dev probe capabilities -- --models <csv-of-model-names> [--cost-limit <usd>]

Requirements

  • Python 3.14 or higher
  • Linux/macOS (Windows via WSL2)
  • uv (recommended)

Versioning

This is an internal framework under active development. No backward compatibility is guaranteed between versions — pin your dependency to an exact version. There is no changelog; the git commit history serves as the changelog.

Crash Visibility

Near-zero tail loss on SIGKILL is the right expectation for stdout logging, not zero-loss durability. The platform logging agent still buffers recent log lines locally before shipping them, so an OOM or SIGKILL can drop the very end of the stream. That is still materially better than deferred database log flushing, but truly zero-loss logging would require synchronous network writes on every log line, which is too expensive for the framework's hot path.

Development Installation

git clone https://github.com/researchtech-inc/ai-pipeline-core.git
cd ai-pipeline-core
make install-dev     # Initializes uv environment and installs pre-commit hooks

Quick Start

Basic Pipeline

from pydantic import BaseModel, Field

from ai_pipeline_core import (
    Document,
    DeploymentResult,
    FlowOptions,
    PipelineDeployment,
    PipelineFlow,
    PipelineTask,
)

import logging

logger = logging.getLogger(__name__)


# 1. Define document types (subclass Document)
class InputDocument(Document):
    """Pipeline input."""


class AnalysisDocument(Document):
    """Per-document analysis result."""


class ReportDocument(Document):
    """Final compiled report."""


# 2. Structured output model
class AnalysisSummary(BaseModel):
    word_count: int
    top_keywords: list[str] = Field(default_factory=list)


# 3. Pipeline task -- class-based, auto-saves returned documents to the active database backend
class AnalyzeDocument(PipelineTask):
    """Analyze a single document."""

    @classmethod
    async def run(cls, documents: tuple[InputDocument, ...]) -> tuple[AnalysisDocument, ...]:

        doc = documents[0]
        return (
            AnalysisDocument.derive(
                name=f"analysis_{doc.sha256[:12]}.json",
                content=AnalysisSummary(word_count=42, top_keywords=["ai", "pipeline"]),
                derived_from=(doc,),
            ),
        )


# 4. Pipeline flow -- type contract is in the run() annotations
class AnalysisFlow(PipelineFlow):
    """Analyze all input documents."""

    async def run(
        self, input_documents: tuple[InputDocument, ...], options: FlowOptions
    ) -> tuple[AnalysisDocument, ...]:
        results: list[AnalysisDocument] = []
        for doc in input_documents:
            results.extend(await AnalyzeDocument.run(documents=(doc,)))
        return tuple(results)


class ReportFlow(PipelineFlow):
    """Generate final report from analyses."""

    async def run(self, analyses: tuple[AnalysisDocument, ...], options: FlowOptions) -> tuple[ReportDocument, ...]:
        report = ReportDocument.derive(
            name="report.md",
            content="# Report\n\nAnalysis complete.",
            derived_from=analyses,
        )
        return (report,)


# 5. Deployment -- build the execution plan up front
class MyResult(DeploymentResult):
    report_count: int = 0


class MyPipeline(PipelineDeployment[FlowOptions, MyResult]):
    def build_plan(self, options: FlowOptions) -> DeploymentPlan:
        return DeploymentPlan(
            steps=(
                FlowStep(AnalysisFlow()),
                FlowStep(ReportFlow()),
            )
        )

    @staticmethod
    def build_result(
        run_id: str,
        documents: tuple[Document, ...],
        options: FlowOptions,
    ) -> MyResult:
        reports = [d for d in documents if isinstance(d, ReportDocument)]
        return MyResult(success=True, report_count=len(reports))


# 6. CLI initializer provides run ID and initial documents
def initialize(options: FlowOptions) -> tuple[str, tuple[Document, ...]]:
    docs: tuple[Document, ...] = (
        InputDocument.create_root(name="input.txt", content="Sample data", reason="CLI input"),
    )
    return "my-project", docs


# Run from CLI (requires positional working_directory arg: python script.py ./output)
pipeline = MyPipeline()
pipeline.run_cli(initializer=initialize)

Conversation (Multi-Turn LLM)

from ai_pipeline_core.llm import AIModel, Conversation, ModelOptions

# Create a conversation with model and optional context
conv = Conversation(model=AIModel(name="gpt-5.4-mini"))

# Add documents to cacheable context prefix (shared across forks)
conv = conv.with_context(doc1, doc2, doc3)

# Add a document to dynamic messages suffix (NOT cached, per-fork content)
conv = conv.with_document(my_document)

# Send a message (returns NEW immutable Conversation instance — always capture!)
conv = await conv.send("Analyze the document")
print(conv.content)  # Response text

# Structured output
conv = await conv.send_structured("Extract key points", response_format=KeyPoints)
print(conv.parsed)  # KeyPoints instance

# Multi-turn: each send() appends to conversation history
conv = await conv.send("Now summarize the key points")
print(conv.content)

# Access response properties
print(conv.reasoning_content)  # Thinking/reasoning text (if available)
print(conv.usage)  # Token usage with input/output counts
print(conv.cost)  # Estimated cost
print(conv.citations)  # Citation objects (for search models)

Tool Calling

from pydantic import BaseModel, Field
from ai_pipeline_core import AIModel, Conversation, Tool


# 1. Define a tool — docstring becomes the LLM description
class GetWeather(Tool):
    """Get current weather for a city."""

    class Input(BaseModel):
        city: str = Field(description="City name")
        unit: str = Field(default="celsius", description="Temperature unit")

    class Output(BaseModel):
        weather: str

    async def run(self, input: Input) -> Output:
        # Call your API, database, or any async operation here
        return self.Output(weather=f"Sunny, 22°C in {input.city}")


# 2. Pass tools to send() — auto-loop handles everything
conv = Conversation(model=AIModel(name="gemini-3-flash"))
conv = await conv.send(
    "What's the weather in Paris?",
    tools=[GetWeather()],
)
print(conv.content)  # "It's sunny and 22°C in Paris!"

# 3. Inspect what tools were called
for record in conv.tool_call_records:
    print(f"Tool: {record.tool.__name__}, Round: {record.round}")
    print(f"Input: {record.input}")
    print(f"Output JSON: {record.output.content}")
    print(f"Output model: {record.output.data}")

How the auto-loop works: send() calls the LLM → if the LLM requests tool calls, the framework executes them in parallel → sends results back → repeats until the LLM produces a final text answer or max_tool_rounds is exhausted.

Tool definition rules (validated at import time):

  • Must have a non-empty docstring (becomes the tool description for the LLM)
  • Must define an Input inner class (BaseModel with Field(description=...) on every field)
  • Must define an Output inner class (BaseModel)
  • Must define an async def run(self, input: Input) -> Output method
  • execute() is sealed and framework-owned; do not override it (it handles retry, timeout, error handling, and serialization)
  • Lifecycle behavior is configurable through ClassVars: retries, retry_delay_seconds, timeout_seconds, max_response_bytes, handled_exceptions
  • dict[str, V] field types are forbidden in Input (OpenAI strict mode incompatible)
  • Field names strict and additionalProperties are reserved and cannot be used (collide with LiteLLM's recursive key stripping)

Tool naming: Class names are auto-converted to snake_case for the LLM (GetWeatherget_weather). Duplicate names after conversion raise ValueError.

Additional options:

  • tool_choice="required" — force the LLM to call a tool on the first round
  • tool_choice="none" — prevent tool calls (useful for final summarization)
  • max_tool_rounds=N — limit the number of tool call rounds (default 10)
  • Tools work with both send() and send_structured() — structured output is produced on the final response

Structured Output

from pydantic import BaseModel
from ai_pipeline_core import AIModel, Conversation


class Analysis(BaseModel):
    summary: str
    sentiment: float
    key_points: list[str]


# Generate structured output via Conversation
conv = Conversation(model=AIModel(name="gpt-5.4-mini"))
conv = await conv.send_structured(
    "Analyze this product review: ...",
    response_format=Analysis,
)

# Access parsed result with type safety
analysis = conv.parsed
print(f"Sentiment: {analysis.sentiment}")
for point in analysis.key_points:
    print(f"- {point}")

Document Handling

from ai_pipeline_core import Document


class MyDocument(Document):
    """Custom document type -- must subclass Document."""


# Create documents from external sources (URI provenance)
doc = MyDocument.create_external(
    name="data.json",
    content={"key": "value"},  # Automatically converted to JSON bytes
    from_sources=["https://api.example.com/data"],
)

# Parse back to original type
data = doc.parse(dict)  # Returns {"key": "value"}

# Document provenance tracking
source_doc = MyDocument.create_root(name="source.txt", content="original data", reason="user upload")
plan_doc = MyDocument.derive(name="plan.txt", content="research plan", derived_from=(source_doc,))
derived = MyDocument.create_external(
    name="derived.json",
    content={"result": "processed"},
    from_sources=["https://api.example.com/data"],  # Content came from this URL
    triggered_by=(plan_doc,),  # Created because of this plan (causal, not content)
)

# Check provenance
for hash in derived.content_documents:
    print(f"Derived from document: {hash}")
for ref in derived.content_references:
    print(f"External source: {ref}")

Typed Content (Document[T])

Declare a Pydantic BaseModel as the content schema for a Document subclass. Content is validated at creation time and accessible via .parsed:

from pydantic import BaseModel
from ai_pipeline_core import Document


class ResearchDefinition(BaseModel, frozen=True):
    topic: str
    max_sources: int = 10


class ResearchPlanDocument(Document[ResearchDefinition]):
    """Plan document with typed content schema."""


# Content is validated against the schema at creation time
plan = ResearchPlanDocument.derive(
    name="plan.json",
    content=ResearchDefinition(topic="AI safety"),
    derived_from=(input_doc,),
)

# Zero-boilerplate typed access (cached, returns ResearchDefinition)
definition = plan.parsed
print(definition.topic)  # "AI safety"
print(definition.max_sources)  # 10


# Wrong schema type is rejected at creation time
class WrongModel(BaseModel, frozen=True):
    x: int


plan = ResearchPlanDocument.derive(
    name="plan.json",
    content=WrongModel(x=1),  # TypeError: Expected content of type ResearchDefinition
    derived_from=(input_doc,),
)

# Introspection
ResearchPlanDocument.get_content_type()  # ResearchDefinition

List content is also supported — Document[list[ModelType]] validates each item:

class FindingItem(BaseModel, frozen=True):
    title: str
    severity: str


class FindingsDocument(Document[list[FindingItem]]):
    """Document containing a list of findings."""


doc = FindingsDocument.create_root(
    name="findings.json",
    content=[FindingItem(title="XSS", severity="high")],
    reason="audit output",
)
doc.parsed  # list[FindingItem]
doc.content_is_list()  # True

Co-location rule: The content model must be defined in the same module as the Document subclass. Cross-module content models raise TypeError at import time.

Core Concepts

Documents

Documents are immutable Pydantic models that wrap binary content with metadata. There is a single Document base class -- subclass it to define your document types:

class MyDocument(Document):
    """All documents subclass Document directly."""


# Use derive() for content transformations
doc = MyDocument.derive(
    name="data.json",
    content={"key": "value"},  # Auto-converts to JSON
    derived_from=(source,),
)

# Access content
if doc.is_text:
    print(doc.text)

# Parse structured data
data = doc.as_json()  # or as_yaml()
model = doc.as_pydantic_model(MyModel)  # Requires model_type argument

# Content-addressed identity
print(doc.sha256)  # Full SHA256 hash (base32)
print(doc.id)  # Short 6-char identifier

Typed content — declare a content schema via generic parameter for automatic validation and typed access:

class PlanDocument(Document[PlanModel]):
    """Content is validated against PlanModel at creation time."""


plan = PlanDocument.derive(name="plan.json", content=PlanModel(...), derived_from=(source_doc,))
plan.parsed  # → PlanModel (cached, typed)
PlanDocument.get_content_type()  # → PlanModel

Document fields:

  • name: Filename (validated -- no path separators / or \)
  • description: Optional human-readable description
  • content: Raw bytes (auto-converted from str, dict, list, BaseModel via create())
  • derived_from: Content provenance — SHA256 hashes of source documents (stored internally). Constructors (derive(), create()) accept Sequence[Document] and extract hashes automatically. URI-style references use create_external(from_sources=...) instead.
  • triggered_by: Causal provenance — SHA256 hashes of documents that caused this document to be created without contributing to its content. Constructors accept Sequence[Document] and extract hashes automatically.
  • attachments: Tuple of Attachment objects for multi-part content

Documents support:

  • Automatic content serialization based on file extension: .json → JSON, .yaml/.yml → YAML, others → UTF-8 text. Structured data (dict, list, BaseModel) requires .json or .yaml extension.
  • Optional typed content schema via Document[ModelType] with creation-time validation and .parsed access
  • MIME type detection via mime_type cached property, with is_text/is_image/is_pdf helpers
  • SHA256-based identity and deduplication
  • Provenance tracking (derived_from for content sources, triggered_by for causal lineage)
  • FILES enum for filename restrictions (definition-time validation)
  • Four construction paths: create_root() (pipeline inputs), derive(derived_from=...) (content transformations), create(triggered_by=...) (causal provenance), create_external(from_sources=...) (URI provenance). All accept Sequence[Document] and extract SHA256 hashes automatically
  • Token count estimation via approximate_tokens_count

Database-backed Persistence

Documents are automatically persisted by PipelineTask to the active database backend. Application code typically reads through DatabaseReader; write operations stay framework-internal.

Backend implementations (internal, auto-selected by execution mode):

  • ClickHouseDatabase: Production backend
  • FilesystemDatabase: CLI, download, and replay-friendly filesystem snapshot backend
  • MemoryDatabase: Testing and local in-memory execution

Backend selection depends on the execution mode:

  • run_cli(): Uses FilesystemDatabase by default, or ClickHouseDatabase when ClickHouse is configured
  • run_local(): Uses MemoryDatabase
  • as_prefect_flow(): Auto-selects the configured production backend

Public API — DatabaseReader (read-only protocol):

  • get_span(span_id) — Retrieve a span by its ID
  • get_child_spans(parent_span_id) — Retrieve direct child spans ordered by sequence number
  • get_document(document_sha256) — Load a document record by SHA256
  • get_documents_batch(sha256s) — Retrieve multiple document records keyed by SHA256
  • get_document_with_content(document_sha256) — Load document metadata plus content and attachment blobs
  • find_documents_by_name(names, document_type) — Find document records by exact name match (takes list[str])
  • get_deployment_tree(deployment_id) — Retrieve every span in a deployment tree as a flat list
  • get_deployment_by_run_id(run_id) — Find the newest deployment span for a run ID
  • get_span_logs(span_id) / get_deployment_logs(deployment_id) — Retrieve execution logs
  • list_deployments(limit, status) — List deployment spans ordered by newest first
  • get_cached_completion(cache_key, max_age) — Find a cached completed span within the max age window
  • get_deployment_cost_totals(deployment_id) — Aggregate cost and token totals for a deployment tree
  • get_blob(content_sha256) / get_blobs_batch(content_sha256s) — Retrieve blob content by SHA256
  • Replay and download helpers resolve document/blob content through the same interface

Write operations (insert_span, save_document, save_blob, save_document_batch, save_blob_batch, save_logs_batch, flush, shutdown) are framework-internal — the framework handles persistence automatically. DatabaseWriter exposes a supports_remote property indicating whether the backend supports Prefect-based remote deployment execution.

Document summaries: Persisted summary storage is supported via Document.create(..., summary=...) and update_document_summary(). Summaries are stored as metadata on document records. Configure via DOC_SUMMARY_ENABLED and DOC_SUMMARY_MODEL. Summary generation is automatically disabled when OPENAI_BASE_URL, OPENAI_API_KEY, or DOC_SUMMARY_MODEL is unset. DOC_SUMMARY_MODEL defaults to empty (None) and must be set to an AIPL model name to enable generated summaries.

LLM Integration

The primary interface is the Conversation class for multi-turn interactions.

Conversation (Recommended)

The Conversation class provides immutable, stateful conversation management:

from ai_pipeline_core.llm import AIModel, Conversation, ModelOptions

# Create with model and optional configuration
conv = Conversation(model=AIModel(name="gpt-5.4-mini"))

# Add documents to cacheable context prefix (shared across forks)
conv = conv.with_context(doc1, doc2, doc3)

# Add a document to dynamic messages suffix (NOT cached, per-fork content)
conv = conv.with_document(my_document)

# Configure model options
conv = conv.with_model_options(
    ModelOptions(
        system_prompt="You are a research analyst.",
        reasoning_effort="high",
    )
)

# Send a message (returns NEW Conversation instance)
conv = await conv.send("Analyze the document")
print(conv.content)  # Response text

# Multi-turn: conversation history is preserved
conv = await conv.send("Now summarize the key points")
print(conv.content)

# Structured output — single model
conv = await conv.send_structured("Extract entities", response_format=Entities)
print(conv.parsed)  # Entities instance

# Structured output — list of models (auto-wrapped for provider compatibility)
conv = await conv.send_structured("Extract all findings", response_format=list[Finding])
print(conv.parsed)  # list[Finding] — framework wraps/unwraps transparently

# Add multiple documents at once
conv = conv.with_documents([doc1, doc2, doc3])

# Inject prior assistant output (e.g., from another conversation)
conv = conv.with_assistant_message("Previous analysis result...")

# Approximate token count for all context and messages
print(conv.approximate_tokens_count)

# Tool calling — LLM can call tools, framework auto-loops
conv = await conv.send("Search for recent news", tools=[SearchTool()])
print(conv.content)  # Final answer after tool execution
print(conv.tool_call_records)  # Records of all tool calls made

send_spec() — sends a PromptSpec to the LLM. Handles document placement, stop sequences, and auto-extraction of <result> tags. For structured specs (PromptSpec[SomeModel]), dispatches to send_structured() automatically.

Content protection (automatic): URLs, blockchain addresses, and high-entropy strings in context documents are automatically shortened to prefix...suffix forms to save tokens. Both .content and .parsed are eagerly restored after every send()/send_structured() call — no manual restoration needed. A fuzzy fallback handles LLM-mangled forms (dropped suffix, prefix/suffix truncated by 1-2 chars).

ModelOptions key fields (all optional with sensible defaults):

  • cache_ttl: Context cache TTL in integer minutes (default 5, set None to inherit from the model)
  • system_prompt: System-level instructions
  • reasoning_effort: Optional per-call override, one of "none" | "minimal" | "low" | "medium" | "high" | "xhigh" for models with explicit reasoning
  • retries: Retry attempts (default None → uses Settings.conversation_retries, default 2)
  • retry_delay_seconds: Flat delay override between retries (default None → otherwise uses exponential backoff from Settings.conversation_retry_delay_seconds=30, conversation_retry_backoff_multiplier=3, capped at conversation_retry_max_delay_seconds=300)
  • timeout: Max wait seconds (default 600)
  • max_completion_tokens: Max output tokens
  • temperature: Generation randomness (usually omit -- use provider defaults)
  • stop: Stop sequences (tuple of strings, used internally by send_spec for </result> tags)

Model names are configured through AIModel(...)/AIModel(name=...). Model capabilities such as fallbacks, vision preset, cache TTL, stop-sequence support, structured output, tool calling, image input, PDF input, and URL preservation live on AIModel, not on a string alias. The framework never infers capabilities from the model name — set the relevant supports_* / preserve_input_urls fields explicitly when constructing an AIModel.

AIModel

AIModel is the model identity object for every LLM call. Put it on FlowOptions, config models, and tool constructors; pass raw strings only to AIModel(name=...) at the configuration boundary.

from ai_pipeline_core import AIModel

primary = AIModel(
    name="gpt-5.4-mini",
    fallback=AIModel(
        name="gemini-3-flash",
        fallback=AIModel(name="gpt-5.4-mini"),
    ),
    cache_ttl=5,
    timeout_s=600.0,
)

Key fields:

  • name: Model name recognized by the AIPL/LiteLLM proxy
  • fallback: Next AIModel when the proxy reports group exhaustion
  • timeout_s: Per-hop wall-clock timeout in seconds
  • temperature, reasoning_effort, verbosity, max_completion_tokens: Generation options carried with the model. reasoning_effort defaults to "medium".
  • supports_stop_sequences: Whether the model accepts stop sequences
  • supports_structured_output: Whether the model accepts a native response_format (declared, never inferred). Default True. When False, a request that carries a response format raises TerminalError at preflight.
  • supports_tools: Whether the model supports tool/function calling. Default True. When False, a request that carries tool schemas raises TerminalError at preflight.
  • supports_images: Whether the model accepts ImageContent parts. Default True. When False, any image in the request raises TerminalError at preflight.
  • supports_pdfs: Whether the model accepts PDFContent parts. Default True. When False, any PDF in the request raises TerminalError at preflight.
  • vision_preset: Image processing preset (ImagePreset.DEFAULT, HIGH_RES, BALANCED, COMPACT)
  • preserve_input_urls: Keep URLs intact (e.g. for search-style models). Default False; set explicitly to True — the framework does not infer it from the model name.
  • cache_ttl: Prompt cache TTL in integer minutes; use 0 to disable explicit cache markers
  • skip_cost_optimized: Ask the AIPL proxy to avoid cost-optimized deployments

See examples/showcase_aimodel.py for a runnable constructor-focused example.

Advanced request primitives such as GenerationSpec, CacheSpec, RoutingSpec, ResponseSpec, ToolSpec, DebugSpec, RetrySpec, LLMRequest, ListOf, TransportMetadata, AIPLInfo, and LiteLLMInfo live under ai_pipeline_core._llm_core for framework internals, replay, and observability. Application code should use Conversation, AIModel, and ModelOptions instead of importing those internal request types from the package root.

Image Processing

Image processing for LLM vision models is available from the llm._images module:

from ai_pipeline_core.llm._images import process_image, ImagePreset

# Process an image with model-specific presets
result = process_image(screenshot_bytes, preset=ImagePreset.HIGH_RES)
for part in result.parts:
    print(part.label, len(part.data))

Available presets: HIGH_RES (3000px, 9M pixels), COMPACT (1568px, 1.15M pixels), BALANCED (2048px, 4M pixels), DEFAULT (1000px, 1M pixels).

Token cost: A single image is estimated at 1080 tokens for token counting purposes (actual usage depends on provider).

The Conversation class automatically splits oversized images when documents are added to context — you typically don't need to call process_image directly.

Exceptions

The framework re-exports key exceptions at the top level for convenient catching:

from ai_pipeline_core import (
    ContentPolicyError,
    DocumentNameError,
    DocumentSizeError,
    DocumentValidationError,
    GroupExhaustedError,
    LLMError,
    LLMValidationError,
    NonRetriableError,
    PipelineCoreError,
    ProviderAuthError,
    ProviderError,
    RetryableError,
    StreamWatchdogError,
    StubNotImplementedError,
    TerminalError,
)
  • PipelineCoreError — Base for all framework exceptions
  • NonRetriableError — Signals that an operation must not be retried (stops task/flow retry loops immediately)
  • LLMError — LLM generation failures after framework retries are exhausted
  • TerminalError — LLM failure that requires an external fix and should not be retried by task/flow retry loops
  • RetryableError — Retry-eligible integration failure for tools and provider adapters
  • GroupExhaustedError — AIPL proxy exhausted one logical model group and fallback should advance
  • ContentPolicyError — Provider blocked the response for content-policy reasons
  • LLMValidationError — Structured LLM output failed validation after retries
  • StreamWatchdogError — Streaming response violated the configured watchdog policy
  • StubNotImplementedError — Executed a stub=True placeholder task, flow, or spec
  • ProviderError — External provider call failed after all retries (subclass of PipelineCoreError)
  • ProviderAuthError — Authentication/authorization failure (401/403), never retried (subclass of both ProviderError and NonRetriableError)
  • DocumentValidationError — Document validation failures
  • DocumentSizeError — Document exceeds size limits
  • DocumentNameError — Invalid document name (path separators, reserved suffixes, etc.)

Output degeneration (token repetition loops) is detected automatically and triggers an internal retry with cache disabled; the framework surfaces it as LLMError only after retries exhaust. EmptyResponseError and OutputDegenerationError are framework-internal retry signals and are not part of the top-level public surface.

Pipeline Classes

The pipeline system uses a three-tier class hierarchy: PipelineTaskPipelineFlowPipelineDeployment. All classes use __init_subclass__ for import-time validation — errors are caught when the module is imported, not at runtime.

PipelineTask

Base class for pipeline tasks with automatic execution-node tracking, document persistence, and lifecycle events:

from ai_pipeline_core import PipelineTask


class ProcessChunk(PipelineTask):
    """Process a single document chunk."""

    @classmethod
    async def run(cls, documents: tuple[InputDocument, ...]) -> tuple[OutputDocument, ...]:

        doc = documents[0]
        return (
            OutputDocument.derive(
                name="result.json",
                content={"processed": True},
                derived_from=(doc,),
            ),
        )


class ExpensiveTask(PipelineTask):
    retries = 3
    timeout_seconds = 600
    estimated_minutes = 5

    @classmethod
    async def run(cls, documents: tuple[InputDocument, ...]) -> tuple[OutputDocument, ...]: ...

Invocation patterns:

# Sequential — await TaskClass.run(...)
result = await ProcessChunk.run(documents)

# Parallel — TaskClass.run() without await returns TaskHandle
handle = ProcessChunk.run(documents)
result = await handle.result()

ClassVar configuration:

  • retries: Retry attempts on failure (default None → uses Settings.task_retries, default 0; exponential backoff)
  • retry_delay_seconds: Base delay between retries (default None → uses Settings.task_retry_delay_seconds, default 30; doubles each attempt, capped at 300s)
  • timeout_seconds: Task execution timeout (default None)
  • estimated_minutes: Duration estimate for progress tracking (default 1, must be >= 1)

Key features:

  • Import-time validation of run() signature and document type annotations
  • Async-only enforcement (raises TypeError if run is not async def)
  • Rejects classes starting with Test (reserved for pytest)
  • Rejects required __init__ parameters (tasks use documents-only invocation)
  • Automatic execution-node tracking
  • Document auto-save to the active database backend (returned documents are persisted)
  • Source validation (warns if referenced SHA256s don't exist in the database)
  • Task-level lifecycle events (TaskStartedEvent, TaskCompletedEvent, TaskFailedEvent)

PipelineFlow

Base class for pipeline flows that orchestrate tasks:

from ai_pipeline_core import PipelineFlow, FlowOptions


class AnalysisFlow(PipelineFlow):
    """Analyze input documents."""

    async def run(
        self,
        input_docs: tuple[InputDoc, ...],  # Collection input: all matching docs
        options: MyFlowOptions,  # Must be FlowOptions or subclass
    ) -> tuple[OutputDoc, ...]:  # Output types extracted from annotation
        results: list[OutputDoc] = []
        for doc in input_docs:
            results.extend(await AnalyzeTask.run(documents=(doc,)))
        return tuple(results)

Flow input parameter annotations determine input resolution from the blackboard:

  • source: MyDocument means the latest matching document is injected
  • source: MyDocument | None means latest-or-None
  • sources: tuple[MyDocument, ...] means all matching documents
  • options: MyFlowOptions receives the deployment options object

The flow return annotation determines output types. run() must be an instance method with self first, at least one named input parameter, and a tuple[DocumentSubclass, ...] return annotation. Use get_run_id() inside a flow when you need the current run ID.

Constructor parameters for per-instance configuration:

class ConfigurableFlow(PipelineFlow):
    """Flow with per-instance model configuration."""

    async def run(self, input_docs: tuple[InputDoc, ...], options: FlowOptions) -> tuple[OutputDoc, ...]:
        _ = (input_docs, options)
        model = self.model  # Access constructor params as attributes
        ...


flow = ConfigurableFlow(model=AIModel("gpt-5.4-mini"), temperature=0.7)
flow.get_params()  # {"model": AIModel("gpt-5.4-mini"), "temperature": 0.7}

Flow ClassVar configuration:

  • retries: Per-flow retry override (default None — defers to deployment's flow_retries, then to Settings.flow_retries, default 0)
  • retry_delay_seconds: Per-flow delay override (default None — defers to deployment, then Settings.flow_retry_delay_seconds, default 30)
  • estimated_minutes: Duration estimate for progress tracking (default 1, must be >= 1)

Flow retries are controlled by the deployment (see flow_retries on PipelineDeployment). A flow can override with an explicit retries = N in its class body. Raise NonRetriableError to stop retries immediately.

FlowOptions is a frozen BaseModel for pipeline configuration. Subclass it to add flow-specific parameters:

class ResearchOptions(FlowOptions):
    analysis_model: AIModel = AIModel("gpt-5.4-mini")
    verification_model: AIModel = AIModel("gemini-3-flash")
    synthesis_model: AIModel = AIModel("gpt-5.4-mini")
    max_sources: int = 10

PipelineDeployment

Orchestrates multi-flow pipelines with resume, per-flow uploads, and event publishing:

class MyPipeline(PipelineDeployment[MyOptions, MyResult]):
    pubsub_service_type = "research"  # Enables Pub/Sub event publishing

    def build_plan(self, options: MyOptions) -> DeploymentPlan:
        return DeploymentPlan(
            steps=(
                FlowStep(AnalysisFlow()),
                FlowStep(ReportFlow(model=AIModel("gpt-5.4-mini"))),
            )
        )

    @staticmethod
    def build_result(
        run_id: str,
        documents: tuple[Document, ...],
        options: MyOptions,
    ) -> MyResult: ...

Dynamic flow control with build_plan() and FieldGate:

from pydantic import BaseModel
from ai_pipeline_core import DeploymentPlan, Document, FieldGate, FlowStep


class SynthesisDecision(BaseModel, frozen=True):
    should_synthesize: bool


class SynthesisDecisionDocument(Document[SynthesisDecision]):
    pass


class MyPipeline(PipelineDeployment[MyOptions, MyResult]):
    def build_plan(self, options: MyOptions) -> DeploymentPlan:
        return DeploymentPlan(
            steps=(
                FlowStep(ExtractFlow()),
                FlowStep(AnalyzeFlow()),
                FlowStep(
                    SynthesisFlow(),
                    run_if=FieldGate(SynthesisDecisionDocument, "should_synthesize", op="truthy", on_missing="skip"),
                ),
            )
        )

Execution modes:

pipeline = MyPipeline()

# CLI mode: parses sys.argv, requires positional working_directory argument
# Usage: python script.py ./output [--start N] [--end N] [--max-keywords 8]
pipeline.run_cli(initializer=init_fn)

# Local mode: in-memory store, returns result directly (synchronous)
result = pipeline.run_local(
    run_id="test",
    documents=input_docs,
    options=MyOptions(),
)

# Production: generates a Prefect flow for deployment
prefect_flow = pipeline.as_prefect_flow()

Features:

  • Per-flow resume: Skips flows with a cached completed execution node in the database (explicit completion tracking, not document-presence inference). Configurable cache_ttl (default 24h)
  • Type chain validation: At runtime, validates that at least one of each flow's declared input types is producible by preceding flows (union semantics)
  • Event publishing: 11 lifecycle events (run started/completed/failed, flow started/completed/failed/skipped, task started/completed/failed, heartbeat) via Pub/Sub. Enabled by setting pubsub_service_type ClassVar. Requires PUBSUB_PROJECT_ID and PUBSUB_TOPIC_ID env vars
  • Dynamic flow control: build_plan() returns FlowStep entries, and FieldGate / group_stop_if drive runtime skips and loop stops from typed control documents
  • Flow retries: Configurable via flow_retries (default None → uses Settings.flow_retries, default 0) and flow_retry_delay_seconds (default None → uses Settings.flow_retry_delay_seconds, default 30) ClassVars on the deployment. Exponential backoff, capped at 300s. Individual flows can override with explicit retries = N
  • Concurrency limits: Cross-run enforcement via Prefect global concurrency limits
  • CLI mode: --start N / --end N for step control with the configured database backend

Concurrency Limits

Declare cross-run concurrency and rate limits on PipelineDeployment to prevent exceeding external API quotas across all concurrent pipeline runs:

from ai_pipeline_core import LimitKind, PipelineLimit, PipelineDeployment, pipeline_concurrency


class MyPipeline(PipelineDeployment[MyOptions, MyResult]):
    concurrency_limits = {
        "provider-a": PipelineLimit(500, LimitKind.CONCURRENT),  # max 500 simultaneous
        "provider-b": PipelineLimit(15, LimitKind.PER_MINUTE, timeout=300),  # 15/min token bucket
    }
    ...

Use pipeline_concurrency() at call sites to acquire slots:

from ai_pipeline_core import pipeline_concurrency


async def fetch_data(url: str) -> Data:
    async with pipeline_concurrency("provider-a"):
        return await provider.fetch(url)

Limit kinds:

  • CONCURRENT — Lease-based slots held during operation, released on exit
  • PER_MINUTE — Token bucket with limit/60 decay per second (allows bursting)
  • PER_HOUR — Token bucket with limit/3600 decay per second

Behavior:

  • Limits are auto-created in Prefect server at pipeline start (idempotent upsert)
  • Timeout raises AcquireConcurrencySlotTimeoutError (limit doing its job)
  • When Prefect is unavailable, limits proceed unthrottled (logged as warning)
  • Limit names are validated at class definition time (alphanumeric, dashes, underscores)

Stub Classes for Incremental Development

Use stub=True to define placeholder classes with correct type signatures that pass all validation but block deployment. This enables multi-agent development where an architect defines the pipeline structure and individual agents implement pieces incrementally:

from ai_pipeline_core import PipelineTask, PipelineFlow, PromptSpec, FlowOptions


# Stub task — type contract defined, implementation pending
class AnalyzeDataTask(PipelineTask, stub=True):
    """Analyze cleaned data and produce structured findings.

    Input: CleanedDataDoc with normalized content.
    Output: AnalysisResultDoc with analysis findings.
    """

    @classmethod
    async def run(cls, cleaned_docs: tuple[CleanedDataDoc, ...]) -> tuple[AnalysisResultDoc, ...]: ...


# Stub flow — same pattern
class AnalysisFlow(PipelineFlow, stub=True):
    """Run analysis pipeline."""

    async def run(
        self, cleaned_docs: tuple[CleanedDataDoc, ...], options: FlowOptions
    ) -> tuple[AnalysisResultDoc, ...]: ...


# Stub PromptSpec — uses keyword arg syntax
class AnalysisSpec(PromptSpec, stub=True):
    """Analyze data for themes and anomalies."""

    role = AnalystRole
    input_documents = (CleanedDataDoc,)
    task = "Analyze the provided data for key themes and anomalies."

Behavior:

  • Stubs pass all definition-time validation (ruff, basedpyright, semgrep, import-time checks)
  • Type contracts are fully preserved — input_document_types and output_document_types are extracted from annotations
  • _validate_flow_chain works normally with stub flows (type chain is verified)
  • Executing a stub raises StubNotImplementedError (subclass of NonRetriableError — no retries)
  • deploy.py blocks deployment if any stubs exist in the import graph
  • To implement: fill in the run() body and remove stub=True from the class declaration

See examples/showcase_stubs.py for a complete working example.

Parallel Execution

Task dispatch for parallel task execution within flows:

from ai_pipeline_core import PipelineTask, collect_tasks, as_task_completed

# Dispatch tasks for parallel execution (run without await returns TaskHandle)
handle_a = ExtractTask.run(docs_a)
handle_b = ExtractTask.run(docs_b)
handle_c = ExtractTask.run(docs_c)

# Collect all results with optional deadline
batch = await collect_tasks(handle_a, handle_b, handle_c, deadline_seconds=120.0)
# batch.completed = [result_a, result_b, ...]
# batch.incomplete = [handle_c]  # timed out or failed

# Iterate in completion order
async for handle in as_task_completed(handle_a, handle_b, handle_c):
    result = await handle.result()

safe_gather and safe_gather_indexed run coroutines in parallel with fault tolerance:

from ai_pipeline_core import safe_gather, safe_gather_indexed

# safe_gather: returns successes only, filters out failures
results = await safe_gather(
    process(doc1),
    process(doc2),
    process(doc3),
    label="processing",
)  # Returns list of successful results (order may shift)

# safe_gather_indexed: preserves positional correspondence (None for failures)
results = await safe_gather_indexed(
    process(doc1),
    process(doc2),
    process(doc3),
    label="processing",
)  # Returns [result1, None, result3] if doc2 failed

Both raise if all coroutines fail (configurable via raise_if_all_fail=False).

Deploying to Prefect Cloud

The framework includes a deploy script that builds a fully bundled deployment (project wheel + all dependency wheels), uploads to GCS, and creates a Prefect deployment. The worker installs fully offline with --no-index — no PyPI contact, no stale cache issues.

# From your project root (where pyproject.toml lives)
ai-pipeline-deploy

# Also available as module:
python -m ai_pipeline_core.deployment.deploy

Requirements:

  • uv (dependency resolution) and pip (wheel download) on the deploy machine
  • PREFECT_API_URL, PREFECT_GCS_BUCKET configured
  • uv on the worker (for offline install)

Remote Deployment Client

RemoteDeployment is a typed client for calling a remote PipelineDeployment via Prefect. Name the client class identically to the server's deployment class so the auto-derived deployment name matches:

from ai_pipeline_core import RemoteDeployment, DeploymentResult, FlowOptions, Document


class RemoteInputDocument(Document):
    """Mirror type -- class_name must match the remote pipeline's document type."""


class RemoteResult(DeploymentResult):
    """Result type matching the remote pipeline's result."""

    report_count: int = 0


class MyPipeline(RemoteDeployment[FlowOptions, RemoteResult]):
    """Client for the remote MyPipeline deployment."""


client = MyPipeline()
result = await client.run(
    documents=input_docs,
    options=FlowOptions(),
)

The client defines local Document subclasses ("mirror types") whose class_name must match the remote pipeline's document types exactly. run_remote_deployment() is also available as a lower-level function. Remote execution only happens when the active database backend reports supports_remote=True; otherwise it falls back to inline execution.

Deterministic run_id: RemoteDeployment derives a deterministic run_id from the caller's run_id + a combined fingerprint hash of all document SHA256s and serialized options (format: {run_id}-{fingerprint[:8]}). Same inputs always produce the same derived run_id, enabling worker-side flow resume.

run_id validation: All run_id values are validated at entry points — alphanumeric characters, underscores, and hyphens only, max 100 characters.

Prompt Compiler

Type-safe prompt specifications that replace Jinja2 templates. Every piece of prompt content is a class or class attribute, validated at definition time (import time).

Components — define once, reuse across specs:

from ai_pipeline_core import Role, Rule, OutputRule, Guide


class ResearchAnalyst(Role):
    """Analyst role for research pipelines."""

    text = "experienced research analyst with expertise in data synthesis"


class CiteEvidence(Rule):
    """Citation rule."""

    text = "Always cite specific evidence from the source documents.\nInclude document IDs when referencing."


class DontUseMarkdownTables(OutputRule):
    """Table formatting rule."""

    text = "Do not use markdown tables in the output."


class RiskFrameworkGuide(Guide):
    """Risk assessment framework guide."""

    template = "guides/risk_framework.md"  # Relative to module file, loaded at import time

Specs — typed prompt definitions with full validation:

from ai_pipeline_core import PromptSpec, Document
from pydantic import Field


class SourceDocument(Document):
    """Source material for analysis."""


class AnalysisSpec(PromptSpec):
    """Analyze source documents for key findings."""

    role = ResearchAnalyst
    input_documents = (SourceDocument,)
    task = "Analyze the provided documents and identify key findings."
    rules = (CiteEvidence,)
    guides = (RiskFrameworkGuide,)
    output_structure = "## Key Findings\n## Evidence\n## Gaps"
    output_rules = (DontUseMarkdownTables,)

    # Dynamic fields — become template variables
    project_name: str = Field(description="Project name")

Multi-line fields — use MultiLineField for long or multiline content (e.g., review feedback, website content). All multi-line fields are combined into a single XML-tagged user message sent before the main prompt, not inlined in the Context section. Regular Field() values must be short, single-line strings (up to 500 chars) — longer or multiline values are auto-promoted to multi-line treatment with a warning:

from ai_pipeline_core import PromptSpec, MultiLineField
from pydantic import Field


class ReviewSpec(PromptSpec):
    """Analyze a review."""

    role = ResearchAnalyst
    input_documents = (SourceDocument,)
    task = "Analyze the review and identify key themes."

    project_name: str = Field(description="Project name")  # Short, inline in prompt
    review: str = MultiLineField(description="Review text")  # Sent as <review>...</review> message

Rendering and sending:

from ai_pipeline_core import AIModel, Conversation, render_preview, render_text

# Create spec instance with dynamic field values
spec = AnalysisSpec(project_name="ACME")

# Render prompt text (for inspection/debugging)
prompt = render_text(spec, documents=[source_doc])

# Preview with placeholder values (for debugging)
preview = render_preview(AnalysisSpec)

# Send to LLM via Conversation
conv = await Conversation(model=AIModel(name="gemini-3-flash")).send_spec(spec, documents=[source_doc])
print(conv.content)  # <result> tags auto-extracted by send_spec()

Structured outputoutput_structure automatically enables <result> tag wrapping, sets a stop sequence at </result>, and auto-extracts the content in Conversation.send_spec(). conv.content returns clean text without tags. Structured output (PromptSpec[SomeModel]) uses send_structured() automatically.

Follow-up specs — use follows=ParentSpec to declare follow-up specs. Follow-up specs inherit context from the parent conversation and don't require role or input_documents.

CLI tool for discovery, inspection, rendering, and compilation:

# Inspect a spec's anatomy (role, docs, fields, rules, output config, token estimate)
ai-prompt-compiler inspect AnalysisSpec

# Render a prompt preview
ai-prompt-compiler render AnalysisSpec

# Discover, list, and compile all specs to .prompts/ directory as markdown files
ai-prompt-compiler compile

# Explicit module:class reference
ai-prompt-compiler render my_package.specs:AnalysisSpec

# Also available as module:
python -m ai_pipeline_core.prompt_compiler inspect AnalysisSpec

Replay

Every LLM conversation call, pipeline task, and pipeline flow is automatically captured as a replayable span in the unified database. When you download a deployment with ai-trace download, the snapshot is a portable FilesystemDatabase; ai-replay can load one recorded span JSON file from that bundle, or replay directly from the database with --from-db. Document references are resolved from the database backend by SHA256 at replay time.

Inspect a recorded span file:

ai-replay show ./downloaded_bundle/runs/.../01_conv-a1b2c3d4.json --db-path ./downloaded_bundle

Re-execute with the same parameters:

ai-replay run ./downloaded_bundle/runs/.../01_task-build-summary.json --db-path ./downloaded_bundle --import my_app.tasks

Override fields before execution:

# Switch model for a recorded conversation span
ai-replay run ./downloaded_bundle/runs/.../01_conv-a1b2c3d4.json --db-path ./downloaded_bundle --import my_app --model gemini-3-flash

# Override model_options or response_format
ai-replay run ./downloaded_bundle/runs/.../01_conv-a1b2c3d4.json --db-path ./downloaded_bundle --import my_app --set reasoning_effort=low

# Pin a replayed LLM round to one AIPL deployment ID
ai-replay run ./downloaded_bundle/runs/.../01_conv-a1b2c3d4.json --db-path ./downloaded_bundle --force-deployment-id deployment-123

The --import flag is required when the original script was run as __main__ — it imports the module so Document subclasses and functions are registered, and automatically remaps __main__:X references to the correct module path.

Output directory:

By default, replay writes results to {replay_file_stem}_replay/ next to the replay file. The output directory is a full FilesystemDatabase snapshot containing:

conversation_replay/
    output.yaml     # Execution result summary (names, SHA256 references)
    runs/           # Span records
    documents/      # Document metadata
    blobs/          # Raw document content (by SHA256)

Override with --output-dir:

ai-replay run ./downloaded_bundle/runs/.../01_task-build-summary.json --db-path ./downloaded_bundle --import my_app --output-dir ./my_output

Database-backed replay:

Replay resolves document references from a database backend. Use --db-path for local snapshots or --from-db to replay directly from a recorded span. show supports the same database-backed path, and file-backed show / run reject directories with an actionable error telling you to pass one span JSON file or use --from-db.

ai-replay show --from-db 550e8400-e29b-41d4-a716-446655440000 --db-path ./downloaded_bundle
ai-replay run --from-db 550e8400-e29b-41d4-a716-446655440000 --db-path ./downloaded_bundle

Programmatic replay:

from pathlib import Path

from ai_pipeline_core.database.filesystem import FilesystemDatabase
from ai_pipeline_core.replay import execute_span

# Open a downloaded bundle read-only
database = FilesystemDatabase(Path("./downloaded_bundle"), read_only=True)

# Replay one recorded span by UUID
result = await execute_span(span_id, source_db=database)
print(result)

Replayable span kinds are the same runtime boundaries the framework records: conversation, task, and flow.

Deployment Downloads

ai-trace download exports a deployment as a portable FilesystemDatabase snapshot:

downloaded_bundle/
  summary.md
  costs.md
  logs.jsonl
  llm_calls.jsonl
  validation.json
  schema_meta.json
  errors.md
  documents.md
  runs/
  documents/
  blobs/

errors.md is created only when failed nodes exist, and documents.md is created only when documents exist. validation.json summarizes the staged bundle validation that runs before publish. Under runs/, each deployment is stored under a date/name directory with deployment.json at the root plus sequence-prefixed child files such as 01_flow-analyze-flow.json, 01_task-build-summary.json, and 01_conv-a1b2c3d4.json. The snapshot is opened read-only by ai-trace --db-path and ai-replay --db-path.

ai-trace CLI

The ai-trace command-line tool provides access to pipeline execution data from the configured database backend or a downloaded FilesystemDatabase snapshot.

# List recent pipeline runs
ai-trace list --limit 10 --status completed

# Show execution summary without downloading
ai-trace show 550e8400-e29b-41d4-a716-446655440000

# Download a deployment as a portable FilesystemDatabase snapshot
ai-trace download 550e8400-e29b-41d4-a716-446655440000 -o ./debug/

Connection defaults to CLICKHOUSE_* environment variables, or use --db-path to point at a local FilesystemDatabase snapshot.

Configuration

Environment Variables

# LLM Configuration (via LiteLLM proxy)
OPENAI_BASE_URL=http://localhost:4000
OPENAI_API_KEY=your-api-key

# Optional: Orchestration
PREFECT_API_URL=http://localhost:4200/api
PREFECT_API_KEY=your-prefect-key
PREFECT_API_AUTH_STRING=your-auth-string
PREFECT_WORK_POOL_NAME=default
PREFECT_WORK_QUEUE_NAME=default
PREFECT_GCS_BUCKET=your-gcs-bucket

# Optional: GCS (for remote storage)
GCS_SERVICE_ACCOUNT_FILE=/path/to/service-account.json

# Optional: Unified Database / Execution Tracking (ClickHouse -- omit for local filesystem store)
CLICKHOUSE_HOST=your-clickhouse-host
CLICKHOUSE_PORT=8443
CLICKHOUSE_DATABASE=default
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=your-password
CLICKHOUSE_SECURE=true
CLICKHOUSE_CONNECT_TIMEOUT=30
CLICKHOUSE_SEND_RECEIVE_TIMEOUT=66

# Optional: Retry Configuration (override class-level None defaults)
TASK_RETRIES=0
TASK_RETRY_DELAY_SECONDS=30
FLOW_RETRIES=0
FLOW_RETRY_DELAY_SECONDS=30
CONVERSATION_RETRIES=2
CONVERSATION_RETRY_DELAY_SECONDS=30
CONVERSATION_RETRY_BACKOFF_MULTIPLIER=3
CONVERSATION_RETRY_MAX_DELAY_SECONDS=300

# Optional: Document Summaries (store-level, LLM-generated)
DOC_SUMMARY_ENABLED=true
# Empty = disabled; set to an AIPL model name to enable generated summaries.
DOC_SUMMARY_MODEL=

# Optional: Pub/Sub event delivery (deployment progress/status)
# Requires pubsub_service_type ClassVar on the PipelineDeployment subclass
PUBSUB_PROJECT_ID=your-gcp-project
PUBSUB_TOPIC_ID=pipeline-events

Required Proxy Capabilities

The framework can speak to a plain LiteLLM-compatible endpoint for basic completions, but production behavior assumes an AIPL-compatible LiteLLM proxy. These capabilities power fallback chains, deployment pinning, workload routing, and trace debugging:

  • AIPL response headers parsed by the framework, including x-aipl-call-id, x-aipl-deployment-id, x-aipl-group-status, x-aipl-tried, x-aipl-failed, x-aipl-cc-dedup, x-aipl-cc-stale, cache headers, limiter headers, and provider metadata.
  • GET /aipl/trace/{call_id} for best-effort trace fetch after failed or degraded calls.
  • Request metadata keys accepted by the proxy: aipl_force_deployment_id, aipl_prefer_deployment_id, aipl_skip_model_ids, aipl_skip_cost_optimized, and aipl_warmup_strategy_override. Workload-scoped demotion is framework-side; the proxy no longer reads a workload id from metadata.
  • Group exhaustion signaling that lets AIModel.fallback advance to the next model.

Without these proxy features, ordinary LLM calls may still work, but fallback routing, deployment observability, and AIPL trace diagnostics are unavailable.

Settings Management

Create custom settings by inheriting from the base Settings class:

from ai_pipeline_core import Settings


class ProjectSettings(Settings):
    """Project-specific configuration."""

    app_name: str = "my-app"
    max_retries: int = 3


# Create singleton instance
settings = ProjectSettings()

# Access configuration (all env vars above are available)
print(settings.openai_base_url)
print(settings.app_name)

Best Practices

Framework Rules

  1. Pipeline classes: Subclass PipelineTask for tasks and PipelineFlow for flows. Use PipelineDeployment.build_plan() as the primary deployment authoring path; build_flows() remains a fallback wrapper
  2. Task invocation: Use await TaskClass.run(...) for sequential execution and TaskClass.run(...) (without await) when you want a parallel TaskHandle
  3. Logging: Use logging.getLogger(__name__) -- never print(). Logging is auto-configured on import
  4. LLM calls: Use Conversation for all LLM interactions (multi-turn and single-shot). Use tools= for function calling
  5. Options: Omit ModelOptions unless specifically needed (defaults are production-optimized)
  6. Documents: Use create_root() for pipeline inputs, derive() for content transformations, create() for causally triggered documents, create_external() for URI provenance. Always subclass Document
  7. Type annotations: Input/output types are in the run() method signature. Flows use named Document parameters resolved from the deployment blackboard, and tasks use explicit typed inputs with document return annotations
  8. Initialization: Logger at module scope, not in functions
  9. Document collections: Use typed tuples like tuple[MyDocument, ...] when a task needs a collection input. Flows declare named document parameters, and deployment entrypoints still accept generic document sequences

Import Convention

Always import from the top-level package when possible:

# Top-level imports (preferred)
from ai_pipeline_core import Document, PipelineTask, PipelineFlow, PipelineDeployment, Conversation, Tool
from ai_pipeline_core import (
    ExternalProvider,
    StatelessPollingProvider,
    ProviderOutcome,
    ProviderError,
    ProviderAuthError,
)
from ai_pipeline_core import collect_tasks, as_task_completed, TaskHandle, TaskBatch

# Sub-package imports for symbols not at top level
from ai_pipeline_core.llm import ModelOptions

Development

Dev CLI

All test, lint, and type-check workflows go through the dev CLI. It captures full output to .tmp/dev-runs/ (auto-cleaned after 24h), prints concise summaries with per-step wall-clock durations, auto-detects affected tests from git changes via testmon, warns about slow tests (>30s), and skips reruns when code hasn't changed. Running bare dev (no subcommand) shows the info guide.

# Testing
dev test                # Run affected tests (auto-scoped from git changes, uses testmon)
dev test pipeline       # Run tests for a specific module
dev test --rerun-failed # Rerun last-failed tests within the selected scope
dev test --lane=unit    # Full unit lane
dev test --lane=integration

# Code quality
dev format              # Auto-fix lint + formatting (ruff format + ruff check --fix)
dev lint                # Check lint without fixing
dev typecheck           # Type checking (basedpyright)

# All checks in order (lint → typecheck → deadcode → semgrep → docstrings → tests)
dev check               # Full validation pipeline

# Utilities
dev status              # Show changed files, last run results, suggested next command
dev info                # Detailed usage guide with auto-detected project config

Do not run pytest, ruff, or basedpyright directly — use dev commands instead. A Claude Code hook enforces this in the devcontainer. The dev CLI handles correct flags, output management, marker expressions, and idempotency automatically.

Recommended workflow:

  1. Write code
  2. dev format — auto-fix lint/formatting
  3. dev test — run affected tests
  4. If tests fail: fix code, then dev test --rerun-failed
  5. dev check — full validation before commit

Infrastructure tests (ClickHouse, Pub/Sub, LLM integration) auto-skip when their requirements are unavailable. dev test is always safe to run. Use dev info to see which infrastructure is detected.

make targets (make test, make lint, make check, etc.) delegate to dev and remain available as shortcuts.

Static analysis tools:

  • Ruff — 28 rule sets including bugbear, security (bandit), complexity, async enforcement, exception patterns
  • Basedpyright — strict mode with reportUnusedCoroutine, reportUnreachable, reportImplicitStringConcatenation
  • Vulture — dead code detection with framework-aware whitelist
  • Semgrep — custom rules in .semgrep/ for frozen model mutable fields, async enforcement, docstring quality, architecture constraints
  • Interrogate — 100% docstring coverage enforcement

Examples

The examples/ directory contains:

  • showcase.py -- Full 3-stage pipeline: class-based PipelineTask/PipelineFlow, Conversation API, multi-turn LLM analysis, structured extraction, PipelineDeployment with CLI, resume/skip, progress tracking, image processing
  • showcase_database.py -- Database usage: run a real deployment into MemoryDatabase, inspect a recorded task span target plus parsed meta_json / metrics_json, and inspect output document ancestry
  • showcase_replay.py -- Replay system: record a real task span through PipelineDeployment.run(...), then replay that stored span with execute_span()
  • showcase_prompt_compiler.py -- Prompt compiler features: Role, Rule, OutputRule, Guide, PromptSpec, rendering, Conversation.send_spec() usage patterns, follow-up specs, definition-time validation
  • showcase_aimodel.py -- AIModel capabilities: fallback chains, cache TTL, cost-optimized deployment skipping, and stream watchdog settings
  • showcase_stubs.py -- Stub classes for incremental development: stub=True on PipelineTask, PipelineFlow, PromptSpec with preserved type contracts, runtime guards, and deployment blocking

Run examples:

# Full pipeline showcase (requires OPENAI_BASE_URL and OPENAI_API_KEY)
python examples/showcase.py ./output

# Database showcase (no arguments needed)
python examples/showcase_database.py

# Replay showcase (no arguments needed)
python examples/showcase_replay.py

# Prompt compiler showcase (no arguments needed)
python examples/showcase_prompt_compiler.py

# AIModel showcase (live send is opt-in)
python examples/showcase_aimodel.py

Project Structure

ai-pipeline-core/
|-- ai_pipeline_core/
|   |-- _llm_core/        # Internal LLM client, model types, and response handling
|   |-- deployment/        # Pipeline deployment, deploy script, CLI bootstrap, progress, remote
|   |-- database/          # Execution DAG, documents, blobs, logs, and download helpers
|   |-- documents/         # Document system (Document base class, attachments, context)
|   |-- llm/               # Conversation class, Tool base class, tool loop, URLSubstitutor, image processing
|   |-- logger/            # Logging infrastructure
|   |-- observability/     # Database-backed execution CLI (`ai-trace`)
|   |-- pipeline/          # PipelineTask, PipelineFlow, parallel primitives, FlowOptions, concurrency limits
|   |-- prompt_compiler/   # Type-safe prompt specs, rendering, and CLI tool
|   |-- replay/            # Replay system (capture, serialize, resolve, execute)
|   |-- providers.py       # External provider base classes (ExternalProvider, StatelessPollingProvider)
|   |-- settings.py        # Configuration management (Pydantic BaseSettings)
|   +-- exceptions.py      # Framework exceptions (LLMError, DocumentNameError, etc.)
|-- tools/
|   +-- trace-inspector/   # Trace inspection — generates markdown debug bundles from execution data
|-- tests/                 # Comprehensive test suite
|-- examples/              # Usage examples
+-- pyproject.toml         # Project configuration

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make changes — run dev check (must pass all linting, type checking, semgrep, and tests)
  4. Open a Pull Request

Note: This is an internal-first framework. External contributions are welcome but the architecture and infrastructure choices (Prefect, ClickHouse) are fixed.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

Acknowledgments

  • Built on Prefect for workflow orchestration
  • Uses an AIPL-compatible LiteLLM proxy for LLM provider abstraction, deployment routing, and trace fetch
  • Type checking with Pydantic and basedpyright

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ai_pipeline_core-0.23.10.tar.gz (796.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ai_pipeline_core-0.23.10-py3-none-any.whl (404.3 kB view details)

Uploaded Python 3

File details

Details for the file ai_pipeline_core-0.23.10.tar.gz.

File metadata

  • Download URL: ai_pipeline_core-0.23.10.tar.gz
  • Upload date:
  • Size: 796.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for ai_pipeline_core-0.23.10.tar.gz
Algorithm Hash digest
SHA256 9cb3e82030bd4cc976059ae1b589406801834d111184f24e546d38692035941d
MD5 fa668ef0ed0c8ecd72a4e88399d5a5a7
BLAKE2b-256 ad2c8b81db287fa8e447d461db6cfaa570e53b0f498b644a9fe2fd078fa6c603

See more details on using hashes here.

Provenance

The following attestation bundles were made for ai_pipeline_core-0.23.10.tar.gz:

Publisher: release.yml on researchtech-inc/ai-pipeline-core

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ai_pipeline_core-0.23.10-py3-none-any.whl.

File metadata

File hashes

Hashes for ai_pipeline_core-0.23.10-py3-none-any.whl
Algorithm Hash digest
SHA256 18c5b6372b5026a6cf7f550b69a16635945d4d829ab41b6b9d982ae3fb8bc4e9
MD5 c03b3a6906bfa1f9b010b69fa677d606
BLAKE2b-256 0f0fed7e4998eb9a8426f9cd08761681472dc39196e6dc84edb81c0d647f0e25

See more details on using hashes here.

Provenance

The following attestation bundles were made for ai_pipeline_core-0.23.10-py3-none-any.whl:

Publisher: release.yml on researchtech-inc/ai-pipeline-core

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page