Skip to main content

A simple yet powerful task orchestration library based on graph data structure

Project description

Graph-Based Task Orchestrator

A simple yet powerful task orchestration library for Python based on graph data structures.

Features

  • Linear Graph Execution: Create simple pipelines where each node passes data to the next
  • Parallel Paths: Split execution into multiple parallel paths and merge results
  • Map-Reduce Pattern: Dynamically create parallel execution paths for iterable data
  • Sub-Graphs: Use entire graphs as nodes within larger graphs for modular design
  • Graph Visualization: Visualize your graphs in the browser with interactive diagrams
  • Graph Simplification: At each execution step, the graph is simplified by moving the START node forward
  • Execution History: Full execution history for debugging, retrying, and replaying
  • Data Validation: Type hints and Pydantic model validation for inputs and outputs

Installation

Using pip:

pip install -e .

Using uv (recommended):

uv pip install -e .

This will install the package in editable mode and make the graph-validate command available globally.

Quick Start

1. Linear Graph

from graph_orchestrator import Graph, START, END, Node

class MultiplierByTwo(Node):
    def execute(self, input_data):
        return input_data * 2

# Create graph
start = START(s=10)
end = END()
g = Graph(start=start)

# Build pipeline
multiplier = MultiplierByTwo()
g.add_edge(start, multiplier)
g.add_edge(multiplier, end)

# Run
result = g.run()  # Returns 20

2. Parallel Paths

# Create parallel paths that merge
g.add_edge(start, node1)
g.add_edge(node1, node2)  # Path 1
g.add_edge(node1, node3)  # Path 2
g.add_edge(node2, merger)
g.add_edge(node3, merger)
g.add_edge(merger, end)

3. Map-Reduce

# Map-reduce pattern
g.add_edge(start, list_generator)
g.add_map_reduce(list_generator, mapper, reducer)
g.add_edge(reducer, end)

4. Sub-Graphs (Graphs as Nodes)

# Create a reusable sub-graph
def create_processing_subgraph():
    start = START()
    processor = DataProcessor()
    validator = DataValidator()
    end = END()
    
    subgraph = Graph(start)
    subgraph.add_edge(start, processor)
    subgraph.add_edge(processor, validator)
    subgraph.add_edge(validator, end)
    
    return subgraph

# Use the sub-graph in a larger graph
processing_subgraph = create_processing_subgraph()

main_graph = Graph(START())
main_graph.add_edge(START(), data_source)
main_graph.add_edge(data_source, processing_subgraph)  # Graph used as node!
main_graph.add_edge(processing_subgraph, result_handler)
main_graph.add_edge(result_handler, END())

# Or use it in map-reduce
main_graph.add_map_reduce(
    source_node=data_generator,
    mapper_node=processing_subgraph,  # Each item processed by entire sub-graph
    reducer_node=aggregator
)

See SUBGRAPHS.md for detailed documentation on using graphs as nodes.

Graph Visualization

Visualize your graphs directly in the browser:

# Quick visualization
graph-validate visualize examples/linear_graph.py

# Save to file
graph-validate visualize examples/map_reduce_graph.py -o output.html

See VISUALIZATION.md for detailed documentation on graph visualization.

Creating Custom Nodes

Simply inherit from the Node class and implement the execute method:

from graph_orchestrator import Node

class MyCustomNode(Node):
    def execute(self, input_data):
        # Your logic here
        return processed_data

With Type Validation

Use type hints and Pydantic models for automatic validation:

from typing import List
from pydantic import BaseModel
from graph_orchestrator import Node

class InputModel(BaseModel):
    name: str
    age: int

class OutputModel(BaseModel):
    message: str
    
class ValidatedNode(Node):
    def execute(self, input_data: InputModel) -> OutputModel:
        return OutputModel(
            message=f"Hello {input_data.name}, you are {input_data.age} years old"
        )

Static Validation

The library includes a powerful static validation tool that can analyze your graph structures before runtime, checking for:

  • Type compatibility between connected nodes
  • Graph cycles
  • Unreachable nodes
  • Pydantic model field compatibility
  • Map-reduce pattern correctness

Using the Validation Tool

Method 1: Using the installed command (after uv pip install -e .):

# Basic validation
graph-validate validate examples/validation_example.py

# Enhanced validation with deep type checking
graph-validate validate --enhanced examples/validation_example.py

# Validate directory
graph-validate validate --directory examples/

# Verbose output
graph-validate validate -v examples/map_reduce_graph.py

Method 2: Using the convenience script:

# Basic validation
./scripts/validate.sh basic examples/validation_example.py

# Enhanced validation
./scripts/validate.sh enhanced examples/

# Validate all examples
./scripts/validate.sh examples

# Run validation demo
./scripts/validate.sh demo

Method 3: Direct Python scripts:

# Simple validation script
python validate.py examples/validation_example.py

# With enhanced type checking
python validate.py examples/validation_example.py --enhanced

# Using the module directly
python -m graph_orchestrator.cli validate examples/validation_example.py

Validation Features

  1. AST Analysis: Parses Python files to extract graph construction patterns
  2. Type Checking: Validates type compatibility between node connections
  3. Structural Validation: Checks for cycles, unreachable nodes, and missing connections
  4. Pydantic Support: Deep inspection of Pydantic model compatibility
  5. Map-Reduce Validation: Ensures correct types in map-reduce patterns

Example Validation Output

============================================================
File: examples/invalid_graph.py
============================================================

❌ Errors (2):
  • Type incompatibility: node_a (NodeA) -> node_b (NodeB): Type mismatch: OutputModel is not compatible with int
  • Graph 'cyclic_graph' contains a cycle

⚠️  Warnings (1):
  • Graph 'g' has unreachable nodes: orphan_node

❌ Validation failed!

Examples

See the examples/ directory for complete working examples:

  • linear_graph.py - Simple linear pipeline
  • parallel_graph.py - Parallel execution paths with merging
  • map_reduce_graph.py - Map-reduce pattern
  • validation_example.py - Data validation with Pydantic
  • debug_example.py - Execution history and debugging
  • subgraph_example.py - Using graphs as nodes for modular design
  • visualization_demo.py - Demonstration of visualization features

Run all examples:

python test_all_features.py

Run validation examples:

python validate_graphs.py

Quick Validation Commands

Using Make (recommended for development):

make validate-examples        # Validate all examples
make validate FILE=myfile.py  # Validate specific file
make validate-demo           # Run validation demo

Using the shell script:

./scripts/validate.sh examples  # Validate all examples
./scripts/validate.sh help      # Show all available commands

Using the installed command:

graph-validate validate --help  # Show validation options

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

graph_based_task_orchestrator-0.2.2.tar.gz (22.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

graph_based_task_orchestrator-0.2.2-py3-none-any.whl (23.4 kB view details)

Uploaded Python 3

File details

Details for the file graph_based_task_orchestrator-0.2.2.tar.gz.

File metadata

File hashes

Hashes for graph_based_task_orchestrator-0.2.2.tar.gz
Algorithm Hash digest
SHA256 bbd5bb66d41f2f8d9c214f35172a7d739f9662586c56bbf446fbc970d354ae56
MD5 1124bafe7ede6155839b89b61d1ff93c
BLAKE2b-256 a332c0464b9210c9209479822ec00ef4e7a1d7cfd51db805d29d7e4dbc048374

See more details on using hashes here.

File details

Details for the file graph_based_task_orchestrator-0.2.2-py3-none-any.whl.

File metadata

File hashes

Hashes for graph_based_task_orchestrator-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 fd7a6e52430ff9185c78dee820f3cc863413c0419e6c79dc5304889233d928c6
MD5 f7bbb1ea6acf621ddbbb51f2ab4dd64f
BLAKE2b-256 3cb75ca6d1f010265989b4600446c8992ca9e30e4d3221420372580178c97c7b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page