Meta tool for managing and deploying data pipelines.
Project description
kptn
Lightweight, cacheable data pipelines in Python, R, and SQL.
kptn lets you define data pipelines as composable Python functions, SQL files, and R scripts. Pipelines are hash-cached by default — unchanged tasks are skipped automatically on re-runs, so you only pay for what changed. Profiles let you parameterize and filter runs by environment or dataset without changing code. Cloud deployment to AWS is on the roadmap.
Installation
pip install kptn
Optional extras:
pip install kptn[duckdb] # DuckDB state store and SQL tasks
pip install kptn[aws] # AWS deployment (work in progress)
CLI setup — add to your project's pyproject.toml:
[tool.kptn]
pipeline = "your_package.pipeline" # module that exposes a `pipeline` attribute
Concepts
- Tasks — the unit of work. A task is a Python function decorated with
@kptn.task, a SQL file registered withkptn.sql_task(), or an R script registered withkptn.r_task(). Each task declares its output files. - Graphs — tasks are composed into a directed acyclic graph using the
>>operator for sequential chaining and operators likeparallel(),Stage(), andmap()for branching. - Caching — kptn hashes each task's outputs and source code. On re-run, tasks whose outputs and dependencies haven't changed are skipped. Use
kptn planto preview what will run before committing. - Profiles — named configurations in
kptn.yamlthat filter stages, override task arguments, and control execution cursors. Useful for running a subset of the pipeline for a specific dataset or environment.
Quick Start
# pipeline.py
import kptn
from pathlib import Path
def get_greeting() -> str:
return "Hello, kptn!"
@kptn.task(outputs=["output/extract.txt"])
def extract(greeting: str) -> None:
Path("output").mkdir(exist_ok=True)
Path("output/extract.txt").write_text(greeting)
@kptn.task(outputs=["output/transform.txt"])
def transform() -> None:
data = Path("output/extract.txt").read_text()
Path("output/transform.txt").write_text(data.upper())
@kptn.task(outputs=["output/load.txt"])
def load() -> None:
data = Path("output/transform.txt").read_text()
Path("output/load.txt").write_text(f"Loaded: {data}")
deps = kptn.config(greeting=get_greeting)
graph = deps >> extract >> transform >> load
pipeline = kptn.Pipeline("hello_kptn", graph)
Preview the plan, then run:
kptn plan
kptn run
Or run directly from Python:
kptn.run(pipeline)
Core Features
Graph Composition
Chain tasks sequentially with >>:
graph = extract >> transform >> load
Fan out to parallel branches with kptn.parallel():
graph = ingest >> kptn.parallel(transform_a, transform_b) >> merge
Use kptn.Stage() to define profile-selectable branches. The profile controls which branches are active at runtime:
datasets = kptn.Stage(
"datasets",
load_full,
load_subset,
)
graph = ingest >> datasets >> analyze
Use kptn.map() to fan out dynamically over a runtime collection:
graph = list_items >> kptn.map(process_item, over="items")
Profiles
Profiles are defined in kptn.yaml at your project root. They let you parameterize runs without changing code.
settings:
db: duckdb
db_path: pipeline.db
profiles:
full:
stage_selections:
datasets: [load_full]
subset:
stage_selections:
datasets: [load_subset]
args:
analyze:
limit: 1000
subset_test:
extends: subset
stop_after: transform
optional_groups:
qa_checks: false
Profile keys:
| Key | Description |
|---|---|
extends |
Inherit settings from another profile (or a list of profiles) |
stage_selections |
Map of stage name → list of branch names to activate |
args |
Per-task keyword argument overrides |
start_from |
Skip tasks before this task name |
stop_after |
Skip tasks after this task name |
optional_groups |
Enable or disable named optional task groups |
Run with a profile:
kptn run --profile subset
kptn plan --profile subset
DuckDB Integration
Pass a DuckDB connection factory via kptn.config(). Tasks receive the connection as a keyword argument:
import duckdb
import kptn
from pathlib import Path
def get_engine():
return duckdb.connect("pipeline.db")
@kptn.task(outputs=["output/summary.parquet"])
def summarize(engine) -> None:
engine.execute("COPY (SELECT * FROM raw) TO 'output/summary.parquet'")
ingest = kptn.sql_task("sql/ingest.sql", outputs=["raw"])
deps = kptn.config(duckdb=(get_engine, "engine"))
graph = deps >> ingest >> summarize
pipeline = kptn.Pipeline("my_pipeline", graph)
The duckdb=(factory, "alias") tuple tells kptn to inject the connection under the name "engine". SQL tasks receive it automatically.
Add duckdb_checkpoint=True to a task to persist a DuckDB checkpoint after it runs, enabling incremental restores:
@kptn.task(outputs=["output/final.parquet"], duckdb_checkpoint=True)
def finalize(engine) -> None:
...
Caching and Re-runs
kptn hashes each task's declared outputs and source code. On re-run:
- Tasks whose outputs exist and haven't changed are skipped.
- Tasks whose source code or upstream dependencies changed are re-run.
To bypass the cache for a single run:
kptn run --force
To preview what would run without executing:
kptn plan
API Reference
Tasks
| Symbol | Description |
|---|---|
@kptn.task(outputs, optional=None, compute=None, duckdb_checkpoint=False) |
Decorate a Python function as a kptn task |
kptn.sql_task(path, outputs, optional=None, duckdb_checkpoint=False) |
Register a SQL file as a task |
kptn.r_task(path, outputs, compute=None, optional=None, duckdb_checkpoint=False) |
Register an R script as a task |
kptn.noop() |
Placeholder / synchronization node |
Graph Composition
| Symbol | Description |
|---|---|
>> |
Chain tasks or graphs sequentially |
kptn.parallel(*branches) |
Fan out to parallel branches. Accepts an optional name as the first argument: kptn.parallel("name", a, b) |
kptn.Stage(name, *branches) |
Profile-selectable branches grouped under a named stage |
kptn.map(task_fn, over="key") |
Dynamic fanout over a runtime collection |
Pipeline & Execution
| Symbol | Description |
|---|---|
kptn.config(**kwargs) |
Declare dependency injection factories. Use duckdb=(factory, "alias") for DuckDB connections |
kptn.Pipeline(name, graph) |
Wrap a graph in a named pipeline |
kptn.run(pipeline, *, profile=None, keep_db_open=False, no_cache=False, force=False) |
Execute the pipeline |
kptn.plan(pipeline, *, profile=None) |
Dry-run: print which tasks would run or be skipped |
CLI Reference
The kptn CLI discovers your pipeline from [tool.kptn] pipeline = "..." in pyproject.toml. The referenced module must expose a pipeline attribute of type Pipeline.
kptn plan [--profile PROFILE] # preview what will run or be skipped
kptn run [--profile PROFILE] [--force] # execute the pipeline
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kptn-0.2.5.tar.gz.
File metadata
- Download URL: kptn-0.2.5.tar.gz
- Upload date:
- Size: 8.4 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bc556957b6d91a24fdce9cc7ae0eaf1496ae08cb6d0fac62f2c1bb9fdf0340c6
|
|
| MD5 |
5856ded3f60d44553cb69a70276c0ba7
|
|
| BLAKE2b-256 |
dd72f82abc14fa20fd9a75ffaf3754084f1a0c94433bfd19cc7f14dba9cd48f2
|
File details
Details for the file kptn-0.2.5-py3-none-any.whl.
File metadata
- Download URL: kptn-0.2.5-py3-none-any.whl
- Upload date:
- Size: 281.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d073b7e432169ae25774d7810d602c7b006d72b97a68b173a0e64a18870e444d
|
|
| MD5 |
e4c7a7a20ff150de95e62ec60dc52073
|
|
| BLAKE2b-256 |
42ddd5cf204abc488c765064d7be404e25c8dca626cac5b556106a20d5307b0b
|