Lightweight YAML-driven workflow engine for Python
Project description
FlowEngine
Lightweight YAML-driven state machine for Python
FlowEngine enables developers to define execution flows declaratively in YAML, build pluggable component systems, and execute conditional branching based on runtime state.
Features
- YAML-Driven Configuration — Define flows in human-readable YAML files
- Component-Based Architecture — Build reusable, testable processing units
- Graph-Based DAG Execution — Define flows as directed acyclic graphs with topological ordering
- Cyclic Graph Execution — Build agentic loops with iteration limits and port-based exit conditions
- Port-Based Output Routing — Components route execution through named output ports
- Conditional Execution — Execute steps based on runtime context state
- Async Component Support — Native async processing with automatic sync fallback
- Execution Checkpoints — Suspend and resume flows mid-execution with serializable checkpoints
- Step Lifecycle Hooks — Observe flow execution with pluggable hook callbacks
- Safe Expression Evaluation — Condition expressions are validated against an AST allowlist
- Full Type Hints — Compatible with mypy strict mode
- Execution Metadata — Track timing, errors, and skipped components with step-level detail
- Cooperative Timeout — Protect against runaway flows with deadline-based timeouts
- Component Registry — Auto-instantiate components from type paths or validate types at runtime
- Round-Trip Serialization — Fully serialize and restore context state for replay/debugging
- Minimal Dependencies — Only requires
pyyamlandpydantic
Installation
pip install flowengine
For HTTP component support:
pip install flowengine[http]
For development:
pip install flowengine[dev]
Quick Start
1. Define a Component
from flowengine import BaseComponent, FlowContext
class GreetComponent(BaseComponent):
def init(self, config: dict) -> None:
super().init(config)
self.greeting = config.get("greeting", "Hello")
def process(self, context: FlowContext) -> FlowContext:
name = context.get("name", "World")
context.set("message", f"{self.greeting}, {name}!")
return context
2. Create a Flow Configuration
# flow.yaml
name: "Greeting Flow"
version: "1.0"
components:
- name: greeter
type: myapp.GreetComponent
config:
greeting: "Hello"
flow:
steps:
- component: greeter
description: "Generate greeting"
3. Execute the Flow
from flowengine import ConfigLoader, FlowEngine, FlowContext
# Load configuration
config = ConfigLoader.load("flow.yaml")
# Create components
components = {"greeter": GreetComponent("greeter")}
# Create engine and execute
engine = FlowEngine(config, components)
context = FlowContext()
context.set("name", "FlowEngine")
result = engine.execute(context)
print(result.data.message) # "Hello, FlowEngine!"
Core Concepts
Components
Components are the building blocks of flows. Each component has a lifecycle:
__init__(name)— Instance creationinit(config)— One-time configuration (called once)setup(context)— Pre-processing (called each run)process(context)— Main logic (called each run) [required]teardown(context)— Cleanup (called each run)
from flowengine import BaseComponent, FlowContext
class DatabaseComponent(BaseComponent):
def init(self, config: dict) -> None:
super().init(config)
self.connection_string = config["connection_string"]
self._conn = None
def setup(self, context: FlowContext) -> None:
self._conn = create_connection(self.connection_string)
def process(self, context: FlowContext) -> FlowContext:
data = self._conn.query("SELECT * FROM users")
context.set("users", data)
return context
def teardown(self, context: FlowContext) -> None:
if self._conn:
self._conn.close()
def validate_config(self) -> list[str]:
errors = []
if not self.config.get("connection_string"):
errors.append("connection_string is required")
return errors
Context
The FlowContext carries data through the flow and tracks execution metadata:
from flowengine import FlowContext
context = FlowContext()
# Set values
context.set("user", {"name": "Alice", "age": 30})
# Get values with dot notation
print(context.data.user.name) # "Alice"
# Check for values
print(context.has("user")) # True
print(context.get("missing", "default")) # "default"
# Access metadata
print(context.metadata.flow_id)
print(context.metadata.component_timings)
# Serialize
print(context.to_json())
Flow Configuration
name: "My Flow"
version: "1.0"
description: "Optional description"
components:
- name: component_name
type: module.path.ComponentClass
config:
key: value
flow:
type: sequential # or "conditional" for first-match branching
settings:
fail_fast: true # Stop on first error
timeout_seconds: 300 # Max execution time (cooperative)
on_condition_error: fail # fail, skip, or warn
steps:
- component: component_name
description: "What this step does"
condition: "context.data.ready == True"
on_error: fail # fail, skip, or continue
Settings Reference
| Setting | Default | Description |
|---|---|---|
fail_fast |
true |
Stop on first component error |
timeout_seconds |
300 |
Maximum flow execution time in seconds |
timeout_mode |
cooperative |
Timeout enforcement: cooperative, hard_async, hard_process |
require_deadline_check |
false |
Require components to call check_deadline() in cooperative mode |
on_condition_error |
fail |
How to handle invalid conditions: fail (raise exception), skip (skip step), warn (log and skip) |
max_iterations |
10 |
Maximum loop iterations for cyclic graphs |
on_max_iterations |
"fail" |
Policy when max iterations reached: fail, exit, warn |
Flow Types
FlowEngine supports three flow execution types:
Sequential (Default)
Runs all steps in order. Each step's condition guards whether that individual step runs.
flow:
type: sequential # default
steps:
- component: fetch_data # Always runs
- component: transform_data # Runs if condition is True
condition: "context.data.fetch_result.status == 'success'"
- component: save_data # Runs if condition is True
condition: "context.data.transformed is not None"
- component: notify_error # Runs if condition is True
condition: "context.data.fetch_result.status == 'error'"
All four steps are evaluated. Multiple steps can execute if their conditions match.
Conditional (First-Match Branching)
First-match branching like a switch/case statement. Stops after the first step whose condition is True.
flow:
type: conditional # first-match branching
steps:
- component: handle_user
condition: "context.data.request_type == 'user'"
- component: handle_order
condition: "context.data.request_type == 'order'"
- component: handle_admin
condition: "context.data.request_type == 'admin'"
- component: handle_unknown # No condition = default case
Only one step executes. Once a condition matches, remaining steps are skipped.
Graph (DAG Execution)
Define flows as directed acyclic graphs with topological ordering. Supports port-based routing for conditional branching.
flow:
type: graph
nodes:
- id: fetch
component: fetch_data
- id: validate
component: validator
- id: process_valid
component: processor
- id: handle_invalid
component: error_handler
edges:
- source: fetch
target: validate
- source: validate
target: process_valid
port: "valid" # Only activates when port == "valid"
- source: validate
target: handle_invalid
port: "invalid" # Only activates when port == "invalid"
Nodes execute in topological order. Port-based edges enable conditional routing — components call set_output_port(context, "valid") to choose a branch.
Cyclic Graph (Agent Loops) — v0.3.0
Define flows with cycles for iterative agent patterns. The graph executor automatically detects cycles and switches to a ready-queue BFS executor with iteration limits.
flow:
type: graph
settings:
max_iterations: 10 # Safety limit for loop iterations
on_max_iterations: exit # fail | exit | warn
nodes:
- id: plan
component: planner
- id: execute
component: executor
- id: evaluate
component: evaluator
- id: deliver
component: deliverer
edges:
- source: plan
target: execute
- source: execute
target: evaluate
- source: evaluate
target: plan
port: "refine" # Loop back when more work needed
- source: evaluate
target: deliver
port: "done" # Exit loop when quality threshold met
The evaluator component uses port-based routing to either loop back (refine) or exit to delivery (done):
class EvaluateComponent(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
quality = context.get("quality_score", 0)
threshold = self.config.get("quality_threshold", 3)
if quality >= threshold:
self.set_output_port(context, "done")
else:
self.set_output_port(context, "refine")
return context
After execution, cyclic metadata is available:
result = engine.execute()
print(result.metadata.iteration_count) # Number of loop iterations
print(result.metadata.node_visit_counts) # Per-node execution counts
print(result.metadata.max_iterations_reached) # Whether limit was hit
| Setting | Default | Description |
|---|---|---|
max_iterations |
10 |
Maximum loop iterations before policy triggers |
on_max_iterations |
"fail" |
fail (raise MaxIterationsError), exit (stop silently), warn (log + stop) |
max_visits (per-node) |
None |
Cap individual node executions (defaults to max_iterations) |
| Flow Type | Behavior | Use Case |
|---|---|---|
sequential |
All matching steps run | Data pipelines, multi-step processing |
conditional |
First match wins, then stop | Request routing, dispatch, mutually exclusive branches |
graph (DAG) |
DAG with port-based routing | Complex workflows, approval flows |
graph (cyclic) |
Loops with iteration limits | Agent loops, iterative refinement, agentic AI patterns |
Conditional Step Execution
Steps can have conditions that are evaluated at runtime:
steps:
- component: fetch_data
- component: process_data
condition: "context.data.fetch_data.status == 'success'"
- component: save_data
condition: "context.data.process_data is not None"
- component: notify_error
condition: "context.data.fetch_data.status == 'error'"
Allowed Expressions
Conditions support safe Python expressions:
| Category | Allowed |
|---|---|
| Comparisons | <, >, <=, >=, ==, != |
| Logical | and, or, not |
| Identity | is, is not |
| Membership | in, not in |
| Attributes | context.data.user.name |
| Subscripts | context.data["key"] |
| Constants | True, False, None, numbers, strings |
Disallowed for security:
- Function calls (
len(),print(), etc.) - Imports
- Lambda expressions
- List comprehensions
Async Components
Components can implement native async processing:
from flowengine import BaseComponent, FlowContext
class AsyncFetchComponent(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
# Sync fallback
return context
async def process_async(self, context: FlowContext) -> FlowContext:
data = await fetch_data_async()
context.set("data", data)
return context
The is_async property detects whether a component overrides process_async:
comp = AsyncFetchComponent("fetch")
print(comp.is_async) # True
Execution Checkpoints (Suspend/Resume)
Flows can be suspended mid-execution and resumed later — useful for human-in-the-loop workflows:
# Component suspends the flow
class ApprovalComponent(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
if not context.has("resume_data"):
context.suspend(self.name, reason="Needs human approval")
else:
decision = context.get("resume_data")
context.set("approved", decision.get("approved", False))
return context
from flowengine.core.checkpoint import InMemoryCheckpointStore
store = InMemoryCheckpointStore()
engine = FlowEngine(config, components, checkpoint_store=store)
# Execute — flow suspends at approval node
result = engine.execute()
checkpoint_id = result.get("checkpoint_id")
# Later, resume with data
resumed = engine.resume(checkpoint_id, resume_data={"approved": True})
print(resumed.get("approved")) # True
Step Lifecycle Hooks
Observe flow execution with hooks:
class LoggingHook:
def on_node_start(self, node_id, component_name, context):
print(f"Starting: {node_id}")
def on_node_complete(self, node_id, component_name, context, duration):
print(f"Completed: {node_id} in {duration:.3f}s")
def on_node_error(self, node_id, component_name, error, context):
print(f"Error in {node_id}: {error}")
def on_node_skipped(self, node_id, component_name, reason):
print(f"Skipped: {node_id} ({reason})")
def on_flow_suspended(self, node_id, reason, checkpoint_id):
print(f"Suspended at {node_id}: {reason}")
engine = FlowEngine(config, components, hooks=[LoggingHook()])
Hooks are fault-tolerant — a broken hook never interrupts flow execution.
Error Handling
Configure error behavior per step:
steps:
- component: risky_operation
on_error: continue # Options: fail, skip, continue
- component: cleanup
# Always runs even if previous step failed (with on_error: continue)
Use fail_fast: false in settings to allow continuing after errors:
flow:
settings:
fail_fast: false
steps:
- component: step1
on_error: continue # Log error, continue to next step
- component: step2
on_error: skip # Log error, mark as skipped
- component: step3
on_error: fail # Stop execution (default)
Access errors in context:
result = engine.execute(context)
if result.metadata.has_errors:
for error in result.metadata.errors:
print(f"{error['component']}: {error['message']}")
Timeout Handling
Flows can have a maximum execution time:
flow:
settings:
timeout_seconds: 60 # 60 second limit
timeout_mode: cooperative # cooperative (default), hard_async, or hard_process
Timeout Modes
FlowEngine supports three timeout enforcement modes:
| Mode | Enforcement | Use Case |
|---|---|---|
cooperative |
Components call check_deadline() |
Default, safest for complex components |
hard_async |
Uses asyncio.wait_for |
I/O-bound components, async-friendly code |
hard_process |
Runs in separate process | CPU-bound components, guaranteed termination |
Cooperative Mode (Default)
The engine sets a deadline before each step and checks between steps. Components cooperate by calling check_deadline():
class LongRunningComponent(BaseComponent):
def process(self, context: FlowContext) -> FlowContext:
for item in large_dataset:
self.check_deadline(context) # Check periodically
process_item(item)
return context
Strict Enforcement: Enable require_deadline_check: true to raise an error when long-running components don't call check_deadline():
flow:
settings:
timeout_seconds: 60
timeout_mode: cooperative
require_deadline_check: true # Raise error instead of warning
Hard Async Mode
Uses asyncio.wait_for to enforce timeouts. Components run in a thread executor, allowing cancellation:
flow:
settings:
timeout_seconds: 10
timeout_mode: hard_async
Guarantees:
- Timeout is enforced even if component doesn't call
check_deadline() - Teardown always runs (in main thread)
- Best for I/O-bound operations
Hard Process Mode
Runs each step in a separate process with a hard kill on timeout:
flow:
settings:
timeout_seconds: 30
timeout_mode: hard_process
Guarantees:
- Component is forcibly terminated on timeout
- Teardown always runs in main process
- Context is serialized/deserialized across process boundary
- Best for CPU-bound operations that may hang
Requirements:
- Components must be picklable (standard Python classes)
- Context data must be JSON-serializable
Timeout Guarantees by Mode
| Scenario | Cooperative | Hard Async | Hard Process |
|---|---|---|---|
| Between steps | ✅ Always | ✅ Always | ✅ Always |
Component calls check_deadline() |
✅ Yes | ✅ Yes | ✅ Yes |
| Component blocks without checking | ❌ Runs until returns | ✅ Cancelled | ✅ Killed |
| Teardown runs on timeout | ✅ Yes | ✅ Yes | ✅ Yes |
Choosing a Timeout Mode
┌─────────────────────────────────────────────────────────────┐
│ Choose Timeout Mode │
├─────────────────────────────────────────────────────────────┤
│ │
│ Components call check_deadline()? │
│ └── YES → Use cooperative (default, safest) │
│ └── NO → Components do I/O operations? │
│ └── YES → Use hard_async │
│ └── NO → Components are CPU-bound? │
│ └── YES → Use hard_process │
│ └── NO → Use cooperative │
│ │
└─────────────────────────────────────────────────────────────┘
Error Handling
from flowengine import FlowTimeoutError, DeadlineCheckError
try:
result = engine.execute()
except FlowTimeoutError as e:
print(f"Timed out after {e.elapsed:.2f}s (limit: {e.timeout}s)")
except DeadlineCheckError as e:
print(f"Component '{e.component}' didn't call check_deadline()")
Best Practices for Timeout Compliance
- Cooperative mode: Call
self.check_deadline(context)in loops and before I/O - Hard async: Keep components stateless when possible
- Hard process: Ensure context data is JSON-serializable
- All modes: Implement proper
teardown()for cleanup
Component Registry
For YAML-complete flows, you can auto-instantiate components from their type paths:
from flowengine import ConfigLoader, FlowEngine
# Load config and create engine with auto-instantiation
config = ConfigLoader.load("flow.yaml")
engine = FlowEngine.from_config(config)
result = engine.execute()
Or use the registry directly:
from flowengine import ComponentRegistry, FlowEngine
registry = ComponentRegistry()
registry.register_class("greeter", GreetComponent)
# Registry is used when creating engine
engine = FlowEngine.from_config(config, registry=registry)
Validate that provided components match their declared types:
engine = FlowEngine(config, components)
errors = engine.validate_component_types()
if errors:
print("Type mismatches:", errors)
Step Timing Details
Execution metadata tracks timing per step, even for repeated components:
result = engine.execute()
# Individual step timings (preserves order)
for timing in result.metadata.step_timings:
print(f"Step {timing.step_index}: {timing.component} took {timing.duration:.3f}s")
# Aggregated by component (backward-compatible)
for name, total in result.metadata.component_timings.items():
print(f"{name}: {total:.3f}s total")
Context Serialization
Contexts can be fully serialized and restored:
from flowengine import FlowContext
# After execution
result = engine.execute()
# Serialize to JSON
json_str = result.to_json()
# Later, restore the context
restored = FlowContext.from_json(json_str)
# All data preserved
print(restored.get("key"))
print(restored.metadata.flow_id)
print(restored.metadata.step_timings)
Contrib Components
LoggingComponent
Logs context state for debugging:
- name: debug
type: flowengine.contrib.logging.LoggingComponent
config:
level: debug # debug, info, warning, error
message: "Current state"
log_data: true
log_metadata: false
keys: # Optional: only log specific keys
- user
- result
HTTPComponent
Makes HTTP requests (requires pip install flowengine[http]):
- name: api
type: flowengine.contrib.http.HTTPComponent
config:
base_url: "https://api.example.com"
timeout: 30
headers:
Authorization: "Bearer token"
method: GET # GET, POST, PUT, PATCH, DELETE
Usage:
context.set("endpoint", "/users/123")
result = engine.execute(context)
print(result.data.api.data) # Response JSON
API Reference
Core Classes
| Class | Description |
|---|---|
BaseComponent |
Abstract base class for components |
FlowContext |
Context passed through all components |
DotDict |
Dictionary with attribute-style access |
ExecutionMetadata |
Tracks timing, errors, and execution state |
StepTiming |
Timing info for a single step execution |
FlowEngine |
Orchestrates flow execution |
GraphExecutor |
DAG-based graph flow executor |
ExecutionHook |
Protocol for step lifecycle hooks |
Checkpoint |
Serializable flow execution snapshot |
CheckpointStore |
Abstract base class for checkpoint persistence |
InMemoryCheckpointStore |
In-memory checkpoint store implementation |
Configuration Classes
| Class | Description |
|---|---|
ConfigLoader |
Loads YAML configurations |
FlowConfig |
Complete flow configuration model |
ComponentConfig |
Component configuration model |
StepConfig |
Step configuration model |
FlowSettings |
Execution settings model |
FlowDefinition |
Flow structure and execution definition |
GraphNodeConfig |
Node configuration for graph flows |
GraphEdgeConfig |
Edge configuration for graph flows |
ComponentRegistry |
Registry for dynamic component loading |
Exceptions
| Exception | Description |
|---|---|
FlowEngineError |
Base exception for all errors |
ConfigurationError |
Invalid configuration |
FlowExecutionError |
Runtime execution error |
FlowTimeoutError |
Flow exceeded timeout_seconds |
MaxIterationsError |
Cyclic graph exceeded max_iterations (with on_max_iterations=fail) |
DeadlineCheckError |
Component didn't call check_deadline() (with require_deadline_check=True) |
ComponentError |
Component processing error |
ConditionEvaluationError |
Invalid/unsafe condition |
Examples
See the examples/ directory for complete examples:
simple_flow.py— Basic flow executionconditional_flow.py— Sequential flow with conditional stepsrouting_flow.py— Conditional flow with first-match branchingtimeout_modes.py— Timeout enforcement modes (cooperative, hard_async, hard_process)custom_components.py— Advanced component patternsagent_loop.py— Cyclic graph agent loop with iterative refinement (v0.3.0)
Run examples:
cd examples
python simple_flow.py
python conditional_flow.py
python routing_flow.py
python timeout_modes.py
python custom_components.py
python agent_loop.py
Development
Setup
git clone https://github.com/yourorg/flowengine.git
cd flowengine
pip install -e ".[dev]"
Running Tests
pytest tests/ -v --cov=flowengine
Type Checking
mypy src/flowengine
Linting
ruff check src/ tests/
License
MIT License - see LICENSE for details.
Contributing
Contributions are welcome! Please feel free to submit issues and pull requests.
- Fork the repository
- Create a feature branch
- Make your changes
- Run tests and linting
- Submit a pull request
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file flowengine-0.4.0.tar.gz.
File metadata
- Download URL: flowengine-0.4.0.tar.gz
- Upload date:
- Size: 46.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fa296ce448d9a9dcfae26fd7c9a65da70c0c603b29728611695028f71526fac3
|
|
| MD5 |
27b6308a00b7999bb65b2f0dea19b57d
|
|
| BLAKE2b-256 |
b8561b78e71f2a4b035b3e7c24cb8c6fa94127e6ae1d51278d895f5ca44713b4
|
File details
Details for the file flowengine-0.4.0-py3-none-any.whl.
File metadata
- Download URL: flowengine-0.4.0-py3-none-any.whl
- Upload date:
- Size: 48.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.4
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60162b26b5f0dd0174781a5a25fe92f08a060b08a9af6a36ce40479aea7e94d6
|
|
| MD5 |
3d51813792a2294c1609cfd2578982a6
|
|
| BLAKE2b-256 |
641042bc05800a84dea8fc2c1b6c39de00d512c590a77d246a98a22dfbc5f0ff
|