Deterministic YAML pipeline engine for structured LLM extraction
Project description
pyconveyor
Deterministic YAML pipeline engine for structured LLM extraction.
pyconveyor lets you build reliable LLM extraction pipelines by declaring them in YAML. It handles prompt rendering, schema validation, self-correcting retries, parallel steps, and controlled-vocabulary normalisation — so your code handles the domain logic, not the plumbing.
steps:
- name: extract
type: llm
model: default
prompt: prompts/extract.j2
schema: schemas:ArticleSummary
max_attempts: 3
from pyconveyor import PipelineRunner
runner = PipelineRunner("pipeline.yaml")
result = runner.run({"text": open("article.txt").read()})
summary = result.steps["extract"].value # validated ArticleSummary instance
print(summary.title)
Install
pip install pyconveyor
For Anthropic Claude support:
pip install "pyconveyor[anthropic]"
Quickstart
Bootstrap a working project in one command:
pyconveyor init my_project/
cd my_project/
export OPENAI_API_KEY=sk-...
pyconveyor run pipeline.yaml --input '{"document": "The quick brown fox."}'
How it works
You write three files. pyconveyor owns the runner.
your_project/
├── pipeline.yaml # what to do and in what order
├── schemas.py # what shape the output must have (Pydantic models)
└── prompts/
└── extract.j2 # what to ask the model (Jinja2 templates)
When runner.run(input_data) is called:
- The input dict becomes
ctx— available in every prompt template and expression - Steps execute in declaration order
- Each step's result is stored and can be referenced by later steps as
{{ steps.name.value }} - A
RunContextis returned with all results, attempt logs, and timing
Features
Structured output with automatic retries
Every llm step validates the model's response against a Pydantic schema. If validation fails, pyconveyor feeds the error back to the model and retries — up to max_attempts times.
- name: extract
type: llm
model: default
prompt: prompts/extract.j2
schema: schemas:ArticleSummary
max_attempts: 3
on_error: continue # "raise" | "continue" | "skip_remaining"
All step types
| Step type | What it does |
|---|---|
llm |
Call a model, validate output against a Pydantic schema, retry on failure |
transform |
Call a Python function with step outputs as inputs |
validate |
Assert a condition; fail or skip remaining steps if it's false |
io |
Call a Python function for side effects (DB write, file save) |
parallel |
Run multiple sub-pipelines concurrently with ThreadPoolExecutor |
condition |
Branch to different steps based on a runtime expression |
Provider support
| Provider | How |
|---|---|
| OpenAI | provider: openai_compat |
| Anthropic Claude | provider: anthropic + pip install pyconveyor[anthropic] |
| Ollama / vLLM / LM Studio | provider: openai_compat + base_url: override |
| Custom | @register_provider("name") decorator |
| Tests | provider: mock — no API calls |
Vocabulary-constrained fields
VocabField constrains a Pydantic field to a controlled vocabulary, normalises fuzzy matches, and grows the vocabulary over time.
from pyconveyor.vocab import Vocabulary, VocabField
from pydantic import BaseModel
PlasticVocab = Vocabulary(
known={"PET", "PE", "PLA", "PP"},
label="plastic_type",
growth_policy="human", # queue novel terms for CLI review
persist=True, # save after each run
)
class Record(BaseModel):
plastic: str = VocabField(vocab=PlasticVocab)
quantity: int
Growth policies: "auto" (add immediately), "human" (queue for CLI review), "llm" (LLM decides), or any callable fn(VocabSuggestion) -> bool.
Review pending terms interactively:
pyconveyor vocab review pipeline.yaml
Batch processing
Process a JSONL file with configurable concurrency:
pyconveyor batch pipeline.yaml inputs.jsonl --concurrency 4 --output results.jsonl
from pyconveyor import BatchRunner
runner = BatchRunner("pipeline.yaml", concurrency=4)
batch = runner.run_all(records) # list of dicts
print(batch.summary()) # total, succeeded, failed, error_rate
Load-time validation
PipelineRunner("pipeline.yaml") validates everything before spending any tokens — all schema imports, model references, expression syntax, and field names. Errors include the YAML line number and "did you mean?" suggestions.
pyconveyor validate pipeline.yaml
# ✓ pipeline.yaml is valid
# Or on error:
# pipeline.yaml:14: unknown field 'max_attempt' on llm step — did you mean 'max_attempts'?
Hooks and observability
runner.on_llm_call = lambda model, prompt, response: log_to_db(model, prompt, response)
runner.on_run_end = lambda rctx: metrics.record(rctx.summary())
Response caching
Cache LLM responses during development to avoid burning tokens on repeated runs:
pyconveyor run pipeline.yaml --input '...' --cache
pyconveyor run pipeline.yaml --input '...' --cache --cache-ttl 3600
DAG visualisation
pyconveyor visualise pipeline.yaml
# Outputs Mermaid diagram
CLI reference
pyconveyor init <dir> Bootstrap a new project
pyconveyor run <pipeline.yaml> Run a pipeline
pyconveyor validate <pipeline> Validate without running
pyconveyor batch <pipeline> <jsonl> Batch process a JSONL file
pyconveyor vocab review <pipeline> Review pending vocabulary suggestions
pyconveyor schema Emit JSONSchema for editor autocomplete
pyconveyor visualise <pipeline> Print Mermaid DAG diagram
Python API
from pyconveyor import PipelineRunner, BatchRunner
# Single run
runner = PipelineRunner("pipeline.yaml")
result = runner.run({"text": "..."})
result.failed # bool
result.steps["extract"].value # Pydantic model instance
result.steps["extract"].last_attempt # AttemptLog with timing and token counts
result.summary() # RunSummary with aggregates
# Batch
batch_runner = BatchRunner("pipeline.yaml", concurrency=8)
batch = batch_runner.run_all(records)
for record in batch.successes:
save(record)
Versioning policy
The YAML pipeline format (pipeline.yaml) is treated as a public API subject to the same semver rules as the Python API. A breaking change to the YAML schema will increment the major version.
Documentation
Full documentation at pyconveyor.readthedocs.io
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyconveyor-1.0.1.tar.gz.
File metadata
- Download URL: pyconveyor-1.0.1.tar.gz
- Upload date:
- Size: 203.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e5cda6cf5d35271506d5aa65522696583343bc425ee39932a7228bd87edda98f
|
|
| MD5 |
45d2041f108f3f22b55b34093cae5b20
|
|
| BLAKE2b-256 |
92272d1ca09c8d8cdee9c78bca8041f962c24caa3d2ff738f0d23e2fca5e43e6
|
Provenance
The following attestation bundles were made for pyconveyor-1.0.1.tar.gz:
Publisher:
publish.yml on VictorGambarini/pyconveyor
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyconveyor-1.0.1.tar.gz -
Subject digest:
e5cda6cf5d35271506d5aa65522696583343bc425ee39932a7228bd87edda98f - Sigstore transparency entry: 1487416781
- Sigstore integration time:
-
Permalink:
VictorGambarini/pyconveyor@456f613a7ac9890de16764d6e5f35d2abbcaaba6 -
Branch / Tag:
refs/tags/v1.0.1 - Owner: https://github.com/VictorGambarini
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@456f613a7ac9890de16764d6e5f35d2abbcaaba6 -
Trigger Event:
push
-
Statement type:
File details
Details for the file pyconveyor-1.0.1-py3-none-any.whl.
File metadata
- Download URL: pyconveyor-1.0.1-py3-none-any.whl
- Upload date:
- Size: 50.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ead400917ada14a588218c2597df4373cbb26d74958fa86ff9c4daf931addfb
|
|
| MD5 |
39a01b9f4a64bdf167d7b9890a4a6985
|
|
| BLAKE2b-256 |
ba577fc52f21fcdf493b8d963ebdbd533b843e5116b8a2c5f339ac16bdad1687
|
Provenance
The following attestation bundles were made for pyconveyor-1.0.1-py3-none-any.whl:
Publisher:
publish.yml on VictorGambarini/pyconveyor
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pyconveyor-1.0.1-py3-none-any.whl -
Subject digest:
7ead400917ada14a588218c2597df4373cbb26d74958fa86ff9c4daf931addfb - Sigstore transparency entry: 1487416818
- Sigstore integration time:
-
Permalink:
VictorGambarini/pyconveyor@456f613a7ac9890de16764d6e5f35d2abbcaaba6 -
Branch / Tag:
refs/tags/v1.0.1 - Owner: https://github.com/VictorGambarini
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@456f613a7ac9890de16764d6e5f35d2abbcaaba6 -
Trigger Event:
push
-
Statement type: