Library for creating workflows in Python
Project description
Introduction
The workflow library offers a framework for defining and executing asynchronous workflows. It simplifies chaining multiple tasks, handling errors, and managing shared context across tasks. This library is especially useful for designing complex workflows with multiple steps and dependencies.
Features
- Asynchronous Execution: Define and execute tasks asynchronously.
- Scopes: Steps for normal execution, error handling, and exit scenarios.
- Execution Modes: Different modes like threads or processes.
- Context Sharing: Shared context (
ctx
) across all steps. - Retry Mechanism: Retry steps upon failure.
- Timeouts: Set timeout limits.
- Parallel Execution: Concurrent execution of steps.
- Cancellation: Cancel workflows midway.
Installation
pip install st-workflow
Quick Start
Use asyncio.run
to execute the workflow:
import asyncio
from workflow import Workflow, Scope
# Define your steps
async def step1(ctx):
# Your code here
pass
async def error_step(ctx):
# Your code here
pass
# Create a workflow with a context
wf = Workflow(ctx={'initial_data': 'data_value'})
# Add steps to the workflow
wf.add_step(step1)
wf.add_error_step(error_step)
# Run the workflow using asyncio.run
asyncio.run(wf.run())
Real-World Examples
1. Web Scraping Workflow
Imagine you want to scrape data from a website, process it, store it in a database, and then notify someone about the update.
import asyncio
from workflow import Workflow, Scope
async def fetch_data(ctx):
# Use an HTTP library to fetch data from a website
data = "fetched data"
return data
async def process_data(ctx):
# Process the fetched data
processed_data = ctx["fetch_data"] + " processed"
return processed_data
async def store_in_db(ctx):
# Store the processed data in a database
print(f"Stored: {ctx['process_data']}")
async def notify_someone(ctx):
# Notify someone about the update
print("Notification sent!")
# Error handler
async def handle_error(ctx):
print(f"Error in step {ctx['normal_error']['step']}: {ctx['normal_error']['error']}")
wf = Workflow(ctx={})
wf.add_step(fetch_data)
wf.add_step(process_data)
wf.add_step(store_in_db)
wf.add_step(notify_someone)
wf.add_error_step(handle_error)
asyncio.run(wf.run())
2. Data Processing Pipeline
Imagine you're working with a large dataset. You need to load it, clean it, transform it, and then generate a report.
import asyncio
from workflow import Workflow, Scope
async def load_data(ctx):
# Load data from a source
data = "raw data"
return data
async def clean_data(ctx):
# Clean the loaded data
cleaned_data = ctx["load_data"].replace("raw", "cleaned")
return cleaned_data
async def transform_data(ctx):
# Transform the cleaned data
transformed_data = ctx["clean_data"] + " transformed"
return transformed_data
async def generate_report(ctx):
# Generate a report based on the transformed data
print(f"Report for {ctx['transform_data']} generated!")
# Error handler
async def handle_error(ctx):
print(f"Error in step {ctx['normal_error']['step']}: {ctx['normal_error']['error']}")
wf = Workflow(ctx={})
wf.add_step(load_data)
wf.add_step(clean_data)
wf.add_step(transform_data)
wf.add_step(generate_report)
wf.add_error_step(handle_error)
asyncio.run(wf.run())
3. Infrastructure Management Workflow
Imagine you're managing a cloud infrastructure. You need to create a virtual machine, configure it, deploy an application, and monitor its status.
import asyncio
from workflow import Workflow, Scope
async def create_vm(ctx):
# Create a virtual machine
vm_id = "vm123"
return vm_id
async def configure_vm(ctx):
# Configure the virtual machine
print(f"Configured VM with ID: {ctx['create_vm']}")
async def deploy_app(ctx):
# Deploy an application on the VM
print(f"Deployed app on VM with ID: {ctx['create_vm']}")
async def monitor_status(ctx):
# Monitor the status of the deployed app
print("Monitoring app status...")
# Error handler
async def handle_error(ctx):
print(f"Error in step {ctx['normal_error']['step']}: {ctx['normal_error']['error']}")
wf = Workflow(ctx={})
wf.add_step(create_vm)
wf.add_step(configure_vm)
wf.add_step(deploy_app)
wf.add_step(monitor_status)
wf.add_error_step(handle_error)
asyncio.run(wf.run())
Advanced Real-World Example: E-commerce Order Processing System
Imagine an e-commerce system where a user places an order. The system needs to:
- Validate the order.
- Charge the user's credit card.
- Notify the warehouse for packaging.
- Notify the shipping company for delivery.
- Notify the user about the order status.
- If any step fails, it should handle the error appropriately.
For our example, we'll also incorporate features like retries, timeouts, parallel execution, and shared context.
import asyncio
from workflow import Workflow, Scope
# Steps
async def validate_order(ctx):
# Validate the order details
print("Order validated!")
return "Validated order details"
async def charge_credit_card(ctx):
# Simulate credit card charge
print("Credit card charged!")
return "Transaction ID: 123456"
async def notify_warehouse(ctx):
# Notify warehouse for packaging
print("Warehouse notified for packaging!")
async def notify_shipping(ctx):
# Notify shipping company for delivery
print("Shipping company notified for delivery!")
# Parallel step
async def notify_user(ctx):
# Notify user about order status in parallel
print("User notified about order status!")
async def send_email(ctx):
# Send an email confirmation to the user in parallel
print("Email confirmation sent to user!")
# Error handler
async def handle_error(ctx):
print(f"Error in step {ctx['normal_error']['step']}: {ctx['normal_error']['error']}")
# Exit steps
async def log_transaction(ctx):
# Log the transaction details
print(f"Logged transaction with ID: {ctx['charge_credit_card']}")
async def cleanup(ctx):
# Cleanup any resources or temporary files
print("Resources cleaned up!")
# Create the workflow with initial context
wf = Workflow(ctx={"order_id": "order_001"})
# Add steps to the workflow
wf.add_step(validate_order, retries=2) # Retry twice if validation fails
wf.add_step(charge_credit_card, timeout=10) # Timeout after 10 seconds
wf.add_step(notify_warehouse)
wf.add_step(notify_shipping)
wf.add_parallel_steps([notify_user, send_email], name="notifications") # Parallel steps
wf.add_error_step(handle_error)
wf.add_exit_step(log_transaction)
wf.add_exit_step(cleanup)
# Run the workflow using asyncio.run
asyncio.run(wf.run())
In this scenario:
- We use retries for the order validation step. If there's a temporary issue with validation, it'll retry twice before failing.
- The credit card charge has a timeout to ensure the user isn't left waiting indefinitely.
- The user notification and email confirmation steps run in parallel to speed up the process.
- If any step fails, the error handler will provide information on the failed step.
- Exit steps ensure that the transaction is logged and any used resources are cleaned up, regardless of whether the workflow completed successfully or encountered an error.
This example showcases a combination of the features provided by the Workflow
library in a real-world e-commerce order processing scenario.
Organizing Multiple Workflows in a Project
Maintain a directory structure to keep workflows organized:
project_root/
|-- workflows/
| |-- ecommerce/
| | |-- order_processing.py
| | |-- returns.py
|-- utilities/
| |-- db_connector.py
| |-- notifier.py
|-- main.py
- Shared Utilities: Place utilities or helper functions used across workflows in a
utilities
directory. - Workflow Dependencies: Trigger or check the status of dependent workflows using shared context, callbacks, or database flags.
- Configuration: Use a configuration system for environment variables.
- Logging: Introduce logging for tracking and debugging.
- Documentation: Include docstrings outlining the purpose, prerequisites, input/output, and error handling for each workflow.
- Testing: Create test files for each workflow.
- Versioning: Implement versioning if workflows are subject to frequent changes.
Dependency Injection in Workflows
- Define Interfaces for Dependencies: These interfaces act as contracts.
from abc import ABC, abstractmethod
class PaymentGateway(ABC):
@abstractmethod
async def charge(self, amount):
pass
@abstractmethod
async def refund(self, amount):
pass
- Provide Implementations: Offer mock and real-world implementations.
class MockPaymentGateway(PaymentGateway):
async def charge(self, amount):
print(f"Mock charged {amount}")
return "Mock Transaction ID: 123456"
async def refund(self, amount):
print(f"Mock refunded {amount}")
return "Mock Refund ID: 654321"
class RealPaymentGateway(PaymentGateway):
async def charge(self, amount):
# Real charge logic here
pass
async def refund(self, amount):
# Real refund logic here
pass
- Modify the Workflow to Accept Dependencies: Accept dependencies as parameters during workflow initialization.
class EcommerceWorkflow:
def __init__(self, payment_gateway: PaymentGateway):
self.wf = Workflow(ctx={"payment_gateway": payment_gateway, "order_id": "order_001"})
async def charge_credit_card(self, ctx):
transaction_id = await ctx["payment_gateway"].charge(100)
print(f"Transaction ID: {transaction_id}")
return transaction_id
# ... other steps ...
def run(self):
# Add steps to the workflow...
asyncio.run(self.wf.run())
- Inject the Dependencies: Provide the required dependencies when initializing the workflow.
# For testing
workflow = EcommerceWorkflow(payment_gateway=MockPaymentGateway())
workflow.run()
# For production
workflow = EcommerceWorkflow(payment_gateway=RealPaymentGateway())
workflow.run()
Feedback and Contributions
Open issues or PRs on our GitHub repository. We value feedback and contributions!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for st_workflow-0.1.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d78f1b366225cb7b49ab9c8a475767710a1e0228f56362e90c77b8f65cdb7fd8 |
|
MD5 | 89ece043ddd289fca1a7ca8da3468b80 |
|
BLAKE2b-256 | 8adae02f4dfc9d8a3cfb3edda321b2499e6440a5a655a982e6d44ad59f0fc71c |