A lightweight, high-performance Python library for building and executing DAG (Directed Acyclic Graph) and FSM (Finite State Machine) workflows
Project description
fast-dag
fast-dag is a lightweight, high-performance Python library for building and executing Directed Acyclic Graph (DAG) and Finite State Machine (FSM) workflows. It's designed with simplicity and speed in mind, making it perfect for data pipelines, task orchestration, and complex workflow automation.
✨ Key Features
- 🚀 Fast & Lightweight - Minimal dependencies, optimized for performance
- 🔄 DAG & FSM Support - Build both directed acyclic graphs and state machines
- 🎯 Type-Safe - Full type hints and mypy support
- 🔌 Extensible - Easy to add custom nodes and behaviors
- 📊 Visualization - Built-in Mermaid and Graphviz support
- ⚡ Multiple Execution Modes - Sequential, parallel, and async execution
- 🛡️ Robust Error Handling - Comprehensive error strategies and retry logic
- 🔍 Runtime Validation - Automatic cycle detection and dependency validation
📦 Installation
Basic Installation
pip install fast-dag
With Visualization Support
pip install fast-dag[viz]
With All Features
pip install fast-dag[all]
Development Installation
git clone https://github.com/felixnext/fast-dag.git
cd fast-dag
pip install -e ".[dev]"
🚀 Quick Start
Simple DAG Example
from fast_dag import DAG
# Create a DAG
dag = DAG("data_pipeline")
# Define nodes using decorators
@dag.node
def fetch_data() -> dict:
return {"data": [1, 2, 3, 4, 5]}
@dag.node
def process_data(data: dict) -> list:
return [x * 2 for x in data["data"]]
@dag.node
def save_results(processed: list) -> str:
return f"Saved {len(processed)} items"
# Connect nodes
dag.connect("fetch_data", "process_data", input="data")
dag.connect("process_data", "save_results", input="processed")
# Execute the DAG
result = dag.run()
print(result) # "Saved 5 items"
FSM Example
from fast_dag import FSM
from fast_dag.core.types import FSMReturn
# Create a finite state machine
fsm = FSM("traffic_light")
@fsm.state(initial=True)
def red() -> FSMReturn:
print("Red light - STOP")
return FSMReturn(next_state="green")
@fsm.state
def green() -> FSMReturn:
print("Green light - GO")
return FSMReturn(next_state="yellow")
@fsm.state
def yellow() -> FSMReturn:
print("Yellow light - SLOW DOWN")
return FSMReturn(next_state="red")
# Run for 5 cycles
fsm.max_cycles = 5
fsm.run()
🎯 Core Concepts
Nodes
Nodes are the basic units of work in fast-dag:
@dag.node
def my_task(x: int) -> int:
return x * 2
# Or create nodes manually
from fast_dag.core.node import Node
node = Node(
func=my_function,
name="my_node",
description="Process data"
)
Connections
Connect nodes to define data flow:
# Simple connection
dag.connect("source", "target")
# Specify input/output names
dag.connect("source", "target", output="result", input="data")
# Use operator syntax
dag.nodes["a"] >> dag.nodes["b"] >> dag.nodes["c"]
Conditional Flows
Build dynamic workflows with conditions:
@dag.condition
def check_threshold(value: int) -> bool:
return value > 100
@dag.node
def process_high(value: int) -> str:
return f"High value: {value}"
@dag.node
def process_low(value: int) -> str:
return f"Low value: {value}"
# Connect conditional branches
dag.nodes["check_threshold"].on_true >> dag.nodes["process_high"]
dag.nodes["check_threshold"].on_false >> dag.nodes["process_low"]
Advanced Node Types
# Multi-input convergence
@dag.any() # Waits for ANY input to be ready
def merge_first(data: dict) -> str:
return f"Received: {data}"
@dag.all() # Waits for ALL inputs to be ready
def merge_all(data: dict) -> str:
return f"All data: {data}"
# Multi-way branching
@dag.select
def router(request: dict) -> SelectReturn:
category = request.get("category", "default")
return SelectReturn(branch=category, value=request)
🔧 Advanced Features
Execution Modes
# Sequential execution (default)
result = dag.run()
# Parallel execution
result = dag.run(mode="parallel")
# Async execution
result = await dag.run_async()
# Custom runner configuration
from fast_dag.runner import DAGRunner
runner = DAGRunner(dag)
runner.configure(
max_workers=4,
timeout=300,
error_strategy="continue"
)
result = runner.run()
Error Handling
# Retry with exponential backoff
@dag.node(retry=3, retry_delay=1.0)
def flaky_operation() -> dict:
# Will retry up to 3 times with exponential backoff
return fetch_external_data()
# Error strategies
result = dag.run(error_strategy="continue") # Continue on error
result = dag.run(error_strategy="stop") # Stop on first error (default)
Node Lifecycle Hooks
def before_node(node, inputs):
print(f"Starting {node.name}")
def after_node(node, inputs, result):
print(f"Finished {node.name}: {result}")
return result # Can modify result
def on_error(node, inputs, error):
print(f"Error in {node.name}: {error}")
dag.set_node_hooks(
"my_node",
pre_execute=before_node,
post_execute=after_node,
on_error=on_error
)
Visualization
# Generate Mermaid diagram
mermaid_code = dag.visualize(backend="mermaid")
# Generate Graphviz DOT
dot_code = dag.visualize(backend="graphviz")
# Visualize with execution results
dag.run()
from fast_dag.visualization import VisualizationOptions
options = VisualizationOptions(
show_results=True,
direction="LR",
success_color="#90EE90",
error_color="#FFB6C1"
)
dag.visualize(options=options, filename="pipeline", format="png")
Context and Metrics
# Access context during execution
@dag.node
def process_with_context(data: dict, context: Context) -> dict:
# Access previous results
previous = context.get("previous_node")
# Store metadata
context.metadata["process_time"] = time.time()
return {"combined": [data, previous]}
# After execution, access metrics
dag.run()
print(dag.context.metrics["execution_order"])
print(dag.context.metrics["node_times"])
print(dag.context.metadata)
📋 More Examples
Data Processing Pipeline
dag = DAG("etl_pipeline")
@dag.node
def extract_data() -> pd.DataFrame:
return pd.read_csv("data.csv")
@dag.node
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
return df.dropna().reset_index(drop=True)
@dag.node
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
assert len(df) > 0, "No data after transformation"
return df
@dag.node
def load_data(df: pd.DataFrame) -> str:
df.to_parquet("output.parquet")
return f"Loaded {len(df)} rows"
# Connect the pipeline
(
dag.nodes["extract_data"]
>> dag.nodes["transform_data"]
>> dag.nodes["validate_data"]
>> dag.nodes["load_data"]
)
result = dag.run()
State Machine with Conditions
fsm = FSM("order_processor")
@fsm.state(initial=True)
def pending(order_id: str) -> FSMReturn:
# Process payment
if payment_successful():
return FSMReturn(next_state="confirmed", value={"order_id": order_id})
else:
return FSMReturn(next_state="failed", value={"reason": "payment_failed"})
@fsm.state
def confirmed(data: dict) -> FSMReturn:
# Ship order
tracking = ship_order(data["order_id"])
return FSMReturn(next_state="shipped", value={"tracking": tracking})
@fsm.state(terminal=True)
def shipped(data: dict) -> FSMReturn:
notify_customer(data["tracking"])
return FSMReturn(stop=True, value="Order delivered")
@fsm.state(terminal=True)
def failed(data: dict) -> FSMReturn:
log_failure(data["reason"])
return FSMReturn(stop=True, value="Order failed")
result = fsm.run(order_id="12345")
🛠️ Development
Running Tests
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov=fast-dag
# Run specific test file
uv run pytest tests/unit/test_dag.py
Code Quality
# Format code
uv run ruff format .
# Lint code
uv run ruff check . --fix
# Type checking
uv run mypy fast-dag
Pre-commit Hooks
# Install pre-commit hooks
uv run pre-commit install
# Run manually
uv run pre-commit run --all-files
Building and Publishing
# Build the package
uv run python -m build
# Check the distribution
uv run twine check dist/*
# Upload to TestPyPI (for testing)
uv run twine upload --repository testpypi dist/*
# Upload to PyPI
uv run twine upload dist/*
📚 Documentation
For more detailed documentation, examples, and API reference, visit our documentation site.
🤝 Contributing
We welcome contributions! Please see our Contributing Guide for details.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
🙏 Acknowledgments
- Built with UV for fast, reliable Python package management
- Inspired by Airflow, Prefect, and Dagster
- Special thanks to all contributors and users
📧 Contact
- Author: Felix Geilert
- GitHub: @felixnext
- PyPI: fast-dag
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fast_dag-0.2.0.tar.gz.
File metadata
- Download URL: fast_dag-0.2.0.tar.gz
- Upload date:
- Size: 289.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d8df175eabafa06d59495898c203adb230f31dcd4af1f91e3915257be33d72a7
|
|
| MD5 |
8670722ea6aab6accb7764bc0b5300de
|
|
| BLAKE2b-256 |
178dfb6857fef661b0daa361e72cfcf89fb6c38af54f07c0389eef796e2584a5
|
File details
Details for the file fast_dag-0.2.0-py3-none-any.whl.
File metadata
- Download URL: fast_dag-0.2.0-py3-none-any.whl
- Upload date:
- Size: 77.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3b8ccd8011ae54a08d15e92efab5d3edbcffe4bd7bb43d9ae11c7844cb4d326d
|
|
| MD5 |
2c630c669e5f5f56424044c080c5cfc1
|
|
| BLAKE2b-256 |
44381ff82c5bb3b89c24313dfedeaa65e05a3876e9761231536f68a1750dd1a8
|