A dead simple, type-safe DAG (Directed Acyclic Graph) library with runtime validation, caching, and async support
Project description
DAG Simple
A dead simple, type-safe DAG (Directed Acyclic Graph) library for Python with runtime validation, caching, and full async support.
Perfect for building data pipelines, ML workflows, and computation graphs without the complexity of larger frameworks.
โจ Features
- ๐ฏ Type Safe - Full generic type support with runtime validation
- โก Async/Await - First-class async support with concurrent execution
- ๐ Cycle Detection - Automatic detection at construction time
- ๐พ Result Caching - Memoization to avoid redundant computations
- ๐ Topological Sorting - Efficient execution using Kahn's algorithm
- ๐ Great Error Messages - Clear, actionable error reporting
- ๐ Visualization - Tree view and Mermaid diagram generation
- ๐ชถ Zero Dependencies - Pure Python implementation
- ๐จ Simple API - Decorator-based interface
๐ฆ Installation
pip install dag-simple
Or using uv:
uv add dag-simple
๐ Quick Start
from dag_simple import node
# Define your computation nodes
@node()
def load_data(source: str) -> dict[str, list[int]]:
return {"data": [1, 2, 3, 4, 5]}
@node(deps=[load_data])
def process(load_data: dict[str, list[int]]) -> dict[str, list[int]]:
return {"processed": [x * 2 for x in load_data["data"]]}
@node(deps=[process])
def save(process: dict[str, list[int]]) -> str:
return f"Saved {len(process['processed'])} items"
# Execute the pipeline
result = save.run(source="database")
print(result) # "Saved 5 items"
assert result == "Saved 5 items"
# Visualize the DAG
save.visualize()
# Output:
# โ save
# โ process
# โ load_data
๐ค When to Use What?
Use node.run() for Simple Pipelines
Perfect for straightforward, linear workflows:
from dag_simple import node
@node()
def step1(x: int) -> int:
return x + 1
@node(deps=[step1])
def step2(step1: int) -> int:
return step1 * 2
# Simple execution
result = step2.run(x=5) # 12
assert result == 12
When to use:
- โ Simple, linear pipeline (A โ B โ C)
- โ Single target to execute
- โ Small projects (<10 nodes)
- โ Quick prototypes
Use DAG Class for Complex Projects
The DAG class excels when you need:
1. Multiple Independent Workflows
from dag_simple import DAG, node
dag = DAG(name="analytics")
# User analytics workflow
@node()
def load_users() -> list[dict[str, int]]:
return [{"id": 1}, {"id": 2}]
@node(deps=[load_users])
def analyze_users(load_users: list[dict[str, int]]) -> dict[str, int]:
return {"count": len(load_users)}
# Sales analytics workflow
@node()
def load_sales() -> list[dict[str, int]]:
return [{"amount": 100}, {"amount": 200}]
@node(deps=[load_sales])
def analyze_sales(load_sales: list[dict[str, int]]) -> dict[str, int]:
return {"total": sum(s["amount"] for s in load_sales)}
# Add to DAG
dag.add_nodes(load_users, analyze_users, load_sales, analyze_sales)
# Execute specific workflow
user_stats = dag.execute("analyze_users")
assert user_stats == {"count": 2}
# Or execute ALL workflows at once
all_results = dag.execute_all()
# {'analyze_users': {...}, 'analyze_sales': {...}}
assert all_results["analyze_users"] == {"count": 2}
assert all_results["analyze_sales"] == {"total": 300}
2. Concurrent Async Execution
import asyncio
from dag_simple import DAG, node
dag = DAG("data_pipeline")
@node()
async def fetch_api_1() -> dict[str, list[int]]:
await asyncio.sleep(0.2)
return {"data": [1, 2, 3]}
@node()
async def fetch_api_2() -> dict[str, list[int]]:
await asyncio.sleep(0.2)
return {"data": [4, 5, 6]}
dag.add_nodes(fetch_api_1, fetch_api_2)
# Execute both APIs concurrently! (~0.2s, not 0.4s)
async def main():
results = await dag.execute_all_async()
assert results["fetch_api_1"] == {"data": [1, 2, 3]}
assert results["fetch_api_2"] == {"data": [4, 5, 6]}
return results
# For testing purposes, we'll just create the DAG
# In real usage, you'd call: asyncio.run(main())
3. Namespace Management
from dag_simple import DAG, node
# Separate DAGs for different domains
user_service = DAG("user_service")
order_service = DAG("order_service")
# Same node name in different DAGs - no conflicts!
@node()
def get_data():
return "user data"
user_service.add_node(get_data)
@node()
def get_data(): # Different implementation
return "order data"
order_service.add_node(get_data)
When to use DAG class:
- โ Multiple independent workflows
- โ Need to execute multiple targets at once
- โ Large projects (10+ nodes)
- โ Want automatic concurrent async execution
- โ Need namespace separation
๐งต Run DAGs in Parallel Processes
run_sync_in_process and run_async_in_process wrap a DAG evaluation in a
ProcessPoolExecutor. They are useful when you want the work to run on a separate
CPU core or need to isolate long-running jobs from interactive code.
Use a process pool when you need to:
- โ Run CPU-bound workloads without blocking the main thread or event loop.
- โ Execute multiple DAGs side-by-side.
- โ Keep expensive computations isolated from user-facing code.
Remember:
- Functions, their inputs, and their outputs must be picklable so they can cross process boundaries.
- Each worker has its own
ExecutionContext, so cached values are not shared across processes unless you provide external storage. - Spawning processes is expensive. Reuse a
ProcessPoolExecutorto amortize the startup cost. run_sync_in_processonly accepts branches made entirely of synchronous nodes.
import asyncio
from concurrent.futures import ProcessPoolExecutor
from dag_simple import node, run_async_in_process, run_sync_in_process
@node()
def make_numbers(seed: int) -> list[int]:
return [seed + i for i in range(5)]
@node(deps=[make_numbers])
def total_energy(make_numbers: list[int]) -> int:
return sum(value * value for value in make_numbers)
@node()
async def fetch_multiplier() -> int:
await asyncio.sleep(0.1)
return 2
@node(deps=[total_energy, fetch_multiplier])
def scaled(total_energy: int, fetch_multiplier: int) -> int:
return total_energy * fetch_multiplier
@node()
def explode() -> None:
raise ValueError("boom")
def main() -> None:
energy = run_sync_in_process(total_energy, seed=10)
assert energy == 730
with ProcessPoolExecutor() as pool:
parallel = [
run_sync_in_process(total_energy, executor=pool, seed=seed)
for seed in range(3)
]
assert parallel == [30, 55, 90]
scaled_result = run_async_in_process(scaled, executor=pool, seed=5)
assert scaled_result == 510
try:
run_sync_in_process(explode)
except ValueError as exc:
assert str(exc) == "boom"
if __name__ == "__main__":
main()
The exceptions raised inside a worker are re-raised in the calling process when
you access the result, so normal error handling applies. See
examples/process_pool_example.py for a
complete runnable walkthrough, including sharing a pool across many DAG
invocations.
๐ Core Concepts
Nodes
Nodes are the building blocks of your DAG. Each node wraps a function and can depend on other nodes.
from dag_simple import node
@node()
def add(x: int, y: int) -> int:
return x + y
@node()
def multiply(x: int, y: int) -> int:
return x * y
@node(deps=[add, multiply])
def combine(add: int, multiply: int) -> int:
return add + multiply
result = combine.run(x=2, y=3) # Returns 11: (2+3) + (2*3)
assert result == 11
Async Support
First-class support for async/await:
import asyncio
from dag_simple import node
@node()
async def fetch_data(url: str) -> dict[str, list[int]]:
# Async I/O operations
await asyncio.sleep(0.1)
return {"data": [1, 2, 3]}
@node(deps=[fetch_data])
async def process(fetch_data: dict[str, list[int]]) -> list[int]:
return [x * 2 for x in fetch_data["data"]]
# Execute async DAG
async def main():
result = await process.run_async(url="https://api.example.com")
assert result == [2, 4, 6]
return result
# For testing purposes, we'll just create the nodes
# In real usage, you'd call: asyncio.run(main())
Concurrent execution is automatic:
import asyncio
from dag_simple import node
@node()
async def fetch1() -> dict[str, list[int]]:
await asyncio.sleep(0.2) # 200ms
return {"data": [1, 2, 3]}
@node()
async def fetch2() -> dict[str, list[int]]:
await asyncio.sleep(0.2) # 200ms
return {"data": [4, 5, 6]}
@node(deps=[fetch1, fetch2])
async def combine(fetch1: dict[str, list[int]], fetch2: dict[str, list[int]]) -> dict[str, list[int]]:
return {"merged": fetch1["data"] + fetch2["data"]}
# fetch1 and fetch2 run CONCURRENTLY! (~200ms total, not 400ms)
async def main():
result = await combine.run_async()
assert result == {"merged": [1, 2, 3, 4, 5, 6]}
return result
# For testing purposes, we'll just create the nodes
# In real usage, you'd call: asyncio.run(main())
Type Validation
Enable runtime type checking to catch errors early:
from dag_simple import node
@node(validate_types=True)
def typed_function(x: int, y: str) -> str:
return f"{y}: {x}"
# โ This works
result = typed_function.run(x=42, y="Answer")
assert result == "Answer: 42"
# โ This raises NodeExecutionError (wrapping ValidationError)
try:
result = typed_function.run(x="wrong", y="Answer")
except Exception as e:
print(f"Expected error: {e}")
assert "NodeExecutionError" in str(type(e).__name__)
Error Handling
All execution errors are wrapped with comprehensive context information for easier debugging:
from dag_simple import node, NodeExecutionError
@node()
def load_data(source: str) -> dict[str, int]:
return {"count": 100, "threshold": 50}
@node(deps=[load_data])
def process_data(load_data: dict[str, int], multiplier: int) -> int:
return load_data["count"] * multiplier
@node(deps=[process_data])
def validate_result(process_data: int) -> int:
if process_data > 1000:
raise ValueError(f"Result too large: {process_data}")
return process_data
@node(deps=[validate_result])
def save_result(validate_result: int) -> str:
return f"Saved {validate_result}"
try:
# This will fail at validate_result because 100 * 20 = 2000 > 1000
save_result.run(source="database", multiplier=20)
except NodeExecutionError as e:
print(e)
Error Output:
================================================================================
Node Execution Failed: 'validate_result'
================================================================================
Execution Path:
load_data -> process_data -> validate_result
Inputs to 'validate_result':
process_data: 2000
Original Error:
ValueError: Result too large: 2000
================================================================================
Key Features:
- ๐ Execution Path - Shows which nodes executed before the error occurred
- ๐ Input Context - All inputs passed to the failed node are captured
- ๐ Original Exception - Full error details are preserved and displayed
- ๐ฏ Clear Formatting - Well-structured, easy-to-read error messages
- โ๏ธ Async Support - Works seamlessly with both sync and async execution
- ๐ฆ Multiprocessing Ready - Exceptions are picklable for use with
ProcessPoolExecutor - ๐ Exception Chaining - Access
e.original_exception,e.execution_path, ande.node_inputsfor programmatic handling
See examples/error_handling_example.py for more detailed examples.
Result Caching
Cache expensive computations that are used multiple times:
from dag_simple import node
@node(cache_result=True)
def expensive_computation(x: int) -> int:
# This only runs once even if multiple nodes depend on it
return x ** 2
@node(deps=[expensive_computation])
def use_result_1(expensive_computation: int) -> int:
return expensive_computation + 1
@node(deps=[expensive_computation])
def use_result_2(expensive_computation: int) -> int:
return expensive_computation + 2
@node(deps=[use_result_1, use_result_2])
def final(use_result_1: int, use_result_2: int) -> int:
return use_result_1 + use_result_2
result = final.run(x=5) # expensive_computation runs only once
# expensive_computation(5) = 25
# use_result_1(25) = 25 + 1 = 26
# use_result_2(25) = 25 + 2 = 27
# final(26, 27) = 26 + 27 = 53
assert result == 53
๐ Examples
Data Pipeline (ETL)
from dag_simple import node
@node(cache_result=True, validate_types=True)
def extract(source: str) -> list[dict[str, int]]:
"""Extract data from source."""
return [
{"id": 1, "value": 100},
{"id": 2, "value": 200},
{"id": 3, "value": 150},
]
@node(deps=[extract], validate_types=True)
def transform(extract: list[dict[str, int]], min_value: int) -> list[dict[str, int]]:
"""Filter and transform data."""
return [item for item in extract if item["value"] >= min_value]
@node(deps=[transform], validate_types=True)
def load(transform: list[dict[str, int]]) -> str:
"""Load transformed data."""
total = sum(item["value"] for item in transform)
return f"Loaded {len(transform)} items, total value: {total}"
# Execute the pipeline
result = load.run(source="database", min_value=150)
print(result) # "Loaded 2 items, total value: 350"
assert result == "Loaded 2 items, total value: 350"
Async Web Scraping
import asyncio
from dag_simple import node
@node()
async def fetch_page(url: str) -> str:
# Simulated fetch
await asyncio.sleep(0.1)
return "<html>...</html>"
@node(deps=[fetch_page])
async def extract_links(fetch_page: str) -> list[str]:
# Parse HTML
return ["link1", "link2", "link3"]
@node(deps=[extract_links])
async def fetch_all_links(extract_links: list[str]) -> list[str]:
# Fetch all links concurrently
async def fetch_one(link: str) -> str:
await asyncio.sleep(0.1)
return f"content_{link}"
return await asyncio.gather(*[fetch_one(link) for link in extract_links])
async def main():
result = await fetch_all_links.run_async(url="https://example.com")
assert result == ["content_link1", "content_link2", "content_link3"]
return result
# For testing purposes, we'll just create the nodes
# In real usage, you'd call: asyncio.run(main())
Machine Learning Pipeline with DAG Class
from dag_simple import DAG, node
import asyncio
ml_pipeline = DAG("ml_pipeline")
@node()
async def load_data(path: str) -> dict[str, list]:
await asyncio.sleep(0.1)
return {"X": [[1, 2]], "y": [0]}
@node(deps=[load_data])
async def preprocess(load_data: dict[str, list]) -> dict[str, list]:
await asyncio.sleep(0.1)
return {"X_processed": load_data["X"], "y": load_data["y"]}
@node(deps=[preprocess], cache_result=True)
async def train_model(preprocess: dict[str, list]) -> dict[str, str | float]:
await asyncio.sleep(0.2)
return {"model": "trained", "accuracy": 0.95}
@node(deps=[preprocess, train_model])
async def evaluate(preprocess: dict[str, list], train_model: dict[str, str | float]) -> float:
await asyncio.sleep(0.1)
return 0.93
@node(deps=[train_model, evaluate])
async def save_if_good(train_model: dict[str, str | float], evaluate: float) -> str:
if evaluate > 0.9:
return f"โ Saved {train_model['model']}"
return "โ Model not good enough"
ml_pipeline.add_nodes(load_data, preprocess, train_model, evaluate, save_if_good)
# Visualize
ml_pipeline.visualize_all()
# Execute
async def main():
result = await ml_pipeline.execute_async("save_if_good", path="data.csv")
assert result == "โ Saved trained"
return result
# For testing purposes, we'll just create the DAG
# In real usage, you'd call: asyncio.run(main())
๐ Introspection & Debugging
Visualize DAG Structure
from dag_simple import node
# Create a simple node for demonstration
@node()
def example_node(x: int) -> int:
return x * 2
# Tree view
example_node.visualize()
# Get topological order
order = example_node.topological_sort()
print(" -> ".join(order))
assert order == ["example_node"]
# Get dependency graph
graph = example_node.graph_dict()
print(graph)
assert graph == {"example_node": []}
# Generate Mermaid diagram
mermaid = example_node.to_mermaid()
print(mermaid)
assert "example_node" in mermaid
DAG Class Methods
from dag_simple import DAG, node
# Create a simple DAG for demonstration
dag = DAG("my_pipeline")
@node()
def example_node(x: int) -> int:
return x * 2
dag.add_node(example_node)
# Execute specific target (sync)
result = dag.execute("example_node", x=10)
assert result == 20
# Execute specific target (async)
async def async_example():
result = await dag.execute_async("example_node", x=10)
assert result == 20
return result
# Execute ALL leaf nodes (sync)
results = dag.execute_all(x=10)
assert results["example_node"] == 20
# Execute ALL leaf nodes (async, concurrently!)
async def async_all_example():
results = await dag.execute_all_async(x=10)
assert results["example_node"] == 20
return results
# Visualization
dag.visualize_all()
# Get execution order
order = dag.get_execution_order()
assert order == ["example_node"]
๐ API Reference
@node Decorator
@node(
deps: list[Node] | None = None, # Dependencies
name: str | None = None, # Custom name
validate_types: bool = True, # Enable type validation
cache_result: bool = False # Enable result caching
)
Node.run() and Node.run_async()
# Synchronous execution
result = node.run(
enable_cache: bool = True, # Enable/disable caching
**inputs # Input values
)
# Asynchronous execution
result = await node.run_async(
enable_cache: bool = True, # Enable/disable caching
**inputs # Input values
)
Introspection Methods
node.visualize()- Print tree visualizationnode.topological_sort()- Get execution ordernode.graph_dict()- Get dependency dictionarynode.get_all_dependencies()- Get all transitive dependenciesnode.to_mermaid()- Generate Mermaid diagram
๐ค Contributing
We welcome contributions! Here's how to get started:
Development Setup
-
Clone the repository:
git clone https://github.com/yourusername/dag-simple.git cd dag-simple -
Install
uv(recommended):curl -LsSf https://astral.sh/uv/install.sh | sh -
Install dependencies:
uv sync --dev -
Run tests (includes documentation code blocks):
uv run pytest -
Run type checking:
uv run pyright -
Format code:
uv run ruff format . -
Lint code:
uv run ruff check .
See CONTRIBUTING.md for detailed guidelines.
Testing Documentation
This project uses pytest-codeblocks to ensure all code examples in the README.md continue to work correctly. This helps catch issues when:
- API changes break documentation examples
- Dependencies change behavior
- Code refactoring affects examples
Code blocks testing is automatically enabled - when you run uv run pytest, it will automatically test all Python code blocks found in README.md and CONTRIBUTING.md.
The configuration in pyproject.toml includes:
--codeblocksflag in pytest addopts for automatic executiontestpaths = ["tests", "**/*.md"]to discover markdown files
Note: Some code blocks may fail tests because they:
- Are missing imports (e.g.,
from dag_simple import node) - Have
awaitoutside async functions (documentation examples) - Are syntax examples rather than executable code
- Are incomplete code snippets
This is expected behavior - the important thing is that pytest-codeblocks will catch real issues when your API changes or when code examples become outdated.
๐ Requirements
- Python 3.10+
- No runtime dependencies!
๐ License
MIT License - see LICENSE file for details.
๐ Acknowledgments
Inspired by modern workflow orchestration tools but designed to be simpler and more lightweight.
๐ Related Projects
- Airflow - Production-grade workflow orchestration
- Prefect - Modern workflow orchestration
- Dask - Parallel computing library
- Luigi - Python workflow management
DAG Simple is perfect when you need DAG functionality without the overhead of these larger frameworks.
๐ฌ Support
- ๐ Issues: GitHub Issues
- ๐ก Discussions: GitHub Discussions
- ๐ Documentation: README
Made with โค๏ธ by the DAG Simple contributors
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dag_simple-0.3.0.tar.gz.
File metadata
- Download URL: dag_simple-0.3.0.tar.gz
- Upload date:
- Size: 93.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
87da4d093fdc831df4d760c7be0444c5622d227a1875b98b26b48ba74480d95a
|
|
| MD5 |
073d5ad1987dd74783ed504c1c76e469
|
|
| BLAKE2b-256 |
ffb9b17f1af0c374c6d9b5dd8ed5df9ef4fddf680b224028dc7ff1036585b978
|
File details
Details for the file dag_simple-0.3.0-py3-none-any.whl.
File metadata
- Download URL: dag_simple-0.3.0-py3-none-any.whl
- Upload date:
- Size: 20.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2dbfb9621d9f661a01a86916b77b8698a856743745ca3b80860337d440ce939f
|
|
| MD5 |
b4853345acd91a4f9ca9bd69b855199c
|
|
| BLAKE2b-256 |
250f98b78b7c1241e7ac72ce41bf0c3eaf7d96f8a8f40e9b4883ee3e1215ea9e
|