Skip to main content

Meta tool for managing and deploying data pipelines.

Project description

kptn

Lightweight, cacheable data pipelines in Python, R, and SQL.

PyPI version Python versions License

kptn lets you define data pipelines as composable Python functions, SQL files, and R scripts. Pipelines are hash-cached by default — unchanged tasks are skipped automatically on re-runs, so you only pay for what changed. Profiles let you parameterize and filter runs by environment or dataset without changing code. Cloud deployment to AWS is on the roadmap.


Installation

pip install kptn

Optional extras:

pip install kptn[duckdb]   # DuckDB state store and SQL tasks
pip install kptn[aws]      # AWS deployment (work in progress)

CLI setup — add to your project's pyproject.toml:

[tool.kptn]
pipeline = "your_package.pipeline"  # module that exposes a `pipeline` attribute

Concepts

  • Tasks — the unit of work. A task is a Python function decorated with @kptn.task, a SQL file registered with kptn.sql_task(), or an R script registered with kptn.r_task(). Each task declares its output files.
  • Graphs — tasks are composed into a directed acyclic graph using the >> operator for sequential chaining and operators like parallel(), Stage(), and map() for branching.
  • Caching — kptn hashes each task's outputs and source code. On re-run, tasks whose outputs and dependencies haven't changed are skipped. Use kptn plan to preview what will run before committing.
  • Profiles — named configurations in kptn.yaml that filter stages, override task arguments, and control execution cursors. Useful for running a subset of the pipeline for a specific dataset or environment.

Quick Start

# pipeline.py
import kptn
from pathlib import Path


def get_greeting() -> str:
    return "Hello, kptn!"


@kptn.task(outputs=["output/extract.txt"])
def extract(greeting: str) -> None:
    Path("output").mkdir(exist_ok=True)
    Path("output/extract.txt").write_text(greeting)


@kptn.task(outputs=["output/transform.txt"])
def transform() -> None:
    data = Path("output/extract.txt").read_text()
    Path("output/transform.txt").write_text(data.upper())


@kptn.task(outputs=["output/load.txt"])
def load() -> None:
    data = Path("output/transform.txt").read_text()
    Path("output/load.txt").write_text(f"Loaded: {data}")


deps = kptn.config(greeting=get_greeting)
graph = deps >> extract >> transform >> load
pipeline = kptn.Pipeline("hello_kptn", graph)

Preview the plan, then run:

kptn plan
kptn run

Or run directly from Python:

kptn.run(pipeline)

Core Features

Graph Composition

Chain tasks sequentially with >>:

graph = extract >> transform >> load

Fan out to parallel branches with kptn.parallel():

graph = ingest >> kptn.parallel(transform_a, transform_b) >> merge

Use kptn.Stage() to define profile-selectable branches. The profile controls which branches are active at runtime:

datasets = kptn.Stage(
    "datasets",
    load_full,
    load_subset,
)
graph = ingest >> datasets >> analyze

Use kptn.map() to fan out dynamically over a runtime collection:

graph = list_items >> kptn.map(process_item, over="items")

Demand-Driven Dependencies

Declare prerequisites with requires=[...] instead of chaining them with >>. A required task is pulled into the run only when a consumer needs it, runs once even if several consumers require it, and is ordered before every requirer. This is ideal for expensive shared prerequisites:

@kptn.task(outputs=["duckdb://index"])
def build_index(engine) -> None:
    ...  # expensive; only worth running when something needs the index


@kptn.task(outputs=["output/report.txt"], requires=[build_index])
def report(engine) -> None:
    ...


# build_index is never chained with >> — `report` pulls it in:
pipeline = kptn.Pipeline("analysis", report)

requires is transitive: a required task's own requires are pulled in too. If you already place a task in the graph yourself (via >>), requires for that task is a no-op — your explicit wiring governs ordering. Note that conjunctive requires only injects and orders prerequisites — it never drops a consumer. If a user-placed prerequisite is later pruned by a profile, its consumer still runs; use kptn.any_of(...) when you need a missing prerequisite to skip the consumer.

Use kptn.any_of(...) for a disjunctive requirement — a gate that pulls nothing and is satisfied only if one of its members is already in the run. If none is present, the consumer is skipped:

@kptn.task(
    outputs=["output/combined.txt"],
    requires=[kptn.any_of(load_full, load_subset)],
)
def summarize(engine) -> None:
    ...

This pairs naturally with kptn.Stage(): whichever branch the active profile selects satisfies the any_of gate, and summarize runs against it.

Profiles

Profiles are defined in kptn.yaml at your project root. They let you parameterize runs without changing code.

settings:
  db: duckdb
  db_path: pipeline.db

profiles:
  full:
    stage_selections:
      datasets: [load_full]

  subset:
    stage_selections:
      datasets: [load_subset]
    args:
      analyze:
        limit: 1000

  subset_test:
    extends: subset
    stop_after: transform
    optional_groups:
      qa_checks: false

Profile keys:

Key Description
extends Inherit settings from another profile (or a list of profiles)
stage_selections Map of stage name → list of branch names to activate
args Per-task keyword argument overrides
start_from Skip tasks before this task name
stop_after Skip tasks after this task name
optional_groups Enable or disable named optional task groups

Run with a profile:

kptn run --profile subset
kptn plan --profile subset

DuckDB Integration

Pass a DuckDB connection factory via kptn.config(). Tasks receive the connection as a keyword argument:

import duckdb
import kptn
from pathlib import Path


def get_engine():
    return duckdb.connect("pipeline.db")


@kptn.task(outputs=["output/summary.parquet"])
def summarize(engine) -> None:
    engine.execute("COPY (SELECT * FROM raw) TO 'output/summary.parquet'")


ingest = kptn.sql_task("sql/ingest.sql", outputs=["raw"])
deps = kptn.config(duckdb=(get_engine, "engine"))
graph = deps >> ingest >> summarize
pipeline = kptn.Pipeline("my_pipeline", graph)

The duckdb=(factory, "alias") tuple tells kptn to inject the connection under the name "engine". SQL tasks receive it automatically.

Add duckdb_checkpoint=True to a task to persist a DuckDB checkpoint after it runs, enabling incremental restores:

@kptn.task(outputs=["output/final.parquet"], duckdb_checkpoint=True)
def finalize(engine) -> None:
    ...

Caching and Re-runs

kptn hashes each task's declared outputs and source code. On re-run:

  • Tasks whose outputs exist and haven't changed are skipped.
  • Tasks whose source code or upstream dependencies changed are re-run.

To bypass the cache for a single run:

kptn run --force

To preview what would run without executing:

kptn plan

API Reference

Tasks

Symbol Description
@kptn.task(outputs, optional=None, compute=None, duckdb_checkpoint=False, requires=None) Decorate a Python function as a kptn task
kptn.sql_task(path, outputs, optional=None, duckdb_checkpoint=False, requires=None) Register a SQL file as a task
kptn.r_task(path, outputs, compute=None, optional=None, duckdb_checkpoint=False, requires=None) Register an R script as a task
kptn.noop() Placeholder / synchronization node

Graph Composition

Symbol Description
>> Chain tasks or graphs sequentially
kptn.parallel(*branches) Fan out to parallel branches. Accepts an optional name as the first argument: kptn.parallel("name", a, b)
kptn.Stage(name, *branches) Profile-selectable branches grouped under a named stage
kptn.map(task_fn, over="key") Dynamic fanout over a runtime collection
kptn.any_of(*tasks) Disjunctive requirement group for requires= — satisfied if any member task is present in the run (otherwise the consumer is skipped)

Pipeline & Execution

Symbol Description
kptn.config(**kwargs) Declare dependency injection factories. Use duckdb=(factory, "alias") for DuckDB connections
kptn.Pipeline(name, graph) Wrap a graph in a named pipeline
kptn.run(pipeline, *, profile=None, keep_db_open=False, no_cache=False, force=False) Execute the pipeline
kptn.plan(pipeline, *, profile=None) Dry-run: print which tasks would run or be skipped

CLI Reference

The kptn CLI discovers your pipeline from [tool.kptn] pipeline = "..." in pyproject.toml. The referenced module must expose a pipeline attribute of type Pipeline.

kptn plan [--profile PROFILE]          # preview what will run or be skipped
kptn run  [--profile PROFILE] [--force] # execute the pipeline

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kptn-0.2.8.tar.gz (8.5 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

kptn-0.2.8-py3-none-any.whl (287.2 kB view details)

Uploaded Python 3

File details

Details for the file kptn-0.2.8.tar.gz.

File metadata

  • Download URL: kptn-0.2.8.tar.gz
  • Upload date:
  • Size: 8.5 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.7

File hashes

Hashes for kptn-0.2.8.tar.gz
Algorithm Hash digest
SHA256 3922a59b8061509cda49b25d985cd1ce38d1aba7cbb3e6423ab74b5b3895ae90
MD5 1f8b6ac79f3413ed5e5aaf3e24fd70c4
BLAKE2b-256 78bc9cfcf4cc7fbfe25821dba17f6fcc6cd6b9a7f92bb7fe5a6e527b1eea7cfb

See more details on using hashes here.

File details

Details for the file kptn-0.2.8-py3-none-any.whl.

File metadata

  • Download URL: kptn-0.2.8-py3-none-any.whl
  • Upload date:
  • Size: 287.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.7

File hashes

Hashes for kptn-0.2.8-py3-none-any.whl
Algorithm Hash digest
SHA256 8467bbde2e7cd26a6e09e69ddcdd7cdcf167b4f043ce575fb0ad731a94cc538b
MD5 54c25ae1b42961e6020d2d86b64fdefc
BLAKE2b-256 1b1288d495f8950d5f290d507e6dee71646d11fa0b825c53ee25c7775544aac5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page