Skip to main content

A pipeline framework where everything is a knot.

Project description

pirn

pirn

A pipeline framework where everything is a knot.

pirn builds typed, async, observable data and computation pipelines. You wire work into a tapestry of knots, run it, and get back a structured result — including content-addressed lineage records you can join across runs.

pip install pirn  # not yet on PyPI; this repo is the source

Requires Python 3.11+.

Quickstart

import asyncio
from pirn import Tapestry, Parameter, KnotConfig, knot, RunRequest

@knot
async def double(x: int) -> int:
    return x * 2

@knot
async def add(a: int, b: int) -> int:
    return a + b

async def main():
    with Tapestry() as t:
        x = Parameter("x", int)
        d = double(x=x, _config=KnotConfig(id="d"))
        answer = add(a=x, b=d, _config=KnotConfig(id="answer"))

    result = await t.run(RunRequest(parameters={"x": 5}))
    print(result.outputs)  # {'param:x': 5, 'd': 10, 'answer': 15}

asyncio.run(main())

That's the whole shape: declare knots inside a Tapestry() context, wire them by passing one knot as a kwarg of another, run.

The constructor convention

When you construct a knot, pirn looks at every kwarg:

  • If the value is itself a knot, it becomes a parent — this knot depends on the other knot's output.
  • Otherwise, the value is config — a constant fed in at run time.

So add(a=x, b=d, _config=KnotConfig(id="answer")) makes x and d parents of answer. There's no separate parents={...} dict to remember.

Framework metadata (the knot's id, error-handling policy, validation toggle) goes in the reserved _config= kwarg, which keeps the framework's namespace out of yours.

Every knot needs an explicit id — pirn doesn't auto-generate them, because auto-generated ids make lineage records unreadable.

Tapestry

A Tapestry is the workspace your knots live in. Constructing knots inside with Tapestry() as t: auto-registers them. You can also pass tapestry= explicitly, or hand a knot directly to t.register(knot).

t.run(request) walks from the tapestry's terminal knots (those with no downstream consumers) and runs the whole graph reachable from them. To run a specific subset, pass terminals=knot_or_list.

A tapestry holds three backends:

Backend What it stores Defaults / Phase 3 options
TapestryStore the canonical knot definitions InMemoryStore, SQLiteStore, PostgresStore, ValKeyStore
RunHistory run results and lineage records InMemoryHistory, SQLiteHistory, DuckDBHistory, PostgresHistory
DataStore intermediate values, keyed by content hash InMemoryDataStore, LocalDiskDataStore, ValKeyDataStore, S3DataStore

They're separate so each can be picked for its strength: Postgres for both store and history when you want one durable database; SQLite store + DuckDB history when you want OLAP-fast lineage queries; ValKey for the data store where content-addressed values fit a key-value store naturally; S3 when intermediate values are large or shared across many workers.

Each backend lives behind an extra: pip install pirn[sqlite], pirn[postgres], pirn[valkey], pirn[duckdb], pirn[s3], or pirn[all] for everything.

Result is three-way

Every knot produces an Ok, an Err, or a Skipped:

  • Ok(value) — success.
  • Err(record) — failure; the record is a Pydantic ExceptionRecord with the type, message, traceback, and a stable id.
  • Skipped(reason) — opted out, branch not selected, gate closed, parent failed under the default policy. Distinct from Err so downstream knots can react differently to "didn't run" vs "crashed".

By default, a knot whose parent produced Err or Skipped is itself skipped (SKIP_IF_PARENT_FAILED). Other policies:

  • RECEIVE_ERRORS — your process() is called with Result objects directly, so you handle errors yourself.
  • REQUIRE_ALL_PARENTS — any failed/skipped parent makes this knot fail too.

Set per-knot via _config=KnotConfig(id="...", error_policy=...).

Optional knots

If you want an Err from a particular knot to behave like a Skipped downstream, mix in Optional:

from pirn import Optional, Knot

class FetchPrefs(Optional, Knot):
    async def process(self, user_id: str) -> dict:
        ...

Optional is a mixin, not a flag, so it composes cleanly with subclasses that have their own behaviour.

Lineage, content-addressed

Every knot execution produces a KnotLineage record:

KnotLineage(
    run_id="run-abc",
    knot_id="answer",
    knot_class="my_pkg.knots.Add",
    knot_config_hash="sha256:…",       # the knot's config at run time
    parent_input_hashes={               # what it consumed
        "a": "sha256:…",
        "b": "sha256:…",
    },
    output_hash="sha256:…",            # what it produced
    outcome="ok",
    dispatcher="LocalDispatcher",
    started_at=, finished_at=,
)

Because hashes are content-addressed (sha256 of a stable canonicalisation), the same value always hashes to the same string regardless of which run produced it. This makes cross-run lineage queries trivial:

# Did anything else in any past run produce this same output?
matches = await tapestry.history.query_lineage_by_output_hash(out_hash)

# Who else consumed this value as input?
consumers = await tapestry.history.query_lineage_by_input_hash(in_hash)

# What's this knot's run history?
records = await tapestry.history.query_lineage_by_knot_id("answer")

Lineage records reference values by hash; the DataStore holds the values. You can scrub values from the data store (TTL, GDPR, whatever) without losing the lineage graph.

The node taxonomy

Beyond Knot, pirn ships a handful of specialised classes:

Class Shape
Source zero parents → produces a value (file, DB query, fetch, …)
Sink terminal consumer; output conventionally None
Aggregator N parents combined via a combine callable
Branch one input + selector → tagged path; non-selected paths are skipped
Gate one input + predicate → pass through or skip
Map wraps an inner knot, applying it per-element of a parent's list
Reduce folds a list parent into one value (whole-list or pairwise)

Optional is a mixin (not a node).

from pirn import Map, Reduce, Aggregator, Gate, Branch

# Map an inner knot over a collection-producing parent.
users = Map(
    over=record_ids,
    each=enrich_record,
    bind="record_id",
    _config=KnotConfig(id="users"),
)

# Reduce a list to one value.
total = Reduce(of=users, combine=sum, _config=KnotConfig(id="total"))

# Branch on a selector.
b = Branch(
    input=msg,
    selector=lambda m: m["type"],
    branches=("tool", "response"),
    _config=KnotConfig(id="route"),
)
handle_tool(payload=b["tool"], _config=KnotConfig(id="t"))
handle_resp(payload=b["response"], _config=KnotConfig(id="r"))

Dispatchers

The dispatcher decides where work runs.

  • LocalDispatcher — runs in the current event loop. The default.
  • ThreadDispatcher(max_workers=...) — offloads each knot to a global thread pool, useful for CPU-bound or sync-heavy work.
  • DaskDispatcher — runs each knot on a Dask cluster (pip install pirn[dask]).
  • RayDispatcher — runs each knot as a Ray task (pip install pirn[ray]).
  • CeleryDispatcher — submits each knot through Celery (pip install pirn[celery]).
from pirn import ThreadDispatcher
from pirn.engine.dask_dispatcher import DaskDispatcher

# In-process scaling.
with Tapestry(dispatcher=ThreadDispatcher(max_workers=8)) as t:
    ...

# Distributed scaling.
dispatcher = DaskDispatcher.local()  # or DaskDispatcher(scheduler="tcp://...")
with Tapestry(dispatcher=dispatcher) as t:
    ...

All dispatchers honor the same Dispatcher protocol; switching between them doesn't change the rest of your pipeline.

Triggers and emitters

A trigger starts a run when an external event arrives. An emitter observes runs as they happen and fans events out to logs, metrics, message buses, or traces.

Triggers

from pirn.triggers import CronTrigger, KafkaTrigger, WebhookTrigger, run_forever

# Run every five minutes.
trigger = CronTrigger(every_seconds=300)
await run_forever(trigger, tapestry)

# Run on each Kafka message.
trigger = KafkaTrigger(topic="orders", bootstrap_servers="kafka:9092")
await run_forever(trigger, tapestry)

# Run on each HTTP POST.  trigger.app is a Starlette ASGI app you mount on
# any ASGI server (uvicorn, hypercorn, FastAPI).
trigger = WebhookTrigger(path="/run")
import uvicorn
uvicorn.run(trigger.app, host="0.0.0.0", port=8080)

ValKeyTrigger (pubsub) is also available; full list in pirn.triggers.

Emitters

from pirn import LogEmitter, KafkaEmitter, OpenTelemetryEmitter

# Stream structured logs.
log_emitter = LogEmitter(with_payload=False)

# Publish to Kafka for downstream observability tools.
kafka_emitter = KafkaEmitter(
    bootstrap_servers="kafka:9092",
    topic_status="pirn.status",
    topic_lineage="pirn.lineage",
    topic_result="pirn.result",
)

# OpenTelemetry trace spans per knot.
otel_emitter = OpenTelemetryEmitter()

with Tapestry(emitters=[log_emitter, kafka_emitter, otel_emitter]) as t:
    ...

WebhookEmitter and ValKeyEmitter are also available. A broken emitter never breaks a run — exceptions inside emitters are isolated.

Streaming sources

Triggers fire whole runs (request/response). Streaming sources feed continuous data into a single long-running pipeline — ETL-style.

from pirn.streaming import IterableSource, FileTailSource, run_stream

# Tail a log file forever.
source = FileTailSource("/var/log/app.log", parameter_name="line")
await run_stream(source, tapestry, on_result=handle)

# Wrap any iterable.
source = IterableSource([1, 2, 3], parameter_name="x")
await run_stream(source, tapestry)

KafkaStreamingSource is available too. If you want to drive trigger-based machinery from a stream, wrap with StreamingSourceTrigger.

Mid-run extension

For dynamic pipelines (e.g., a knot decides to spawn N more knots based on its output), opt into extensible runs:

result = await tapestry.run(
    RunRequest(parameters={"x": 5}),
    extensible=True,
)

When extensible, knots registered to the store while the run is in progress get merged into the running shed, and execute if reachable from a terminal. Requires a store that supports change notification (InMemoryStore does; SQLiteStore doesn't yet — Phase 4).

Visualization

from pirn import mermaid_for_tapestry, mermaid_for_run, html_for_run

# Mermaid for embedding in docs.
print(mermaid_for_tapestry(t))           # structure only
print(mermaid_for_run(result))           # structure + outcome colors

# Standalone HTML/SVG for browsing.
Path("run.html").write_text(html_for_run(result))

The HTML renderer produces a single self-contained file with hover tooltips (knot id, class, outcome, hashes, duration), filter buttons by outcome, and a longest-path layout — no server, no external assets.

YAML pipelines

Pipelines can be declared in YAML and loaded with load_pipeline.

name: simple
nodes:
  - id: x
    type: parameter
    type_: int
    has_default: true
    default: 5
  - id: doubled
    type: knot
    callable: double
    parents:
      x: x
  - id: answer
    type: knot
    callable: add
    parents:
      a: x
      b: doubled
from pirn import load_pipeline, RunRequest

t = load_pipeline(
    yaml_text,
    known_callables={"double": double, "add": add},
)
result = await t.run(RunRequest())

Strict by default: every callable, predicate, selector, combine, or each reference must be in known_callables. Set allow_callable_refs: true at the top level to opt into dotted-path imports (loose mode).

Security

pirn uses pickle to serialize intermediate values in the S3DataStore, ValKeyDataStore, and LocalDiskDataStore backends. Pickle is an arbitrary-code-execution primitive: only use these backends when the backing store is not writable by adversaries.

The WebhookTrigger has no built-in authentication. Always place an authenticating reverse proxy or middleware in front of it before exposing it to any network.

Setting allow_callable_refs: true in a YAML pipeline enables dynamic Python imports from YAML content. Only use this with YAML authored by trusted developers — never with user-supplied YAML.

For a full security analysis, findings, and deployment hardening checklist, see planning/security-analysis.md.
To report a vulnerability, see SECURITY.md.

Documentation

Document Contents
docs/architecture.md Full architecture and design reference: execution model, backend matrix, extension points, Mermaid diagrams
planning/security-analysis.md Security findings, threat model, deployment hardening checklist
docs/choosing-backends.md When to use each storage backend
docs/deployment-sizing.md Sizing guidance for different deployment scales
docs/observability.md Emitters, OTel, Kafka, log structure
docs/schema-migrations.md Database schema migration procedures
docs/subscribable-stores.md Mid-run extension and subscribable store protocol
SECURITY.md Responsible disclosure policy

Philosophy

  • Declarative wiring, imperative bodies. Wiring happens in Tapestry context blocks; bodies are normal Python async functions.
  • Three-way results from the start. Skip is not failure; both deserve first-class handling.
  • Lineage by default, not as an add-on. Every run produces structured, content-addressed records that join across runs.
  • Backends are protocols. SQLite, Postgres, DuckDB, ValKey, S3, local disk — pick the shape that fits your deployment without API churn.
  • Optional deps stay optional. Each backend, dispatcher, trigger, and emitter is gated behind a [bracket] extra; install only what you use.

Status

Phase 3 (current). Public API stable: every protocol from Phase 2 still works, and Phase 3 adds the networked backends, distributed dispatchers, event-driven triggers and emitters, streaming sources, mid-run extension, and visualization on top.

For testing real backends (Postgres, ValKey, Kafka, S3) end-to-end, see planning/real-backend-testing-plan.md.

Apache-2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pirn-0.0.0.tar.gz (4.2 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pirn-0.0.0-py3-none-any.whl (156.4 kB view details)

Uploaded Python 3

File details

Details for the file pirn-0.0.0.tar.gz.

File metadata

  • Download URL: pirn-0.0.0.tar.gz
  • Upload date:
  • Size: 4.2 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pirn-0.0.0.tar.gz
Algorithm Hash digest
SHA256 6c62fe282b12dedc8c6232ac9e87ee3d2fd386941dc8f7ebfa4f7660cfb56741
MD5 770d0d2b552550ec33a2454b473b1834
BLAKE2b-256 3ef83b337e698be85cb1dc24822d339cbca3844a653d01ce00d8ca785ca56848

See more details on using hashes here.

Provenance

The following attestation bundles were made for pirn-0.0.0.tar.gz:

Publisher: ci.yml on snoodleboot-io/pirn

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pirn-0.0.0-py3-none-any.whl.

File metadata

  • Download URL: pirn-0.0.0-py3-none-any.whl
  • Upload date:
  • Size: 156.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pirn-0.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 56e0920e13e97a6af5d3fb5d22b52bb4f5fa646773ddfb5df5f19d26f84b4fca
MD5 07432a0fdf208e5d4eacc4d501cdf10f
BLAKE2b-256 bcf664986354275390847430b3694bdda4b50bdf6a591a162c9967a9bd20c506

See more details on using hashes here.

Provenance

The following attestation bundles were made for pirn-0.0.0-py3-none-any.whl:

Publisher: ci.yml on snoodleboot-io/pirn

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page