A flexible data pipeline library for custom data processing workflows
Project description
PipelineHub
Python pipelines with automatic debugging built in.
pip install pipelinehub
The problem
Your pipeline fails at 2am. The stack trace points to step 9. The actual problem happened in step 4 — a column silently changed dtype and nobody caught it until three steps later.
You spend the next four hours adding print statements, re-running on stale data, and guessing. I built this after doing exactly that, more times than I want to count.
What it does
pipelinehub captures your data at every step — shape, nulls, dtypes, statistics — without any configuration. When something breaks, you see what the data looked like going into the failing step, not just which line exploded.
from pipelinehub import DataPipeline
pipeline = DataPipeline(name="ml-pipeline")
pipeline.add_step(clean_data, "clean")
pipeline.add_step(feature_engineer, "features")
pipeline.add_step(normalize, "normalize")
result = pipeline.execute(df)
When a step fails:
PipelineStepError: Step "normalize" (step 3 of 3) failed
Data entering this step:
type: dataframe
shape: (10420, 8)
nulls: col_price: 142
dtypes: col_id: object col_price: float64
Original error: TypeError: unsupported operand type(s) for +: 'float' and 'str'
To replay from this step:
pipeline.replay_from("normalize", your_data)
Why not an AI assistant?
An AI assistant needs you to notice something is wrong, copy the data, describe the problem, and ask. pipelinehub runs at 3am on a schedule — no human in the loop. It catches silent failures that never throw an exception: null counts that creep up, dtypes that silently change, row counts that drop 60% in one step.
Why not Kedro or Airflow?
Kedro requires a new project structure, a CLI, a data catalog, and YAML configs. Reasonable for a large team running production ML. Overkill if you're preprocessing data in a notebook or a script and just want visibility into what's happening at each step.
Airflow is an orchestrator. It schedules and monitors jobs — it doesn't help you understand what's wrong inside one.
pipelinehub is a library, not a framework. It adds one import and one method call to code you already have.
Quick start
from pipelinehub import DataPipeline
pipeline = (DataPipeline(name="my-pipeline")
.add_step(lambda x: [i for i in x if i > 0], "filter_positive")
.add_step(lambda x: [i**2 for i in x], "square")
.add_step(lambda x: [i/max(x) for i in x], "normalize"))
result = pipeline.execute([-2, -1, 0, 1, 2, 3, 4, 5])
Pass debug=False to skip snapshotting entirely — same behaviour as v0.1, no overhead:
result = pipeline.execute(data, debug=False)
Features
- Automatic snapshots — shape, dtypes, null counts, and numeric stats captured at every step, zero configuration
- Rich failure context —
PipelineStepErrorshows exactly what the data looked like entering the failing step - Anomaly detection — warns when null counts spike, dtypes change, rows drop more than 50%, or value distributions shift compared to the last run
- Run history — every run stored locally in
.pipelinehub/runs.db, no setup required - Run comparison —
pipeline.compare_runs()diffs any two runs step by step - Replay from any step — re-run forward from any named step without re-running everything before it
phCLI — inspect run history, diff runs, and watch live execution from the terminal- Fluent chaining — method chaining works if you prefer that style
- No external dependencies — stdlib only; pandas, polars, and numpy are detected and profiled if installed
- Full type hints — works with IDE autocomplete
Snapshot output
When anomalies are detected, pipelinehub prints a summary after execute():
Pipeline completed ✓ (2.3s)
Step Rows Nulls Duration
──────────────────────────────────────────────────────────
clean 10500→10420 0→0 0.4s
feature_engineer 10420→10420 0→142 ⚠ 1.1s
normalize 10420→10420 142→0 0.8s
⚠ col_price nulls introduced in step "feature_engineer" (+142)
Inspect run history
# Last run with all step snapshots
last = pipeline.last_run()
# Recent runs
pipeline.list_runs(limit=5)
# Compare last two runs
pipeline.compare_runs()
# Compare specific runs by ID
pipeline.compare_runs(run_id_a, run_id_b)
Replay from a step
# Fix normalize(), replay forward — skips clean and feature_engineer
result = pipeline.replay_from("normalize", your_data)
ph CLI
pip install pipelinehub also installs the ph command. It reads from .pipelinehub/runs.db in your project — no auth, no setup.
# List recent runs
ph runs list
# Show last completed run
ph runs last
# Step-by-step detail for a specific run
ph runs show <run_id>
# Diff two runs
ph runs diff <run_id_a> <run_id_b>
# Watch a pipeline execute in real time (polls every 1s)
ph runs watch
# Health summary across all pipelines
ph status
# Aggregate stats — success rate, failure count
ph stats
# Check setup
ph doctor
Example output for ph runs list:
ID Pipeline Started Status Steps
──────────────────────────────────────────────────────────────────────────
a3f2c1b0 ml-pipeline 2026-06-24 09:12:03 ✅ success 4
7d91e4a2 ml-pipeline 2026-06-23 22:47:11 ❌ failed 3
c58b0f31 etl-daily 2026-06-23 06:00:02 ✅ success 6
Examples
Data cleaning
from pipelinehub import DataPipeline
pipeline = (DataPipeline(name="cleaning")
.add_step(lambda x: [float(i) for i in x if i is not None], "convert")
.add_step(lambda x: [i for i in x if abs(i - sum(x)/len(x)) < 2.5], "remove_outliers")
.add_step(lambda x: [(i - min(x)) / (max(x) - min(x)) for i in x], "normalize"))
result = pipeline.execute([1, 2, 3, None, 100, 4, 5, 6, 7, 8, 9])
Text processing
import re
from pipelinehub import DataPipeline
pipeline = (DataPipeline(name="text")
.add_step(str.lower, "lowercase")
.add_step(lambda t: re.sub(r'[^a-zA-Z0-9\s]', '', t), "clean")
.add_step(str.split, "tokenize")
.add_step(lambda words: sorted(set(w for w in words if len(w) >= 4)), "keywords"))
result = pipeline.execute("Hello World! This is a Sample Text for Processing...")
Pipeline management
pipeline = DataPipeline(name="example")
pipeline.add_step(lambda x: [i*2 for i in x], "double")
pipeline.add_step(lambda x: [i+1 for i in x], "add_one")
print(pipeline.get_steps()) # ['double', 'add_one']
print(len(pipeline)) # 2
pipeline.remove_step(0)
pipeline.clear_steps()
Roadmap
v0.1 ✅ — Fluent pipeline chaining, zero dependencies, verbose mode
v0.2 ✅ — Automatic snapshot engine, rich failure context, run comparison, anomaly detection, replay, ph CLI
v0.3 — Web dashboard for run history and team visibility
Contributing
git checkout -b feature/your-feature
pytest tests/
git commit -m 'Add your feature'
# open a pull request against main
Branch protection is on main — all changes go through a PR.
License
MIT — see LICENSE for details.
Built by Rahul Paul
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pipelinehub-0.1.11.tar.gz.
File metadata
- Download URL: pipelinehub-0.1.11.tar.gz
- Upload date:
- Size: 33.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e8305c308ff2276dbcdb7ba545e6e45b90067c263a65d566a574be4ac4a2a0f7
|
|
| MD5 |
cb6bb3a162ea6ca9e6f086d7fd749327
|
|
| BLAKE2b-256 |
d210f4e130bd0d0a8725ed03e75eaca54a5390fc5b93c249e7194524e097af8e
|
Provenance
The following attestation bundles were made for pipelinehub-0.1.11.tar.gz:
Publisher:
publish.yml on rahulxj100/pipelinehub
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pipelinehub-0.1.11.tar.gz -
Subject digest:
e8305c308ff2276dbcdb7ba545e6e45b90067c263a65d566a574be4ac4a2a0f7 - Sigstore transparency entry: 1952667751
- Sigstore integration time:
-
Permalink:
rahulxj100/pipelinehub@15a34423cf3633060cf1c041686c1e63f02e4537 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/rahulxj100
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@15a34423cf3633060cf1c041686c1e63f02e4537 -
Trigger Event:
push
-
Statement type:
File details
Details for the file pipelinehub-0.1.11-py3-none-any.whl.
File metadata
- Download URL: pipelinehub-0.1.11-py3-none-any.whl
- Upload date:
- Size: 23.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e78897d3f27dc348ac2ddd6d61883cae1cd9886fd204d060544fab1fdad81885
|
|
| MD5 |
d0c1c2a35295ecb943d7b9b6267871be
|
|
| BLAKE2b-256 |
417a39e17acf0224074cb4d9b91e60c911ca470060409e08f3263ba89d3988c5
|
Provenance
The following attestation bundles were made for pipelinehub-0.1.11-py3-none-any.whl:
Publisher:
publish.yml on rahulxj100/pipelinehub
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pipelinehub-0.1.11-py3-none-any.whl -
Subject digest:
e78897d3f27dc348ac2ddd6d61883cae1cd9886fd204d060544fab1fdad81885 - Sigstore transparency entry: 1952668166
- Sigstore integration time:
-
Permalink:
rahulxj100/pipelinehub@15a34423cf3633060cf1c041686c1e63f02e4537 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/rahulxj100
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@15a34423cf3633060cf1c041686c1e63f02e4537 -
Trigger Event:
push
-
Statement type: