A hierarchical, modular pipeline system for ML/AI workflows
Project description
Hierarchical, Modular Data Pipelines for AI/ML
Core Pattern • Caching • Daft • Docs • Install
Why HyperNodes?
Data pipelines are run more than written — you execute the same code on the same data hundreds of times during development. Every run costs time and money.
HyperNodes solves this with two core ideas:
| 🎯 Think Singular | Write logic for one item. No batch loops. Clean, testable code. |
| 📦 Smart Caching | Only compute what changed. Resume from where you left off. |
✨ The Three-Step Pattern
1️⃣ Think Singular
Write functions that process one item. No batch loops, no complexity — just clean, testable code.
from hypernodes import Pipeline, node
@node(output_name="cleaned")
def clean(text: str) -> str:
return text.strip().lower()
@node(output_name="word_count")
def count(cleaned: str) -> int:
return len(cleaned.split())
# Create a named pipeline
text_processor = Pipeline(nodes=[clean, count], name="text_processor")
result = text_processor.run(inputs={"text": " Hello World "})
# {'cleaned': 'hello world', 'word_count': 2}
The text_processor pipeline
2️⃣ Compose
Pipelines are nodes. Use text_processor inside a larger workflow — it appears as a single unit.
@node(output_name="report")
def summarize(word_count: int, cleaned: str) -> str:
return f"Processed: {word_count} words"
# text_processor becomes a node in the analysis pipeline
analysis = Pipeline(nodes=[text_processor.as_node(), summarize], name="analysis")
Collapsed view — text_processor as a single node
Expand to see the internals:
Expanded view — see inside text_processor
3️⃣ Scale
Process many items with .map(). The same single-item logic runs over a collection.
run() vs map() at a glance
Single-item execution versus per-item fan-out.
Pipeline.run() |
Pipeline.map() |
|---|---|
| Accepts a single input item and executes the DAG once. | Accepts lists or iterables for the keys listed in map_over. |
| Returns a single dict of outputs. | Returns a list of dicts (one per item). |
| Cache signatures cover the full set of inputs for that run. | Cache signatures are computed per item, enabling incremental recompute. |
# Process multiple texts
results = text_processor.map(
inputs={"text": ["Hello", "World", "Foo Bar"]},
map_over="text"
)
# [{'cleaned': 'hello', 'word_count': 1},
# {'cleaned': 'world', 'word_count': 1},
# {'cleaned': 'foo bar', 'word_count': 2}]
Or wrap it as a batch processor in a larger pipeline:
batch_processor = text_processor.as_node(
name="batch_text_processor",
map_over="texts",
input_mapping={"texts": "text"},
output_mapping={"word_count": "word_counts"}
)
Batch processing with map_over
📦 Smart Caching
HyperNodes implements a content-addressable cache that keeps runs fast and correct.
| Scenario | Without Cache | Naive Cache | HyperNodes Cache |
|---|---|---|---|
| Process 10 items (first time) | 10s | 10s | 10s |
| Process 10 items (again) | 10s | Instant | Instant |
| Process 15 items (10 old + 5 new) |
15s | 15s (Full re-run) | 5s (Only new items) |
| Drop or edit 1 item | 14s | 14s (Full re-run) | 1s (Only changed item) |
How it works
It hashes:
- Function signature (source code, closures, defaults)
- Input values (per item in
.map()) - Dependencies (upstream node signatures)
Only nodes whose hashes changed will execute.
Enable caching in 3 steps
- Pick a backend –
DiskCacheships by default; swap in Redis/S3 later. - Attach it to an engine –
SeqEngine(cache=DiskCache(path=".cache")). - Run pipelines as usual –
pipeline.run(...)orpipeline.map(...)automatically uses the cache.
from hypernodes import Pipeline, DiskCache, SeqEngine
engine = SeqEngine(cache=DiskCache(path=".cache"))
pipeline = Pipeline(nodes=[clean, count], engine=engine)
pipeline.run(inputs={"text": "hello"}) # warm cache
Incremental map runs (per-item signatures)
Caching shines when datasets change gradually. Each item inside .map() gets its own signature, so adding a single record does not blow away the cache for the entire batch.
from hypernodes import Pipeline, SeqEngine, DiskCache, node
@node(output_name="features")
def featurize(text: str) -> dict[str, int]:
print(f"computing -> {text}")
return {"len": len(text)}
engine = SeqEngine(cache=DiskCache(path=".cache"))
featurizer = Pipeline(nodes=[featurize], engine=engine)
# 1) Initial batch – every item is a miss
featurizer.map(inputs={"text": ["alpha", "beta"]}, map_over="text")
# 2) Append one more record – only "delta" recomputes
featurizer.map(inputs={"text": ["alpha", "beta", "delta"]}, map_over="text")
Tip: Visualize before long runs. Cached (grey) vs running (colored) nodes stand out in the HTML/SVG artifact.
featurizer.visualize(
filename="outputs/cache_walkthrough.svg",
engine="graphviz",
separate_outputs=True,
depth=None,
)
Node-level reuse across complex DAGs
Because caching happens per node, you can pause/resume multi-stage DAGs without re-running upstream work. Here’s a more realistic extraction → embedding pipeline:
@node(output_name="cleaned")
def clean(text: str) -> str:
return text.strip().lower()
@node(output_name="embedding")
def embed(cleaned: str, model: str) -> list[float]:
return encoder[model](cleaned) # Expensive!
@node(output_name="doc")
def package(cleaned: str, embedding: list[float]) -> dict:
return {"text": cleaned, "vector": embedding}
pipe = Pipeline(nodes=[clean, embed, package], engine=SeqEngine(cache=DiskCache(".cache")))
pipe.run(inputs={"text": "Hello", "model": "v1"})
# Change only the final step; upstream nodes stay cached
@node(output_name="doc")
def package(cleaned: str, embedding: list[float]) -> dict:
return {"text": cleaned, "vector": embedding, "v": 2}
pipe.run(inputs={"text": "Hello", "model": "v1"})
# clean() and embed() hit cache, only package() reruns
| Change | Re-run cost without cache | With HyperNodes |
|---|---|---|
| Update final formatting node | All nodes re-execute | Only the last node runs |
Swap to model_name="large" |
All nodes | Only embed + dependents |
Reuse clean output in another pipeline |
Duplicate compute | Cache hit across pipelines (same node signature) |
Use pipeline.engine.cache.inspect(signature) (or custom cache utilities) if you ever want to programmatically verify whether a node is cached before triggering a heavy run.
Code-aware invalidations (no stale runs)
The cache is just as smart about code changes. HyperNodes hashes the function body and anything it touches (imports, closures, default arguments). If a helper changes, the dependent node automatically invalidates.
# helpers.py
def normalize(text):
return text.lower()
@node(output_name="clean")
def clean(text: str) -> str:
return normalize(text)
pipe = Pipeline(nodes=[clean], engine=SeqEngine(cache=DiskCache(".cache")))
pipe.run(inputs={"text": "Hello"})
# Change helpers.py -> normalize()
def normalize(text):
return text.lower().strip()
pipe.run(inputs={"text": "Hello"}) # cache miss – dependency changed!
Default arguments, environment variables, and closure state are also part of the hash.
| Scenario | Traditional Cache | HyperNodes |
|---|---|---|
| Function body edited | Might stay cached → stale | Auto-invalidated |
| Helper/import updated | Usually ignored | Auto-invalidated |
| Default arg / env var changed | Often missed | Captured in signature |
| Closure state mutated | Rarely detected | Captured in signature |
Need to know why a node reran? Enable LOG_HYPERNODES_CACHE=1 (env var) to log cache hits/misses, or plug in a callback that records on_node_cached events for your observability system.
🚀 Scale with Daft
When you're ready for distributed execution, plug in Daft — same pipeline code, now runs on a cluster.
from hypernodes.engines import DaftEngine
engine = DaftEngine(use_batch_udf=True)
pipeline = Pipeline(nodes=[clean, count], engine=engine)
# Auto-batched, distributed, with per-item caching
results = pipeline.map(inputs={"text": texts * 10000}, map_over="text")
Daft provides:
- Distributed execution — laptop to cluster, no code changes
- Auto-batching — optimal batch sizes computed automatically
- Per-item caching — even in distributed mode, each item is cached independently
💡 Inspiration
HyperNodes stands on the shoulders of giants. It was inspired by and grew from working with:
- Pipefunc: For its elegant approach to function composition.
- Apache Hamilton: For its paradigm of defining dataflows using standard Python functions.
HyperNodes aims to bring native support for hierarchical pipelines and advanced caching to this ecosystem.
📚 Documentation
The full documentation is available in the docs/ directory:
- Introduction & Quick Start
- Core Concepts
- Data Processing
- Scaling & Optimization
- Observability
🚀 Installation
pip install hypernodes
# or with uv
uv add hypernodes
📄 License
MIT License - see LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hypernodes-0.4.8.tar.gz.
File metadata
- Download URL: hypernodes-0.4.8.tar.gz
- Upload date:
- Size: 681.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
40ad62ba3a40126074dd405636ba08e33496e497d24c0741127f27e69b7ed60f
|
|
| MD5 |
4b5cefb54f809221d65b05c80eb503a6
|
|
| BLAKE2b-256 |
c7a2fc6db177f02d39694192bb4a8ba5412dac885a7e71e53e0773ed230ee783
|
File details
Details for the file hypernodes-0.4.8-py3-none-any.whl.
File metadata
- Download URL: hypernodes-0.4.8-py3-none-any.whl
- Upload date:
- Size: 703.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8e70a5e69de9e7e7cf9c11e09830c4177d15eff63246532da2c14c8d3eea9812
|
|
| MD5 |
3c36e32385214acf9d1f4d827e338550
|
|
| BLAKE2b-256 |
da29e1e6c4563f3f9d036d56bee6f7a66d43b6a9ff1000c559fe60efd89334c3
|