Skip to main content

Deterministic YAML pipeline engine for structured LLM extraction

Project description

pyconveyor

Deterministic YAML pipeline engine for structured LLM extraction.

PyPI Python CI License: MIT

pyconveyor lets you build reliable LLM extraction pipelines by declaring them in YAML. It handles prompt rendering, schema validation, self-correcting retries, parallel steps, batch processing, and benchmarking — so your code handles the domain logic, not the plumbing.

steps:
  - name: extract
    type: llm
    model: default
    prompt: prompts/extract.j2
    schema:
      invoice_number: str
      vendor: str
      amount: float
    max_attempts: 3
pyconveyor run pipeline.yaml --input '{"document": "Invoice from Acme Corp…"}'

Install

pip install pyconveyor

For Anthropic Claude support:

pip install "pyconveyor[anthropic]"

Quickstart

Bootstrap a working project interactively — no Python files needed:

pyconveyor init my_project/ --interactive
cd my_project/
export OPENAI_API_KEY=sk-...
pyconveyor run pipeline.yaml --input '{"document": "The quick brown fox."}'

Or use the static layout with schemas.py:

pyconveyor init my_project/

How it works

You write three files. pyconveyor owns the runner.

your_project/
├── pipeline.yaml       # what to do and in what order
├── schemas.py          # what shape the output must have (Pydantic models)
└── prompts/
    └── extract.j2      # what to ask the model (Jinja2 templates)

Or skip schemas.py and write the schema inline in YAML:

steps:
  - name: extract
    type: llm
    model: default
    prompt: prompts/extract.j2
    schema:
      title: str
      key_points: list[str]
      confidence: float | None

When runner.run(input_data) is called:

  1. The input dict becomes ctx — available in every prompt template and expression
  2. Steps execute in declaration order
  3. Each step's result is stored and can be referenced by later steps as {{ steps.name.value }}
  4. A RunContext is returned with all results, attempt logs, and timing

Features

Structured output with automatic retries

Every llm step validates the model's response against a schema. If validation fails, pyconveyor feeds the error back to the model and retries automatically.

- name: extract
  type: llm
  model: default
  prompt: prompts/extract.j2
  schema: schemas:ArticleSummary
  max_attempts: 3
  on_error: continue   # "raise" | "continue" | "skip_remaining"

All step types

Step type What it does
llm Call a model, validate output against a schema, retry on failure
ensemble Run N models in parallel, auto-judge and merge results
transform Call a Python function with step outputs as inputs
validate Assert a condition; fail or skip remaining steps if it's false
io Call a Python function for side effects (DB write, file save)
parallel Run multiple sub-pipelines concurrently
condition Branch to different steps based on a runtime expression

Inline schemas — no Python required

Define your output schema directly in the YAML file:

schema:
  label: str
  confidence: float
  notes: str | None

Or generate a schemas.py stub from sample output:

pyconveyor run pipeline.yaml --input sample.json > output.json
pyconveyor schema infer pipeline.yaml --sample output.json --output schemas.py

Benchmarking and reports

Measure pipeline accuracy against golden-standard cases and generate shareable HTML reports:

# Run benchmark, compare two pipelines, open report
pyconveyor benchmark benchmarks/ \
  --pipeline pipeline_v1.yaml \
  --pipeline pipeline_v2.yaml \
  --report comparison.html

open comparison.html

The report includes per-step accuracy tables, a pipeline comparison delta, a Mermaid graph with accuracy annotations, Chart.js bar charts, and a per-case collapsible breakdown.

from pyconveyor import BenchmarkRunner, generate_report

runner = BenchmarkRunner(
    benchmark_dir="benchmarks/",
    pipelines=["pipeline_v1.yaml", "pipeline_v2.yaml"],
    pass_threshold=0.8,
)
summary = runner.run()
generate_report(summary, "report.html", pdf=True)

Ensemble steps — multi-model consensus

Run N models in parallel and automatically merge their outputs with a judge model:

steps:
  - name: extract
    type: ensemble
    schema: schemas:Record
    prompt: prompts/extract.j2
    members:
      - model: gpt4o
      - model: claude
        required: false   # pipeline continues if this model fails
    judge:
      model: gpt4o        # reviews all outputs and returns the merged result
      condition: all_succeeded

Member results are also accessible individually as steps.extract.gpt4o and steps.extract.claude. If the judge is skipped or fails, the first succeeded member's result is returned.

Provider support

Provider How
OpenAI provider: openai_compat
Anthropic Claude provider: anthropic + pip install pyconveyor[anthropic]
Ollama / vLLM / LM Studio provider: openai_compat + base_url: override
Custom @register_provider("name") decorator
Tests provider: mock — no API calls

Batch processing

Process thousands of documents with parallel workers:

pyconveyor batch pipeline.yaml --input documents.jsonl --output results.jsonl --workers 8
from pyconveyor import BatchRunner

runner = BatchRunner("pipeline.yaml", max_workers=8)
for item_id, result in runner.run(records):
    if not result.failed:
        save(result.steps["extract"].value)

Vocabulary-constrained fields

VocabField constrains a Pydantic field to a controlled vocabulary, normalises fuzzy matches, and grows the vocabulary over time.

from pyconveyor.vocab import Vocabulary, VocabField
from pydantic import BaseModel

PlasticVocab = Vocabulary(
    known={"PET", "PE", "PLA", "PP"},
    label="plastic_type",
    growth_policy="human",   # queue novel terms for CLI review
    persist=True,
)

class Record(BaseModel):
    plastic: str = VocabField(vocab=PlasticVocab)
    quantity: int

Review pending terms interactively:

pyconveyor vocab review pipeline.yaml

Automatic output saving

Add an outputs: block to any pipeline and pyconveyor will write step results to disk after each run — no io steps required:

outputs:
  dir: "./results/{{ ctx.doc_id }}"   # Jinja2 expression; default: ./outputs/
  final_as: result.json               # write the last non-None step result here

steps:
  - name: extract
    type: llm
    model: default
    prompt: prompts/extract.j2
    schema:
      vendor: str
      amount: float
    # save: false          # suppress this step's file
    # save: raw.json       # or use a custom filename

Each step with a non-None result is saved as {step_name}.json. Ensemble members are saved as {step}.{member}.json. Writes are non-fatal and skipped on pipeline failure or dry-run.

Load-time validation

PipelineRunner("pipeline.yaml") validates everything before spending any tokens:

pyconveyor validate pipeline.yaml
# ✓ pipeline.yaml is valid

Errors include the YAML line number and "did you mean?" suggestions.

Response caching

Cache LLM responses during development to avoid burning tokens on repeated runs:

pyconveyor run pipeline.yaml --input input.json
# subsequent runs use cached responses by default

DAG visualisation

pyconveyor visualise pipeline.yaml
# Outputs Mermaid diagram — paste into GitHub, GitLab, or Notion

CLI reference

pyconveyor init <dir>                  Bootstrap a new project
pyconveyor init <dir> --interactive    Guided setup — define fields interactively
pyconveyor run <pipeline.yaml>         Run a pipeline
pyconveyor validate <pipeline>         Validate without running
pyconveyor batch <pipeline>            Batch process a JSONL file
pyconveyor benchmark <dir>             Benchmark against golden-standard cases
pyconveyor vocab review <pipeline>     Review pending vocabulary suggestions
pyconveyor schema                      Emit JSONSchema for editor autocomplete
pyconveyor schema infer <pipeline>     Infer schemas.py from sample output
pyconveyor visualise <pipeline>        Print Mermaid DAG diagram

Python API

from pyconveyor import PipelineRunner, BatchRunner, BenchmarkRunner, generate_report

# Single run
runner = PipelineRunner("pipeline.yaml")
result = runner.run({"text": "…"})

result.failed                          # bool
result.steps["extract"].value          # Pydantic model or dict
result.steps["extract"].last_attempt   # AttemptLog with timing and token counts
result.summary()                       # RunSummary with aggregates

# Batch
batch_runner = BatchRunner("pipeline.yaml", max_workers=8)
for item_id, result in batch_runner.run(records):
    save(result.steps["extract"].value)

# Benchmark
bench = BenchmarkRunner("benchmarks/", pipelines=["pipeline.yaml"])
summary = bench.run()
generate_report(summary, "report.html")

Versioning policy

The YAML pipeline format (pipeline.yaml) is treated as a public API subject to the same semver rules as the Python API. A breaking change to the YAML schema will increment the major version.


Documentation

Full documentation at pyconveyor.readthedocs.io


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyconveyor-1.4.0.tar.gz (250.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyconveyor-1.4.0-py3-none-any.whl (73.7 kB view details)

Uploaded Python 3

File details

Details for the file pyconveyor-1.4.0.tar.gz.

File metadata

  • Download URL: pyconveyor-1.4.0.tar.gz
  • Upload date:
  • Size: 250.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyconveyor-1.4.0.tar.gz
Algorithm Hash digest
SHA256 2ff00d8699d0ef82d0700f2df01c3af373675b99874610ed686de93ee330c779
MD5 33cf99813e2f120db49ba7b31d4d5b9a
BLAKE2b-256 3f87fce701c3f99e815a13f8a1f60712a3833f28f822817a4ba6cc6b7e514e83

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyconveyor-1.4.0.tar.gz:

Publisher: publish.yml on VictorGambarini/pyconveyor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyconveyor-1.4.0-py3-none-any.whl.

File metadata

  • Download URL: pyconveyor-1.4.0-py3-none-any.whl
  • Upload date:
  • Size: 73.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyconveyor-1.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d5acbc6b35f423e48e51c8686fc492a5ca65b5e7a4369b8794b78018ada80d1a
MD5 3e64ee38832c379a3179397bc6c17eaa
BLAKE2b-256 c78c571d1592570bd9004cc75b8293108faccb7b40978d8a2c01496dbb51100b

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyconveyor-1.4.0-py3-none-any.whl:

Publisher: publish.yml on VictorGambarini/pyconveyor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page