Flux is a distributed workflow orchestration engine to build stateful and fault-tolerant workflows.
Project description
Flux
Flux is a distributed workflow orchestration engine written in Python that enables building stateful and fault-tolerant workflows. It provides an intuitive async programming model for creating complex, reliable distributed applications with built-in support for state management, error handling, and execution control.
Current Version: 0.2.3
Key Features
Core Capabilities
- Stateful Execution: Full persistence of workflow state and execution history
- Distributed Architecture: Support for both local and distributed execution modes
- High Performance: Efficient parallel task execution and workflow processing
- Type Safety: Leverages Python type hints for safer workflow development
- API Integration: Built-in FastAPI server for HTTP-based workflow execution
Task Management
- Flexible Task Configuration:
@task.with_options( name="custom_task", # Custom task name retry_max_attempts=3, # Auto-retry failed tasks retry_delay=1, # Initial delay between retries retry_backoff=2, # Exponential backoff for retries timeout=30, # Task execution timeout (seconds) fallback=fallback_func, # Fallback handler for failures rollback=rollback_func, # Rollback handler for cleanup secret_requests=['API_KEY'], # Secure secrets management cache=True, # Enable task result caching metadata=True # Enable task metadata access ) async def my_task(): pass
Workflow Patterns
- Task Parallelization: Execute multiple tasks concurrently
- Pipeline Processing: Chain tasks in sequential processing pipelines
- Subworkflows: Compose complex workflows from simpler ones
- Task Mapping: Apply tasks across collections of inputs
- Graph-based Workflows: Define workflows as directed acyclic graphs (DAGs)
- Dynamic Workflows: Modify workflow behavior based on runtime conditions
Error Handling & Recovery
- Automatic Retries: Configurable retry policies with backoff
- Fallback Mechanisms: Define alternative execution paths
- Rollback Support: Clean up after failures
- Exception Handling: Comprehensive error management
- Timeout Management: Prevent hung tasks and workflows
State Management
- Execution Persistence: Durable storage of workflow state
- Pause & Resume: Control workflow execution flow
- Deterministic Replay: Automatic replay of workflow events to maintain consistency
- State Inspection: Monitor workflow progress and state
Installation
pip install flux-core
Requirements:
- Python 3.12 or later
- Dependencies are managed through Poetry
Quick Start
1. Basic Workflow
Create a simple workflow that processes input:
from flux import task, workflow, ExecutionContext
@task
async def say_hello(name: str) -> str:
return f"Hello, {name}"
@workflow
async def hello_world(ctx: ExecutionContext[str]):
return await say_hello(ctx.input)
# Execute locally
result = hello_world.run("World")
print(result.output) # "Hello, World"
2. Parallel Task Execution
Execute multiple tasks concurrently:
from flux import task, workflow, ExecutionContext
from flux.tasks import parallel
@task
async def say_hi(name: str):
return f"Hi, {name}"
@task
async def say_hello(name: str):
return f"Hello, {name}"
@task
async def say_hola(name: str):
return f"Hola, {name}"
@workflow
async def parallel_workflow(ctx: ExecutionContext[str]):
results = await parallel(
say_hi(ctx.input),
say_hello(ctx.input),
say_hola(ctx.input)
)
return results
3. Pipeline Processing
Chain tasks in a processing pipeline:
from flux import task, workflow, ExecutionContext
from flux.tasks import pipeline
@task
async def multiply_by_two(x):
return x * 2
@task
async def add_three(x):
return x + 3
@task
async def square(x):
return x * x
@workflow
async def pipeline_workflow(ctx: ExecutionContext[int]):
result = await pipeline(
multiply_by_two,
add_three,
square,
input=ctx.input
)
return result
4. Task Mapping
Apply a task across multiple inputs:
@task
async def process_item(item: str):
return item.upper()
@workflow
async def map_workflow(ctx: ExecutionContext[list[str]]):
results = await process_item.map(ctx.input)
return results
Advanced Usage
Workflow Control
State Management
# Resume existing workflow execution
ctx = workflow.run(execution_id="previous_execution_id")
# Check workflow state
print(f"Finished: {ctx.has_finished}")
print(f"Succeeded: {ctx.has_succeeded}")
print(f"Failed: {ctx.has_failed}")
# Inspect workflow events
for event in ctx.events:
print(f"{event.type}: {event.value}")
Error Handling
@task.with_options(
retry_max_attempts=3,
retry_delay=1,
retry_backoff=2,
fallback=lambda: "fallback result",
rollback=cleanup_function
)
async def risky_task():
# Task implementation with comprehensive error handling
pass
Secret Management
@task.with_options(secret_requests=["API_KEY"])
async def secure_task(secrets: dict[str, Any] = {}):
api_key = secrets["API_KEY"]
# Use API key securely
Task Caching
Enable task result caching to avoid re-execution:
@task.with_options(cache=True)
async def expensive_computation(input_data):
# Results will be cached based on input
return complex_calculation(input_data)
Task Metadata
Access task metadata during execution:
from flux.decorators import TaskMetadata
@task.with_options(metadata=True)
async def metadata_aware_task(data, metadata: TaskMetadata = {}):
print(f"Task ID: {metadata.task_id}")
print(f"Task Name: {metadata.task_name}")
return process_data(data)
Built-in Tasks
Flux provides several built-in tasks for common operations:
from flux.tasks import now, sleep, uuid4, choice, randint, pause
@workflow
async def built_in_tasks_example(ctx: ExecutionContext):
# Time operations
start_time = await now()
await sleep(2.5) # Sleep for 2.5 seconds
# Random operations
random_choice = await choice(['option1', 'option2', 'option3'])
random_number = await randint(1, 100)
# UUID generation
unique_id = await uuid4()
# Workflow pause points
await pause("wait_for_approval")
return {
'start_time': start_time,
'choice': random_choice,
'number': random_number,
'id': str(unique_id)
}
Distributed Architecture
Flux supports distributed execution through a server and worker architecture:
Start Server
Start the server to coordinate workflow execution:
flux start server
You can specify custom host and port:
flux start server --host 0.0.0.0 --port 8080
Start Workers
Start worker nodes to execute tasks:
flux start worker
Workers automatically connect to the server and register themselves for task execution.
Execute Workflows via HTTP
Once the server is running, you can execute workflows via HTTP. The API provides several endpoints for workflow management:
Upload and Register Workflows
# Upload a Python file containing workflows
curl -X POST 'http://localhost:8000/workflows' \
-F 'file=@my_workflows.py'
List All Workflows
curl -X GET 'http://localhost:8000/workflows'
Get Workflow Details
curl -X GET 'http://localhost:8000/workflows/workflow_name'
Execute Workflows
Run workflows with different execution modes:
Synchronous execution (wait for completion):
curl -X POST 'http://localhost:8000/workflows/workflow_name/run/sync' \
-H 'Content-Type: application/json' \
-d '"input_data"'
Asynchronous execution (immediate response):
curl -X POST 'http://localhost:8000/workflows/workflow_name/run/async' \
-H 'Content-Type: application/json' \
-d '"input_data"'
Streaming execution (real-time updates):
curl -X POST 'http://localhost:8000/workflows/workflow_name/run/stream' \
-H 'Content-Type: application/json' \
-d '"input_data"'
Check Workflow Status
curl -X GET 'http://localhost:8000/workflows/workflow_name/status/execution_id'
For detailed execution information, add ?detailed=true:
curl -X GET 'http://localhost:8000/workflows/workflow_name/status/execution_id?detailed=true'
API Documentation
The server provides interactive API documentation at:
- Swagger UI:
http://localhost:8000/docs
Development
Setup Development Environment
git clone https://github.com/edurdias/flux
cd flux
poetry install
Run Tests
poetry run pytest
Code Quality
The project uses several tools for code quality and development:
Linting & Formatting:
- Ruff - Fast Python linter and formatter (configured with 100-char line length)
- Pylint - Comprehensive code analysis
- Pyflakes - Fast Python source checker
- Bandit - Security vulnerability scanner
- Prospector - Meta-tool that runs multiple analysis tools
Type Checking:
- Pyright - Static type checker for Python
Testing:
- Pytest - Testing framework with coverage support
- pytest-cov - Coverage reporting
- pytest-mock - Mocking utilities
Development Tools:
- Pre-commit - Git hooks for automated code quality checks
- Poethepoet - Task runner for custom commands
- Radon - Code complexity analysis
Documentation:
- MkDocs with Material theme - Documentation generation
- MkDocstrings - Auto-generate API documentation
License
Apache License 2.0 - See LICENSE file for details.
Contributing
Contributions are welcome! Please feel free to submit pull requests. For major changes, please open an issue first to discuss what you would like to change.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Documentation
For more details, please check our documentation.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flux_core-0.2.4.tar.gz.
File metadata
- Download URL: flux_core-0.2.4.tar.gz
- Upload date:
- Size: 40.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.12.10 Linux/6.11.0-1014-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0bed6a44f012af735bf84cd95e86f726db74389e3aa7c4c39e731620ac18a0e3
|
|
| MD5 |
63e5e6188b46fe5b3f461380ee977ece
|
|
| BLAKE2b-256 |
a54acbe22d7eef263340d9c231bc0fec2a02ae3bba3fa1bc72e159110add0457
|
File details
Details for the file flux_core-0.2.4-py3-none-any.whl.
File metadata
- Download URL: flux_core-0.2.4-py3-none-any.whl
- Upload date:
- Size: 45.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.7.1 CPython/3.12.10 Linux/6.11.0-1014-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a87f032d9fa17726d30ee7b5177963a4aff67f82016128a7d5d0583eae51cc81
|
|
| MD5 |
7be82347dcf749a5b4375bb84aeb821b
|
|
| BLAKE2b-256 |
4f1ad27cc408ccc2f4c55441fb36aa228e59247610a202911b1be42fdb9b5e68
|