Skip to main content

A powerful, lightweight and flexible workflow management framework for Python

Project description

Zuma - Workflow Manager and Executor

Zuma is a powerful, asynchronous workflow management and execution system built in Python. It provides a flexible framework for defining, managing, and executing complex workflows with parallel processing capabilities.

Objective

The main objective of Zuma is to provide a robust framework for:

  • Defining and executing complex workflows with multiple steps
  • Managing parallel task execution with concurrency control
  • Handling dependencies between workflow components
  • Providing detailed execution tracking and error handling
  • Supporting conditional workflow paths

Features

1. Core Components

  • ZumaWorkflow: Main container for organizing workflow steps
    • Supports sequential and parallel execution
    • Handles step dependencies and execution order
    • Configurable failure behavior with continue_on_failure
  • ZumaActionStep: Basic unit of work execution
    • Configurable retries and timeouts
    • Custom execution logic through overridable methods
    • Automatic context and dependency injection
  • ZumaParallelAction: Handles concurrent execution of multiple steps
    • Configurable concurrency limits with max_concurrency
    • Fail-fast option for error handling
    • Automatic resource management
  • ZumaConditionalStep: Supports branching logic in workflows
    • Dynamic path selection based on context
    • Optional else-branch handling
    • Context-aware condition evaluation
  • ZumaRunner: Orchestrates workflow execution
    • Manages workflow lifecycle
    • Handles execution context
    • Provides execution summaries and results

2. Execution Management

  • Asynchronous execution using Python's asyncio
    • Non-blocking operation execution
    • Efficient resource utilization
    • Concurrent task handling
  • Configurable concurrency limits for parallel actions
  • Execution context management and logging
  • Progress tracking and status reporting
  • Detailed execution results and metrics

3. Error Handling

  • Comprehensive error tracking and reporting
    • Detailed error messages with stack traces
    • Component-specific error context
    • Error categorization
  • Configurable retry mechanisms
    • Per-step retry configuration
    • Exponential backoff support
    • Maximum retry limits
  • Fail-fast and continue-on-failure options
  • Validation of workflow configurations

4. Monitoring and Reporting

  • Detailed execution summaries
  • Duration tracking for steps
  • Status tracking (PENDING, RUNNING, SUCCESS, FAILED, etc.)
  • JSON-formatted execution results

5. Flexibility

  • Plugin system for extending functionality
  • Custom action step implementation
  • Configurable execution parameters
  • Support for complex workflow patterns

6. Workflow Visualization

  • Mermaid diagram generation for workflow visualization
    • Clear visualization of workflow steps and their relationships
    • Support for retry mechanisms visualization
    • Visual representation of parallel processing
    • Automatic diagram generation during workflow execution
    • Dark theme support for better readability

Usage Documentation

Basic Usage

1. Simple Sequential Workflow

from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner
from typing import Dict, Any
import asyncio

class DataFetchStep(ZumaActionStep):
    """Step that simulates fetching data from a source"""
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        print(f"[{self.name}] Fetching data...")
        await asyncio.sleep(1)  # Simulate network delay
        return {"data": "fetched_data_123"}

class ProcessingStep(ZumaActionStep):
    """Step that processes the fetched data"""
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        data = context.get("data")
        print(f"[{self.name}] Processing data: {data}")
        await asyncio.sleep(0.5)  # Simulate processing
        return {"processed_data": f"processed_{data}"}

async def run_simple_workflow():
    workflow = ZumaWorkflow(
        "Simple Sequential Workflow",
        steps=[
            DataFetchStep("Fetch Data"),
            ProcessingStep("Process Data")
        ]
    )

    runner = ZumaRunner()
    result = await runner.run_workflow(workflow)
    runner.print_execution_summary(result)
    return result

if __name__ == "__main__":
    asyncio.run(run_simple_workflow())

2. Parallel Processing

from zuma import ZumaWorkflow, ZumaActionStep, ZumaParallelAction, ZumaRunner
from typing import Dict, Any
import asyncio

class DataProcessStep(ZumaActionStep):
    """Processes a single file type"""
    def __init__(self, name: str, file_type: str):
        super().__init__(name)
        self.file_type = file_type

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        valid_files = context.get("valid_files", [])
        our_files = [f for f in valid_files if f.endswith(self.file_type)]

        results = []
        for file in our_files:
            print(f"[{self.name}] Processing {file}...")
            await asyncio.sleep(0.5)  # Simulate processing
            results.append({
                "file": file,
                "processor": self.name,
                "status": "completed"
            })

        return {"processed_files": len(results), "results": results}

async def run_parallel_workflow():
    # Define parallel data processing
    parallel_processing = ZumaParallelAction(
        "Parallel Processing",
        steps=[
            DataProcessStep("Process CSV", ".csv"),
            DataProcessStep("Process JSON", ".json"),
            DataProcessStep("Process XML", ".xml")
        ],
        max_concurrency=2  # Process 2 file types at a time
    )

    # Create workflow
    workflow = ZumaWorkflow(
        "Parallel Processing Workflow",
        steps=[parallel_processing]
    )

    # Run workflow with sample data
    initial_context = {
        "valid_files": [
            "data1.csv", "data2.json", "data3.xml",
            "data4.csv", "data5.json"
        ]
    }

    runner = ZumaRunner()
    result = await runner.run_workflow(workflow, context=initial_context)
    runner.print_execution_summary(result)
    return result

if __name__ == "__main__":
    asyncio.run(run_parallel_workflow())

3. Error Handling

from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner, ZumaExecutionError
from typing import Dict, Any
import asyncio

class ValidatingStep(ZumaActionStep):
    """Step that validates input data"""
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        data = context.get("data")
        print(f"[{self.name}] Validating data: {data}")

        if not self.validate_data(data):
            raise ZumaExecutionError("Data validation failed")

        return {"validated": True, "data": data}

    def validate_data(self, data):
        return isinstance(data, str) and len(data) > 0

async def run_error_workflow():
    workflow = ZumaWorkflow(
        "Error Handling Workflow",
        steps=[
            ValidatingStep(
                "Validate Data",
                description="Validates input data format",
                retries=3
            )
        ],
        continue_on_failure=True
    )

    # Run workflow with sample data
    initial_context = {
        "data": "sample_data_123"  # Valid data
        # "data": None  # Invalid data to trigger validation error
    }

    runner = ZumaRunner()
    result = await runner.run_workflow(workflow, context=initial_context)
    runner.print_execution_summary(result)
    return result

if __name__ == "__main__":
    asyncio.run(run_error_workflow())

4. Conditional Workflow

from zuma import ZumaWorkflow, ZumaActionStep, ZumaConditionalStep, ZumaRunner
from typing import Dict, Any
import asyncio

def check_data_size(context: Dict[str, Any]) -> bool:
    """Determines processing path based on data size"""
    return context.get("data_size", 0) > 1000

class BatchProcessingStep(ZumaActionStep):
    """Handles large dataset processing in batches"""
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        data = context.get("data", [])
        batch_size = 100

        print(f"[{self.name}] Processing {len(data)} items in batches...")
        processed = []

        for i in range(0, len(data), batch_size):
            batch = data[i:i + batch_size]
            await asyncio.sleep(0.1)  # Simulate batch processing
            processed.extend([f"processed_{item}" for item in batch])

        return {
            "processed_items": len(processed),
            "processing_type": "batch",
            "results": processed
        }

async def run_conditional_workflow():
    workflow = ZumaWorkflow(
        "Conditional Processing Workflow",
        steps=[
            ZumaConditionalStep(
                "Processing Path Decision",
                condition=check_data_size,
                true_component=BatchProcessingStep("Batch Process"),
                false_component=SimpleProcessingStep("Simple Process")
            )
        ]
    )

    # Run workflow with sample data
    initial_context = {
        "data": list(range(2000)),  # Large dataset to trigger batch processing
        "data_size": 2000
    }

    runner = ZumaRunner()
    result = await runner.run_workflow(workflow, context=initial_context)
    runner.print_execution_summary(result)
    return result

if __name__ == "__main__":
    asyncio.run(run_conditional_workflow())

Complex Scenarios

1. Parallel Processing with Dependencies

from zuma import ZumaWorkflow, ZumaActionStep, ZumaParallelAction, ZumaRunner
from typing import Dict, Any
import asyncio
import random

class DataProcessStep(ZumaActionStep):
    """Processes a single file"""
    def __init__(self, name: str, file_type: str):
        super().__init__(name)
        self.file_type = file_type

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        valid_files = context.get("valid_files", [])
        our_files = [f for f in valid_files if f.endswith(self.file_type)]

        results = []
        for file in our_files:
            print(f"[{self.name}] Processing {file}...")
            await asyncio.sleep(0.5)  # Simulate processing
            results.append({
                "file": file,
                "processor": self.name
            })

        return {
            "processed_files": len(results),
            "results": results
        }

# Define parallel data processing
parallel_processing = ZumaParallelAction(
    "Parallel Processing",
    steps=[
        DataProcessStep("Process CSV", file_type=".csv"),
        DataProcessStep("Process JSON", file_type=".json"),
        DataProcessStep("Process XML", file_type=".xml")
    ],
    max_concurrency=2  # Process 2 file types at a time
)

2. Conditional Branching with State Management

from zuma import ZumaWorkflow, ZumaActionStep, ZumaConditionalStep
from typing import Dict, Any
import asyncio

def check_data_size(context: Dict[str, Any]) -> bool:
    """Determines processing path based on data size"""
    return context.get("data_size", 0) > 1000

class BatchProcessingStep(ZumaActionStep):
    """Handles large dataset processing in batches"""
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        data = context.get("data", [])
        batch_size = 100

        print(f"[{self.name}] Processing {len(data)} items in batches...")
        processed = []

        for i in range(0, len(data), batch_size):
            batch = data[i:i + batch_size]
            await asyncio.sleep(0.1)  # Simulate batch processing
            processed.extend([f"processed_{item}" for item in batch])

        return {
            "processed_items": len(processed),
            "processing_type": "batch",
            "results": processed
        }

# Create workflow with conditional branching
workflow = ZumaWorkflow(
    "Conditional Processing Workflow",
    steps=[
        DataLoadStep("Load Data"),
        ZumaConditionalStep(
            "Processing Path Decision",
            condition=check_data_size,
            true_component=BatchProcessingStep("Batch Process"),
            false_component=SimpleProcessingStep("Simple Process")
        ),
        ResultSaveStep("Save Results")
    ]
)

3. Dynamic Workflow Generation

def create_dynamic_workflow(config: Dict[str, Any]) -> ZumaWorkflow:
    steps = []

    # Add input validation
    if config.get("validate_input"):
        steps.append(ValidationStep("Input Validation"))

    # Add processing steps based on config
    for process in config.get("processes", []):
        steps.append(ProcessingStep(f"Process {process}", process_type=process))

    # Add parallel processing if needed
    if config.get("parallel_processing"):
        parallel_steps = [
            ProcessingStep(f"Parallel {i}", worker_id=i)
            for i in range(config["worker_count"])
        ]
        steps.append(ZumaParallelAction("Parallel Processing", steps=parallel_steps))

    return ZumaWorkflow("Dynamic Workflow", steps=steps)

Customizations

1. Custom Action Step with Progress Tracking

from loguru import logger

class ProgressTrackingStep(ZumaActionStep):
    def __init__(self, name: str, total_items: int):
        super().__init__(name)
        self.total_items = total_items
        self.processed = 0

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        for i in range(self.total_items):
            await self.process_item(i)
            self.processed += 1
            self.report_progress()
        return {"processed_count": self.processed}

    def report_progress(self):
        percentage = (self.processed / self.total_items) * 100
        logger.info(f"{self.name}: {percentage:.2f}% complete")

    async def process_item(self, item):
        # Implementation specific processing
        pass

2. Custom Result Handler

class CustomResultHandler:
    def __init__(self, workflow_name: str):
        self.workflow_name = workflow_name
        self.results = []

    def handle_result(self, result: ZumaResult):
        # Process the result
        self.results.append({
            "step": result.name,
            "status": result.status.value,
            "duration": result.duration,
            "error": result.error
        })

    def generate_report(self):
        return {
            "workflow_name": self.workflow_name,
            "total_steps": len(self.results),
            "successful_steps": sum(1 for r in self.results if r["status"] == "SUCCESS"),
            "failed_steps": sum(1 for r in self.results if r["status"] == "FAILED"),
            "total_duration": sum(r["duration"] for r in self.results if r["duration"]),
            "step_results": self.results
        }

# Usage with custom handler
async def run_with_custom_handler():
    workflow = create_workflow()
    runner = ZumaRunner()
    handler = CustomResultHandler(workflow.name)

    result = await runner.run_workflow(workflow)
    handler.handle_result(result)
    report = handler.generate_report()
    return report

3. Custom Context Manager

class WorkflowContext:
    def __init__(self):
        self.start_time = None
        self.metrics = {}
        self.state = {}

    def track_metric(self, name: str, value: float):
        if name not in self.metrics:
            self.metrics[name] = []
        self.metrics[name].append(value)

    def get_average_metric(self, name: str) -> float:
        values = self.metrics.get(name, [])
        return sum(values) / len(values) if values else 0

    def update_state(self, key: str, value: Any):
        self.state[key] = value

    def get_state(self, key: str, default: Any = None) -> Any:
        return self.state.get(key, default)

# Usage with custom context
async def run_with_custom_context():
    context = WorkflowContext()
    workflow = create_workflow()
    runner = ZumaRunner()

    result = await runner.run_workflow(
        workflow,
        context={"custom_context": context}
    )
    return result

Advanced Usage Examples

1. Parallel Execution with Error Handling

from zuma import ZumaWorkflow, ZumaParallelAction, ZumaActionStep

# Create parallel workflow with error handling
workflow = ZumaWorkflow(
    "Parallel Processing",
    steps=[
        ZumaParallelAction(
            "Data Processing",
            steps=[
                ZumaActionStep("Process1", retries=3),
                ZumaActionStep("Process2", timeout=30.0),
                ZumaActionStep("Process3"),
            ],
            fail_fast=True,
            max_concurrency=2
        )
    ],
    continue_on_failure=False
)

2. Conditional Workflow

from zuma import ZumaWorkflow, ZumaConditionalStep

def check_condition(context):
    return context.get('value', 0) > 100

workflow = ZumaWorkflow(
    "Conditional Flow",
    steps=[
        ZumaConditionalStep(
            "Value Check",
            condition=check_condition,
            true_component=ZumaActionStep("High Value Process"),
            false_component=ZumaActionStep("Low Value Process")
        )
    ]
)

3. Custom Action Step

from zuma import ZumaActionStep

class DataValidationStep(ZumaActionStep):
    async def execute(self, context: Dict[str, Any], **kwargs) -> Dict[str, Any]:
        data = context.get('input_data')
        if not self.validate_data(data):
            raise ZumaExecutionError("Invalid data format")
        return {"validation_passed": True}

    def validate_data(self, data):
        # Custom validation logic
        return True

Technical Architecture

Component Hierarchy

ZumaComponent (Abstract Base)
├── ZumaWorkflow
├── ZumaActionStep
├── ZumaParallelAction
└── ZumaConditionalStep

Execution Flow

  1. Initialization

    • Workflow and step validation
    • Context preparation
    • Dependency injection
  2. Execution

    • Async task creation
    • Parallel execution management
    • Status tracking
    • Error handling
  3. Completion

    • Result aggregation
    • Cleanup
    • Summary generation

Current Limitations

  1. Persistence

    • No built-in persistence for workflow state
    • Execution history is not stored between runs
    • No recovery mechanism for failed workflows
  2. Monitoring

    • Limited real-time monitoring capabilities
    • No built-in visualization tools
    • Basic console-based progress reporting
  3. Scalability

    • Limited to single process execution
    • No distributed execution support
    • Memory constraints for large workflows
  4. Error Recovery

    • Basic retry mechanism
    • No sophisticated failure recovery strategies
    • Limited rollback capabilities

Room for Improvement

1. Architecture Enhancements

  • Implement persistent storage for workflow state
  • Add support for distributed execution
  • Develop a proper plugin architecture
  • Implement workflow versioning

2. Execution Features

  • Add support for workflow checkpointing
  • Implement sophisticated retry strategies
  • Add transaction support for atomic operations
  • Develop workflow templating system

3. Monitoring and Observability

  • Add real-time monitoring dashboard
  • Implement metrics collection
  • Add workflow visualization tools
  • Enhance logging and debugging capabilities

4. Error Handling

  • Implement advanced error recovery strategies
  • Add support for compensating transactions
  • Develop workflow replay capabilities
  • Add debugging and troubleshooting tools

5. Integration and Extensions

  • Add REST API interface
  • Develop language-agnostic workflow definitions
  • Add support for external task queues
  • Implement event-driven workflow triggers

6. Documentation and Testing

  • Expand test coverage
  • Add performance benchmarks
  • Improve documentation with more examples
  • Create user guides and tutorials

Getting Started

from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner

# Define a simple workflow
workflow = ZumaWorkflow(
    "Sample Workflow",
    steps=[
        ZumaActionStep("Step1"),
        ZumaActionStep("Step2"),
    ]
)

# Create a runner and execute
runner = ZumaRunner()
result = await runner.run_workflow(workflow)

Examples

The examples/ directory contains various examples demonstrating Zuma's features:

  1. simple.py - Basic sequential workflow demonstration
  2. retry_mechanism.py - Advanced retry handling with visualization
  3. parallel_processing.py - Concurrent task execution
  4. workflow_in_workflow.py - Nested workflow composition
  5. conditional_workflow.py - Dynamic branching based on conditions
  6. error_handling.py - Comprehensive error handling patterns
  7. dynamic_workflow.py - Runtime workflow modification
  8. custom_actions.py - Creating custom action steps
  9. workflow_composition.py - Complex workflow composition

Visualization Example

from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner
import asyncio

class UnreliableAPIStep(ZumaActionStep):
    def __init__(self, name: str):
        super().__init__(
            name=name,
            retries=3,  # Try up to 3 times
            retry_delay=1.0  # Wait 1 second between retries
        )

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        # Simulated API call that might fail
        if random.random() < 0.7:  # 70% chance of failure
            raise ZumaExecutionError("API request failed")
        return {"api_response": "success"}

async def run_retry_example():
    workflow = ZumaWorkflow(
        "Retry Mechanism Demo",
        steps=[UnreliableAPIStep("API Call")]
    )

    runner = ZumaRunner()
    # Generate visualization diagram
    result = await runner.run_workflow(
        workflow,
        generate_diagram=True,
        diagram_output="retry_mechanism"
    )
    return result

if __name__ == "__main__":
    asyncio.run(run_retry_example())

The generated diagram will show:

  • Main workflow path
  • Retry attempts with failure paths
  • Success paths back to main flow
  • Error handling mechanism

Installation

pip install zuma-workflow

Requirements

  • Python 3.8+
  • asyncio
  • typing-extensions

Documentation

For detailed documentation, examples, and API reference, visit: https://zuma.codejunction.dev

Contributing

[Add Contributing Guidelines]

Project Status

Zuma is currently in active development. While it's stable for production use, some advanced features are still being developed. Contributions and feedback are welcome!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zuma_workflow-0.0.1b1.tar.gz (145.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zuma_workflow-0.0.1b1-py3-none-any.whl (29.2 kB view details)

Uploaded Python 3

File details

Details for the file zuma_workflow-0.0.1b1.tar.gz.

File metadata

  • Download URL: zuma_workflow-0.0.1b1.tar.gz
  • Upload date:
  • Size: 145.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.9

File hashes

Hashes for zuma_workflow-0.0.1b1.tar.gz
Algorithm Hash digest
SHA256 f06d6bc4d60eb3220e40d0ebf86d7494101b519605f16b77be90961522aabe2c
MD5 7e1895f1e4c67ee6c6b9eb0d2d230818
BLAKE2b-256 cb2eb1da8e9d284b2f75effabc9291ccfec511ec69af3e9dc1ecc7bc6b1fbdde

See more details on using hashes here.

File details

Details for the file zuma_workflow-0.0.1b1-py3-none-any.whl.

File metadata

File hashes

Hashes for zuma_workflow-0.0.1b1-py3-none-any.whl
Algorithm Hash digest
SHA256 7e148cd23fde812418e51833ad1c1b54c3c5fcc248aebfe4b2287a8f2aa0d02d
MD5 ee65844a31e8063ff67f2da3641d7947
BLAKE2b-256 4de5b63cdcb5fd29285d96938d1d121769efc72c863eb91e4b9a61a207c41bfa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page