A pipeline framework where everything is a knot.
Project description
pirn
A pipeline framework where everything is a knot.
pirn builds typed, async, observable data and computation pipelines. You wire
work into a tapestry of knots, run it, and get back a structured result —
including content-addressed lineage records you can join across runs.
pip install pirn # not yet on PyPI; this repo is the source
Requires Python 3.11+.
Quickstart
import asyncio
from pirn import Tapestry, Parameter, KnotConfig, knot, RunRequest
@knot
async def double(x: int) -> int:
return x * 2
@knot
async def add(a: int, b: int) -> int:
return a + b
async def main():
with Tapestry() as t:
x = Parameter("x", int)
d = double(x=x, _config=KnotConfig(id="d"))
answer = add(a=x, b=d, _config=KnotConfig(id="answer"))
result = await t.run(RunRequest(parameters={"x": 5}))
print(result.outputs) # {'param:x': 5, 'd': 10, 'answer': 15}
asyncio.run(main())
That's the whole shape: declare knots inside a Tapestry() context, wire them
by passing one knot as a kwarg of another, run.
The constructor convention
When you construct a knot, pirn looks at every kwarg:
- If the value is itself a knot, it becomes a parent — this knot depends on the other knot's output.
- Otherwise, the value is config — a constant fed in at run time.
So add(a=x, b=d, _config=KnotConfig(id="answer")) makes x and d parents
of answer. There's no separate parents={...} dict to remember.
Framework metadata (the knot's id, error-handling policy, validation toggle)
goes in the reserved _config= kwarg, which keeps the framework's namespace
out of yours.
Every knot needs an explicit id — pirn doesn't auto-generate them, because auto-generated ids make lineage records unreadable.
Tapestry
A Tapestry is the workspace your knots live in. Constructing knots inside
with Tapestry() as t: auto-registers them. You can also pass tapestry=
explicitly, or hand a knot directly to t.register(knot).
t.run(request) walks from the tapestry's terminal knots (those with no
downstream consumers) and runs the whole graph reachable from them. To run a
specific subset, pass terminals=knot_or_list.
A tapestry holds three backends:
| Backend | What it stores | Defaults / Phase 3 options |
|---|---|---|
TapestryStore |
the canonical knot definitions | InMemoryStore, SQLiteStore, PostgresStore, ValKeyStore |
RunHistory |
run results and lineage records | InMemoryHistory, SQLiteHistory, DuckDBHistory, PostgresHistory |
DataStore |
intermediate values, keyed by content hash | InMemoryDataStore, LocalDiskDataStore, ValKeyDataStore, S3DataStore |
They're separate so each can be picked for its strength: Postgres for both store and history when you want one durable database; SQLite store + DuckDB history when you want OLAP-fast lineage queries; ValKey for the data store where content-addressed values fit a key-value store naturally; S3 when intermediate values are large or shared across many workers.
Each backend lives behind an extra: pip install pirn[sqlite],
pirn[postgres], pirn[valkey], pirn[duckdb], pirn[s3], or
pirn[all] for everything.
Result is three-way
Every knot produces an Ok, an Err, or a Skipped:
Ok(value)— success.Err(record)— failure; the record is a PydanticExceptionRecordwith the type, message, traceback, and a stable id.Skipped(reason)— opted out, branch not selected, gate closed, parent failed under the default policy. Distinct fromErrso downstream knots can react differently to "didn't run" vs "crashed".
By default, a knot whose parent produced Err or Skipped is itself
skipped (SKIP_IF_PARENT_FAILED). Other policies:
RECEIVE_ERRORS— yourprocess()is called withResultobjects directly, so you handle errors yourself.REQUIRE_ALL_PARENTS— any failed/skipped parent makes this knot fail too.
Set per-knot via _config=KnotConfig(id="...", error_policy=...).
Optional knots
If you want an Err from a particular knot to behave like a Skipped
downstream, mix in Optional:
from pirn import Optional, Knot
class FetchPrefs(Optional, Knot):
async def process(self, user_id: str) -> dict:
...
Optional is a mixin, not a flag, so it composes cleanly with subclasses
that have their own behaviour.
Lineage, content-addressed
Every knot execution produces a KnotLineage record:
KnotLineage(
run_id="run-abc",
knot_id="answer",
knot_class="my_pkg.knots.Add",
knot_config_hash="sha256:…", # the knot's config at run time
parent_input_hashes={ # what it consumed
"a": "sha256:…",
"b": "sha256:…",
},
output_hash="sha256:…", # what it produced
outcome="ok",
dispatcher="LocalDispatcher",
started_at=…, finished_at=…,
)
Because hashes are content-addressed (sha256 of a stable canonicalisation), the same value always hashes to the same string regardless of which run produced it. This makes cross-run lineage queries trivial:
# Did anything else in any past run produce this same output?
matches = await tapestry.history.query_lineage_by_output_hash(out_hash)
# Who else consumed this value as input?
consumers = await tapestry.history.query_lineage_by_input_hash(in_hash)
# What's this knot's run history?
records = await tapestry.history.query_lineage_by_knot_id("answer")
Lineage records reference values by hash; the DataStore holds the values.
You can scrub values from the data store (TTL, GDPR, whatever) without
losing the lineage graph.
The node taxonomy
Beyond Knot, pirn ships a handful of specialised classes:
| Class | Shape |
|---|---|
Source |
zero parents → produces a value (file, DB query, fetch, …) |
Sink |
terminal consumer; output conventionally None |
Aggregator |
N parents combined via a combine callable |
Branch |
one input + selector → tagged path; non-selected paths are skipped |
Gate |
one input + predicate → pass through or skip |
Map |
wraps an inner knot, applying it per-element of a parent's list |
Reduce |
folds a list parent into one value (whole-list or pairwise) |
Optional is a mixin (not a node).
from pirn import Map, Reduce, Aggregator, Gate, Branch
# Map an inner knot over a collection-producing parent.
users = Map(
over=record_ids,
each=enrich_record,
bind="record_id",
_config=KnotConfig(id="users"),
)
# Reduce a list to one value.
total = Reduce(of=users, combine=sum, _config=KnotConfig(id="total"))
# Branch on a selector.
b = Branch(
input=msg,
selector=lambda m: m["type"],
branches=("tool", "response"),
_config=KnotConfig(id="route"),
)
handle_tool(payload=b["tool"], _config=KnotConfig(id="t"))
handle_resp(payload=b["response"], _config=KnotConfig(id="r"))
Dispatchers
The dispatcher decides where work runs.
LocalDispatcher— runs in the current event loop. The default.ThreadDispatcher(max_workers=...)— offloads each knot to a global thread pool, useful for CPU-bound or sync-heavy work.DaskDispatcher— runs each knot on a Dask cluster (pip install pirn[dask]).RayDispatcher— runs each knot as a Ray task (pip install pirn[ray]).CeleryDispatcher— submits each knot through Celery (pip install pirn[celery]).
from pirn import ThreadDispatcher
from pirn.engine.dask_dispatcher import DaskDispatcher
# In-process scaling.
with Tapestry(dispatcher=ThreadDispatcher(max_workers=8)) as t:
...
# Distributed scaling.
dispatcher = DaskDispatcher.local() # or DaskDispatcher(scheduler="tcp://...")
with Tapestry(dispatcher=dispatcher) as t:
...
All dispatchers honor the same Dispatcher protocol; switching between
them doesn't change the rest of your pipeline.
Triggers and emitters
A trigger starts a run when an external event arrives. An emitter observes runs as they happen and fans events out to logs, metrics, message buses, or traces.
Triggers
from pirn.triggers import CronTrigger, KafkaTrigger, WebhookTrigger, run_forever
# Run every five minutes.
trigger = CronTrigger(every_seconds=300)
await run_forever(trigger, tapestry)
# Run on each Kafka message.
trigger = KafkaTrigger(topic="orders", bootstrap_servers="kafka:9092")
await run_forever(trigger, tapestry)
# Run on each HTTP POST. trigger.app is a Starlette ASGI app you mount on
# any ASGI server (uvicorn, hypercorn, FastAPI).
trigger = WebhookTrigger(path="/run")
import uvicorn
uvicorn.run(trigger.app, host="0.0.0.0", port=8080)
ValKeyTrigger (pubsub) is also available; full list in
pirn.triggers.
Emitters
from pirn import LogEmitter, KafkaEmitter, OpenTelemetryEmitter
# Stream structured logs.
log_emitter = LogEmitter(with_payload=False)
# Publish to Kafka for downstream observability tools.
kafka_emitter = KafkaEmitter(
bootstrap_servers="kafka:9092",
topic_status="pirn.status",
topic_lineage="pirn.lineage",
topic_result="pirn.result",
)
# OpenTelemetry trace spans per knot.
otel_emitter = OpenTelemetryEmitter()
with Tapestry(emitters=[log_emitter, kafka_emitter, otel_emitter]) as t:
...
WebhookEmitter and ValKeyEmitter are also available. A broken
emitter never breaks a run — exceptions inside emitters are isolated.
Streaming sources
Triggers fire whole runs (request/response). Streaming sources feed continuous data into a single long-running pipeline — ETL-style.
from pirn.streaming import IterableSource, FileTailSource, run_stream
# Tail a log file forever.
source = FileTailSource("/var/log/app.log", parameter_name="line")
await run_stream(source, tapestry, on_result=handle)
# Wrap any iterable.
source = IterableSource([1, 2, 3], parameter_name="x")
await run_stream(source, tapestry)
KafkaStreamingSource is available too. If you want to drive
trigger-based machinery from a stream, wrap with
StreamingSourceTrigger.
Mid-run extension
For dynamic pipelines (e.g., a knot decides to spawn N more knots based on its output), opt into extensible runs:
result = await tapestry.run(
RunRequest(parameters={"x": 5}),
extensible=True,
)
When extensible, knots registered to the store while the run is in
progress get merged into the running shed, and execute if reachable
from a terminal. Requires a store that supports change notification
(InMemoryStore does; SQLiteStore doesn't yet — Phase 4).
Visualization
from pirn import mermaid_for_tapestry, mermaid_for_run, html_for_run
# Mermaid for embedding in docs.
print(mermaid_for_tapestry(t)) # structure only
print(mermaid_for_run(result)) # structure + outcome colors
# Standalone HTML/SVG for browsing.
Path("run.html").write_text(html_for_run(result))
The HTML renderer produces a single self-contained file with hover tooltips (knot id, class, outcome, hashes, duration), filter buttons by outcome, and a longest-path layout — no server, no external assets.
YAML pipelines
Pipelines can be declared in YAML and loaded with load_pipeline.
name: simple
nodes:
- id: x
type: parameter
type_: int
has_default: true
default: 5
- id: doubled
type: knot
callable: double
parents:
x: x
- id: answer
type: knot
callable: add
parents:
a: x
b: doubled
from pirn import load_pipeline, RunRequest
t = load_pipeline(
yaml_text,
known_callables={"double": double, "add": add},
)
result = await t.run(RunRequest())
Strict by default: every callable, predicate, selector, combine, or each
reference must be in known_callables. Set allow_callable_refs: true at
the top level to opt into dotted-path imports (loose mode).
Security
pirn uses pickle to serialize intermediate values in the S3DataStore, ValKeyDataStore, and LocalDiskDataStore backends. Pickle is an arbitrary-code-execution primitive: only use these backends when the backing store is not writable by adversaries.
The WebhookTrigger has no built-in authentication. Always place an authenticating reverse proxy or middleware in front of it before exposing it to any network.
Setting allow_callable_refs: true in a YAML pipeline enables dynamic Python imports from YAML content. Only use this with YAML authored by trusted developers — never with user-supplied YAML.
For a full security analysis, findings, and deployment hardening checklist, see planning/security-analysis.md.
To report a vulnerability, see SECURITY.md.
Documentation
| Document | Contents |
|---|---|
| docs/architecture.md | Full architecture and design reference: execution model, backend matrix, extension points, Mermaid diagrams |
| planning/security-analysis.md | Security findings, threat model, deployment hardening checklist |
| docs/choosing-backends.md | When to use each storage backend |
| docs/deployment-sizing.md | Sizing guidance for different deployment scales |
| docs/observability.md | Emitters, OTel, Kafka, log structure |
| docs/schema-migrations.md | Database schema migration procedures |
| docs/subscribable-stores.md | Mid-run extension and subscribable store protocol |
| SECURITY.md | Responsible disclosure policy |
Philosophy
- Declarative wiring, imperative bodies. Wiring happens in
Tapestrycontext blocks; bodies are normal Pythonasyncfunctions. - Three-way results from the start. Skip is not failure; both deserve first-class handling.
- Lineage by default, not as an add-on. Every run produces structured, content-addressed records that join across runs.
- Backends are protocols. SQLite, Postgres, DuckDB, ValKey, S3, local disk — pick the shape that fits your deployment without API churn.
- Optional deps stay optional. Each backend, dispatcher, trigger, and
emitter is gated behind a
[bracket]extra; install only what you use.
Status
Phase 3 (current). Public API stable: every protocol from Phase 2 still works, and Phase 3 adds the networked backends, distributed dispatchers, event-driven triggers and emitters, streaming sources, mid-run extension, and visualization on top.
For testing real backends (Postgres, ValKey, Kafka, S3) end-to-end, see
planning/real-backend-testing-plan.md.
Apache-2.0.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pirn-0.1.0.tar.gz.
File metadata
- Download URL: pirn-0.1.0.tar.gz
- Upload date:
- Size: 4.2 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8293e477026f923c134267a8153fadcc8cdf7b0785ebedccb37a1145dfa09a30
|
|
| MD5 |
bbea3a5dce5c78689f25b889487a1683
|
|
| BLAKE2b-256 |
b031ccb3bc04ee8a84a611574bd7db2bca8452e5ba6b98a2471ee6695e113497
|
Provenance
The following attestation bundles were made for pirn-0.1.0.tar.gz:
Publisher:
ci.yml on snoodleboot-io/pirn
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pirn-0.1.0.tar.gz -
Subject digest:
8293e477026f923c134267a8153fadcc8cdf7b0785ebedccb37a1145dfa09a30 - Sigstore transparency entry: 1410331535
- Sigstore integration time:
-
Permalink:
snoodleboot-io/pirn@2c566ddfd46a5fdf8394fafa61e6c7584664516c -
Branch / Tag:
refs/heads/main - Owner: https://github.com/snoodleboot-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@2c566ddfd46a5fdf8394fafa61e6c7584664516c -
Trigger Event:
push
-
Statement type:
File details
Details for the file pirn-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pirn-0.1.0-py3-none-any.whl
- Upload date:
- Size: 156.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cd8b75107b69d6ccb6e8d670a8a026f40b7e4c655a1e897ca685a382043273b6
|
|
| MD5 |
89de524f21474fab7508c2ead7fb9cbe
|
|
| BLAKE2b-256 |
d643fc25f221fa862365f2e608c20fa5e27d8de0d99c121369cef87d40f8e03b
|
Provenance
The following attestation bundles were made for pirn-0.1.0-py3-none-any.whl:
Publisher:
ci.yml on snoodleboot-io/pirn
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pirn-0.1.0-py3-none-any.whl -
Subject digest:
cd8b75107b69d6ccb6e8d670a8a026f40b7e4c655a1e897ca685a382043273b6 - Sigstore transparency entry: 1410331634
- Sigstore integration time:
-
Permalink:
snoodleboot-io/pirn@2c566ddfd46a5fdf8394fafa61e6c7584664516c -
Branch / Tag:
refs/heads/main - Owner: https://github.com/snoodleboot-io
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@2c566ddfd46a5fdf8394fafa61e6c7584664516c -
Trigger Event:
push
-
Statement type: