A powerful, lightweight and flexible workflow management framework for Python
Project description
Zuma - Workflow Manager and Executor
Zuma is a powerful, asynchronous workflow management and execution system built in Python. It provides a flexible framework for defining, managing, and executing complex workflows with parallel processing capabilities.
Objective
The main objective of Zuma is to provide a robust framework for:
- Defining and executing complex workflows with multiple steps
- Managing parallel task execution with concurrency control
- Handling dependencies between workflow components
- Providing detailed execution tracking and error handling
- Supporting conditional workflow paths
Features
1. Core Components
- ZumaWorkflow: Main container for organizing workflow steps
- Supports sequential and parallel execution
- Handles step dependencies and execution order
- Configurable failure behavior with
continue_on_failure
- ZumaActionStep: Basic unit of work execution
- Configurable retries and timeouts
- Custom execution logic through overridable methods
- Automatic context and dependency injection
- ZumaParallelAction: Handles concurrent execution of multiple steps
- Configurable concurrency limits with
max_concurrency - Fail-fast option for error handling
- Automatic resource management
- Configurable concurrency limits with
- ZumaConditionalStep: Supports branching logic in workflows
- Dynamic path selection based on context
- Optional else-branch handling
- Context-aware condition evaluation
- ZumaRunner: Orchestrates workflow execution
- Manages workflow lifecycle
- Handles execution context
- Provides execution summaries and results
2. Execution Management
- Asynchronous execution using Python's asyncio
- Non-blocking operation execution
- Efficient resource utilization
- Concurrent task handling
- Configurable concurrency limits for parallel actions
- Execution context management and logging
- Progress tracking and status reporting
- Detailed execution results and metrics
3. Error Handling
- Comprehensive error tracking and reporting
- Detailed error messages with stack traces
- Component-specific error context
- Error categorization
- Configurable retry mechanisms
- Per-step retry configuration
- Exponential backoff support
- Maximum retry limits
- Fail-fast and continue-on-failure options
- Validation of workflow configurations
4. Monitoring and Reporting
- Detailed execution summaries
- Duration tracking for steps
- Status tracking (PENDING, RUNNING, SUCCESS, FAILED, etc.)
- JSON-formatted execution results
5. Flexibility
- Plugin system for extending functionality
- Custom action step implementation
- Configurable execution parameters
- Support for complex workflow patterns
6. Workflow Visualization
- Mermaid diagram generation for workflow visualization
- Clear visualization of workflow steps and their relationships
- Support for retry mechanisms visualization
- Visual representation of parallel processing
- Automatic diagram generation during workflow execution
- Dark theme support for better readability
Usage Documentation
Basic Usage
1. Simple Sequential Workflow
from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner
from typing import Dict, Any
import asyncio
class DataFetchStep(ZumaActionStep):
"""Step that simulates fetching data from a source"""
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
print(f"[{self.name}] Fetching data...")
await asyncio.sleep(1) # Simulate network delay
return {"data": "fetched_data_123"}
class ProcessingStep(ZumaActionStep):
"""Step that processes the fetched data"""
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
data = context.get("data")
print(f"[{self.name}] Processing data: {data}")
await asyncio.sleep(0.5) # Simulate processing
return {"processed_data": f"processed_{data}"}
async def run_simple_workflow():
workflow = ZumaWorkflow(
"Simple Sequential Workflow",
steps=[
DataFetchStep("Fetch Data"),
ProcessingStep("Process Data")
]
)
runner = ZumaRunner()
result = await runner.run_workflow(workflow)
runner.print_execution_summary(result)
return result
if __name__ == "__main__":
asyncio.run(run_simple_workflow())
2. Parallel Processing
from zuma import ZumaWorkflow, ZumaActionStep, ZumaParallelAction, ZumaRunner
from typing import Dict, Any
import asyncio
class DataProcessStep(ZumaActionStep):
"""Processes a single file type"""
def __init__(self, name: str, file_type: str):
super().__init__(name)
self.file_type = file_type
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
valid_files = context.get("valid_files", [])
our_files = [f for f in valid_files if f.endswith(self.file_type)]
results = []
for file in our_files:
print(f"[{self.name}] Processing {file}...")
await asyncio.sleep(0.5) # Simulate processing
results.append({
"file": file,
"processor": self.name,
"status": "completed"
})
return {"processed_files": len(results), "results": results}
async def run_parallel_workflow():
# Define parallel data processing
parallel_processing = ZumaParallelAction(
"Parallel Processing",
steps=[
DataProcessStep("Process CSV", ".csv"),
DataProcessStep("Process JSON", ".json"),
DataProcessStep("Process XML", ".xml")
],
max_concurrency=2 # Process 2 file types at a time
)
# Create workflow
workflow = ZumaWorkflow(
"Parallel Processing Workflow",
steps=[parallel_processing]
)
# Run workflow with sample data
initial_context = {
"valid_files": [
"data1.csv", "data2.json", "data3.xml",
"data4.csv", "data5.json"
]
}
runner = ZumaRunner()
result = await runner.run_workflow(workflow, context=initial_context)
runner.print_execution_summary(result)
return result
if __name__ == "__main__":
asyncio.run(run_parallel_workflow())
3. Error Handling
from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner, ZumaExecutionError
from typing import Dict, Any
import asyncio
class ValidatingStep(ZumaActionStep):
"""Step that validates input data"""
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
data = context.get("data")
print(f"[{self.name}] Validating data: {data}")
if not self.validate_data(data):
raise ZumaExecutionError("Data validation failed")
return {"validated": True, "data": data}
def validate_data(self, data):
return isinstance(data, str) and len(data) > 0
async def run_error_workflow():
workflow = ZumaWorkflow(
"Error Handling Workflow",
steps=[
ValidatingStep(
"Validate Data",
description="Validates input data format",
retries=3
)
],
continue_on_failure=True
)
# Run workflow with sample data
initial_context = {
"data": "sample_data_123" # Valid data
# "data": None # Invalid data to trigger validation error
}
runner = ZumaRunner()
result = await runner.run_workflow(workflow, context=initial_context)
runner.print_execution_summary(result)
return result
if __name__ == "__main__":
asyncio.run(run_error_workflow())
4. Conditional Workflow
from zuma import ZumaWorkflow, ZumaActionStep, ZumaConditionalStep, ZumaRunner
from typing import Dict, Any
import asyncio
def check_data_size(context: Dict[str, Any]) -> bool:
"""Determines processing path based on data size"""
return context.get("data_size", 0) > 1000
class BatchProcessingStep(ZumaActionStep):
"""Handles large dataset processing in batches"""
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
data = context.get("data", [])
batch_size = 100
print(f"[{self.name}] Processing {len(data)} items in batches...")
processed = []
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
await asyncio.sleep(0.1) # Simulate batch processing
processed.extend([f"processed_{item}" for item in batch])
return {
"processed_items": len(processed),
"processing_type": "batch",
"results": processed
}
async def run_conditional_workflow():
workflow = ZumaWorkflow(
"Conditional Processing Workflow",
steps=[
ZumaConditionalStep(
"Processing Path Decision",
condition=check_data_size,
true_component=BatchProcessingStep("Batch Process"),
false_component=SimpleProcessingStep("Simple Process")
)
]
)
# Run workflow with sample data
initial_context = {
"data": list(range(2000)), # Large dataset to trigger batch processing
"data_size": 2000
}
runner = ZumaRunner()
result = await runner.run_workflow(workflow, context=initial_context)
runner.print_execution_summary(result)
return result
if __name__ == "__main__":
asyncio.run(run_conditional_workflow())
Complex Scenarios
1. Parallel Processing with Dependencies
from zuma import ZumaWorkflow, ZumaActionStep, ZumaParallelAction, ZumaRunner
from typing import Dict, Any
import asyncio
import random
class DataProcessStep(ZumaActionStep):
"""Processes a single file"""
def __init__(self, name: str, file_type: str):
super().__init__(name)
self.file_type = file_type
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
valid_files = context.get("valid_files", [])
our_files = [f for f in valid_files if f.endswith(self.file_type)]
results = []
for file in our_files:
print(f"[{self.name}] Processing {file}...")
await asyncio.sleep(0.5) # Simulate processing
results.append({
"file": file,
"processor": self.name
})
return {
"processed_files": len(results),
"results": results
}
# Define parallel data processing
parallel_processing = ZumaParallelAction(
"Parallel Processing",
steps=[
DataProcessStep("Process CSV", file_type=".csv"),
DataProcessStep("Process JSON", file_type=".json"),
DataProcessStep("Process XML", file_type=".xml")
],
max_concurrency=2 # Process 2 file types at a time
)
2. Conditional Branching with State Management
from zuma import ZumaWorkflow, ZumaActionStep, ZumaConditionalStep
from typing import Dict, Any
import asyncio
def check_data_size(context: Dict[str, Any]) -> bool:
"""Determines processing path based on data size"""
return context.get("data_size", 0) > 1000
class BatchProcessingStep(ZumaActionStep):
"""Handles large dataset processing in batches"""
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
data = context.get("data", [])
batch_size = 100
print(f"[{self.name}] Processing {len(data)} items in batches...")
processed = []
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
await asyncio.sleep(0.1) # Simulate batch processing
processed.extend([f"processed_{item}" for item in batch])
return {
"processed_items": len(processed),
"processing_type": "batch",
"results": processed
}
# Create workflow with conditional branching
workflow = ZumaWorkflow(
"Conditional Processing Workflow",
steps=[
DataLoadStep("Load Data"),
ZumaConditionalStep(
"Processing Path Decision",
condition=check_data_size,
true_component=BatchProcessingStep("Batch Process"),
false_component=SimpleProcessingStep("Simple Process")
),
ResultSaveStep("Save Results")
]
)
3. Dynamic Workflow Generation
def create_dynamic_workflow(config: Dict[str, Any]) -> ZumaWorkflow:
steps = []
# Add input validation
if config.get("validate_input"):
steps.append(ValidationStep("Input Validation"))
# Add processing steps based on config
for process in config.get("processes", []):
steps.append(ProcessingStep(f"Process {process}", process_type=process))
# Add parallel processing if needed
if config.get("parallel_processing"):
parallel_steps = [
ProcessingStep(f"Parallel {i}", worker_id=i)
for i in range(config["worker_count"])
]
steps.append(ZumaParallelAction("Parallel Processing", steps=parallel_steps))
return ZumaWorkflow("Dynamic Workflow", steps=steps)
Customizations
1. Custom Action Step with Progress Tracking
from loguru import logger
class ProgressTrackingStep(ZumaActionStep):
def __init__(self, name: str, total_items: int):
super().__init__(name)
self.total_items = total_items
self.processed = 0
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
for i in range(self.total_items):
await self.process_item(i)
self.processed += 1
self.report_progress()
return {"processed_count": self.processed}
def report_progress(self):
percentage = (self.processed / self.total_items) * 100
logger.info(f"{self.name}: {percentage:.2f}% complete")
async def process_item(self, item):
# Implementation specific processing
pass
2. Custom Result Handler
class CustomResultHandler:
def __init__(self, workflow_name: str):
self.workflow_name = workflow_name
self.results = []
def handle_result(self, result: ZumaResult):
# Process the result
self.results.append({
"step": result.name,
"status": result.status.value,
"duration": result.duration,
"error": result.error
})
def generate_report(self):
return {
"workflow_name": self.workflow_name,
"total_steps": len(self.results),
"successful_steps": sum(1 for r in self.results if r["status"] == "SUCCESS"),
"failed_steps": sum(1 for r in self.results if r["status"] == "FAILED"),
"total_duration": sum(r["duration"] for r in self.results if r["duration"]),
"step_results": self.results
}
# Usage with custom handler
async def run_with_custom_handler():
workflow = create_workflow()
runner = ZumaRunner()
handler = CustomResultHandler(workflow.name)
result = await runner.run_workflow(workflow)
handler.handle_result(result)
report = handler.generate_report()
return report
3. Custom Context Manager
class WorkflowContext:
def __init__(self):
self.start_time = None
self.metrics = {}
self.state = {}
def track_metric(self, name: str, value: float):
if name not in self.metrics:
self.metrics[name] = []
self.metrics[name].append(value)
def get_average_metric(self, name: str) -> float:
values = self.metrics.get(name, [])
return sum(values) / len(values) if values else 0
def update_state(self, key: str, value: Any):
self.state[key] = value
def get_state(self, key: str, default: Any = None) -> Any:
return self.state.get(key, default)
# Usage with custom context
async def run_with_custom_context():
context = WorkflowContext()
workflow = create_workflow()
runner = ZumaRunner()
result = await runner.run_workflow(
workflow,
context={"custom_context": context}
)
return result
Advanced Usage Examples
1. Parallel Execution with Error Handling
from zuma import ZumaWorkflow, ZumaParallelAction, ZumaActionStep
# Create parallel workflow with error handling
workflow = ZumaWorkflow(
"Parallel Processing",
steps=[
ZumaParallelAction(
"Data Processing",
steps=[
ZumaActionStep("Process1", retries=3),
ZumaActionStep("Process2", timeout=30.0),
ZumaActionStep("Process3"),
],
fail_fast=True,
max_concurrency=2
)
],
continue_on_failure=False
)
2. Conditional Workflow
from zuma import ZumaWorkflow, ZumaConditionalStep
def check_condition(context):
return context.get('value', 0) > 100
workflow = ZumaWorkflow(
"Conditional Flow",
steps=[
ZumaConditionalStep(
"Value Check",
condition=check_condition,
true_component=ZumaActionStep("High Value Process"),
false_component=ZumaActionStep("Low Value Process")
)
]
)
3. Custom Action Step
from zuma import ZumaActionStep
class DataValidationStep(ZumaActionStep):
async def execute(self, context: Dict[str, Any], **kwargs) -> Dict[str, Any]:
data = context.get('input_data')
if not self.validate_data(data):
raise ZumaExecutionError("Invalid data format")
return {"validation_passed": True}
def validate_data(self, data):
# Custom validation logic
return True
Technical Architecture
Component Hierarchy
ZumaComponent (Abstract Base)
├── ZumaWorkflow
├── ZumaActionStep
├── ZumaParallelAction
└── ZumaConditionalStep
Execution Flow
-
Initialization
- Workflow and step validation
- Context preparation
- Dependency injection
-
Execution
- Async task creation
- Parallel execution management
- Status tracking
- Error handling
-
Completion
- Result aggregation
- Cleanup
- Summary generation
Current Limitations
-
Persistence
- No built-in persistence for workflow state
- Execution history is not stored between runs
- No recovery mechanism for failed workflows
-
Monitoring
- Limited real-time monitoring capabilities
- No built-in visualization tools
- Basic console-based progress reporting
-
Scalability
- Limited to single process execution
- No distributed execution support
- Memory constraints for large workflows
-
Error Recovery
- Basic retry mechanism
- No sophisticated failure recovery strategies
- Limited rollback capabilities
Room for Improvement
1. Architecture Enhancements
- Implement persistent storage for workflow state
- Add support for distributed execution
- Develop a proper plugin architecture
- Implement workflow versioning
2. Execution Features
- Add support for workflow checkpointing
- Implement sophisticated retry strategies
- Add transaction support for atomic operations
- Develop workflow templating system
3. Monitoring and Observability
- Add real-time monitoring dashboard
- Implement metrics collection
- Add workflow visualization tools
- Enhance logging and debugging capabilities
4. Error Handling
- Implement advanced error recovery strategies
- Add support for compensating transactions
- Develop workflow replay capabilities
- Add debugging and troubleshooting tools
5. Integration and Extensions
- Add REST API interface
- Develop language-agnostic workflow definitions
- Add support for external task queues
- Implement event-driven workflow triggers
6. Documentation and Testing
- Expand test coverage
- Add performance benchmarks
- Improve documentation with more examples
- Create user guides and tutorials
Getting Started
from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner
# Define a simple workflow
workflow = ZumaWorkflow(
"Sample Workflow",
steps=[
ZumaActionStep("Step1"),
ZumaActionStep("Step2"),
]
)
# Create a runner and execute
runner = ZumaRunner()
result = await runner.run_workflow(workflow)
Examples
The examples/ directory contains various examples demonstrating Zuma's features:
simple.py- Basic sequential workflow demonstrationretry_mechanism.py- Advanced retry handling with visualizationparallel_processing.py- Concurrent task executionworkflow_in_workflow.py- Nested workflow compositionconditional_workflow.py- Dynamic branching based on conditionserror_handling.py- Comprehensive error handling patternsdynamic_workflow.py- Runtime workflow modificationcustom_actions.py- Creating custom action stepsworkflow_composition.py- Complex workflow composition
Visualization Example
from zuma import ZumaWorkflow, ZumaActionStep, ZumaRunner
import asyncio
class UnreliableAPIStep(ZumaActionStep):
def __init__(self, name: str):
super().__init__(
name=name,
retries=3, # Try up to 3 times
retry_delay=1.0 # Wait 1 second between retries
)
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
# Simulated API call that might fail
if random.random() < 0.7: # 70% chance of failure
raise ZumaExecutionError("API request failed")
return {"api_response": "success"}
async def run_retry_example():
workflow = ZumaWorkflow(
"Retry Mechanism Demo",
steps=[UnreliableAPIStep("API Call")]
)
runner = ZumaRunner()
# Generate visualization diagram
result = await runner.run_workflow(
workflow,
generate_diagram=True,
diagram_output="retry_mechanism"
)
return result
if __name__ == "__main__":
asyncio.run(run_retry_example())
The generated diagram will show:
- Main workflow path
- Retry attempts with failure paths
- Success paths back to main flow
- Error handling mechanism
Installation
pip install zuma-workflow
Requirements
- Python 3.8+
- asyncio
- typing-extensions
Documentation
For detailed documentation, examples, and API reference, visit: https://zuma.codejunction.dev
Contributing
[Add Contributing Guidelines]
Project Status
Zuma is currently in active development. While it's stable for production use, some advanced features are still being developed. Contributions and feedback are welcome!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file zuma_workflow-0.0.1b1.tar.gz.
File metadata
- Download URL: zuma_workflow-0.0.1b1.tar.gz
- Upload date:
- Size: 145.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f06d6bc4d60eb3220e40d0ebf86d7494101b519605f16b77be90961522aabe2c
|
|
| MD5 |
7e1895f1e4c67ee6c6b9eb0d2d230818
|
|
| BLAKE2b-256 |
cb2eb1da8e9d284b2f75effabc9291ccfec511ec69af3e9dc1ecc7bc6b1fbdde
|
File details
Details for the file zuma_workflow-0.0.1b1-py3-none-any.whl.
File metadata
- Download URL: zuma_workflow-0.0.1b1-py3-none-any.whl
- Upload date:
- Size: 29.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7e148cd23fde812418e51833ad1c1b54c3c5fcc248aebfe4b2287a8f2aa0d02d
|
|
| MD5 |
ee65844a31e8063ff67f2da3641d7947
|
|
| BLAKE2b-256 |
4de5b63cdcb5fd29285d96938d1d121769efc72c863eb91e4b9a61a207c41bfa
|