Skip to main content

A lightweight, high-performance Python library for building and executing DAG (Directed Acyclic Graph) and FSM (Finite State Machine) workflows

Project description

fast-dag

PyPI version Python versions License CI

fast-dag is a lightweight, high-performance Python library for building and executing Directed Acyclic Graph (DAG) and Finite State Machine (FSM) workflows. It's designed with simplicity and speed in mind, making it perfect for data pipelines, task orchestration, and complex workflow automation.

✨ Key Features

  • 🚀 Fast & Lightweight - Minimal dependencies, optimized for performance
  • 🔄 DAG & FSM Support - Build both directed acyclic graphs and state machines
  • 🎯 Type-Safe - Full type hints and mypy support
  • 🔌 Extensible - Easy to add custom nodes and behaviors
  • 📊 Visualization - Built-in Mermaid and Graphviz support
  • Multiple Execution Modes - Sequential, parallel, and async execution
  • 🛡️ Robust Error Handling - Comprehensive error strategies and retry logic
  • 🔍 Runtime Validation - Automatic cycle detection and dependency validation

📦 Installation

Basic Installation

pip install fast-dag

With Visualization Support

pip install fast-dag[viz]

With All Features

pip install fast-dag[all]

Development Installation

git clone https://github.com/felixnext/fast-dag.git
cd fast-dag
pip install -e ".[dev]"

🚀 Quick Start

Simple DAG Example

from fast_dag import DAG

# Create a DAG
dag = DAG("data_pipeline")

# Define nodes using decorators
@dag.node
def fetch_data() -> dict:
    return {"data": [1, 2, 3, 4, 5]}

@dag.node
def process_data(data: dict) -> list:
    return [x * 2 for x in data["data"]]

@dag.node
def save_results(processed: list) -> str:
    return f"Saved {len(processed)} items"

# Connect nodes
dag.connect("fetch_data", "process_data", input="data")
dag.connect("process_data", "save_results", input="processed")

# Execute the DAG
result = dag.run()
print(result)  # "Saved 5 items"

FSM Example

from fast_dag import FSM
from fast_dag.core.types import FSMReturn

# Create a finite state machine
fsm = FSM("traffic_light")

@fsm.state(initial=True)
def red() -> FSMReturn:
    print("Red light - STOP")
    return FSMReturn(next_state="green")

@fsm.state
def green() -> FSMReturn:
    print("Green light - GO")
    return FSMReturn(next_state="yellow")

@fsm.state
def yellow() -> FSMReturn:
    print("Yellow light - SLOW DOWN")
    return FSMReturn(next_state="red")

# Run for 5 cycles
fsm.max_cycles = 5
fsm.run()

🎯 Core Concepts

Nodes

Nodes are the basic units of work in fast-dag:

@dag.node
def my_task(x: int) -> int:
    return x * 2

# Or create nodes manually
from fast_dag.core.node import Node

node = Node(
    func=my_function,
    name="my_node",
    description="Process data"
)

Connections

Connect nodes to define data flow:

# Simple connection
dag.connect("source", "target")

# Specify input/output names
dag.connect("source", "target", output="result", input="data")

# Use operator syntax
dag.nodes["a"] >> dag.nodes["b"] >> dag.nodes["c"]

Conditional Flows

Build dynamic workflows with conditions:

@dag.condition
def check_threshold(value: int) -> bool:
    return value > 100

@dag.node
def process_high(value: int) -> str:
    return f"High value: {value}"

@dag.node
def process_low(value: int) -> str:
    return f"Low value: {value}"

# Connect conditional branches
dag.nodes["check_threshold"].on_true >> dag.nodes["process_high"]
dag.nodes["check_threshold"].on_false >> dag.nodes["process_low"]

Advanced Node Types

# Multi-input convergence
@dag.any()  # Waits for ANY input to be ready
def merge_first(data: dict) -> str:
    return f"Received: {data}"

@dag.all()  # Waits for ALL inputs to be ready
def merge_all(data: dict) -> str:
    return f"All data: {data}"

# Multi-way branching
@dag.select
def router(request: dict) -> SelectReturn:
    category = request.get("category", "default")
    return SelectReturn(branch=category, value=request)

🔧 Advanced Features

Execution Modes

# Sequential execution (default)
result = dag.run()

# Parallel execution
result = dag.run(mode="parallel")

# Async execution
result = await dag.run_async()

# Custom runner configuration
from fast_dag.runner import DAGRunner

runner = DAGRunner(dag)
runner.configure(
    max_workers=4,
    timeout=300,
    error_strategy="continue"
)
result = runner.run()

Error Handling

# Retry with exponential backoff
@dag.node(retry=3, retry_delay=1.0)
def flaky_operation() -> dict:
    # Will retry up to 3 times with exponential backoff
    return fetch_external_data()

# Error strategies
result = dag.run(error_strategy="continue")  # Continue on error
result = dag.run(error_strategy="stop")      # Stop on first error (default)

Node Lifecycle Hooks

def before_node(node, inputs):
    print(f"Starting {node.name}")

def after_node(node, inputs, result):
    print(f"Finished {node.name}: {result}")
    return result  # Can modify result

def on_error(node, inputs, error):
    print(f"Error in {node.name}: {error}")

dag.set_node_hooks(
    "my_node",
    pre_execute=before_node,
    post_execute=after_node,
    on_error=on_error
)

Visualization

# Generate Mermaid diagram
mermaid_code = dag.visualize(backend="mermaid")

# Generate Graphviz DOT
dot_code = dag.visualize(backend="graphviz")

# Visualize with execution results
dag.run()
from fast_dag.visualization import VisualizationOptions

options = VisualizationOptions(
    show_results=True,
    direction="LR",
    success_color="#90EE90",
    error_color="#FFB6C1"
)
dag.visualize(options=options, filename="pipeline", format="png")

Context and Metrics

# Access context during execution
@dag.node
def process_with_context(data: dict, context: Context) -> dict:
    # Access previous results
    previous = context.get("previous_node")
    
    # Store metadata
    context.metadata["process_time"] = time.time()
    
    return {"combined": [data, previous]}

# After execution, access metrics
dag.run()
print(dag.context.metrics["execution_order"])
print(dag.context.metrics["node_times"])
print(dag.context.metadata)

📋 More Examples

Data Processing Pipeline

dag = DAG("etl_pipeline")

@dag.node
def extract_data() -> pd.DataFrame:
    return pd.read_csv("data.csv")

@dag.node
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    return df.dropna().reset_index(drop=True)

@dag.node
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
    assert len(df) > 0, "No data after transformation"
    return df

@dag.node
def load_data(df: pd.DataFrame) -> str:
    df.to_parquet("output.parquet")
    return f"Loaded {len(df)} rows"

# Connect the pipeline
(
    dag.nodes["extract_data"] 
    >> dag.nodes["transform_data"] 
    >> dag.nodes["validate_data"] 
    >> dag.nodes["load_data"]
)

result = dag.run()

State Machine with Conditions

fsm = FSM("order_processor")

@fsm.state(initial=True)
def pending(order_id: str) -> FSMReturn:
    # Process payment
    if payment_successful():
        return FSMReturn(next_state="confirmed", value={"order_id": order_id})
    else:
        return FSMReturn(next_state="failed", value={"reason": "payment_failed"})

@fsm.state
def confirmed(data: dict) -> FSMReturn:
    # Ship order
    tracking = ship_order(data["order_id"])
    return FSMReturn(next_state="shipped", value={"tracking": tracking})

@fsm.state(terminal=True)
def shipped(data: dict) -> FSMReturn:
    notify_customer(data["tracking"])
    return FSMReturn(stop=True, value="Order delivered")

@fsm.state(terminal=True)
def failed(data: dict) -> FSMReturn:
    log_failure(data["reason"])
    return FSMReturn(stop=True, value="Order failed")

result = fsm.run(order_id="12345")

🛠️ Development

Running Tests

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=fast-dag

# Run specific test file
uv run pytest tests/unit/test_dag.py

Code Quality

# Format code
uv run ruff format .

# Lint code
uv run ruff check . --fix

# Type checking
uv run mypy fast-dag

Pre-commit Hooks

# Install pre-commit hooks
uv run pre-commit install

# Run manually
uv run pre-commit run --all-files

Building and Publishing

# Build the package
uv run python -m build

# Check the distribution
uv run twine check dist/*

# Upload to TestPyPI (for testing)
uv run twine upload --repository testpypi dist/*

# Upload to PyPI
uv run twine upload dist/*

📚 Documentation

For more detailed documentation, examples, and API reference, visit our documentation site.

🤝 Contributing

We welcome contributions! Please see our Contributing Guide for details.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments

  • Built with UV for fast, reliable Python package management
  • Inspired by Airflow, Prefect, and Dagster
  • Special thanks to all contributors and users

📧 Contact

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fast_dag-0.2.0.tar.gz (289.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fast_dag-0.2.0-py3-none-any.whl (77.2 kB view details)

Uploaded Python 3

File details

Details for the file fast_dag-0.2.0.tar.gz.

File metadata

  • Download URL: fast_dag-0.2.0.tar.gz
  • Upload date:
  • Size: 289.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fast_dag-0.2.0.tar.gz
Algorithm Hash digest
SHA256 d8df175eabafa06d59495898c203adb230f31dcd4af1f91e3915257be33d72a7
MD5 8670722ea6aab6accb7764bc0b5300de
BLAKE2b-256 178dfb6857fef661b0daa361e72cfcf89fb6c38af54f07c0389eef796e2584a5

See more details on using hashes here.

File details

Details for the file fast_dag-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: fast_dag-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 77.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for fast_dag-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3b8ccd8011ae54a08d15e92efab5d3edbcffe4bd7bb43d9ae11c7844cb4d326d
MD5 2c630c669e5f5f56424044c080c5cfc1
BLAKE2b-256 44381ff82c5bb3b89c24313dfedeaa65e05a3876e9761231536f68a1750dd1a8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page