Lightweight DAG-based data pipeline runner for local development
Project description
:link: pipechain
Lightweight DAG-based data pipeline runner for local development
Define data pipelines as Python functions. Get dependency resolution, parallel execution, retries, and caching -- without Airflow overhead.
Why This Exists
Data pipelines are everywhere, but the tooling is either massive (Airflow, Prefect, Dagster) or nonexistent (raw scripts with nested try/except).
pipechain fills the gap: a zero-dependency, pure-Python pipeline runner that gives you DAG-based execution, parallel steps, retry policies, result caching, and lifecycle hooks -- all in a library you can pip install and use in 5 lines of code.
No scheduler. No database. No Docker. Just Python functions wired into a DAG.
Features
- DAG-based execution -- automatic dependency resolution with cycle detection
- Parallel execution -- independent steps run concurrently with configurable concurrency limits
- Retry policies -- exponential backoff, linear, jitter, per-exception filtering
- Result caching -- in-memory or file-based, with TTL support
- Conditional steps -- skip steps based on runtime context
- Lifecycle hooks -- before/after pipeline, before/after step, on error, on retry, on skip, on cache hit
- Tag filtering -- run subsets of steps by tag
- Dry-run mode -- validate the plan without executing
- Shared context -- thread-safe data store with namespaces and snapshot/rollback
- CLI interface -- run, validate, visualize, and plan pipelines from the command line
- Type-safe -- full type annotations throughout
Installation
pip install pipechain
Quick Start
from pipechain import Pipeline, Step, Context
# Define steps as plain functions
def extract(ctx: Context):
"""Load raw data."""
return {"users": [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]}
def transform(ctx: Context):
"""Filter and enrich data."""
data = ctx.get("extract.output")
return [u for u in data["users"] if u["age"] >= 28]
def load(ctx: Context):
"""Save results."""
users = ctx.get("transform.output")
ctx.set("loaded_count", len(users))
return f"Loaded {len(users)} users"
# Build the pipeline
pipe = Pipeline("etl_example")
pipe.add_step(Step("extract", extract))
pipe.add_step(Step("transform", transform, depends_on=["extract"]))
pipe.add_step(Step("load", load, depends_on=["transform"]))
# Run it
result = pipe.run()
print(result.summary())
Using the Decorator API
from pipechain import Pipeline, step, Context
from pipechain.retry import RetryPolicy
pipe = Pipeline("decorated_etl")
@step(name="fetch", retry=RetryPolicy.exponential(max_retries=3, base_delay=0.5))
def fetch_data(ctx: Context):
"""Fetch data from API with automatic retries."""
return {"items": list(range(100))}
@step(name="process", depends_on=["fetch"], tags=["cpu"])
def process_data(ctx: Context):
"""Process the fetched data."""
items = ctx.get("fetch.output")["items"]
return [x * 2 for x in items]
@step(name="save", depends_on=["process"], cache_key="save_v1", cache_ttl=3600)
def save_results(ctx: Context):
"""Save with 1-hour cache."""
return len(ctx.get("process.output"))
pipe.add_step(fetch_data)
pipe.add_step(process_data)
pipe.add_step(save_results)
result = pipe.run()
Parallel Execution
Steps without dependencies on each other execute in parallel automatically:
pipe = Pipeline("parallel_demo", max_concurrency=4)
pipe.add_step(Step("fetch_users", fetch_users))
pipe.add_step(Step("fetch_orders", fetch_orders))
pipe.add_step(Step("fetch_products", fetch_products))
pipe.add_step(Step("merge", merge_all, depends_on=["fetch_users", "fetch_orders", "fetch_products"]))
# fetch_users, fetch_orders, fetch_products run in parallel
# merge waits for all three to complete
print(pipe.visualize())
Output:
Pipeline: parallel_demo
Level 0:
[fetch_orders]
[fetch_products]
[fetch_users]
|
v
Level 1:
[merge] <- [fetch_users, fetch_orders, fetch_products]
Conditional Steps
pipe.add_step(Step(
"send_alerts",
send_alerts,
depends_on=["analyze"],
condition=lambda ctx: ctx.get("analyze.output", {}).get("has_anomalies", False),
))
Lifecycle Hooks
from pipechain.hooks import HookType
pipe.on(HookType.BEFORE_PIPELINE, lambda **kw: print("Starting pipeline..."))
pipe.on(HookType.AFTER_STEP, lambda **kw: print(f"Completed: {kw['step'].name}"))
pipe.on(HookType.ON_STEP_ERROR, lambda **kw: log_error(kw['error']))
pipe.on(HookType.ON_STEP_RETRY, lambda **kw: print(f"Retrying {kw['step'].name}, attempt {kw['attempt']}"))
Caching
from pipechain.cache import MemoryCache, FileCache
# In-memory cache (fast, lost on restart)
pipe = Pipeline("cached", cache=MemoryCache())
# File-based cache (persists across runs)
pipe = Pipeline("cached", cache=FileCache(".pipeline_cache"))
# Per-step cache keys and TTL
pipe.add_step(Step("expensive", compute, cache_key="v1", cache_ttl=3600))
Context (Shared Data Store)
from pipechain import Context
ctx = Context({"env": "production"})
# Steps read/write through context
ctx.set("step1.output", {"data": [1, 2, 3]})
ctx.get("step1.output") # {"data": [1, 2, 3]}
# Namespaces
ctx.namespace("step1") # {"output": {"data": [1, 2, 3]}}
# Snapshot & rollback
snap = ctx.snapshot()
ctx.set("temp", "value")
ctx.rollback(snap) # Restores previous state
CLI Usage
# Run a pipeline
pipechain run pipeline.py
# Dry run (validate without executing)
pipechain run pipeline.py --dry-run
# JSON output
pipechain run pipeline.py --json
# Filter by tags
pipechain run pipeline.py --tags etl critical
# Validate a pipeline file
pipechain validate pipeline.py
# Show execution plan
pipechain plan pipeline.py
# Visualize the DAG
pipechain visualize pipeline.py
Async Support
import asyncio
from pipechain import Pipeline, Step
async def async_fetch(ctx):
await asyncio.sleep(0.1) # Simulate I/O
return {"data": "fetched"}
pipe = Pipeline("async_demo")
pipe.add_step(Step("fetch", async_fetch))
# Sync API (creates event loop internally)
result = pipe.run()
# Or use async directly
result = await pipe.async_run()
Retry Policies
from pipechain.retry import RetryPolicy
# No retries
RetryPolicy.none()
# Linear (constant delay)
RetryPolicy.linear(max_retries=3, delay=1.0)
# Exponential backoff with jitter
RetryPolicy.exponential(max_retries=5, base_delay=0.5, max_delay=30.0)
# Custom: only retry on specific exceptions
RetryPolicy(
max_retries=3,
base_delay=1.0,
retry_on=(ConnectionError, TimeoutError),
no_retry_on=(ValueError,),
)
Architecture
pipechain/
__init__.py # Public API exports
pipeline.py # Pipeline orchestrator (DAG execution engine)
step.py # Step definition and @step decorator
context.py # Thread-safe shared data store
dag.py # DAG with topological sort and cycle detection
result.py # StepResult and PipelineResult types
retry.py # Retry policies (exponential, linear, custom)
cache.py # Cache backends (memory, file)
hooks.py # Lifecycle hook system
cli.py # Command-line interface
API Reference
Pipeline
| Method | Description |
|---|---|
add_step(step) |
Add a Step or callable to the pipeline |
remove_step(name) |
Remove a step by name |
run(context, dry_run, tags) |
Execute the pipeline synchronously |
async_run(context, dry_run, tags) |
Execute the pipeline asynchronously |
validate() |
Check for dependency errors |
execution_plan() |
Get parallel execution groups |
visualize() |
ASCII DAG visualization |
on(hook_type, callback) |
Register a lifecycle hook |
Step
| Parameter | Type | Description |
|---|---|---|
name |
str |
Unique step identifier |
fn |
Callable |
Function to execute (sync or async) |
depends_on |
list[str] |
Step dependencies |
retry_policy |
RetryPolicy |
Retry configuration |
cache_key |
str |
Cache key for result caching |
cache_ttl |
float |
Cache TTL in seconds |
condition |
Callable |
Conditional execution predicate |
timeout |
float |
Execution timeout in seconds |
tags |
list[str] |
Tags for filtering |
Context
| Method | Description |
|---|---|
get(key, default) |
Get a value |
set(key, value) |
Set a value |
has(key) |
Check if key exists |
delete(key) |
Delete a key |
namespace(prefix) |
Get all values under a prefix |
snapshot() |
Take a snapshot |
rollback(id) |
Restore a snapshot |
merge(dict) |
Merge data into context |
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file jsleekr_pipechain-0.1.0.tar.gz.
File metadata
- Download URL: jsleekr_pipechain-0.1.0.tar.gz
- Upload date:
- Size: 61.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
82fdbfd592c362cc43453b69bd32c759978eff88786f5d2118e9f1aafff1835e
|
|
| MD5 |
2abad004dc26138a576541d95af86954
|
|
| BLAKE2b-256 |
e7d6179cd67f5a1ce1f068fd8e84a77f4ac98351d209eb7a0e9bfcc9b80914dc
|
File details
Details for the file jsleekr_pipechain-0.1.0-py3-none-any.whl.
File metadata
- Download URL: jsleekr_pipechain-0.1.0-py3-none-any.whl
- Upload date:
- Size: 46.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a64175300060f9f4b166382b6db44cab7f3c67166f5641a2cd7baec1acadb54c
|
|
| MD5 |
0d904d703dacc5ed403d21f1ba2a7e8d
|
|
| BLAKE2b-256 |
c201aceb97e36fb60d6238f16a0b3f433f257fd68f823893d68e13d2a1720de4
|