Skip to main content

A hierarchical, modular pipeline system for ML/AI workflows

Project description

hypernodes

Hierarchical, Modular Data Pipelines for AI/ML

Core PatternCachingDaftDocsInstall


Why HyperNodes?

Data pipelines are run more than written — you execute the same code on the same data hundreds of times during development. Every run costs time and money.

HyperNodes solves this with two core ideas:

🎯 Think Singular Write logic for one item. No batch loops. Clean, testable code.
📦 Smart Caching Only compute what changed. Resume from where you left off.

✨ The Three-Step Pattern

1️⃣ Think Singular

Write functions that process one item. No batch loops, no complexity — just clean, testable code.

from hypernodes import Pipeline, node

@node(output_name="cleaned")
def clean(text: str) -> str:
    return text.strip().lower()

@node(output_name="word_count")
def count(cleaned: str) -> int:
    return len(cleaned.split())

# Create a named pipeline
text_processor = Pipeline(nodes=[clean, count], name="text_processor")

result = text_processor.run(inputs={"text": "  Hello World  "})
# {'cleaned': 'hello world', 'word_count': 2}


The text_processor pipeline


2️⃣ Compose

Pipelines are nodes. Use text_processor inside a larger workflow — it appears as a single unit.

@node(output_name="report")
def summarize(word_count: int, cleaned: str) -> str:
    return f"Processed: {word_count} words"

# text_processor becomes a node in the analysis pipeline
analysis = Pipeline(nodes=[text_processor.as_node(), summarize], name="analysis")


Collapsed view — text_processor as a single node

Expand to see the internals:


Expanded view — see inside text_processor


3️⃣ Scale

Process many items with .map(). The same single-item logic runs over a collection.

run() vs map() at a glance


Single-item execution versus per-item fan-out.

Pipeline.run() Pipeline.map()
Accepts a single input item and executes the DAG once. Accepts lists or iterables for the keys listed in map_over.
Returns a single dict of outputs. Returns a list of dicts (one per item).
Cache signatures cover the full set of inputs for that run. Cache signatures are computed per item, enabling incremental recompute.
# Process multiple texts
results = text_processor.map(
    inputs={"text": ["Hello", "World", "Foo Bar"]},
    map_over="text"
)
# [{'cleaned': 'hello', 'word_count': 1}, 
#  {'cleaned': 'world', 'word_count': 1},
#  {'cleaned': 'foo bar', 'word_count': 2}]

Or wrap it as a batch processor in a larger pipeline:

batch_processor = text_processor.as_node(
    name="batch_text_processor",
    map_over="texts",
    input_mapping={"texts": "text"},
    output_mapping={"word_count": "word_counts"}
)


Batch processing with map_over

📦 Smart Caching

HyperNodes implements a content-addressable cache that keeps runs fast and correct.

Scenario Without Cache Naive Cache HyperNodes Cache
Process 10 items (first time) 10s 10s 10s
Process 10 items (again) 10s Instant Instant
Process 15 items
(10 old + 5 new)
15s 15s (Full re-run) 5s (Only new items)
Drop or edit 1 item 14s 14s (Full re-run) 1s (Only changed item)

How it works

It hashes:

  • Function signature (source code, closures, defaults)
  • Input values (per item in .map())
  • Dependencies (upstream node signatures)

Only nodes whose hashes changed will execute.

Enable caching in 3 steps

  1. Pick a backendDiskCache ships by default; swap in Redis/S3 later.
  2. Attach it to an engineSeqEngine(cache=DiskCache(path=".cache")).
  3. Run pipelines as usualpipeline.run(...) or pipeline.map(...) automatically uses the cache.
from hypernodes import Pipeline, DiskCache, SeqEngine

engine = SeqEngine(cache=DiskCache(path=".cache"))
pipeline = Pipeline(nodes=[clean, count], engine=engine)
pipeline.run(inputs={"text": "hello"})  # warm cache

Incremental map runs (per-item signatures)

Caching shines when datasets change gradually. Each item inside .map() gets its own signature, so adding a single record does not blow away the cache for the entire batch.

from hypernodes import Pipeline, SeqEngine, DiskCache, node

@node(output_name="features")
def featurize(text: str) -> dict[str, int]:
    print(f"computing -> {text}")
    return {"len": len(text)}

engine = SeqEngine(cache=DiskCache(path=".cache"))
featurizer = Pipeline(nodes=[featurize], engine=engine)

# 1) Initial batch – every item is a miss
featurizer.map(inputs={"text": ["alpha", "beta"]}, map_over="text")

# 2) Append one more record – only "delta" recomputes
featurizer.map(inputs={"text": ["alpha", "beta", "delta"]}, map_over="text")

Tip: Visualize before long runs. Cached (grey) vs running (colored) nodes stand out in the HTML/SVG artifact.

featurizer.visualize(
    filename="outputs/cache_walkthrough.svg",
    engine="graphviz",
    separate_outputs=True,
    depth=None,
)

Node-level reuse across complex DAGs

Because caching happens per node, you can pause/resume multi-stage DAGs without re-running upstream work. Here’s a more realistic extraction → embedding pipeline:

@node(output_name="cleaned")
def clean(text: str) -> str:
    return text.strip().lower()

@node(output_name="embedding")
def embed(cleaned: str, model: str) -> list[float]:
    return encoder[model](cleaned)  # Expensive!

@node(output_name="doc")
def package(cleaned: str, embedding: list[float]) -> dict:
    return {"text": cleaned, "vector": embedding}

pipe = Pipeline(nodes=[clean, embed, package], engine=SeqEngine(cache=DiskCache(".cache")))
pipe.run(inputs={"text": "Hello", "model": "v1"})

# Change only the final step; upstream nodes stay cached
@node(output_name="doc")
def package(cleaned: str, embedding: list[float]) -> dict:
    return {"text": cleaned, "vector": embedding, "v": 2}

pipe.run(inputs={"text": "Hello", "model": "v1"})
# clean() and embed() hit cache, only package() reruns
Change Re-run cost without cache With HyperNodes
Update final formatting node All nodes re-execute Only the last node runs
Swap to model_name="large" All nodes Only embed + dependents
Reuse clean output in another pipeline Duplicate compute Cache hit across pipelines (same node signature)

Use pipeline.engine.cache.inspect(signature) (or custom cache utilities) if you ever want to programmatically verify whether a node is cached before triggering a heavy run.

Code-aware invalidations (no stale runs)

The cache is just as smart about code changes. HyperNodes hashes the function body and anything it touches (imports, closures, default arguments). If a helper changes, the dependent node automatically invalidates.

# helpers.py
def normalize(text):
    return text.lower()

@node(output_name="clean")
def clean(text: str) -> str:
    return normalize(text)

pipe = Pipeline(nodes=[clean], engine=SeqEngine(cache=DiskCache(".cache")))
pipe.run(inputs={"text": "Hello"})

# Change helpers.py -> normalize()
def normalize(text):
    return text.lower().strip()

pipe.run(inputs={"text": "Hello"})  # cache miss – dependency changed!

Default arguments, environment variables, and closure state are also part of the hash.

Scenario Traditional Cache HyperNodes
Function body edited Might stay cached → stale Auto-invalidated
Helper/import updated Usually ignored Auto-invalidated
Default arg / env var changed Often missed Captured in signature
Closure state mutated Rarely detected Captured in signature

Need to know why a node reran? Enable LOG_HYPERNODES_CACHE=1 (env var) to log cache hits/misses, or plug in a callback that records on_node_cached events for your observability system.


🚀 Scale with Daft

When you're ready for distributed execution, plug in Daft — same pipeline code, now runs on a cluster.

from hypernodes.engines import DaftEngine

engine = DaftEngine(use_batch_udf=True)
pipeline = Pipeline(nodes=[clean, count], engine=engine)

# Auto-batched, distributed, with per-item caching
results = pipeline.map(inputs={"text": texts * 10000}, map_over="text")

Daft provides:

  • Distributed execution — laptop to cluster, no code changes
  • Auto-batching — optimal batch sizes computed automatically
  • Per-item caching — even in distributed mode, each item is cached independently

💡 Inspiration

HyperNodes stands on the shoulders of giants. It was inspired by and grew from working with:

  • Pipefunc: For its elegant approach to function composition.
  • Apache Hamilton: For its paradigm of defining dataflows using standard Python functions.

HyperNodes aims to bring native support for hierarchical pipelines and advanced caching to this ecosystem.


📚 Documentation

The full documentation is available in the docs/ directory:


🚀 Installation

pip install hypernodes
# or with uv
uv add hypernodes

📄 License

MIT License - see LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hypernodes-0.4.8.tar.gz (681.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hypernodes-0.4.8-py3-none-any.whl (703.1 kB view details)

Uploaded Python 3

File details

Details for the file hypernodes-0.4.8.tar.gz.

File metadata

  • Download URL: hypernodes-0.4.8.tar.gz
  • Upload date:
  • Size: 681.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.13

File hashes

Hashes for hypernodes-0.4.8.tar.gz
Algorithm Hash digest
SHA256 40ad62ba3a40126074dd405636ba08e33496e497d24c0741127f27e69b7ed60f
MD5 4b5cefb54f809221d65b05c80eb503a6
BLAKE2b-256 c7a2fc6db177f02d39694192bb4a8ba5412dac885a7e71e53e0773ed230ee783

See more details on using hashes here.

File details

Details for the file hypernodes-0.4.8-py3-none-any.whl.

File metadata

  • Download URL: hypernodes-0.4.8-py3-none-any.whl
  • Upload date:
  • Size: 703.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.13

File hashes

Hashes for hypernodes-0.4.8-py3-none-any.whl
Algorithm Hash digest
SHA256 8e70a5e69de9e7e7cf9c11e09830c4177d15eff63246532da2c14c8d3eea9812
MD5 3c36e32385214acf9d1f4d827e338550
BLAKE2b-256 da29e1e6c4563f3f9d036d56bee6f7a66d43b6a9ff1000c559fe60efd89334c3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page