Skip to main content

A flexible data pipeline library for custom data processing workflows

Project description

PipelineHub

License: MIT PyPI version PyPI Downloads CI

Python pipelines with automatic debugging built in.

pip install pipelinehub

The problem

Your pipeline fails at 2am. The stack trace points to step 9. The actual problem happened in step 4 — a column silently changed dtype and nobody caught it until three steps later.

You spend the next four hours adding print statements, re-running on stale data, and guessing. I built this after doing exactly that, more times than I want to count.


What it does

pipelinehub captures your data at every step — shape, nulls, dtypes, statistics — without any configuration. When something breaks, you see what the data looked like going into the failing step, not just which line exploded.

from pipelinehub import DataPipeline

pipeline = DataPipeline(name="ml-pipeline")
pipeline.add_step(clean_data, "clean")
pipeline.add_step(feature_engineer, "features")
pipeline.add_step(normalize, "normalize")

result = pipeline.execute(df)

When a step fails:

PipelineStepError: Step "normalize" (step 3 of 3) failed

Data entering this step:
  type:    dataframe
  shape:   (10420, 8)
  nulls:   col_price: 142
  dtypes:  col_id: object  col_price: float64

Original error: TypeError: unsupported operand type(s) for +: 'float' and 'str'

To replay from this step:
  pipeline.replay_from("normalize", your_data)

Why not an AI assistant?

An AI assistant needs you to notice something is wrong, copy the data, describe the problem, and ask. pipelinehub runs at 3am on a schedule — no human in the loop. It catches silent failures that never throw an exception: null counts that creep up, dtypes that silently change, row counts that drop 60% in one step.


Why not Kedro or Airflow?

Kedro requires a new project structure, a CLI, a data catalog, and YAML configs. Reasonable for a large team running production ML. Overkill if you're preprocessing data in a notebook or a script and just want visibility into what's happening at each step.

Airflow is an orchestrator. It schedules and monitors jobs — it doesn't help you understand what's wrong inside one.

pipelinehub is a library, not a framework. It adds one import and one method call to code you already have.


Quick start

from pipelinehub import DataPipeline

pipeline = (DataPipeline(name="my-pipeline")
    .add_step(lambda x: [i for i in x if i > 0], "filter_positive")
    .add_step(lambda x: [i**2 for i in x], "square")
    .add_step(lambda x: [i/max(x) for i in x], "normalize"))

result = pipeline.execute([-2, -1, 0, 1, 2, 3, 4, 5])

Pass debug=False to skip snapshotting entirely — same behaviour as v0.1, no overhead:

result = pipeline.execute(data, debug=False)

Features

  • Automatic snapshots — shape, dtypes, null counts, and numeric stats captured at every step, zero configuration
  • Rich failure contextPipelineStepError shows exactly what the data looked like entering the failing step
  • Anomaly detection — warns when null counts spike, dtypes change, rows drop more than 50%, or value distributions shift compared to the last run
  • Run history — every run stored locally in .pipelinehub/runs.db, no setup required
  • Run comparisonpipeline.compare_runs() diffs any two runs step by step
  • Replay from any step — re-run forward from any named step without re-running everything before it
  • ph CLI — inspect run history, diff runs, and watch live execution from the terminal
  • Fluent chaining — method chaining works if you prefer that style
  • No external dependencies — stdlib only; pandas, polars, and numpy are detected and profiled if installed
  • Full type hints — works with IDE autocomplete

Snapshot output

When anomalies are detected, pipelinehub prints a summary after execute():

Pipeline completed  ✓  (2.3s)

  Step              Rows              Nulls          Duration
  ──────────────────────────────────────────────────────────
  clean             10500→10420       0→0            0.4s
  feature_engineer  10420→10420       0→142 ⚠        1.1s
  normalize         10420→10420       142→0          0.8s

⚠  col_price nulls introduced in step "feature_engineer" (+142)

Inspect run history

# Last run with all step snapshots
last = pipeline.last_run()

# Recent runs
pipeline.list_runs(limit=5)

# Compare last two runs
pipeline.compare_runs()

# Compare specific runs by ID
pipeline.compare_runs(run_id_a, run_id_b)

Replay from a step

# Fix normalize(), replay forward — skips clean and feature_engineer
result = pipeline.replay_from("normalize", your_data)

ph CLI

pip install pipelinehub also installs the ph command. It reads from .pipelinehub/runs.db in your project — no auth, no setup.

# List recent runs
ph runs list

# Show last completed run
ph runs last

# Step-by-step detail for a specific run
ph runs show <run_id>

# Diff two runs
ph runs diff <run_id_a> <run_id_b>

# Watch a pipeline execute in real time (polls every 1s)
ph runs watch

# Health summary across all pipelines
ph status

# Aggregate stats — success rate, failure count
ph stats

# Check setup
ph doctor

Example output for ph runs list:

  ID         Pipeline               Started                Status       Steps
  ──────────────────────────────────────────────────────────────────────────
  a3f2c1b0   ml-pipeline            2026-06-24 09:12:03    ✅ success    4
  7d91e4a2   ml-pipeline            2026-06-23 22:47:11    ❌ failed     3
  c58b0f31   etl-daily              2026-06-23 06:00:02    ✅ success    6

Examples

Data cleaning

from pipelinehub import DataPipeline

pipeline = (DataPipeline(name="cleaning")
    .add_step(lambda x: [float(i) for i in x if i is not None], "convert")
    .add_step(lambda x: [i for i in x if abs(i - sum(x)/len(x)) < 2.5], "remove_outliers")
    .add_step(lambda x: [(i - min(x)) / (max(x) - min(x)) for i in x], "normalize"))

result = pipeline.execute([1, 2, 3, None, 100, 4, 5, 6, 7, 8, 9])

Text processing

import re
from pipelinehub import DataPipeline

pipeline = (DataPipeline(name="text")
    .add_step(str.lower, "lowercase")
    .add_step(lambda t: re.sub(r'[^a-zA-Z0-9\s]', '', t), "clean")
    .add_step(str.split, "tokenize")
    .add_step(lambda words: sorted(set(w for w in words if len(w) >= 4)), "keywords"))

result = pipeline.execute("Hello World! This is a Sample Text for Processing...")

Pipeline management

pipeline = DataPipeline(name="example")
pipeline.add_step(lambda x: [i*2 for i in x], "double")
pipeline.add_step(lambda x: [i+1 for i in x], "add_one")

print(pipeline.get_steps())   # ['double', 'add_one']
print(len(pipeline))          # 2

pipeline.remove_step(0)
pipeline.clear_steps()

Roadmap

v0.1 ✅ — Fluent pipeline chaining, zero dependencies, verbose mode
v0.2 ✅ — Automatic snapshot engine, rich failure context, run comparison, anomaly detection, replay, ph CLI
v0.3 — Web dashboard for run history and team visibility


Contributing

git checkout -b feature/your-feature
pytest tests/
git commit -m 'Add your feature'
# open a pull request against main

Branch protection is on main — all changes go through a PR.


License

MIT — see LICENSE for details.


Built by Rahul Paul

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipelinehub-0.1.17.tar.gz (36.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipelinehub-0.1.17-py3-none-any.whl (25.5 kB view details)

Uploaded Python 3

File details

Details for the file pipelinehub-0.1.17.tar.gz.

File metadata

  • Download URL: pipelinehub-0.1.17.tar.gz
  • Upload date:
  • Size: 36.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pipelinehub-0.1.17.tar.gz
Algorithm Hash digest
SHA256 b4ac06351a2741d5d1ce4d31a079ac65c70dda89dfe5d18d218c80b99284d663
MD5 417b2a2810b5fbbb35885c75e4eb2811
BLAKE2b-256 ed572cd46cfbceec3d70789687a6b64834a71022513de85dd903746d02388a12

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipelinehub-0.1.17.tar.gz:

Publisher: publish.yml on rahulxj100/pipelinehub

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pipelinehub-0.1.17-py3-none-any.whl.

File metadata

  • Download URL: pipelinehub-0.1.17-py3-none-any.whl
  • Upload date:
  • Size: 25.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pipelinehub-0.1.17-py3-none-any.whl
Algorithm Hash digest
SHA256 0c64b0f6ddfcd8fcb62c51c1d9b035f5d7d82bdda859b83fb34ed6efbbb59659
MD5 58bce21fb0fb73ded4e38494f310daa2
BLAKE2b-256 888d181d732586797a16ee3b34d6ffb96532c85b4d2c11b852092ba33da41bb0

See more details on using hashes here.

Provenance

The following attestation bundles were made for pipelinehub-0.1.17-py3-none-any.whl:

Publisher: publish.yml on rahulxj100/pipelinehub

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page