Skip to main content

Deterministic YAML pipeline engine for structured LLM extraction

Project description

pyconveyor

Deterministic YAML pipeline engine for structured LLM extraction.

PyPI Python CI License: MIT

pyconveyor lets you build reliable LLM extraction pipelines by declaring them in YAML. It handles prompt rendering, schema validation, self-correcting retries, parallel steps, and controlled-vocabulary normalisation — so your code handles the domain logic, not the plumbing.

steps:
  - name: extract
    type: llm
    model: default
    prompt: prompts/extract.j2
    schema: schemas:ArticleSummary
    max_attempts: 3
from pyconveyor import PipelineRunner

runner = PipelineRunner("pipeline.yaml")
result = runner.run({"text": open("article.txt").read()})

summary = result.steps["extract"].value  # validated ArticleSummary instance
print(summary.title)

Install

pip install pyconveyor

For Anthropic Claude support:

pip install "pyconveyor[anthropic]"

Quickstart

Bootstrap a working project in one command:

pyconveyor init my_project/
cd my_project/
export OPENAI_API_KEY=sk-...
pyconveyor run pipeline.yaml --input '{"document": "The quick brown fox."}'

How it works

You write three files. pyconveyor owns the runner.

your_project/
├── pipeline.yaml       # what to do and in what order
├── schemas.py          # what shape the output must have (Pydantic models)
└── prompts/
    └── extract.j2      # what to ask the model (Jinja2 templates)

When runner.run(input_data) is called:

  1. The input dict becomes ctx — available in every prompt template and expression
  2. Steps execute in declaration order
  3. Each step's result is stored and can be referenced by later steps as {{ steps.name.value }}
  4. A RunContext is returned with all results, attempt logs, and timing

Features

Structured output with automatic retries

Every llm step validates the model's response against a Pydantic schema. If validation fails, pyconveyor feeds the error back to the model and retries — up to max_attempts times.

- name: extract
  type: llm
  model: default
  prompt: prompts/extract.j2
  schema: schemas:ArticleSummary
  max_attempts: 3
  on_error: continue   # "raise" | "continue" | "skip_remaining"

All step types

Step type What it does
llm Call a model, validate output against a Pydantic schema, retry on failure
transform Call a Python function with step outputs as inputs
validate Assert a condition; fail or skip remaining steps if it's false
io Call a Python function for side effects (DB write, file save)
parallel Run multiple sub-pipelines concurrently with ThreadPoolExecutor
condition Branch to different steps based on a runtime expression

Provider support

Provider How
OpenAI provider: openai_compat
Anthropic Claude provider: anthropic + pip install pyconveyor[anthropic]
Ollama / vLLM / LM Studio provider: openai_compat + base_url: override
Custom @register_provider("name") decorator
Tests provider: mock — no API calls

Vocabulary-constrained fields

VocabField constrains a Pydantic field to a controlled vocabulary, normalises fuzzy matches, and grows the vocabulary over time.

from pyconveyor.vocab import Vocabulary, VocabField
from pydantic import BaseModel

PlasticVocab = Vocabulary(
    known={"PET", "PE", "PLA", "PP"},
    label="plastic_type",
    growth_policy="human",   # queue novel terms for CLI review
    persist=True,            # save after each run
)

class Record(BaseModel):
    plastic: str = VocabField(vocab=PlasticVocab)
    quantity: int

Growth policies: "auto" (add immediately), "human" (queue for CLI review), "llm" (LLM decides), or any callable fn(VocabSuggestion) -> bool.

Review pending terms interactively:

pyconveyor vocab review pipeline.yaml

Batch processing

Process a JSONL file with configurable concurrency:

pyconveyor batch pipeline.yaml inputs.jsonl --concurrency 4 --output results.jsonl
from pyconveyor import BatchRunner

runner = BatchRunner("pipeline.yaml", concurrency=4)
batch = runner.run_all(records)  # list of dicts
print(batch.summary())           # total, succeeded, failed, error_rate

Load-time validation

PipelineRunner("pipeline.yaml") validates everything before spending any tokens — all schema imports, model references, expression syntax, and field names. Errors include the YAML line number and "did you mean?" suggestions.

pyconveyor validate pipeline.yaml
# ✓ pipeline.yaml is valid

# Or on error:
# pipeline.yaml:14: unknown field 'max_attempt' on llm step — did you mean 'max_attempts'?

Hooks and observability

runner.on_llm_call = lambda model, prompt, response: log_to_db(model, prompt, response)
runner.on_run_end  = lambda rctx: metrics.record(rctx.summary())

Response caching

Cache LLM responses during development to avoid burning tokens on repeated runs:

pyconveyor run pipeline.yaml --input '...' --cache
pyconveyor run pipeline.yaml --input '...' --cache --cache-ttl 3600

DAG visualisation

pyconveyor visualise pipeline.yaml
# Outputs Mermaid diagram

CLI reference

pyconveyor init <dir>              Bootstrap a new project
pyconveyor run <pipeline.yaml>     Run a pipeline
pyconveyor validate <pipeline>     Validate without running
pyconveyor batch <pipeline> <jsonl> Batch process a JSONL file
pyconveyor vocab review <pipeline> Review pending vocabulary suggestions
pyconveyor schema                  Emit JSONSchema for editor autocomplete
pyconveyor visualise <pipeline>    Print Mermaid DAG diagram

Python API

from pyconveyor import PipelineRunner, BatchRunner

# Single run
runner = PipelineRunner("pipeline.yaml")
result = runner.run({"text": "..."})

result.failed                          # bool
result.steps["extract"].value          # Pydantic model instance
result.steps["extract"].last_attempt   # AttemptLog with timing and token counts
result.summary()                       # RunSummary with aggregates

# Batch
batch_runner = BatchRunner("pipeline.yaml", concurrency=8)
batch = batch_runner.run_all(records)
for record in batch.successes:
    save(record)

Versioning policy

The YAML pipeline format (pipeline.yaml) is treated as a public API subject to the same semver rules as the Python API. A breaking change to the YAML schema will increment the major version.


Documentation

Full documentation at pyconveyor.readthedocs.io


License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyconveyor-1.0.1.tar.gz (203.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyconveyor-1.0.1-py3-none-any.whl (50.9 kB view details)

Uploaded Python 3

File details

Details for the file pyconveyor-1.0.1.tar.gz.

File metadata

  • Download URL: pyconveyor-1.0.1.tar.gz
  • Upload date:
  • Size: 203.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyconveyor-1.0.1.tar.gz
Algorithm Hash digest
SHA256 e5cda6cf5d35271506d5aa65522696583343bc425ee39932a7228bd87edda98f
MD5 45d2041f108f3f22b55b34093cae5b20
BLAKE2b-256 92272d1ca09c8d8cdee9c78bca8041f962c24caa3d2ff738f0d23e2fca5e43e6

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyconveyor-1.0.1.tar.gz:

Publisher: publish.yml on VictorGambarini/pyconveyor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pyconveyor-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: pyconveyor-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 50.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pyconveyor-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7ead400917ada14a588218c2597df4373cbb26d74958fa86ff9c4daf931addfb
MD5 39a01b9f4a64bdf167d7b9890a4a6985
BLAKE2b-256 ba577fc52f21fcdf493b8d963ebdbd533b843e5116b8a2c5f339ac16bdad1687

See more details on using hashes here.

Provenance

The following attestation bundles were made for pyconveyor-1.0.1-py3-none-any.whl:

Publisher: publish.yml on VictorGambarini/pyconveyor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page