A hierarchical, modular pipeline system for ML/AI workflows
Project description
[Installation] | [Quick Start] | [License]
HyperNodes
Build once, cache intelligently, run anywhere.
HyperNodes is a hierarchical, modular pipeline system with intelligent caching designed for ML/AI development workflows. It treats caching as a first-class citizen, enabling developers to iterate rapidly without re-running expensive computations.
โจ Key Features
๐งช Test with One, Scale to Many
Build and test your pipeline with a single input, then run it over thousands of inputs without changing a line of code. Keep your code simple, unit-testable, and debuggable while enabling production-scale batch processing with intelligent caching.
๐พ Development-First Caching
During development, we run pipelines dozens of times with minor tweaks. HyperNodes automatically caches at node and example granularity and only re-runs what changed. When you scale to multiple inputs, each item benefits from the cache independently.
๐ช Hierarchical Modularity
Functions are nodes. Pipelines are made out of nodes, and Pipelines are nodes themselves. Build complex workflows from simple, reusable pieces.
โก Flexible Execution
Run pipelines with different execution strategies: sequential for debugging, async for I/O-bound workloads, threaded for mixed workloads, or parallel for CPU-intensive tasks. Choose the right executor for your use case.
๐ Observable by Default
Every node execution is tracked, visualized, and measurable. Progress bars, logs, and metrics are built-in, not bolted on.
๐ช Intelligent Callback System
Powerful hooks into the execution lifecycle for observability, progress tracking, distributed tracing, and custom instrumentation. Callbacks are composable and don't require modifying pipeline code.
๐ Quick Start
Installation
pip install hypernodes
Basic Example
from hypernodes import Pipeline, node
# Define functions as nodes
@node(output_name="cleaned_text")
def clean_text(passage: str) -> str:
return passage.strip().lower()
@node(output_name="word_count")
def count_words(cleaned_text: str) -> int:
return len(cleaned_text.split())
# Build pipeline - dependencies are automatically resolved
pipeline = Pipeline(nodes=[clean_text, count_words])
# Test with single input
result = pipeline.run(inputs={"passage": "Hello World"})
print(result) # {'cleaned_text': 'hello world', 'word_count': 2}
# Scale to many inputs - each item cached independently
results = pipeline.map(
inputs={"passage": ["Hello", "World", "Foo", "Bar"]},
map_over="passage",
)
With Caching
from hypernodes import Pipeline, DiskCache
# Enable caching
pipeline = Pipeline(
nodes=[clean_text, count_words],
cache=DiskCache(path=".cache")
)
# First run: executes all nodes
result1 = pipeline.run(inputs={"passage": "Hello World"})
# Second run: instant cache hit
result2 = pipeline.run(inputs={"passage": "Hello World"}) # Cached!
With Stateful Objects (Models, DB Connections)
from hypernodes import stateful, node, Pipeline
# Mark expensive-to-initialize classes as stateful
@stateful
class ExpensiveModel:
def __init__(self, model_path: str):
self.model = load_model(model_path) # Lazy init - only on first use
def predict(self, text: str) -> str:
return self.model(text)
@node(output_name="prediction")
def predict(text: str, model: ExpensiveModel) -> str:
return model.predict(text)
# Create model (doesn't load yet - lazy!)
model = ExpensiveModel("./model.pkl")
pipeline = Pipeline(nodes=[predict])
# Model loads on first item, reused for all 1000 items
results = pipeline.map(
inputs={"text": texts_1000, "model": model},
map_over="text"
)
Nested Pipelines
# Inner pipeline for text processing
text_pipeline = Pipeline(nodes=[clean_text, tokenize, normalize])
# Outer pipeline using nested pipeline
main_pipeline = Pipeline(
nodes=[load_data, text_pipeline, train_model],
)
result = main_pipeline.run(inputs={"data_path": "corpus.txt"})
Parallel Execution with Dask
from hypernodes import Pipeline
from hypernodes.engines import DaskEngine
# Parallel map for CPU/IO-bound workloads
parallel_pipeline = Pipeline(
nodes=[process_data, transform_results],
engine=DaskEngine(scheduler="threads"), # or "processes" for CPU-bound
)
# Regular run (sequential, no overhead)
result = parallel_pipeline.run(inputs={"data": [1, 2, 3]})
# Map operation (parallel via Dask Bag)
results = parallel_pipeline.map(inputs={"data": [1, 2, 3, 4, 5]}, map_over="data")
๐ Core Concepts
Functions โ Nodes
Each function declares its dependencies implicitly through parameter names:
@node(output_name="cleaned_text")
def clean_text(passage: str) -> str:
return passage.strip().lower()
@node(output_name="word_count")
def count_words(cleaned_text: str) -> int: # โ depends on cleaned_text
return len(cleaned_text.split())
Nodes โ Pipelines
Pipelines are directed acyclic graphs (DAGs) of nodes:
pipeline = Pipeline(nodes=[clean_text, count_words])
# Visualize the DAG
pipeline.visualize()
# Run with inputs
result = pipeline.run(inputs={"passage": "Hello World"})
Pipelines โ Nodes
Pipelines can contain other pipelines, enabling hierarchical composition:
inner_pipeline = Pipeline(nodes=[step1, step2, step3])
outer_pipeline = Pipeline(nodes=[load, inner_pipeline, save])
๐พ Intelligent Caching
HyperNodes uses computation signatures for content-addressed caching:
sig(node) = hash(
code_hash # Function source code
+ env_hash # Environment (library versions, config)
+ inputs_hash # Direct input values
+ deps_hash # Signatures of upstream nodes (recursive)
)
Core guarantee: If a node's code, direct inputs, and upstream dependencies haven't changed, its output is guaranteed to be identicalโso we skip execution and reuse the cached result.
Fine-Grained Invalidation
pipeline = Pipeline(nodes=[load_data, preprocess, train_model])
# First run: all nodes execute
result1 = pipeline.run(inputs={"data_path": "data.csv", "learning_rate": 0.01})
# Change only learning_rate
result2 = pipeline.run(inputs={"data_path": "data.csv", "learning_rate": 0.001})
# โ
load_data: CACHED (unchanged)
# โ
preprocess: CACHED (unchanged)
# โ train_model: RE-RUN (learning_rate changed)
Per-Item Caching with .map()
# First run: process 100 items
results1 = pipeline.map(
inputs={"passage": passages_100},
map_over="passage",
)
# Add 50 new items
results2 = pipeline.map(
inputs={"passage": passages_150},
map_over="passage",
)
# โ
First 100 items: CACHED
# โ 50 new items: EXECUTE
๐ฅ๏ธ Execution Engines
Engines determine how (execution strategy) and where (infrastructure) nodes execute.
SequentialEngine (Default)
The default engine for simple, predictable execution:
from hypernodes import Pipeline, SequentialEngine
# Sequential execution (default - no need to specify)
pipeline = Pipeline(nodes=[...])
# Or explicitly:
pipeline = Pipeline(
nodes=[...],
engine=SequentialEngine()
)
Features:
- Simple topological execution
- No parallelism overhead
- Easy debugging
- Best for development and testing
DaskEngine (Parallel Map Operations)
For parallel execution using Dask Bag:
from hypernodes import Pipeline
from hypernodes.engines import DaskEngine
# Auto-optimized for your workload
engine = DaskEngine()
pipeline = Pipeline(nodes=[...], engine=engine)
# Regular run (sequential, no overhead)
result = pipeline.run(inputs={"x": 5})
# Map operation (parallel via Dask Bag)
results = pipeline.map(
inputs={"x": [1, 2, 3, 4, 5]},
map_over="x"
)
# Custom configuration for CPU-bound workload
engine = DaskEngine(
scheduler="processes", # or "threads" (default)
workload_type="cpu", # or "io", "mixed" (default)
num_workers=8 # defaults to CPU count
)
Features:
- Automatic parallelism for map operations
- Configurable scheduler (threads, processes)
- Auto-optimized partitioning
- Zero overhead for non-map operations
DaftEngine (Distributed DataFrames)
For distributed DataFrame-based execution:
from hypernodes import Pipeline
from hypernodes.engines import DaftEngine
# Requires: pip install getdaft
engine = DaftEngine(use_batch_udf=True)
pipeline = Pipeline(nodes=[...], engine=engine)
# All operations are lazy (builds computation graph)
result = pipeline.run(inputs={"x": 5})
# Map operations leverage Daft's distributed execution
results = pipeline.map(
inputs={"x": [1, 2, 3, 4, 5]},
map_over="x"
)
Features:
- Lazy DataFrame execution
- Batch UDF optimization
- Auto-tuned parallelism
- Best for large-scale distributed workloads
๐ Observability
Progress Tracking
from hypernodes import Pipeline
from hypernodes.telemetry import ProgressCallback
pipeline = Pipeline(
nodes=[...],
callbacks=[ProgressCallback()],
)
result = pipeline.run(inputs={"data": "..."})
Output:
Processing Pipeline โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ 100% 0:00:42
โโ clean_text โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ 100% 0:00:05
โโ extract_features โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ 100% 0:00:20
โโ train_model โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ 100% 0:00:17
Distributed Tracing
from hypernodes import Pipeline
from hypernodes.telemetry import TelemetryCallback
pipeline = Pipeline(
nodes=[...],
callbacks=[TelemetryCallback()],
)
# Traces are automatically sent to OpenTelemetry-compatible systems
# (Jaeger, Zipkin, Logfire, etc.)
Pipeline Visualization
# Visualize the DAG
pipeline.visualize()
# Save to file
pipeline.visualize(filename="pipeline.svg")
# Control nested pipeline expansion
pipeline.visualize(depth=2) # Show one level of nesting
๐ช Advanced: Nested Pipelines
Using .as_node() for Interface Adaptation
# Inner pipeline processes ONE item
@node(output_name="embedding")
def encode_text(passage: str) -> Vector:
return model.encode(passage)
single_encode = Pipeline(nodes=[clean_text, encode_text])
# Adapt interface: map over corpus, rename inputs/outputs
encode_corpus = single_encode.as_node(
input_mapping={"corpus": "passage"}, # outer โ inner
output_mapping={"embedding": "encoded_corpus"}, # inner โ outer
map_over="corpus", # Map over corpus list
)
# Use in outer pipeline
index_pipeline = Pipeline(nodes=[encode_corpus, build_index])
# From outer perspective: encode_corpus takes List[str], returns List[Vector]
result = index_pipeline.run(inputs={"corpus": ["Hello", "World", "Foo"]})
Hierarchical Configuration
Nested pipelines inherit configuration from parents but can override:
# Parent defines defaults
from hypernodes import Pipeline, DiskCache
from hypernodes.engines import DaskEngine
from hypernodes.telemetry import ProgressCallback
parent = Pipeline(
nodes=[preprocess, child_pipeline, postprocess],
engine=DaskEngine(scheduler="threads"),
cache=DiskCache(path=".cache"),
callbacks=[ProgressCallback()],
)
# Child inherits all configuration
child_pipeline = Pipeline(
nodes=[step1, step2]
# Inherits: DaskEngine, DiskCache, ProgressCallback
)
# Grandchild overrides engine only (e.g., to process-based parallelism)
grandchild_pipeline = Pipeline(
nodes=[cpu_intensive_step],
engine=DaskEngine(scheduler="processes"), # Override for CPU-bound tasks
# Inherits: DiskCache, ProgressCallback
)
๐งช Testing
HyperNodes is designed to be easily testable:
import pytest
from hypernodes import Pipeline, node
@node(output_name="result")
def my_function(input: str) -> str:
return input.upper()
def test_single_node():
pipeline = Pipeline(nodes=[my_function])
result = pipeline.run(inputs={"input": "hello"})
assert result["result"] == "HELLO"
def test_with_cache():
cache = DiskCache(path="/tmp/test_cache")
pipeline = Pipeline(nodes=[my_function], cache=cache)
# First run
result1 = pipeline.run(inputs={"input": "hello"})
# Second run should hit cache
result2 = pipeline.run(inputs={"input": "hello"})
assert result1 == result2
๐ฏ Design Principles
- Simple by default, powerful when needed - Start with basic pipelines, scale to complex workflows
- Cache-first - Treat caching as core functionality, not an afterthought
- Test with one, scale to many - Same code for single items and batch processing
- Hierarchical everything - Composition through nesting at all levels
- Flexible execution - Choose the right execution strategy for your workload
- Observable - Visibility into execution is built-in
๐ License
MIT License - see LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hypernodes-0.3.0.tar.gz.
File metadata
- Download URL: hypernodes-0.3.0.tar.gz
- Upload date:
- Size: 68.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bced3d6c6d5ff5f9ff405c6c5df40d5655a22cc135514309ca85f24bee21e34d
|
|
| MD5 |
d27a4326efe6cfbb1d751c5ed9449d34
|
|
| BLAKE2b-256 |
9c32a3fea0b23eb915da3258f97af3c34c624a44633e45415f2598280dc333a4
|
File details
Details for the file hypernodes-0.3.0-py3-none-any.whl.
File metadata
- Download URL: hypernodes-0.3.0-py3-none-any.whl
- Upload date:
- Size: 81.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
81790ff185220c52b15e5c1b67566e71b8c7ddd25f7b4720693364ce9e23b9d3
|
|
| MD5 |
974127f7f2d7a9a5a6fa23c0b9990c86
|
|
| BLAKE2b-256 |
a64f4d39db2712bc0da1397f58c7498af97156405e5753014368e98b19d49578
|