Skip to main content

Lightweight DAG-based data pipeline runner for local development

Project description

:link: pipechain

Lightweight DAG-based data pipeline runner for local development

Stars License Python Tests


Define data pipelines as Python functions. Get dependency resolution, parallel execution, retries, and caching -- without Airflow overhead.


Why This Exists

Data pipelines are everywhere, but the tooling is either massive (Airflow, Prefect, Dagster) or nonexistent (raw scripts with nested try/except).

pipechain fills the gap: a zero-dependency, pure-Python pipeline runner that gives you DAG-based execution, parallel steps, retry policies, result caching, and lifecycle hooks -- all in a library you can pip install and use in 5 lines of code.

No scheduler. No database. No Docker. Just Python functions wired into a DAG.


Features

  • DAG-based execution -- automatic dependency resolution with cycle detection
  • Parallel execution -- independent steps run concurrently with configurable concurrency limits
  • Retry policies -- exponential backoff, linear, jitter, per-exception filtering
  • Result caching -- in-memory or file-based, with TTL support
  • Conditional steps -- skip steps based on runtime context
  • Lifecycle hooks -- before/after pipeline, before/after step, on error, on retry, on skip, on cache hit
  • Tag filtering -- run subsets of steps by tag
  • Dry-run mode -- validate the plan without executing
  • Shared context -- thread-safe data store with namespaces and snapshot/rollback
  • CLI interface -- run, validate, visualize, and plan pipelines from the command line
  • Type-safe -- full type annotations throughout

Installation

pip install pipechain

Quick Start

from pipechain import Pipeline, Step, Context

# Define steps as plain functions
def extract(ctx: Context):
    """Load raw data."""
    return {"users": [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]}

def transform(ctx: Context):
    """Filter and enrich data."""
    data = ctx.get("extract.output")
    return [u for u in data["users"] if u["age"] >= 28]

def load(ctx: Context):
    """Save results."""
    users = ctx.get("transform.output")
    ctx.set("loaded_count", len(users))
    return f"Loaded {len(users)} users"

# Build the pipeline
pipe = Pipeline("etl_example")
pipe.add_step(Step("extract", extract))
pipe.add_step(Step("transform", transform, depends_on=["extract"]))
pipe.add_step(Step("load", load, depends_on=["transform"]))

# Run it
result = pipe.run()
print(result.summary())

Using the Decorator API

from pipechain import Pipeline, step, Context
from pipechain.retry import RetryPolicy

pipe = Pipeline("decorated_etl")

@step(name="fetch", retry=RetryPolicy.exponential(max_retries=3, base_delay=0.5))
def fetch_data(ctx: Context):
    """Fetch data from API with automatic retries."""
    return {"items": list(range(100))}

@step(name="process", depends_on=["fetch"], tags=["cpu"])
def process_data(ctx: Context):
    """Process the fetched data."""
    items = ctx.get("fetch.output")["items"]
    return [x * 2 for x in items]

@step(name="save", depends_on=["process"], cache_key="save_v1", cache_ttl=3600)
def save_results(ctx: Context):
    """Save with 1-hour cache."""
    return len(ctx.get("process.output"))

pipe.add_step(fetch_data)
pipe.add_step(process_data)
pipe.add_step(save_results)

result = pipe.run()

Parallel Execution

Steps without dependencies on each other execute in parallel automatically:

pipe = Pipeline("parallel_demo", max_concurrency=4)

pipe.add_step(Step("fetch_users", fetch_users))
pipe.add_step(Step("fetch_orders", fetch_orders))
pipe.add_step(Step("fetch_products", fetch_products))
pipe.add_step(Step("merge", merge_all, depends_on=["fetch_users", "fetch_orders", "fetch_products"]))

# fetch_users, fetch_orders, fetch_products run in parallel
# merge waits for all three to complete
print(pipe.visualize())

Output:

Pipeline: parallel_demo

  Level 0:
    [fetch_orders]
    [fetch_products]
    [fetch_users]
      |
      v
  Level 1:
    [merge] <- [fetch_users, fetch_orders, fetch_products]

Conditional Steps

pipe.add_step(Step(
    "send_alerts",
    send_alerts,
    depends_on=["analyze"],
    condition=lambda ctx: ctx.get("analyze.output", {}).get("has_anomalies", False),
))

Lifecycle Hooks

from pipechain.hooks import HookType

pipe.on(HookType.BEFORE_PIPELINE, lambda **kw: print("Starting pipeline..."))
pipe.on(HookType.AFTER_STEP, lambda **kw: print(f"Completed: {kw['step'].name}"))
pipe.on(HookType.ON_STEP_ERROR, lambda **kw: log_error(kw['error']))
pipe.on(HookType.ON_STEP_RETRY, lambda **kw: print(f"Retrying {kw['step'].name}, attempt {kw['attempt']}"))

Caching

from pipechain.cache import MemoryCache, FileCache

# In-memory cache (fast, lost on restart)
pipe = Pipeline("cached", cache=MemoryCache())

# File-based cache (persists across runs)
pipe = Pipeline("cached", cache=FileCache(".pipeline_cache"))

# Per-step cache keys and TTL
pipe.add_step(Step("expensive", compute, cache_key="v1", cache_ttl=3600))

Context (Shared Data Store)

from pipechain import Context

ctx = Context({"env": "production"})

# Steps read/write through context
ctx.set("step1.output", {"data": [1, 2, 3]})
ctx.get("step1.output")  # {"data": [1, 2, 3]}

# Namespaces
ctx.namespace("step1")  # {"output": {"data": [1, 2, 3]}}

# Snapshot & rollback
snap = ctx.snapshot()
ctx.set("temp", "value")
ctx.rollback(snap)  # Restores previous state

CLI Usage

# Run a pipeline
pipechain run pipeline.py

# Dry run (validate without executing)
pipechain run pipeline.py --dry-run

# JSON output
pipechain run pipeline.py --json

# Filter by tags
pipechain run pipeline.py --tags etl critical

# Validate a pipeline file
pipechain validate pipeline.py

# Show execution plan
pipechain plan pipeline.py

# Visualize the DAG
pipechain visualize pipeline.py

Async Support

import asyncio
from pipechain import Pipeline, Step

async def async_fetch(ctx):
    await asyncio.sleep(0.1)  # Simulate I/O
    return {"data": "fetched"}

pipe = Pipeline("async_demo")
pipe.add_step(Step("fetch", async_fetch))

# Sync API (creates event loop internally)
result = pipe.run()

# Or use async directly
result = await pipe.async_run()

Retry Policies

from pipechain.retry import RetryPolicy

# No retries
RetryPolicy.none()

# Linear (constant delay)
RetryPolicy.linear(max_retries=3, delay=1.0)

# Exponential backoff with jitter
RetryPolicy.exponential(max_retries=5, base_delay=0.5, max_delay=30.0)

# Custom: only retry on specific exceptions
RetryPolicy(
    max_retries=3,
    base_delay=1.0,
    retry_on=(ConnectionError, TimeoutError),
    no_retry_on=(ValueError,),
)

Architecture

pipechain/
  __init__.py       # Public API exports
  pipeline.py       # Pipeline orchestrator (DAG execution engine)
  step.py           # Step definition and @step decorator
  context.py        # Thread-safe shared data store
  dag.py            # DAG with topological sort and cycle detection
  result.py         # StepResult and PipelineResult types
  retry.py          # Retry policies (exponential, linear, custom)
  cache.py          # Cache backends (memory, file)
  hooks.py          # Lifecycle hook system
  cli.py            # Command-line interface

API Reference

Pipeline

Method Description
add_step(step) Add a Step or callable to the pipeline
remove_step(name) Remove a step by name
run(context, dry_run, tags) Execute the pipeline synchronously
async_run(context, dry_run, tags) Execute the pipeline asynchronously
validate() Check for dependency errors
execution_plan() Get parallel execution groups
visualize() ASCII DAG visualization
on(hook_type, callback) Register a lifecycle hook

Step

Parameter Type Description
name str Unique step identifier
fn Callable Function to execute (sync or async)
depends_on list[str] Step dependencies
retry_policy RetryPolicy Retry configuration
cache_key str Cache key for result caching
cache_ttl float Cache TTL in seconds
condition Callable Conditional execution predicate
timeout float Execution timeout in seconds
tags list[str] Tags for filtering

Context

Method Description
get(key, default) Get a value
set(key, value) Set a value
has(key) Check if key exists
delete(key) Delete a key
namespace(prefix) Get all values under a prefix
snapshot() Take a snapshot
rollback(id) Restore a snapshot
merge(dict) Merge data into context

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

jsleekr_pipechain-0.1.0.tar.gz (61.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

jsleekr_pipechain-0.1.0-py3-none-any.whl (46.7 kB view details)

Uploaded Python 3

File details

Details for the file jsleekr_pipechain-0.1.0.tar.gz.

File metadata

  • Download URL: jsleekr_pipechain-0.1.0.tar.gz
  • Upload date:
  • Size: 61.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for jsleekr_pipechain-0.1.0.tar.gz
Algorithm Hash digest
SHA256 82fdbfd592c362cc43453b69bd32c759978eff88786f5d2118e9f1aafff1835e
MD5 2abad004dc26138a576541d95af86954
BLAKE2b-256 e7d6179cd67f5a1ce1f068fd8e84a77f4ac98351d209eb7a0e9bfcc9b80914dc

See more details on using hashes here.

File details

Details for the file jsleekr_pipechain-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for jsleekr_pipechain-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a64175300060f9f4b166382b6db44cab7f3c67166f5641a2cd7baec1acadb54c
MD5 0d904d703dacc5ed403d21f1ba2a7e8d
BLAKE2b-256 c201aceb97e36fb60d6238f16a0b3f433f257fd68f823893d68e13d2a1720de4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page