Skip to main content

A flexible data pipeline library for custom data processing workflows

Project description

PipelineHub

License: MIT PyPI version PyPI Downloads CI

Python pipelines with automatic debugging built in.

pip install pipelinehub

The problem

Your pipeline fails at 2am. The stack trace points to step 9. The actual problem happened in step 4 — a column silently changed dtype and nobody caught it until three steps later.

You spend the next four hours adding print statements, re-running on stale data, and guessing. I built this after doing exactly that, more times than I want to count.


What it does

pipelinehub captures your data at every step — shape, nulls, dtypes, statistics — without any configuration. When something breaks, you see what the data looked like going into the failing step, not just which line exploded.

from pipelinehub import DataPipeline

pipeline = DataPipeline(name="ml-pipeline")
pipeline.add_step(clean_data, "clean")
pipeline.add_step(feature_engineer, "features")
pipeline.add_step(normalize, "normalize")

result = pipeline.execute(df)

When a step fails:

PipelineStepError: Step "normalize" (step 3 of 3) failed

Data entering this step:
  type:    dataframe
  shape:   (10420, 8)
  nulls:   col_price: 142
  dtypes:  col_id: object  col_price: float64

Original error: TypeError: unsupported operand type(s) for +: 'float' and 'str'

To replay from this step:
  pipeline.replay_from("normalize", your_data)

Why not an AI assistant?

An AI assistant needs you to notice something is wrong, copy the data, describe the problem, and ask. pipelinehub runs at 3am on a schedule — no human in the loop. It catches silent failures that never throw an exception: null counts that creep up, dtypes that silently change, row counts that drop 60% in one step.


Why not Kedro or Airflow?

Kedro requires a new project structure, a CLI, a data catalog, and YAML configs. Reasonable for a large team running production ML. Overkill if you're preprocessing data in a notebook or a script and just want visibility into what's happening at each step.

Airflow is an orchestrator. It schedules and monitors jobs — it doesn't help you understand what's wrong inside one.

pipelinehub is a library, not a framework. It adds one import and one method call to code you already have.


Quick start

from pipelinehub import DataPipeline

pipeline = (DataPipeline(name="my-pipeline")
    .add_step(lambda x: [i for i in x if i > 0], "filter_positive")
    .add_step(lambda x: [i**2 for i in x], "square")
    .add_step(lambda x: [i/max(x) for i in x], "normalize"))

result = pipeline.execute([-2, -1, 0, 1, 2, 3, 4, 5])

Pass debug=False to skip snapshotting entirely — same behaviour as v0.1, no overhead:

result = pipeline.execute(data, debug=False)

Features

  • Automatic snapshots — shape, dtypes, null counts, and numeric stats captured at every step, zero configuration
  • Rich failure contextPipelineStepError shows exactly what the data looked like entering the failing step
  • Anomaly detection — warns when null counts spike, dtypes change, rows drop more than 50%, or value distributions shift compared to the last run
  • Run history — every run stored locally in .pipelinehub/runs.db, no setup required
  • Run comparisonpipeline.compare_runs() diffs any two runs step by step
  • Replay from any step — re-run forward from any named step without re-running everything before it
  • ph CLI — inspect run history, diff runs, and watch live execution from the terminal
  • Fluent chaining — method chaining works if you prefer that style
  • No external dependencies — stdlib only; pandas, polars, and numpy are detected and profiled if installed
  • Full type hints — works with IDE autocomplete

Snapshot output

When anomalies are detected, pipelinehub prints a summary after execute():

Pipeline completed  ✓  (2.3s)

  Step              Rows              Nulls          Duration
  ──────────────────────────────────────────────────────────
  clean             10500→10420       0→0            0.4s
  feature_engineer  10420→10420       0→142 ⚠        1.1s
  normalize         10420→10420       142→0          0.8s

⚠  col_price nulls introduced in step "feature_engineer" (+142)

Inspect run history

# Last run with all step snapshots
last = pipeline.last_run()

# Recent runs
pipeline.list_runs(limit=5)

# Compare last two runs
pipeline.compare_runs()

# Compare specific runs by ID
pipeline.compare_runs(run_id_a, run_id_b)

Replay from a step

# Fix normalize(), replay forward — skips clean and feature_engineer
result = pipeline.replay_from("normalize", your_data)

ph CLI

pip install pipelinehub also installs the ph command. It reads from .pipelinehub/runs.db in your project — no auth, no setup.

# List recent runs
ph runs list

# Show last completed run
ph runs last

# Step-by-step detail for a specific run
ph runs show <run_id>

# Diff two runs
ph runs diff <run_id_a> <run_id_b>

# Watch a pipeline execute in real time (polls every 1s)
ph runs watch

# Health summary across all pipelines
ph status

# Aggregate stats — success rate, failure count
ph stats

# Check setup
ph doctor

Example output for ph runs list:

  ID         Pipeline               Started                Status       Steps
  ──────────────────────────────────────────────────────────────────────────
  a3f2c1b0   ml-pipeline            2026-06-24 09:12:03    ✅ success    4
  7d91e4a2   ml-pipeline            2026-06-23 22:47:11    ❌ failed     3
  c58b0f31   etl-daily              2026-06-23 06:00:02    ✅ success    6

Examples

Data cleaning

from pipelinehub import DataPipeline

pipeline = (DataPipeline(name="cleaning")
    .add_step(lambda x: [float(i) for i in x if i is not None], "convert")
    .add_step(lambda x: [i for i in x if abs(i - sum(x)/len(x)) < 2.5], "remove_outliers")
    .add_step(lambda x: [(i - min(x)) / (max(x) - min(x)) for i in x], "normalize"))

result = pipeline.execute([1, 2, 3, None, 100, 4, 5, 6, 7, 8, 9])

Text processing

import re
from pipelinehub import DataPipeline

pipeline = (DataPipeline(name="text")
    .add_step(str.lower, "lowercase")
    .add_step(lambda t: re.sub(r'[^a-zA-Z0-9\s]', '', t), "clean")
    .add_step(str.split, "tokenize")
    .add_step(lambda words: sorted(set(w for w in words if len(w) >= 4)), "keywords"))

result = pipeline.execute("Hello World! This is a Sample Text for Processing...")

Pipeline management

pipeline = DataPipeline(name="example")
pipeline.add_step(lambda x: [i*2 for i in x], "double")
pipeline.add_step(lambda x: [i+1 for i in x], "add_one")

print(pipeline.get_steps())   # ['double', 'add_one']
print(len(pipeline))          # 2

pipeline.remove_step(0)
pipeline.clear_steps()

Roadmap

v0.1 ✅ — Fluent pipeline chaining, zero dependencies, verbose mode
v0.2 ✅ — Automatic snapshot engine, rich failure context, run comparison, anomaly detection, replay, ph CLI
v0.3 — Web dashboard for run history and team visibility


Contributing

git checkout -b feature/your-feature
pytest tests/
git commit -m 'Add your feature'
# open a pull request against main

Branch protection is on main — all changes go through a PR.


License

MIT — see LICENSE for details.


Built by Rahul Paul

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipelinehub-0.1.22.tar.gz (43.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipelinehub-0.1.22-py3-none-any.whl (30.4 kB view details)

Uploaded Python 3

File details

Details for the file pipelinehub-0.1.22.tar.gz.

File metadata

  • Download URL: pipelinehub-0.1.22.tar.gz
  • Upload date:
  • Size: 43.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pipelinehub-0.1.22.tar.gz
Algorithm Hash digest
SHA256 3a0b35dd7f40987449fc58b53300d1d87b3302d61311286c0cd74b6b16ad26e6
MD5 f22fca1b7312c67a60fec6ab1bf1b0ad
BLAKE2b-256 8b5e68e83474f60e620b04a3e9ba81e93663692988b7df1c070765039717826f

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipelinehub-0.1.22.tar.gz:

Publisher: publish.yml on rahulxj100/pipelinehub

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pipelinehub-0.1.22-py3-none-any.whl.

File metadata

  • Download URL: pipelinehub-0.1.22-py3-none-any.whl
  • Upload date:
  • Size: 30.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pipelinehub-0.1.22-py3-none-any.whl
Algorithm Hash digest
SHA256 0ff842d0d3ff678b415bcc1918004b295da3ea79ad61bf85872e0dd27a8eb0c7
MD5 ab1decdb1e70985d6eb992bac7036409
BLAKE2b-256 e8ee1846168dada88af50dd49c2190a34046b0f66ceaa73a19e45924e7179450

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipelinehub-0.1.22-py3-none-any.whl:

Publisher: publish.yml on rahulxj100/pipelinehub

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page