Skip to main content

Flux is a distributed workflow orchestration engine to build stateful and fault-tolerant workflows.

Project description

Flux

Flux is a distributed workflow orchestration engine written in Python that enables building stateful and fault-tolerant workflows. It provides an intuitive programming model for creating complex, reliable distributed applications with built-in support for state management, error handling, and execution control.

Key Features

Core Capabilities

  • Stateful Execution: Full persistence of workflow state and execution history
  • Distributed Architecture: Support for both local and distributed execution modes
  • High Performance: Efficient parallel task execution and workflow processing
  • Type Safety: Leverages Python type hints for safer workflow development
  • API Integration: Built-in FastAPI server for HTTP-based workflow execution

Task Management

  • Flexible Task Configuration:
    @task.with_options(
        retry_max_attempts=3,        # Auto-retry failed tasks
        retry_delay=1,              # Initial delay between retries
        retry_backoff=2,            # Exponential backoff for retries
        timeout=30,                 # Task execution timeout
        fallback=fallback_func,     # Fallback handler for failures
        rollback=rollback_func,     # Rollback handler for cleanup
        secret_requests=['API_KEY'] # Secure secrets management
    )
    

Workflow Patterns

  • Task Parallelization: Execute multiple tasks concurrently
  • Pipeline Processing: Chain tasks in sequential processing pipelines
  • Subworkflows: Compose complex workflows from simpler ones
  • Task Mapping: Apply tasks across collections of inputs
  • Graph-based Workflows: Define workflows as directed acyclic graphs (DAGs)
  • Dynamic Workflows: Modify workflow behavior based on runtime conditions

Error Handling & Recovery

  • Automatic Retries: Configurable retry policies with backoff
  • Fallback Mechanisms: Define alternative execution paths
  • Rollback Support: Clean up after failures
  • Exception Handling: Comprehensive error management
  • Timeout Management: Prevent hung tasks and workflows

State Management

  • Execution Persistence: Durable storage of workflow state
  • Pause & Resume: Control workflow execution flow
  • Deterministic Replay: Automatic replay of workflow events to maintain consistency
  • State Inspection: Monitor workflow progress and state

Installation

pip install flux-core

Requirements:

  • Python 3.12 or later
  • Dependencies are managed through Poetry

Quick Start

1. Basic Workflow

Create a simple workflow that processes input:

from flux import task, workflow, WorkflowExecutionContext

@task
def say_hello(name: str) -> str:
    return f"Hello, {name}"

@workflow
def hello_world(ctx: WorkflowExecutionContext[str]):
    return (yield say_hello(ctx.input))

# Execute locally
result = hello_world.run("World")
print(result.output)  # "Hello, World"

2. Parallel Task Execution

Execute multiple tasks concurrently:

from flux import task, workflow
from flux.tasks import parallel

@workflow
def parallel_workflow(ctx: WorkflowExecutionContext[str]):
    results = yield parallel(
        task1(ctx.input),
        task2(ctx.input),
        task3(ctx.input)
    )
    return results

3. Pipeline Processing

Chain tasks in a processing pipeline:

from flux.tasks import pipeline

@workflow
def pipeline_workflow(ctx: WorkflowExecutionContext[int]):
    result = yield pipeline(
        multiply_by_two,
        add_three,
        square,
        input=ctx.input
    )
    return result

4. Task Mapping

Apply a task across multiple inputs:

@workflow
def map_workflow(ctx: WorkflowExecutionContext[list[str]]):
    results = yield process_item.map(ctx.input)
    return results

Advanced Usage

Workflow Control

State Management

# Resume existing workflow execution
ctx = workflow.run(execution_id="previous_execution_id")

# Check workflow state
print(f"Finished: {ctx.finished}")
print(f"Succeeded: {ctx.succeeded}")
print(f"Failed: {ctx.failed}")

# Inspect workflow events
for event in ctx.events:
    print(f"{event.type}: {event.value}")

Error Handling

@task.with_options(
    retry_max_attempts=3,
    retry_delay=1,
    retry_backoff=2,
    fallback=lambda: "fallback result",
    rollback=cleanup_function
)
def risky_task():
    # Task implementation with comprehensive error handling
    pass

Secret Management

@task.with_options(secret_requests=["API_KEY"])
def secure_task(secrets: dict[str, Any] = {}):
    api_key = secrets["API_KEY"]
    # Use API key securely

API Server

Start the API server for HTTP-based workflow execution:

flux start myworkflows

Execute workflows via HTTP:

curl -X POST 'http://localhost:8000/workflow_name' \
     -H 'Content-Type: application/json' \
     -d '"input_data"'

Development

Setup Development Environment

git clone https://github.com/edurdias/flux
cd flux
poetry install

Run Tests

poetry run pytest

Code Quality

The project uses several tools for code quality:

  • Ruff for linting and formatting
  • MyPy for type checking
  • Pytest for testing
  • Pre-commit hooks for code quality checks

License

Apache License 2.0 - See LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit pull requests. For major changes, please open an issue first to discuss what you would like to change.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Documentation

For a more details, please check our documentation.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flux_core-0.2.3.tar.gz (25.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flux_core-0.2.3-py3-none-any.whl (29.2 kB view details)

Uploaded Python 3

File details

Details for the file flux_core-0.2.3.tar.gz.

File metadata

  • Download URL: flux_core-0.2.3.tar.gz
  • Upload date:
  • Size: 25.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.12.10 Linux/6.11.0-1013-azure

File hashes

Hashes for flux_core-0.2.3.tar.gz
Algorithm Hash digest
SHA256 fcb53dd36895469d82cfb149827715ceba1a4db7e4c60308c7c27a02a42668f0
MD5 b979e929e74b79ffb4a1ace2b479cc30
BLAKE2b-256 4134d391309e1c59c027f2999ba2b9917ecc206b01ee8dea185b16bfa11b6ff6

See more details on using hashes here.

File details

Details for the file flux_core-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: flux_core-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 29.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.12.10 Linux/6.11.0-1013-azure

File hashes

Hashes for flux_core-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 cf63a587e1bd5cd760dcdb656d4f0b7e67b155e2b59d029f7e504c56bb9f23d8
MD5 76ce2d1a60303b84bd8e1a9f166ac30a
BLAKE2b-256 d37ae9c38d0c25a057c585a69c74dffd3d0091804702013c9e44f635f5d3df9e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page