Skip to main content

Deterministic YAML pipeline engine for structured LLM extraction

Project description

pyconveyor

Deterministic YAML pipeline engine for structured LLM extraction.

PyPI Python CI License: MIT

pyconveyor lets you build reliable LLM extraction pipelines by declaring them in YAML. It handles prompt rendering, schema validation, self-correcting retries, parallel steps, batch processing, and benchmarking — so your code handles the domain logic, not the plumbing.

steps:
  - name: extract
    type: llm
    model: default
    prompt: prompts/extract.j2
    schema:
      invoice_number: str
      vendor: str
      amount: float
    max_attempts: 3
pyconveyor run pipeline.yaml --input '{"document": "Invoice from Acme Corp…"}'

Install

pip install pyconveyor

For Anthropic Claude support:

pip install "pyconveyor[anthropic]"

Quickstart

Bootstrap a working project interactively — no Python files needed:

pyconveyor init my_project/ --interactive
cd my_project/
export OPENAI_API_KEY=sk-...
pyconveyor run pipeline.yaml --input '{"document": "The quick brown fox."}'

Or use the static layout with schemas.py:

pyconveyor init my_project/

How it works

You write three files. pyconveyor owns the runner.

your_project/
├── pipeline.yaml       # what to do and in what order
├── schemas.py          # what shape the output must have (Pydantic models)
└── prompts/
    └── extract.j2      # what to ask the model (Jinja2 templates)

Or skip schemas.py and write the schema inline in YAML:

steps:
  - name: extract
    type: llm
    model: default
    prompt: prompts/extract.j2
    schema:
      title: str
      key_points: list[str]
      confidence: float | None

When runner.run(input_data) is called:

  1. The input dict becomes ctx — available in every prompt template and expression
  2. Steps execute in declaration order
  3. Each step's result is stored and can be referenced by later steps as {{ steps.name.value }}
  4. A RunContext is returned with all results, attempt logs, and timing

Features

Structured output with automatic retries

Every llm step validates the model's response against a schema. If validation fails, pyconveyor feeds the error back to the model and retries automatically.

- name: extract
  type: llm
  model: default
  prompt: prompts/extract.j2
  schema: schemas:ArticleSummary
  max_attempts: 3
  on_error: continue   # "raise" | "continue" | "skip_remaining"

All step types

Step type What it does
llm Call a model, validate output against a schema, retry on failure
ensemble Run N models in parallel, auto-judge and merge results
transform Call a Python function with step outputs as inputs
validate Assert a condition; fail or skip remaining steps if it's false
io Call a Python function for side effects (DB write, file save)
parallel Run multiple sub-pipelines concurrently
condition Branch to different steps based on a runtime expression

Inline schemas — no Python required

Define your output schema directly in the YAML file:

schema:
  label: str
  confidence: float
  notes: str | None

Or generate a schemas.py stub from sample output:

pyconveyor run pipeline.yaml --input sample.json > output.json
pyconveyor schema infer pipeline.yaml --sample output.json --output schemas.py

Benchmarking and reports

Measure pipeline accuracy against golden-standard cases and generate shareable HTML reports:

# Run benchmark, compare two pipelines, open report
pyconveyor benchmark benchmarks/ \
  --pipeline pipeline_v1.yaml \
  --pipeline pipeline_v2.yaml \
  --report comparison.html

open comparison.html

The report includes per-step accuracy tables, a pipeline comparison delta, a Mermaid graph with accuracy annotations, Chart.js bar charts, and a per-case collapsible breakdown.

from pyconveyor import BenchmarkRunner, generate_report

runner = BenchmarkRunner(
    benchmark_dir="benchmarks/",
    pipelines=["pipeline_v1.yaml", "pipeline_v2.yaml"],
    pass_threshold=0.8,
)
summary = runner.run()
generate_report(summary, "report.html", pdf=True)

Ensemble steps — multi-model consensus

Run N models in parallel and automatically merge their outputs with a judge model:

steps:
  - name: extract
    type: ensemble
    schema: schemas:Record
    prompt: prompts/extract.j2
    members:
      - model: gpt4o
      - model: claude
        required: false   # pipeline continues if this model fails
    judge:
      model: gpt4o        # reviews all outputs and returns the merged result
      condition: all_succeeded

Member results are also accessible individually as steps.extract.gpt4o and steps.extract.claude. If the judge is skipped or fails, the first succeeded member's result is returned.

Provider support

Provider How
OpenAI provider: openai_compat
Anthropic Claude provider: anthropic + pip install pyconveyor[anthropic]
Ollama / vLLM / LM Studio provider: openai_compat + base_url: override
Custom @register_provider("name") decorator
Tests provider: mock — no API calls

Batch processing

Process thousands of documents with parallel workers:

pyconveyor batch pipeline.yaml --input documents.jsonl --output results.jsonl --workers 8
from pyconveyor import BatchRunner

runner = BatchRunner("pipeline.yaml", max_workers=8)
for item_id, result in runner.run(records):
    if not result.failed:
        save(result.steps["extract"].value)

Vocabulary-constrained fields

VocabField constrains a Pydantic field to a controlled vocabulary, normalises fuzzy matches, and grows the vocabulary over time.

from pyconveyor.vocab import Vocabulary, VocabField
from pydantic import BaseModel

PlasticVocab = Vocabulary(
    known={"PET", "PE", "PLA", "PP"},
    label="plastic_type",
    growth_policy="human",   # queue novel terms for CLI review
    persist=True,
)

class Record(BaseModel):
    plastic: str = VocabField(vocab=PlasticVocab)
    quantity: int

Review pending terms interactively:

pyconveyor vocab review pipeline.yaml

Load-time validation

PipelineRunner("pipeline.yaml") validates everything before spending any tokens:

pyconveyor validate pipeline.yaml
# ✓ pipeline.yaml is valid

Errors include the YAML line number and "did you mean?" suggestions.

Response caching

Cache LLM responses during development to avoid burning tokens on repeated runs:

pyconveyor run pipeline.yaml --input input.json
# subsequent runs use cached responses by default

DAG visualisation

pyconveyor visualise pipeline.yaml
# Outputs Mermaid diagram — paste into GitHub, GitLab, or Notion

CLI reference

pyconveyor init <dir>                  Bootstrap a new project
pyconveyor init <dir> --interactive    Guided setup — define fields interactively
pyconveyor run <pipeline.yaml>         Run a pipeline
pyconveyor validate <pipeline>         Validate without running
pyconveyor batch <pipeline>            Batch process a JSONL file
pyconveyor benchmark <dir>             Benchmark against golden-standard cases
pyconveyor vocab review <pipeline>     Review pending vocabulary suggestions
pyconveyor schema                      Emit JSONSchema for editor autocomplete
pyconveyor schema infer <pipeline>     Infer schemas.py from sample output
pyconveyor visualise <pipeline>        Print Mermaid DAG diagram

Python API

from pyconveyor import PipelineRunner, BatchRunner, BenchmarkRunner, generate_report

# Single run
runner = PipelineRunner("pipeline.yaml")
result = runner.run({"text": "…"})

result.failed                          # bool
result.steps["extract"].value          # Pydantic model or dict
result.steps["extract"].last_attempt   # AttemptLog with timing and token counts
result.summary()                       # RunSummary with aggregates

# Batch
batch_runner = BatchRunner("pipeline.yaml", max_workers=8)
for item_id, result in batch_runner.run(records):
    save(result.steps["extract"].value)

# Benchmark
bench = BenchmarkRunner("benchmarks/", pipelines=["pipeline.yaml"])
summary = bench.run()
generate_report(summary, "report.html")

Versioning policy

The YAML pipeline format (pipeline.yaml) is treated as a public API subject to the same semver rules as the Python API. A breaking change to the YAML schema will increment the major version.


Documentation

Full documentation at pyconveyor.readthedocs.io


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyconveyor-1.3.0.tar.gz (245.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyconveyor-1.3.0-py3-none-any.whl (71.8 kB view details)

Uploaded Python 3

File details

Details for the file pyconveyor-1.3.0.tar.gz.

File metadata

  • Download URL: pyconveyor-1.3.0.tar.gz
  • Upload date:
  • Size: 245.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyconveyor-1.3.0.tar.gz
Algorithm Hash digest
SHA256 42cf03c2645904f627f7e93d113cd879e051cf166f438db294c423f65e5f2ea3
MD5 cab3c322200aed3478a9c04bb82f0aa0
BLAKE2b-256 c08b723a36884e3dec9b7c58cd34322b994ea97f84a35914a7fbc48305249502

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyconveyor-1.3.0.tar.gz:

Publisher: publish.yml on VictorGambarini/pyconveyor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyconveyor-1.3.0-py3-none-any.whl.

File metadata

  • Download URL: pyconveyor-1.3.0-py3-none-any.whl
  • Upload date:
  • Size: 71.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyconveyor-1.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 920ff9c9ab3dbcb471b512c66166fe15181ca76ad3828b0c2c91208b16d50348
MD5 bbe7b8eda582f2e32717a8131f2a6fed
BLAKE2b-256 1ca2c79e2fd2a44218becac817df8f209d78869531a7d5f2c22f36514ceb8466

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyconveyor-1.3.0-py3-none-any.whl:

Publisher: publish.yml on VictorGambarini/pyconveyor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page