The Conductor - Pipeline & Workflow Engine for Aperion Ecosystem
Project description
Aperion Flow - The Conductor
Pipeline & Workflow Engine for the Aperion Ecosystem
Identity: The Conductor orchestrates the symphony of agents with stateful execution and crash recovery.
Overview
Aperion Flow is a standalone workflow engine extracted from aperion-legendary-ai. It provides:
- DAG-based Pipeline Execution - Define workflows with dependencies
- Crash Recovery - Checkpoint-based state persistence
- LLM Assistance Integration - Pre-validate, analyze, diagnose with AI
- Circuit Breaker Protection - Prevent cascade failures
- Event Emission - Real-time progress tracking via The Nervous System
Installation
cd aperion-flow
pip install -e ".[dev]"
Quick Start
Define a Pipeline
from aperion_flow import Pipeline, PipelineStage, LLMAssistanceMode, LLMAssistanceConfig
# Create stages
stages = [
PipelineStage(
id="fetch",
name="Fetch Data",
handler="myapp.handlers.fetch_data",
timeout=30.0,
),
PipelineStage(
id="process",
name="Process Data",
handler="myapp.handlers.process_data",
depends_on=["fetch"],
llm_assistance=[
LLMAssistanceConfig(mode=LLMAssistanceMode.ERROR_DIAGNOSIS)
],
),
PipelineStage(
id="store",
name="Store Results",
handler="myapp.handlers.store_results",
depends_on=["process"],
),
]
# Create pipeline
pipeline = Pipeline(
name="data_pipeline",
description="ETL pipeline with LLM assistance",
stages=stages,
max_parallel=2,
fail_fast=True,
)
Using the Builder
from aperion_flow import PipelineBuilder, LLMAssistanceMode
pipeline = (
PipelineBuilder("my_pipeline", "Description")
.add_stage("fetch", "myapp.fetch", timeout=60.0)
.add_stage("process", "myapp.process", depends_on=["fetch"], retry_count=3)
.add_stage("analyze", "myapp.analyze", depends_on=["process"],
llm_modes=[LLMAssistanceMode.ANALYZE_OUTPUT])
.with_max_parallel(4)
.with_fail_fast(True)
.build()
)
Execute a Pipeline
import asyncio
from aperion_flow import PipelineExecutor
async def main():
executor = PipelineExecutor()
# Register handlers (or use fully qualified module paths)
executor.register_handler("fetch", fetch_handler)
executor.register_handler("process", process_handler)
result = await executor.execute(
pipeline,
initial_data={"input": "value"},
correlation_id="req-123",
)
print(f"Status: {result.status}")
print(f"Completed: {result.stages_completed}/{result.stage_count}")
print(f"Output: {result.data}")
asyncio.run(main())
Crash Recovery
# Resume from checkpoint
result = await executor.execute(
pipeline,
resume_from="execution-id-from-previous-run",
)
Architecture
aperion-flow/
├── src/aperion_flow/
│ ├── definitions/
│ │ ├── pipeline.py # Pipeline, Stage, LLMConfig models
│ │ └── registry.py # Strategic pipeline catalog
│ ├── engine/
│ │ ├── context.py # PipelineContext, Checkpoint, State
│ │ ├── executor.py # Async DAG runner
│ │ └── recovery.py # Retry, backoff, circuit breaker
│ └── api/
│ └── routes.py # FastAPI endpoints
└── tests/
Key Components
PipelineContext
The "State Bag" that flows through execution:
from aperion_flow import PipelineContext
ctx = PipelineContext(
pipeline_id="p1",
pipeline_name="my_pipeline",
)
# Store data between stages
ctx.set("key", "value")
ctx.update({"batch": [1, 2, 3]})
# Track stage results
ctx.record_stage_start("stage_id", "stage_name")
ctx.record_stage_complete("stage_id", {"output": "data"})
Recovery Engine
Resilient execution with multiple strategies:
from aperion_flow import RecoveryConfig, RecoveryEngine, RecoveryStrategy
config = RecoveryConfig(
strategy=RecoveryStrategy.EXPONENTIAL_BACKOFF,
max_retries=5,
base_delay=1.0,
max_delay=60.0,
failure_threshold=10, # Circuit breaker
)
engine = RecoveryEngine(config)
result = await engine.execute_with_recovery(my_handler, circuit_key="api_calls")
Strategic Registry
Pre-configured pipelines with intelligent defaults:
from aperion_flow.definitions.registry import get_registry, PipelineCategory
registry = get_registry()
# Get LLM config with smart defaults
llm_config = registry.get_llm_config(
"pytest_execution",
LLMAssistanceMode.ERROR_DIAGNOSIS,
)
# List pipelines by category
testing_pipelines = registry.get_by_category(PipelineCategory.TESTING)
API Server
Run the Flow API server:
python -m aperion_flow.api.routes
# or
uvicorn aperion_flow.api.routes:create_app --factory --port 8001
Endpoints
| Method | Path | Description |
|---|---|---|
| POST | /flow/start |
Start a pipeline execution |
| POST | /flow/resume |
Resume from checkpoint |
| GET | /flow/{id} |
Get execution status |
| GET | /flow/{id}/data |
Get execution output data |
| GET | /flow/ |
List executions |
| DELETE | /flow/{id} |
Delete execution record |
| GET | /healthz |
Health check |
Integration Points
Aperion Flow integrates with other ecosystem components:
- The Nervous System (Event Bus) - Emits
flow.started,flow.stage.completed,flow.failedevents - The Switchboard (LLM Router) - Provides LLM assistance for validation and diagnosis
- The Cortex (State Gateway) - Persists checkpoints and execution history
- The Gatekeeper (Auth) - Validates access to pipeline execution
Event Emitter Protocol
from typing import Protocol
class EventEmitter(Protocol):
async def emit(
self,
event_type: str,
payload: dict,
*,
source: str | None = None,
correlation_id: str | None = None,
) -> str:
...
LLM Assistant Protocol
from typing import Protocol
class LLMAssistant(Protocol):
async def pre_validate(self, stage, context, timeout) -> dict:
...
async def analyze_output(self, stage, output, context, timeout) -> dict:
...
async def diagnose_error(self, stage, error, context, timeout) -> dict:
...
Migration from aperion-legendary-ai
Source Files Migrated
| Original | New Location |
|---|---|
stack/aperion/pipelines/executor.py |
src/flow/engine/executor.py |
stack/aperion/pipelines/pipeline.py |
src/flow/definitions/pipeline.py |
stack/aperion/pipelines/recovery.py |
src/flow/engine/recovery.py |
stack/aperion/pipelines/strategic_registry.py |
src/flow/definitions/registry.py |
Key Changes
- Async-first - Replaced ThreadPoolExecutor with asyncio
- Stateless Executor - All state in PipelineContext with checkpoints
- Pydantic v2 - Immutable models with
frozen=True - Protocol-based Integration - Pluggable EventEmitter, LLMAssistant, CheckpointStore
- No Internal Dependencies - Removed
@with_doc_contextand internal service coupling
Testing
# Run all tests
pytest
# Run with coverage
pytest --cov=aperion_flow --cov-report=html
# Run specific test file
pytest tests/unit/test_executor.py -v
Security Considerations
- Secrets - Never store secrets in PipelineContext; use
SecretRefpatterns - Timeouts - All stages have configurable timeouts (default 30s)
- Circuit Breakers - Prevent cascade failures from external services
- Audit Trail - Checkpoints provide execution history
Constitution Alignment
| Constitution | Implementation |
|---|---|
| A4 - Agents Analyze, Not Execute | Executor controls flow; handlers execute |
| A5 - Latency SLOs | Stage timeouts, parallel execution |
| B1 - Secrets via Env | SecretRef pattern, no secrets in context |
| D3 - Audit Logging | Checkpoint history, event emission |
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aperion_flow-1.2.1.tar.gz.
File metadata
- Download URL: aperion_flow-1.2.1.tar.gz
- Upload date:
- Size: 64.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5c1841de35166b68c6b7114e8fd8358732742500130f5e3ddb1f49d9aba6fd58
|
|
| MD5 |
fd99255954bc99521a9b2db491e91ed5
|
|
| BLAKE2b-256 |
70514804359dc2411dbafc4d138bf3de9c79c3e32e1e75b16b89e59856181cbd
|
Provenance
The following attestation bundles were made for aperion_flow-1.2.1.tar.gz:
Publisher:
release.yml on invictustitan2/aperion-flow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aperion_flow-1.2.1.tar.gz -
Subject digest:
5c1841de35166b68c6b7114e8fd8358732742500130f5e3ddb1f49d9aba6fd58 - Sigstore transparency entry: 941976254
- Sigstore integration time:
-
Permalink:
invictustitan2/aperion-flow@2aa20c0cf1669399fc4941752dd01ab5b2a338c6 -
Branch / Tag:
refs/tags/v1.2.1 - Owner: https://github.com/invictustitan2
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@2aa20c0cf1669399fc4941752dd01ab5b2a338c6 -
Trigger Event:
push
-
Statement type:
File details
Details for the file aperion_flow-1.2.1-py3-none-any.whl.
File metadata
- Download URL: aperion_flow-1.2.1-py3-none-any.whl
- Upload date:
- Size: 47.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9011f5573caf3940a60b6251d8c11ce421caf31dc571fc85969607a37df36d7c
|
|
| MD5 |
4ad5b079a88abd7bd716055913f7e715
|
|
| BLAKE2b-256 |
7f29015b90a6bc826088390e132254bfa1cb8f5db3c6e852af3513e75f3dc7c0
|
Provenance
The following attestation bundles were made for aperion_flow-1.2.1-py3-none-any.whl:
Publisher:
release.yml on invictustitan2/aperion-flow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aperion_flow-1.2.1-py3-none-any.whl -
Subject digest:
9011f5573caf3940a60b6251d8c11ce421caf31dc571fc85969607a37df36d7c - Sigstore transparency entry: 941976291
- Sigstore integration time:
-
Permalink:
invictustitan2/aperion-flow@2aa20c0cf1669399fc4941752dd01ab5b2a338c6 -
Branch / Tag:
refs/tags/v1.2.1 - Owner: https://github.com/invictustitan2
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@2aa20c0cf1669399fc4941752dd01ab5b2a338c6 -
Trigger Event:
push
-
Statement type: