Skip to main content

A Python workflow orchestrator with embedded SQLite database, similar to Durable Functions

Project description

py-orchestrate

PyPI version Python License: MIT

A Python workflow orchestrator with durable recovery and pluggable persistence backends.

Features

  • Workflow Orchestration: Define workflows that orchestrate multiple activities
  • Persistent State: Persist workflow and activity state in SQLite or custom backends
  • Fault Tolerance: Workflows can be resumed after application restart
  • Activity Tracking: Track current activity execution and progress
  • Background Execution: Workflows run asynchronously in background threads
  • Simple API: Easy-to-use decorators for defining workflows and activities
  • Extensible Storage: Built-in SQLite manager and optional Azure Cosmos DB manager

Installation

From PyPI (Recommended)

# Install using pip
pip install py-orchestrate

# Optional Cosmos DB support
pip install py-orchestrate[cosmos]

# Or using uv
uv add py-orchestrate

From Source

# Clone the repository
git clone https://github.com/your-username/py-orchestrate.git
cd py-orchestrate

# Install using uv
uv pip install -e .

# Or using pip
pip install -e .

Quick Start

1. Define Activities

Activities are individual units of work that can be orchestrated by workflows:

from py_orchestrate import activity

@activity("fetch_data")
def fetch_data(source: str) -> dict:
    # Your activity logic here
    return {"data": f"data_from_{source}", "count": 100}

@activity("process_data")  
def process_data(data: dict) -> dict:
    processed_count = data.get("count", 0) * 2
    return {"processed_data": data["data"] + "_processed", "processed_count": processed_count}

2. Define Workflows

Workflows orchestrate multiple activities:

from py_orchestrate import workflow

@workflow("data_processing_workflow")
def data_processing_workflow(source: str, destination: str) -> dict:
    # Step 1: Fetch data
    raw_data = fetch_data(source)
    
    # Step 2: Process data  
    processed_data = process_data(raw_data)
    
    # Step 3: Return result
    return {
        "workflow_completed": True,
        "total_processed": processed_data["processed_count"]
    }

3. Run the Orchestrator

from py_orchestrate import Orchestrator

# Create and start orchestrator
orchestrator = Orchestrator()
orchestrator.start()

try:
    # Invoke a workflow
    workflow_id = orchestrator.invoke_workflow(
        "data_processing_workflow",
        source="database", 
        destination="warehouse"
    )
    
    # Monitor progress
    status = orchestrator.get_workflow_status(workflow_id)
    print(f"Status: {status['status']}")
    print(f"Current activity: {status['current_activity']}")
    
finally:
    orchestrator.stop()

4. Use a Different Database Manager

from azure.identity import DefaultAzureCredential
from py_orchestrate import CosmosDatabaseManager, Orchestrator

db_manager = CosmosDatabaseManager(
    endpoint="https://<account>.documents.azure.com:443/",
    credential=DefaultAzureCredential(),
    database_id="py-orchestrate",
    workflow_container_id="workflows",
    activity_container_id="activity-executions",
)

orchestrator = Orchestrator(db_manager=db_manager)
orchestrator.start()

Core Concepts

Workflows

  • State orchestrators that coordinate multiple activities
  • Can have status: "processing", "done", or "failed"
  • Run asynchronously in the background
  • State is persisted and can resume after application restart

Activities

  • Individual units of work executed by workflows
  • Must use basic types (dict, list, str, int, etc.) for parameters and return values
  • Cannot pass objects between activities
  • Execution is tracked and persisted

Orchestrator Engine

  • Manages workflow and activity execution
  • Persists state through a database manager interface
  • Provides APIs to invoke workflows and query status
  • Handles fault tolerance and recovery

Database Managers

  • SQLiteDatabaseManager: default file-based persistence
  • CosmosDatabaseManager: Azure Cosmos DB backend with automatic database/container creation
  • BaseDatabaseManager: interface for custom persistence implementations

Activity Signature Changes & Best Practices

⚠️ Important: Changing activity function signatures can cause workflow failures. This section explains the behavior and provides best practices.

How Recovery Works

py-orchestrate uses activity-level recovery with intelligent caching:

  1. Workflow function restarts: The workflow function code begins executing from the top
  2. Completed activities use cache: Activities that finished successfully return cached results instantly
  3. Only incomplete work re-executes: Activities that were interrupted or failed run again
  4. No completed work is lost: All successful activity results are preserved in the database

Example Recovery Flow:

@workflow("data_processing")
def data_processing(source: str) -> dict:
    data = fetch_data(source)          # ← If completed: returns cached result instantly
    processed = process_data(data)     # ← If interrupted here: re-executes normally  
    result = save_results(processed)   # ← Continues from interruption point
    return result

This provides efficient recovery - the workflow "replays" its execution path but skips completed work.

Signature Change Behavior

Safe Changes (No Issues)

# ✅ Adding optional parameters (backward compatible)
@activity("process_data")
def process_data(data: str, mode: str = "default") -> dict:
    return {"processed": data, "mode": mode}

# ✅ Using different activity names (versioning)
@activity("process_data_v2")  
def process_data_v2(data: str, enhanced: bool = True) -> dict:
    return {"processed": data, "enhanced": enhanced}

Unsafe Changes (Will Cause Errors)

# ❌ Removing parameters
@activity("process_data")
def process_data(data: str) -> dict:  # Removed 'mode' parameter
    return {"processed": data}

# ❌ Changing parameter types
@activity("process_data") 
def process_data(data: dict) -> dict:  # Changed str -> dict
    return {"processed": data}

# ❌ Adding required parameters
@activity("process_data")
def process_data(data: str, required_param: str) -> dict:  # No default value
    return {"processed": data, "required": required_param}

Error Examples

When signature mismatches occur, you'll see clear error messages:

# Error: "too many positional arguments"
# Caused by: Workflow calls activity(param1, param2) but activity only accepts activity(param1)

# Error: "missing required positional argument"  
# Caused by: Workflow calls activity(param1) but activity requires activity(param1, param2)

# Error: "argument of type 'dict' is not iterable"
# Caused by: Type mismatch between expected and actual parameter types

Best Practices

1. Version Your Activities

# Good: Use versioned activity names
@activity("fetch_data_v1")
def fetch_data_v1(source: str) -> dict:
    return {"data": f"v1_{source}"}

@activity("fetch_data_v2") 
def fetch_data_v2(source: str, timeout: int = 30) -> dict:
    return {"data": f"v2_{source}", "timeout": timeout}

# Update workflows to use new versions
@workflow("data_processing_v2")
def data_processing_v2(source: str) -> dict:
    data = fetch_data_v2(source, timeout=60)  # Use new version
    return {"result": data}

2. Only Add Optional Parameters

# Good: Backward compatible changes
@activity("process_item")
def process_item(item: dict, 
                validation: bool = True,    # Added optional
                timeout: int = 30) -> dict: # Added optional
    result = {"processed": item}
    if validation:
        result["validated"] = True
    return result

3. Use Defensive Programming

# Good: Handle different signatures gracefully
@activity("flexible_processor")
def flexible_processor(*args, **kwargs) -> dict:
    # Handle both old and new calling patterns
    if len(args) == 1:
        # Old signature: flexible_processor(data)
        data = args[0]
        options = {}
    elif len(args) == 2:
        # New signature: flexible_processor(data, options)
        data, options = args
    else:
        # Keyword arguments
        data = kwargs.get('data')
        options = kwargs.get('options', {})
    
    return {"processed": data, "options": options}

4. Test Signature Changes

# Always test workflows after activity changes
def test_signature_compatibility():
    orchestrator = Orchestrator("test.db")
    orchestrator.start()
    
    try:
        # Test new signature
        workflow_id = orchestrator.invoke_workflow("my_workflow", input="test")
        status = orchestrator.get_workflow_status(workflow_id)
        
        if status['status'] == 'failed':
            print(f"Signature error: {status['error_message']}")
            
    finally:
        orchestrator.stop()

Activity Implementation Changes During Recovery

Important: py-orchestrate uses activity-level caching during recovery, which has important implications for activity changes:

Safe Scenarios (No Issues)

  1. Activity logic changes after completion:

    # Original implementation (already completed and cached)
    @activity("process_data")
    def process_data(data: str) -> dict:
        return {"result": f"old_logic_{data}"}
    
    # New implementation (won't affect cached result)
    @activity("process_data")  
    def process_data(data: str) -> dict:
        return {"result": f"new_logic_{data}"}
    

    Result: Workflow uses cached result from original implementation

  2. Missing activities after completion:

    # If activity was deleted but already completed
    # Result: Workflow uses cached result, doesn't try to re-execute
    

⚠️ Risky Scenarios (Will Cause Failures)

  1. Missing activities for incomplete work:

    # If workflow was interrupted during this activity
    # and then activity is deleted/renamed
    # Result: "Activity 'process_data' not found" error
    
  2. Signature changes for incomplete work:

    # If workflow was interrupted during this activity
    # and then signature changes incompatibly  
    # Result: Signature mismatch errors (as documented above)
    

🔧 Best Practices for Activity Changes

  1. Wait for workflows to complete before making breaking changes
  2. Use activity versioning for major changes:
    @activity("process_data_v1")  # Keep old version
    @activity("process_data_v2")  # Add new version
    
  3. Check for running workflows before deployments:
    # Check for processing workflows
    processing_workflows = orchestrator.list_workflows()
    active = [wf for wf in processing_workflows if wf['status'] == 'processing']
    if active:
        print(f"Warning: {len(active)} workflows still processing")
    

Recovery from Signature Errors

If you encounter signature mismatch errors:

  1. Check the error message - it tells you exactly what's wrong
  2. Fix the activity signature - make it backward compatible
  3. Or create a new versioned activity - safer approach
  4. Test the fix - ensure existing workflows can complete

Migration Strategy

When you need to change activity signatures:

# Step 1: Create new versioned activity
@activity("process_data_v2")
def process_data_v2(data: str, new_param: str = "default") -> dict:
    return {"processed": data, "new_param": new_param}

# Step 2: Keep old activity for compatibility (optional)
@activity("process_data_v1") 
def process_data_v1(data: str) -> dict:
    # Delegate to new version with defaults
    return process_data_v2(data, "legacy_default")

# Step 3: Update new workflows to use v2
@workflow("new_workflow")
def new_workflow(input: str) -> dict:
    result = process_data_v2(input, "enhanced")  # Use new version
    return {"result": result}

# Step 4: Migrate existing workflows gradually

API Reference

Decorators

@workflow(name=None)

Marks a function as a workflow.

Parameters:

  • name (str, optional): Name for the workflow. Defaults to function name.

@activity(name=None)

Marks a function as an activity.

Parameters:

  • name (str, optional): Name for the activity. Defaults to function name.

Orchestrator Class

Orchestrator(db_path="py_orchestrate.db", max_workers=5, db_manager=None)

Creates a new orchestrator instance.

Parameters:

  • db_path (str): Path to SQLite database file used when db_manager is not provided
  • max_workers (int): Maximum number of concurrent workflow threads
  • db_manager (BaseDatabaseManager, optional): Inject a custom database manager implementation

Methods

start()

Starts the orchestrator engine.

stop()

Stops the orchestrator engine and waits for workflows to complete.

invoke_workflow(name: str, **kwargs) -> str

Invokes a workflow by name with input parameters.

Returns: Workflow ID for tracking execution

get_workflow_status(workflow_id: str) -> dict

Gets the current status of a workflow.

Returns: Dictionary with workflow status information:

{
    "id": "workflow-id",
    "name": "workflow-name", 
    "status": "processing|done|failed",
    "current_activity": "activity-name or None",
    "error_message": "error message or None",
    "output": "workflow output or None",
    "created_at": "ISO timestamp",
    "updated_at": "ISO timestamp" 
}
list_workflows(name: str = None) -> List[dict]

Lists workflows, optionally filtered by name.

Database Schema

The built-in persistence layer stores two logical collections of data:

  • workflows: Workflow instances and their current state
  • activity_executions: Individual activity runs for each workflow

SQLite stores these as tables. Cosmos DB stores them in two containers, which should use separate container IDs for better partitioning:

  • workflow_container_id
  • activity_container_id

Examples

See main.py for a complete working example.

Troubleshooting

Common Issues

Workflow Stuck in "processing" Status

# Check current activity and error message
status = orchestrator.get_workflow_status(workflow_id)
print(f"Status: {status['status']}")
print(f"Current Activity: {status.get('current_activity')}")
print(f"Error: {status.get('error_message')}")

Possible causes:

  • Activity is taking longer than expected
  • Activity crashed without proper error handling
  • Database connection issues

"too many positional arguments" Error

Cause: Activity signature was reduced (fewer parameters) Solution: Add back removed parameters or create new versioned activity

"missing required positional argument" Error

Cause: Activity signature was expanded (more required parameters) Solution: Make new parameters optional with default values

Workflows Not Resuming After Restart

Possible causes:

  • Activity functions not registered (missing imports)
  • Activity names changed
  • Database file moved or deleted

Solution:

# Ensure all activities are imported and registered
from my_activities import fetch_data, process_data  # Import activities
from py_orchestrate import Orchestrator

orchestrator = Orchestrator()  
orchestrator.start()  # Recovery happens automatically

Database Corruption

Prevention:

  • Don't modify the SQLite database manually
  • Use proper shutdown: orchestrator.stop()
  • Backup database files in production

Recovery:

# Check database integrity
import sqlite3
conn = sqlite3.connect("my_workflows.db")
conn.execute("PRAGMA integrity_check;")

Debugging Tips

Enable Detailed Logging

import logging
logging.basicConfig(level=logging.DEBUG)

# Now you'll see detailed orchestrator logs
orchestrator = Orchestrator()

Inspect Database State

# Check workflow status directly
workflows = orchestrator.list_workflows("my_workflow_name")
for wf in workflows:
    print(f"ID: {wf['id']}, Status: {wf['status']}")

Test Activities in Isolation

# Test activity functions directly before using in workflows
@activity("test_activity")
def test_activity(param: str) -> dict:
    return {"result": param}

# Test directly
result = test_activity("test_input")
print(result)  # Should work before putting in workflow

Requirements

  • Python 3.12+
  • SQLite (included with Python)
  • azure-cosmos only when using CosmosDatabaseManager

Development

Setting up for Development

# Clone the repository
git clone https://github.com/your-username/py-orchestrate.git
cd py-orchestrate

# Install development dependencies
uv sync --dev

# Run the example
uv run python py_orchestrate/example.py

# Run automated tests
uv run pytest tests

# Run type checking
uv run mypy py_orchestrate --ignore-missing-imports

# Format code
uv run black py_orchestrate

# Build package
uv run python -m build

Release Process

This project uses GitHub Actions for automated building and publishing to PyPI.

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py_orchestrate-1.0.2.tar.gz (69.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py_orchestrate-1.0.2-py3-none-any.whl (19.0 kB view details)

Uploaded Python 3

File details

Details for the file py_orchestrate-1.0.2.tar.gz.

File metadata

  • Download URL: py_orchestrate-1.0.2.tar.gz
  • Upload date:
  • Size: 69.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for py_orchestrate-1.0.2.tar.gz
Algorithm Hash digest
SHA256 3725824b45bf87440d674e8a53210ff6033161c70bca400fa0a7dc81457be089
MD5 0dfbd06fbac1609efb56ccbb863839c1
BLAKE2b-256 6c8bffa1f7179348d9466f340c69fcffdfc77cd11dd5ba12dd209a9c8c4bcb56

See more details on using hashes here.

File details

Details for the file py_orchestrate-1.0.2-py3-none-any.whl.

File metadata

  • Download URL: py_orchestrate-1.0.2-py3-none-any.whl
  • Upload date:
  • Size: 19.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.13

File hashes

Hashes for py_orchestrate-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 724e62a994169957fb2f45dc78cc1a73e9731faeee4578408cbb756cabb80104
MD5 1e84490fb830f568792222fd8edb5a86
BLAKE2b-256 e6c0cd0df1f6699eb029c7a4939873c379b97d06f655714c77219155ae2de950

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page