Meta tool for managing and deploying data pipelines.
Project description
kptn
kptn is a meta orchestration framework for building R and Python data pipelines. It right-sizes tasks, tests changes, and streamlines development and operation.
Define your tasks and their dependencies in YAML and kptn will render a pipeline runnable locally or on AWS with one click.
Features
- Testable tasks: Built-in change detection
- Right-sizing: CPU and memory requirements per task
- Branch-based development: Clone pipeline state across branches
- Update in-place: Tweak and run a subset of the pipeline
- Less boilerplate: Launch R scripts without writing any wrapper code
- Less vendor lock-in: Any orchestration framework could be rendered, not just Prefect
Basic Example
Here's a toy example of a pipeline called example with 3 tasks: A, B, C. We first define the graph of tasks, where A has no dependencies, B depends on A, and C depends on A and B.
graphs:
example:
tasks:
A:
B: A
C: [A, B]
In the same YAML file, we can define the tasks (2 R tasks, 1 Python task):
tasks:
A:
file: "A/run.R"
outputs: ["A.csv"]
B:
file: "B/run.R"
outputs: ["B.csv"]
C:
file: C/run.py
outputs: ["C.csv"]
The file fields define the location of the script to run. In the Python case, the file must contain a function with the same name as the task C. The outputs field are the files generated by the script.
Configuration
There are two main configuration files: pyproject.toml and kptn.yaml
pyproject.toml
Your project is expected to use a pyproject.toml and contain a tool.kptn section.
| Field | Description | Required |
| flows_dir | The output directory where kptn will render your flow files to | Yes |
| py_tasks_dir | The directory (or list of directories) where Python task code exists | Yes |
| tasks_conf_path | The filepath to the kptn.yaml file defining your pipeline | Yes |
| docker_image | The name of the Docker image to build and push to Prefect | Yes |
| cache_namespace | Optional cache namespace shared across graphs/pipelines; defaults to the graph name | No |
Example:
[tool.kptn]
flows_dir = "py_src/flows"
py_tasks_dir = "py_src/tasks"
tasks_conf_path = "py_src/kptn.yaml"
docker_image = "nibrs-estimation-pipeline:latest"
kptn.yaml
Create a file, kptn.yaml that contains definitions of the graphs of tasks and the tasks themselves.
| Field | Value Description | Required |
| graphs | A dictionary of graph IDs and graph objects | Yes |
| graphs.[id] | A dictionary representing a graph (nodes and edges) | Yes |
| graphs.[id].extends | Optional string or list of graph IDs (or objects with graph and optional per-task args) to inherit tasks from; the earliest occurrence of a task name wins (parents first, then child). If provided, `tasks` can be omitted to reuse a parent graph as-is. | No |
| graphs.[id].config | Graph-level config overrides merged on top of the root config when running this graph | No |
| graphs.[id].tasks | An ordered dictionary of task IDs (nodes) and their dependencies (edges) | Yes |
| graphs.[id].tasks.[task_id] | A list of dependency task IDs. If no dependencies, leave blank or use an empty list `[]`. | Yes |
| graphs.[id].tasks.[task_id].args | Static keyword args to supply to that task when run via this graph (overrides or supplements `tasks.[task_id].args`) | No |
| settings.logging.file | File path for kptn runtime logs. Relative paths are resolved from the directory containing kptn.yaml. This applies to kptn's own Python/runtime logs only, not task stdout/stderr. | No |
| tasks | A dictionary of task names and task objects | Yes |
| tasks.[task_id] | A dictionary representing a task | Yes |
| tasks.[task_id].file | A string with the filepath to the Python or R script | Yes |
| tasks.[task_id].args | Static keyword args for Python tasks; values are passed as-is | No |
| tasks.[task_id].prefix_args | A string that will be inserted before the `Rscript` command-line call | No |
| tasks.[task_id].cli_args | A string that will be inserted at the end of the `Rscript` command-line call | No |
| tasks.[task_id].cache_result | A boolean, if `true`, the Python script return value will be saved in the cache database, DynamoDB. If this value is a large list (e.g. 50k items), it will be sharded across DynamoDB items for scalability. | No |
| tasks.[task_id].iterable_item | If `cache_result` is true and the result is a list, `iterable_item` is a string naming each item. The iterable item can also be a combination of values delimited by commas, e.g. US_STATE,ZIP_CODE | No |
| tasks.[task_id].map_over | A string that corresponds to an `iterable_item` in a dependency. Setting `map_over` will call this task for each item in the dependency result list. If this task is a Python task, the `iterable_item` will be passed as a function argument. If this task is an R task, the `iterable_item` will be passed as an environment variable to the R script. If the `iterable_item` is a comma-delimited combo, the values will be split and passed in separately. | No |
| tasks.[task_id].bundle_size | If `map_over` is set, an integer defining the number of subtasks sent to the Dask worker at one time. Default is 1. For a large number of subtasks, increasing this number can speedup the run time. | No |
| tasks.[task_id].group_size | If `map_over` is set, an integer defining the number of subtasks sent to the Dask scheduler at one time. Default is infinity. For a large number of subtasks, setting a max on this number can prevent the scheduler from getting overwhelmed. | No |
| tasks.[task_id].outputs | A list of files that the script outputs | No |
| tasks.[task_id].compute | A dictionary that can contain two fields, cpu and memory, corresponding to the Fargate task definition values | No |
| tasks.[task_id].duckdb_checkpoint | Optional DuckDB file checkpointing for Python or DuckDB SQL tasks. If set to true, kptn saves a task-specific checkpoint after the task succeeds. At process startup, kptn restores the checkpoint from the task furthest along in the pipeline that has an existing checkpoint file. | No |
| tasks.[task_id].tags | A list of strings, which will be used as the Prefect task's tags; useful for concurrency limits | No |
Example:
graphs:
my_graph:
tasks:
A:
B: [A]
tasks:
A:
file: true
cache_result: true
iterable_item: US_STATE
compute:
cpu: 256
memory: 512
B:
file: "B/run.R"
map_over: US_STATE
outputs:
- "${US_STATE}.csv"
In this example, my_graph is the name of a graph (also known as a pipeline or a DAG). The graph object includes the tasks field, which is a dictionary of task names and a list of their corresponding dependencies (other tasks).
Task A will call the A() function in A.py and store the result in the cache database.
Task B will map over the result list, setting US_STATE=${US_STATE} as an environment variable for every call of the R script B/run.R
For example, if A returned the list ['NC', 'SC'], B/run.R would be called 2 times, once with US_STATE=NC and once with US_STATE=SC
To write kptn runtime logs to a file, configure:
settings:
flows_dir: "."
flow_type: vanilla
logging:
file: log/kptn.log
DuckDB Checkpoints
If config.duckdb resolves to a file-backed DuckDB connection, individual tasks can opt into saving checkpoints.
config:
duckdb:
function: src.utils:get_engine
tasks:
init_database:
file: src/init_database.py
duckdb_checkpoint: true
Behavior:
- At process startup, kptn restores the checkpoint from the task furthest along in the pipeline that has an existing checkpoint file, but only when the checkpoint task does not already have a completed cached state in the database. This prevents the restore from discarding cached state for tasks that ran after the checkpoint was saved.
duckdb_checkpoint: trueonly controls whether a task saves a checkpoint after it succeeds.- After a checkpointed task succeeds and its final state (code hashes, status) has been written, kptn runs
checkpoint, closes the DuckDB connection, and copies the refreshed database file to that task's default checkpoint path. - The checkpoint path is task-specific and is written next to the DuckDB database file using the format
<db_stem>.<task_name>.backup<db_suffix>; for example,example.ddband taskbasicproduceexample.basic.backup.ddb. - If the DuckDB connection is in-memory, checkpointing is skipped.
Current scope:
- This startup restore behavior is intended for pipelines that do not use
map_overon checkpointed tasks. - The current implementation is designed for environments that intentionally share cache state via
settings.cache_namespace.
This is intended to replace one-off task logic like “restore the latest existing checkpoint at startup, then save fresh checkpoints at selected tasks” with a declarative task setting in kptn.yaml.
Local runners for Step Functions projects
Projects that render AWS Step Functions flows still need a way to run pipelines locally. The codegen command emits a vanilla Python runner next to the Step Functions JSON files whenever the project’s flow_type is stepfunctions.
Frequently Asked Questions (FAQ)
Why not just use Prefect?
tldr: kptn simplifies pipeline configuration and improves base implementation
As we tried to scale with Prefect and ran into bugs and wondered if Airflow or Dagster might work better, but we'd already written our pipeline in formulaic Prefect code of "call task A, call task B". It was simple to pull out the logic into a YAML representation of a graph. This is a portable definition, allowing us to render to any orchestrator's code.
A major goal was to test our pipeline. If we change the code for task A, we need to be able to run task A by itself and check if its outputs changed compared to its previous outputs. Prefect 2 didn't support running tasks by themselves, and its caching wasn't designed for this use case. Rendering Prefect code allows us to render runnable pieces of the pipeline and use a cache accessible for snapshot testing.
Another major goal is scaling to handle a large number of mapped tasks. Mapped tasks create a single subtask for each input. Prefect has two task runners that allow running subtasks in parallel: Dask and Ray. In our testing with Dask, we've found that we can speedup runtime by batching subtasks into bundles for Dask workers, and prevent the scheduler from crashing by batching subtasks into groups that are run sequentially. kptn offers both of these features via its API. Caching is particularly important for mapped tasks, because mapped tasks can be very expensive to run and more prone to crash. In the event some subtasks fail, re-running will only re-run failed tasks.
Overall, kptn makes maintenance easier. Developing and testing new features, copying production state to a local environment for debugging, and running subsets of subtasks are use cases its designed for.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file kptn-0.2.0.tar.gz.
File metadata
- Download URL: kptn-0.2.0.tar.gz
- Upload date:
- Size: 8.4 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
36862143bddc19304262a8c86d05f288abed20c0d984a3c410f6bee935d3fa69
|
|
| MD5 |
c20f4d663feb0af9b601f753ef305e8b
|
|
| BLAKE2b-256 |
bb8ed297f8ab526ca3e9b746a0517f837acc898edd79cf4c11483134ef3bcc0f
|
File details
Details for the file kptn-0.2.0-py3-none-any.whl.
File metadata
- Download URL: kptn-0.2.0-py3-none-any.whl
- Upload date:
- Size: 288.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0f62b46b7d30e3679d194c7dbb99838212f8d9df82fef914b7eea79c49a8be19
|
|
| MD5 |
2f0ac11a85300c45763ad595233a863a
|
|
| BLAKE2b-256 |
0ee9d1ead71745a162b1b2d5599a4b76b72750ca8b4e13ab27e45f757812cf44
|