A hierarchical, modular pipeline system for ML/AI workflows
Project description
[Installation] | [Quick Start] | [License]
HyperNodes
Build once, cache intelligently, run anywhere.
HyperNodes is a hierarchical, modular pipeline system with intelligent caching designed for ML/AI development workflows. It treats caching as a first-class citizen, enabling developers to iterate rapidly without re-running expensive computations.
โจ Key Features
๐งช Test with One, Scale to Many
Build and test your pipeline with a single input, then run it over thousands of inputs without changing a line of code. Keep your code simple, unit-testable, and debuggable while enabling production-scale batch processing with intelligent caching.
๐พ Development-First Caching
During development, we run pipelines dozens of times with minor tweaks. HyperNodes automatically caches at node and example granularity and only re-runs what changed. When you scale to multiple inputs, each item benefits from the cache independently.
๐ช Hierarchical Modularity
Functions are nodes. Pipelines are made out of nodes, and Pipelines are nodes themselves. Build complex workflows from simple, reusable pieces.
โก Flexible Execution
Run pipelines with different execution strategies: sequential for debugging, async for I/O-bound workloads, or distributed parallel execution with Daft for high-performance data processing.
๐ First-Class Daft Support
HyperNodes integrates deeply with Daft, a distributed query engine for Python.
- Lazy Execution: Builds optimal computation graphs before running.
- Auto-Batching: Automatically batches Python functions for vectorization.
- Distributed: Scales from your laptop to a cluster without code changes.
- Stateful: Efficiently handles heavy resources like ML models and DB connections.
๐ Observable by Default
Every node execution is tracked, visualized, and measurable. Progress bars, logs, and metrics are built-in, not bolted on.
๐ช Intelligent Callback System
Powerful hooks into the execution lifecycle for observability, progress tracking, distributed tracing, and custom instrumentation. Callbacks are composable and don't require modifying pipeline code.
๐ Quick Start
Installation
pip install hypernodes
Basic Example
from hypernodes import Pipeline, node
# Define functions as nodes
@node(output_name="cleaned_text")
def clean_text(passage: str) -> str:
return passage.strip().lower()
@node(output_name="word_count")
def count_words(cleaned_text: str) -> int:
return len(cleaned_text.split())
# Build pipeline - dependencies are automatically resolved
pipeline = Pipeline(nodes=[clean_text, count_words])
# Test with single input
result = pipeline.run(inputs={"passage": "Hello World"})
print(result) # {'cleaned_text': 'hello world', 'word_count': 2}
# Scale to many inputs - each item cached independently
results = pipeline.map(
inputs={"passage": ["Hello", "World", "Foo", "Bar"]},
map_over="passage",
)
With Caching
from hypernodes import Pipeline, SeqEngine, DiskCache
# Enable caching at engine level
engine = SeqEngine(cache=DiskCache(path=".cache"))
pipeline = Pipeline(nodes=[clean_text, count_words], engine=engine)
# First run: executes all nodes
result1 = pipeline.run(inputs={"passage": "Hello World"})
# Second run: instant cache hit
result2 = pipeline.run(inputs={"passage": "Hello World"}) # Cached!
With Stateful Objects (Models, DB Connections)
from hypernodes import stateful, node, Pipeline
# Mark expensive-to-initialize classes as stateful
@stateful
class ExpensiveModel:
def __init__(self, model_path: str):
self.model = load_model(model_path) # Lazy init - only on first use
def predict(self, text: str) -> str:
return self.model(text)
@node(output_name="prediction")
def predict(text: str, model: ExpensiveModel) -> str:
return model.predict(text)
# Create model (doesn't load yet - lazy!)
model = ExpensiveModel("./model.pkl")
pipeline = Pipeline(nodes=[predict])
# Model loads on first item, reused for all 1000 items
results = pipeline.map(
inputs={"text": texts_1000, "model": model},
map_over="text"
)
Nested Pipelines
# Inner pipeline for text processing
text_pipeline = Pipeline(nodes=[clean_text, tokenize, normalize])
# Outer pipeline using nested pipeline
main_pipeline = Pipeline(
nodes=[load_data, text_pipeline, train_model],
)
result = main_pipeline.run(inputs={"data_path": "corpus.txt"})
High-Performance Execution with Daft
from hypernodes import Pipeline
from hypernodes.engines import DaftEngine
# Distributed execution using Daft
# Requires: pip install getdaft
engine = DaftEngine(use_batch_udf=True)
pipeline = Pipeline(nodes=[clean_text, count_words], engine=engine)
# Auto-batches and executes in parallel
# Each item is cached independently
results = pipeline.map(
inputs={"passage": ["Hello", "World"] * 1000},
map_over="passage"
)
๐ Core Concepts
Functions โ Nodes
Each function declares its dependencies implicitly through parameter names:
@node(output_name="cleaned_text")
def clean_text(passage: str) -> str:
return passage.strip().lower()
@node(output_name="word_count")
def count_words(cleaned_text: str) -> int: # โ depends on cleaned_text
return len(cleaned_text.split())
Nodes โ Pipelines
Pipelines are directed acyclic graphs (DAGs) of nodes:
pipeline = Pipeline(nodes=[clean_text, count_words])
# Visualize the DAG
pipeline.visualize()
# Run with inputs
result = pipeline.run(inputs={"passage": "Hello World"})
Pipelines โ Nodes
Pipelines can contain other pipelines, enabling hierarchical composition:
inner_pipeline = Pipeline(nodes=[step1, step2, step3])
outer_pipeline = Pipeline(nodes=[load, inner_pipeline, save])
๐พ Intelligent Caching
HyperNodes uses computation signatures for content-addressed caching:
sig(node) = hash(
code_hash # Function source code
+ env_hash # Environment (library versions, config)
+ inputs_hash # Direct input values
+ deps_hash # Signatures of upstream nodes (recursive)
)
Core guarantee: If a node's code, direct inputs, and upstream dependencies haven't changed, its output is guaranteed to be identicalโso we skip execution and reuse the cached result.
Fine-Grained Invalidation
pipeline = Pipeline(nodes=[load_data, preprocess, train_model])
# First run: all nodes execute
result1 = pipeline.run(inputs={"data_path": "data.csv", "learning_rate": 0.01})
# Change only learning_rate
result2 = pipeline.run(inputs={"data_path": "data.csv", "learning_rate": 0.001})
# โ
load_data: CACHED (unchanged)
# โ
preprocess: CACHED (unchanged)
# โ train_model: RE-RUN (learning_rate changed)
Per-Item Caching with .map()
# First run: process 100 items
results1 = pipeline.map(
inputs={"passage": passages_100},
map_over="passage",
)
# Add 50 new items
results2 = pipeline.map(
inputs={"passage": passages_150},
map_over="passage",
)
# โ
First 100 items: CACHED
# โ 50 new items: EXECUTE
๐ฅ๏ธ Execution Engines
Engines determine how (execution strategy) and where (infrastructure) nodes execute.
DaftEngine (Distributed DataFrames) - ๐ Recommended
For high-performance distributed execution using Daft:
from hypernodes import Pipeline, DiskCache, SeqEngine
from hypernodes.engines import DaftEngine
# Requires: pip install getdaft
# Auto-optimizes batch sizes and parallelism
engine = DaftEngine(
use_batch_udf=True,
cache=DiskCache(path=".cache")
)
pipeline = Pipeline(nodes=[...], engine=engine)
# All operations are lazy (builds computation graph)
result = pipeline.run(inputs={"x": 5})
# Map operations leverage Daft's distributed execution
# Automatically batches data and runs in parallel
results = pipeline.map(
inputs={"x": [1, 2, 3, 4, 5]},
map_over="x"
)
Key Features:
- Lazy DataFrame execution: Optimizes query plan before execution
- Smart Batching: Auto-calculates optimal batch sizes (64-1024) for vectorization
- Resource Management: Efficiently handles GPU resources and memory
- Per-Item Caching: Even in batch mode, individual items are cached and reused
- Seamless Scaling: Works on your laptop or a Ray/K8s cluster
SeqEngine (Default)
The default engine for simple, predictable execution:
from hypernodes import Pipeline, SeqEngine, DiskCache
from hypernodes.telemetry import ProgressCallback
# Configure engine with cache and callbacks
engine = SeqEngine(
cache=DiskCache(path=".cache"),
callbacks=[ProgressCallback()]
)
pipeline = Pipeline(nodes=[...], engine=engine)
# Or use default (no cache, no callbacks)
pipeline = Pipeline(nodes=[...]) # Uses SeqEngine() by default
Features:
- Simple topological execution
- No parallelism overhead
- Easy debugging
- Best for development and testing
DaskEngine (Parallel Map Operations)
For parallel execution using Dask Bag:
from hypernodes import Pipeline
from hypernodes.engines import DaskEngine
# Configure for threads (I/O bound) or processes (CPU bound)
engine = DaskEngine(scheduler="threads")
pipeline = Pipeline(nodes=[...], engine=engine)
# Parallel map execution
results = pipeline.map(
inputs={"x": [1, 2, 3, 4, 5]},
map_over="x"
)
Features:
- Automatic parallelism for map operations
- Configurable scheduler (threads, processes)
๐ Observability
Progress Tracking
from hypernodes import Pipeline, SeqEngine
from hypernodes.telemetry import ProgressCallback
# Configure engine with callbacks
engine = SeqEngine(callbacks=[ProgressCallback()])
pipeline = Pipeline(nodes=[...], engine=engine)
result = pipeline.run(inputs={"data": "..."})
Output:
Processing Pipeline โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ 100% 0:00:42
โโ clean_text โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ 100% 0:00:05
โโ extract_features โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ 100% 0:00:20
โโ train_model โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ 100% 0:00:17
Distributed Tracing
from hypernodes import Pipeline, SeqEngine
from hypernodes.telemetry import TelemetryCallback
# Configure engine with telemetry callbacks
engine = SeqEngine(callbacks=[TelemetryCallback()])
pipeline = Pipeline(nodes=[...], engine=engine)
# Traces are automatically sent to OpenTelemetry-compatible systems
# (Jaeger, Zipkin, Logfire, etc.)
Pipeline Visualization
# Visualize the DAG
pipeline.visualize()
# Save to file
pipeline.visualize(filename="pipeline.svg")
# Control nested pipeline expansion
pipeline.visualize(depth=2) # Show one level of nesting
๐ช Advanced: Nested Pipelines
Using .as_node() for Interface Adaptation
# Inner pipeline processes ONE item
@node(output_name="embedding")
def encode_text(passage: str) -> Vector:
return model.encode(passage)
single_encode = Pipeline(nodes=[clean_text, encode_text])
# Adapt interface: map over corpus, rename inputs/outputs
encode_corpus = single_encode.as_node(
input_mapping={"corpus": "passage"}, # outer โ inner
output_mapping={"embedding": "encoded_corpus"}, # inner โ outer
map_over="corpus", # Map over corpus list
)
# Use in outer pipeline
index_pipeline = Pipeline(nodes=[encode_corpus, build_index])
# From outer perspective: encode_corpus takes List[str], returns List[Vector]
result = index_pipeline.run(inputs={"corpus": ["Hello", "World", "Foo"]})
Hierarchical Configuration
Nested pipelines inherit configuration from parents but can override:
# Parent defines defaults
from hypernodes import Pipeline, DiskCache, SeqEngine
from hypernodes.engines import DaskEngine
from hypernodes.telemetry import ProgressCallback
# Parent pipeline with DaskEngine configuration
parent_engine = DaskEngine(
scheduler="threads",
cache=DiskCache(path=".cache"),
callbacks=[ProgressCallback()]
)
parent = Pipeline(
nodes=[preprocess, child_pipeline, postprocess],
engine=parent_engine
)
# Child inherits parent's engine configuration through nesting
child_pipeline = Pipeline(nodes=[step1, step2])
# Grandchild can override with its own engine for specialized tasks
grandchild_engine = DaskEngine(scheduler="processes") # CPU-bound tasks
grandchild_pipeline = Pipeline(
nodes=[cpu_intensive_step],
engine=grandchild_engine
)
๐งช Testing
HyperNodes is designed to be easily testable:
import pytest
from hypernodes import Pipeline, node
@node(output_name="result")
def my_function(input: str) -> str:
return input.upper()
def test_single_node():
pipeline = Pipeline(nodes=[my_function])
result = pipeline.run(inputs={"input": "hello"})
assert result["result"] == "HELLO"
def test_with_cache():
from hypernodes import SeqEngine, DiskCache
engine = SeqEngine(cache=DiskCache(path="/tmp/test_cache"))
pipeline = Pipeline(nodes=[my_function], engine=engine)
# First run
result1 = pipeline.run(inputs={"input": "hello"})
# Second run should hit cache
result2 = pipeline.run(inputs={"input": "hello"})
assert result1 == result2
๐ฏ Design Principles
Engine-Centric Execution
Execution configuration (cache, callbacks, execution strategy) lives at the engine level, not the pipeline:
from hypernodes import Pipeline, SeqEngine, DiskCache
from hypernodes.telemetry import ProgressCallback
# Configure execution runtime
engine = SeqEngine(
cache=DiskCache(path=".cache"),
callbacks=[ProgressCallback()]
)
# Pipeline focuses on DAG definition
pipeline = Pipeline(nodes=[node1, node2], engine=engine)
Benefits:
- Separation of Concerns: Pipeline defines "what", Engine defines "how"
- Reusability: Same pipeline can run with different engines/configurations
- Extensibility: New engines reuse shared orchestration logic
- Type Safety: Callbacks can declare engine compatibility
- Simple by default, powerful when needed - Start with basic pipelines, scale to complex workflows
- Cache-first - Treat caching as core functionality, not an afterthought
- Test with one, scale to many - Same code for single items and batch processing
- Hierarchical everything - Composition through nesting at all levels
- Flexible execution - Choose the right execution strategy for your workload
- Observable - Visibility into execution is built-in
๐ License
MIT License - see LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hypernodes-0.4.0.tar.gz.
File metadata
- Download URL: hypernodes-0.4.0.tar.gz
- Upload date:
- Size: 69.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
14b1fa3f3bd3e7e09e16ee38f65b355f1907e53b64467b77b94e77e30952f8f0
|
|
| MD5 |
1b70b94821c55e272ac541a55fad9671
|
|
| BLAKE2b-256 |
2ef6a5e52a628a1ef716e72c1a53aa91e8c6e7a4e2f44695199c6047cf590448
|
File details
Details for the file hypernodes-0.4.0-py3-none-any.whl.
File metadata
- Download URL: hypernodes-0.4.0-py3-none-any.whl
- Upload date:
- Size: 83.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb47ccaf1237044c31c1272082ea89166d52a576a3f3cc03111f36caccfc9ff0
|
|
| MD5 |
b988d33ddbfd8ca93f182959cb62c66c
|
|
| BLAKE2b-256 |
2b41a55efdb400c4cb5d1f25b6b598a74ddde332315751b9846956801305940a
|