Skip to main content

Lightweight YAML-driven workflow engine for Python

Project description

FlowEngine

Lightweight YAML-driven state machine for Python

FlowEngine enables developers to define execution flows declaratively in YAML, build pluggable component systems, and execute conditional branching based on runtime state.

Features

  • YAML-Driven Configuration — Define flows in human-readable YAML files
  • Component-Based Architecture — Build reusable, testable processing units
  • Graph-Based DAG Execution — Define flows as directed acyclic graphs with topological ordering
  • Port-Based Output Routing — Components route execution through named output ports
  • Conditional Execution — Execute steps based on runtime context state
  • Async Component Support — Native async processing with automatic sync fallback
  • Execution Checkpoints — Suspend and resume flows mid-execution with serializable checkpoints
  • Step Lifecycle Hooks — Observe flow execution with pluggable hook callbacks
  • Safe Expression Evaluation — Condition expressions are validated against an AST allowlist
  • Full Type Hints — Compatible with mypy strict mode
  • Execution Metadata — Track timing, errors, and skipped components with step-level detail
  • Cooperative Timeout — Protect against runaway flows with deadline-based timeouts
  • Component Registry — Auto-instantiate components from type paths or validate types at runtime
  • Round-Trip Serialization — Fully serialize and restore context state for replay/debugging
  • Minimal Dependencies — Only requires pyyaml and pydantic

Installation

pip install flowengine

For HTTP component support:

pip install flowengine[http]

For development:

pip install flowengine[dev]

Quick Start

1. Define a Component

from flowengine import BaseComponent, FlowContext

class GreetComponent(BaseComponent):
    def init(self, config: dict) -> None:
        super().init(config)
        self.greeting = config.get("greeting", "Hello")

    def process(self, context: FlowContext) -> FlowContext:
        name = context.get("name", "World")
        context.set("message", f"{self.greeting}, {name}!")
        return context

2. Create a Flow Configuration

# flow.yaml
name: "Greeting Flow"
version: "1.0"

components:
  - name: greeter
    type: myapp.GreetComponent
    config:
      greeting: "Hello"

flow:
  steps:
    - component: greeter
      description: "Generate greeting"

3. Execute the Flow

from flowengine import ConfigLoader, FlowEngine, FlowContext

# Load configuration
config = ConfigLoader.load("flow.yaml")

# Create components
components = {"greeter": GreetComponent("greeter")}

# Create engine and execute
engine = FlowEngine(config, components)
context = FlowContext()
context.set("name", "FlowEngine")

result = engine.execute(context)
print(result.data.message)  # "Hello, FlowEngine!"

Core Concepts

Components

Components are the building blocks of flows. Each component has a lifecycle:

  1. __init__(name) — Instance creation
  2. init(config) — One-time configuration (called once)
  3. setup(context) — Pre-processing (called each run)
  4. process(context) — Main logic (called each run) [required]
  5. teardown(context) — Cleanup (called each run)
from flowengine import BaseComponent, FlowContext

class DatabaseComponent(BaseComponent):
    def init(self, config: dict) -> None:
        super().init(config)
        self.connection_string = config["connection_string"]
        self._conn = None

    def setup(self, context: FlowContext) -> None:
        self._conn = create_connection(self.connection_string)

    def process(self, context: FlowContext) -> FlowContext:
        data = self._conn.query("SELECT * FROM users")
        context.set("users", data)
        return context

    def teardown(self, context: FlowContext) -> None:
        if self._conn:
            self._conn.close()

    def validate_config(self) -> list[str]:
        errors = []
        if not self.config.get("connection_string"):
            errors.append("connection_string is required")
        return errors

Context

The FlowContext carries data through the flow and tracks execution metadata:

from flowengine import FlowContext

context = FlowContext()

# Set values
context.set("user", {"name": "Alice", "age": 30})

# Get values with dot notation
print(context.data.user.name)  # "Alice"

# Check for values
print(context.has("user"))  # True
print(context.get("missing", "default"))  # "default"

# Access metadata
print(context.metadata.flow_id)
print(context.metadata.component_timings)

# Serialize
print(context.to_json())

Flow Configuration

name: "My Flow"
version: "1.0"
description: "Optional description"

components:
  - name: component_name
    type: module.path.ComponentClass
    config:
      key: value

flow:
  type: sequential  # or "conditional" for first-match branching

  settings:
    fail_fast: true            # Stop on first error
    timeout_seconds: 300       # Max execution time (cooperative)
    on_condition_error: fail   # fail, skip, or warn

  steps:
    - component: component_name
      description: "What this step does"
      condition: "context.data.ready == True"
      on_error: fail  # fail, skip, or continue

Settings Reference

Setting Default Description
fail_fast true Stop on first component error
timeout_seconds 300 Maximum flow execution time in seconds
timeout_mode cooperative Timeout enforcement: cooperative, hard_async, hard_process
require_deadline_check false Require components to call check_deadline() in cooperative mode
on_condition_error fail How to handle invalid conditions: fail (raise exception), skip (skip step), warn (log and skip)

Flow Types

FlowEngine supports three flow execution types:

Sequential (Default)

Runs all steps in order. Each step's condition guards whether that individual step runs.

flow:
  type: sequential  # default
  steps:
    - component: fetch_data      # Always runs
    - component: transform_data  # Runs if condition is True
      condition: "context.data.fetch_result.status == 'success'"
    - component: save_data       # Runs if condition is True
      condition: "context.data.transformed is not None"
    - component: notify_error    # Runs if condition is True
      condition: "context.data.fetch_result.status == 'error'"

All four steps are evaluated. Multiple steps can execute if their conditions match.

Conditional (First-Match Branching)

First-match branching like a switch/case statement. Stops after the first step whose condition is True.

flow:
  type: conditional  # first-match branching
  steps:
    - component: handle_user
      condition: "context.data.request_type == 'user'"
    - component: handle_order
      condition: "context.data.request_type == 'order'"
    - component: handle_admin
      condition: "context.data.request_type == 'admin'"
    - component: handle_unknown  # No condition = default case

Only one step executes. Once a condition matches, remaining steps are skipped.

Graph (DAG Execution)

Define flows as directed acyclic graphs with topological ordering. Supports port-based routing for conditional branching.

flow:
  type: graph
  nodes:
    - id: fetch
      component: fetch_data
    - id: validate
      component: validator
    - id: process_valid
      component: processor
    - id: handle_invalid
      component: error_handler
  edges:
    - source: fetch
      target: validate
    - source: validate
      target: process_valid
      port: "valid"              # Only activates when port == "valid"
    - source: validate
      target: handle_invalid
      port: "invalid"            # Only activates when port == "invalid"

Nodes execute in topological order. Port-based edges enable conditional routing — components call set_output_port(context, "valid") to choose a branch.

Flow Type Behavior Use Case
sequential All matching steps run Data pipelines, multi-step processing
conditional First match wins, then stop Request routing, dispatch, mutually exclusive branches
graph DAG with port-based routing Complex workflows, agent orchestration, approval flows

Conditional Step Execution

Steps can have conditions that are evaluated at runtime:

steps:
  - component: fetch_data

  - component: process_data
    condition: "context.data.fetch_data.status == 'success'"

  - component: save_data
    condition: "context.data.process_data is not None"

  - component: notify_error
    condition: "context.data.fetch_data.status == 'error'"

Allowed Expressions

Conditions support safe Python expressions:

Category Allowed
Comparisons <, >, <=, >=, ==, !=
Logical and, or, not
Identity is, is not
Membership in, not in
Attributes context.data.user.name
Subscripts context.data["key"]
Constants True, False, None, numbers, strings

Disallowed for security:

  • Function calls (len(), print(), etc.)
  • Imports
  • Lambda expressions
  • List comprehensions

Async Components

Components can implement native async processing:

from flowengine import BaseComponent, FlowContext

class AsyncFetchComponent(BaseComponent):
    def process(self, context: FlowContext) -> FlowContext:
        # Sync fallback
        return context

    async def process_async(self, context: FlowContext) -> FlowContext:
        data = await fetch_data_async()
        context.set("data", data)
        return context

The is_async property detects whether a component overrides process_async:

comp = AsyncFetchComponent("fetch")
print(comp.is_async)  # True

Execution Checkpoints (Suspend/Resume)

Flows can be suspended mid-execution and resumed later — useful for human-in-the-loop workflows:

# Component suspends the flow
class ApprovalComponent(BaseComponent):
    def process(self, context: FlowContext) -> FlowContext:
        if not context.has("resume_data"):
            context.suspend(self.name, reason="Needs human approval")
        else:
            decision = context.get("resume_data")
            context.set("approved", decision.get("approved", False))
        return context
from flowengine.core.checkpoint import InMemoryCheckpointStore

store = InMemoryCheckpointStore()
engine = FlowEngine(config, components, checkpoint_store=store)

# Execute — flow suspends at approval node
result = engine.execute()
checkpoint_id = result.get("checkpoint_id")

# Later, resume with data
resumed = engine.resume(checkpoint_id, resume_data={"approved": True})
print(resumed.get("approved"))  # True

Step Lifecycle Hooks

Observe flow execution with hooks:

class LoggingHook:
    def on_node_start(self, node_id, component_name, context):
        print(f"Starting: {node_id}")

    def on_node_complete(self, node_id, component_name, context, duration):
        print(f"Completed: {node_id} in {duration:.3f}s")

    def on_node_error(self, node_id, component_name, error, context):
        print(f"Error in {node_id}: {error}")

    def on_node_skipped(self, node_id, component_name, reason):
        print(f"Skipped: {node_id} ({reason})")

    def on_flow_suspended(self, node_id, reason, checkpoint_id):
        print(f"Suspended at {node_id}: {reason}")

engine = FlowEngine(config, components, hooks=[LoggingHook()])

Hooks are fault-tolerant — a broken hook never interrupts flow execution.

Error Handling

Configure error behavior per step:

steps:
  - component: risky_operation
    on_error: continue  # Options: fail, skip, continue

  - component: cleanup
    # Always runs even if previous step failed (with on_error: continue)

Use fail_fast: false in settings to allow continuing after errors:

flow:
  settings:
    fail_fast: false
  steps:
    - component: step1
      on_error: continue  # Log error, continue to next step
    - component: step2
      on_error: skip      # Log error, mark as skipped
    - component: step3
      on_error: fail      # Stop execution (default)

Access errors in context:

result = engine.execute(context)

if result.metadata.has_errors:
    for error in result.metadata.errors:
        print(f"{error['component']}: {error['message']}")

Timeout Handling

Flows can have a maximum execution time:

flow:
  settings:
    timeout_seconds: 60  # 60 second limit
    timeout_mode: cooperative  # cooperative (default), hard_async, or hard_process

Timeout Modes

FlowEngine supports three timeout enforcement modes:

Mode Enforcement Use Case
cooperative Components call check_deadline() Default, safest for complex components
hard_async Uses asyncio.wait_for I/O-bound components, async-friendly code
hard_process Runs in separate process CPU-bound components, guaranteed termination

Cooperative Mode (Default)

The engine sets a deadline before each step and checks between steps. Components cooperate by calling check_deadline():

class LongRunningComponent(BaseComponent):
    def process(self, context: FlowContext) -> FlowContext:
        for item in large_dataset:
            self.check_deadline(context)  # Check periodically
            process_item(item)
        return context

Strict Enforcement: Enable require_deadline_check: true to raise an error when long-running components don't call check_deadline():

flow:
  settings:
    timeout_seconds: 60
    timeout_mode: cooperative
    require_deadline_check: true  # Raise error instead of warning

Hard Async Mode

Uses asyncio.wait_for to enforce timeouts. Components run in a thread executor, allowing cancellation:

flow:
  settings:
    timeout_seconds: 10
    timeout_mode: hard_async

Guarantees:

  • Timeout is enforced even if component doesn't call check_deadline()
  • Teardown always runs (in main thread)
  • Best for I/O-bound operations

Hard Process Mode

Runs each step in a separate process with a hard kill on timeout:

flow:
  settings:
    timeout_seconds: 30
    timeout_mode: hard_process

Guarantees:

  • Component is forcibly terminated on timeout
  • Teardown always runs in main process
  • Context is serialized/deserialized across process boundary
  • Best for CPU-bound operations that may hang

Requirements:

  • Components must be picklable (standard Python classes)
  • Context data must be JSON-serializable

Timeout Guarantees by Mode

Scenario Cooperative Hard Async Hard Process
Between steps ✅ Always ✅ Always ✅ Always
Component calls check_deadline() ✅ Yes ✅ Yes ✅ Yes
Component blocks without checking ❌ Runs until returns ✅ Cancelled ✅ Killed
Teardown runs on timeout ✅ Yes ✅ Yes ✅ Yes

Choosing a Timeout Mode

┌─────────────────────────────────────────────────────────────┐
│                    Choose Timeout Mode                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Components call check_deadline()?                          │
│    └── YES → Use cooperative (default, safest)              │
│    └── NO  → Components do I/O operations?                  │
│                └── YES → Use hard_async                     │
│                └── NO  → Components are CPU-bound?          │
│                            └── YES → Use hard_process       │
│                            └── NO  → Use cooperative        │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Error Handling

from flowengine import FlowTimeoutError, DeadlineCheckError

try:
    result = engine.execute()
except FlowTimeoutError as e:
    print(f"Timed out after {e.elapsed:.2f}s (limit: {e.timeout}s)")
except DeadlineCheckError as e:
    print(f"Component '{e.component}' didn't call check_deadline()")

Best Practices for Timeout Compliance

  1. Cooperative mode: Call self.check_deadline(context) in loops and before I/O
  2. Hard async: Keep components stateless when possible
  3. Hard process: Ensure context data is JSON-serializable
  4. All modes: Implement proper teardown() for cleanup

Component Registry

For YAML-complete flows, you can auto-instantiate components from their type paths:

from flowengine import ConfigLoader, FlowEngine

# Load config and create engine with auto-instantiation
config = ConfigLoader.load("flow.yaml")
engine = FlowEngine.from_config(config)

result = engine.execute()

Or use the registry directly:

from flowengine import ComponentRegistry, FlowEngine

registry = ComponentRegistry()
registry.register_class("greeter", GreetComponent)

# Registry is used when creating engine
engine = FlowEngine.from_config(config, registry=registry)

Validate that provided components match their declared types:

engine = FlowEngine(config, components)
errors = engine.validate_component_types()
if errors:
    print("Type mismatches:", errors)

Step Timing Details

Execution metadata tracks timing per step, even for repeated components:

result = engine.execute()

# Individual step timings (preserves order)
for timing in result.metadata.step_timings:
    print(f"Step {timing.step_index}: {timing.component} took {timing.duration:.3f}s")

# Aggregated by component (backward-compatible)
for name, total in result.metadata.component_timings.items():
    print(f"{name}: {total:.3f}s total")

Context Serialization

Contexts can be fully serialized and restored:

from flowengine import FlowContext

# After execution
result = engine.execute()

# Serialize to JSON
json_str = result.to_json()

# Later, restore the context
restored = FlowContext.from_json(json_str)

# All data preserved
print(restored.get("key"))
print(restored.metadata.flow_id)
print(restored.metadata.step_timings)

Contrib Components

LoggingComponent

Logs context state for debugging:

- name: debug
  type: flowengine.contrib.logging.LoggingComponent
  config:
    level: debug  # debug, info, warning, error
    message: "Current state"
    log_data: true
    log_metadata: false
    keys:  # Optional: only log specific keys
      - user
      - result

HTTPComponent

Makes HTTP requests (requires pip install flowengine[http]):

- name: api
  type: flowengine.contrib.http.HTTPComponent
  config:
    base_url: "https://api.example.com"
    timeout: 30
    headers:
      Authorization: "Bearer token"
    method: GET  # GET, POST, PUT, PATCH, DELETE

Usage:

context.set("endpoint", "/users/123")
result = engine.execute(context)
print(result.data.api.data)  # Response JSON

API Reference

Core Classes

Class Description
BaseComponent Abstract base class for components
FlowContext Context passed through all components
DotDict Dictionary with attribute-style access
ExecutionMetadata Tracks timing, errors, and execution state
StepTiming Timing info for a single step execution
FlowEngine Orchestrates flow execution
GraphExecutor DAG-based graph flow executor
ExecutionHook Protocol for step lifecycle hooks
Checkpoint Serializable flow execution snapshot
CheckpointStore Abstract base class for checkpoint persistence
InMemoryCheckpointStore In-memory checkpoint store implementation

Configuration Classes

Class Description
ConfigLoader Loads YAML configurations
FlowConfig Complete flow configuration model
ComponentConfig Component configuration model
StepConfig Step configuration model
FlowSettings Execution settings model
FlowDefinition Flow structure and execution definition
GraphNodeConfig Node configuration for graph flows
GraphEdgeConfig Edge configuration for graph flows
ComponentRegistry Registry for dynamic component loading

Exceptions

Exception Description
FlowEngineError Base exception for all errors
ConfigurationError Invalid configuration
FlowExecutionError Runtime execution error
FlowTimeoutError Flow exceeded timeout_seconds
DeadlineCheckError Component didn't call check_deadline() (with require_deadline_check=True)
ComponentError Component processing error
ConditionEvaluationError Invalid/unsafe condition

Examples

See the examples/ directory for complete examples:

  • simple_flow.py — Basic flow execution
  • conditional_flow.py — Sequential flow with conditional steps
  • routing_flow.py — Conditional flow with first-match branching
  • timeout_modes.py — Timeout enforcement modes (cooperative, hard_async, hard_process)
  • custom_components.py — Advanced component patterns

Run examples:

cd examples
python simple_flow.py
python conditional_flow.py
python routing_flow.py
python timeout_modes.py
python custom_components.py

Development

Setup

git clone https://github.com/yourorg/flowengine.git
cd flowengine
pip install -e ".[dev]"

Running Tests

pytest tests/ -v --cov=flowengine

Type Checking

mypy src/flowengine

Linting

ruff check src/ tests/

License

MIT License - see LICENSE for details.

Contributing

Contributions are welcome! Please feel free to submit issues and pull requests.

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Run tests and linting
  5. Submit a pull request

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowengine-0.3.0.tar.gz (41.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

flowengine-0.3.0-py3-none-any.whl (44.6 kB view details)

Uploaded Python 3

File details

Details for the file flowengine-0.3.0.tar.gz.

File metadata

  • Download URL: flowengine-0.3.0.tar.gz
  • Upload date:
  • Size: 41.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for flowengine-0.3.0.tar.gz
Algorithm Hash digest
SHA256 218b362fb62a3fd146eed3042679b88abf9d88e55760c3272538e9764ae6ecd0
MD5 2f06080e442f3d72313625e94c3cf78a
BLAKE2b-256 28933aa45cfc1bd6982a37d8d421df41dbc72656b8db28157d7d8ca61993dcf7

See more details on using hashes here.

File details

Details for the file flowengine-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: flowengine-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 44.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.5

File hashes

Hashes for flowengine-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b177c3d792cc7cc1d0e8970db1248a0c6b774ae5c3906c473c2966cc4c943801
MD5 cb0b7e86333e1f2fb7300a462b5cc5d3
BLAKE2b-256 88946cef77b26e0aed6dc472acb6f8f732c0c75180ed1c8630d67d8ceca73238

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page