Skip to main content

The Conductor - Pipeline & Workflow Engine for Aperion Ecosystem

Project description

Aperion Flow - The Conductor

Pipeline & Workflow Engine for the Aperion Ecosystem

Identity: The Conductor orchestrates the symphony of agents with stateful execution and crash recovery.

Overview

Aperion Flow is a standalone workflow engine extracted from aperion-legendary-ai. It provides:

  • DAG-based Pipeline Execution - Define workflows with dependencies
  • Crash Recovery - Checkpoint-based state persistence
  • LLM Assistance Integration - Pre-validate, analyze, diagnose with AI
  • Circuit Breaker Protection - Prevent cascade failures
  • Event Emission - Real-time progress tracking via The Nervous System

Installation

cd aperion-flow
pip install -e ".[dev]"

Quick Start

Define a Pipeline

from aperion_flow import Pipeline, PipelineStage, LLMAssistanceMode, LLMAssistanceConfig

# Create stages
stages = [
    PipelineStage(
        id="fetch",
        name="Fetch Data",
        handler="myapp.handlers.fetch_data",
        timeout=30.0,
    ),
    PipelineStage(
        id="process",
        name="Process Data",
        handler="myapp.handlers.process_data",
        depends_on=["fetch"],
        llm_assistance=[
            LLMAssistanceConfig(mode=LLMAssistanceMode.ERROR_DIAGNOSIS)
        ],
    ),
    PipelineStage(
        id="store",
        name="Store Results",
        handler="myapp.handlers.store_results",
        depends_on=["process"],
    ),
]

# Create pipeline
pipeline = Pipeline(
    name="data_pipeline",
    description="ETL pipeline with LLM assistance",
    stages=stages,
    max_parallel=2,
    fail_fast=True,
)

Using the Builder

from aperion_flow import PipelineBuilder, LLMAssistanceMode

pipeline = (
    PipelineBuilder("my_pipeline", "Description")
    .add_stage("fetch", "myapp.fetch", timeout=60.0)
    .add_stage("process", "myapp.process", depends_on=["fetch"], retry_count=3)
    .add_stage("analyze", "myapp.analyze", depends_on=["process"],
               llm_modes=[LLMAssistanceMode.ANALYZE_OUTPUT])
    .with_max_parallel(4)
    .with_fail_fast(True)
    .build()
)

Execute a Pipeline

import asyncio
from aperion_flow import PipelineExecutor

async def main():
    executor = PipelineExecutor()

    # Register handlers (or use fully qualified module paths)
    executor.register_handler("fetch", fetch_handler)
    executor.register_handler("process", process_handler)

    result = await executor.execute(
        pipeline,
        initial_data={"input": "value"},
        correlation_id="req-123",
    )

    print(f"Status: {result.status}")
    print(f"Completed: {result.stages_completed}/{result.stage_count}")
    print(f"Output: {result.data}")

asyncio.run(main())

Crash Recovery

# Resume from checkpoint
result = await executor.execute(
    pipeline,
    resume_from="execution-id-from-previous-run",
)

Architecture

aperion-flow/
├── src/aperion_flow/
│   ├── definitions/
│   │   ├── pipeline.py    # Pipeline, Stage, LLMConfig models
│   │   └── registry.py    # Strategic pipeline catalog
│   ├── engine/
│   │   ├── context.py     # PipelineContext, Checkpoint, State
│   │   ├── executor.py    # Async DAG runner
│   │   └── recovery.py    # Retry, backoff, circuit breaker
│   └── api/
│       └── routes.py      # FastAPI endpoints
└── tests/

Key Components

PipelineContext

The "State Bag" that flows through execution:

from aperion_flow import PipelineContext

ctx = PipelineContext(
    pipeline_id="p1",
    pipeline_name="my_pipeline",
)

# Store data between stages
ctx.set("key", "value")
ctx.update({"batch": [1, 2, 3]})

# Track stage results
ctx.record_stage_start("stage_id", "stage_name")
ctx.record_stage_complete("stage_id", {"output": "data"})

Recovery Engine

Resilient execution with multiple strategies:

from aperion_flow import RecoveryConfig, RecoveryEngine, RecoveryStrategy

config = RecoveryConfig(
    strategy=RecoveryStrategy.EXPONENTIAL_BACKOFF,
    max_retries=5,
    base_delay=1.0,
    max_delay=60.0,
    failure_threshold=10,  # Circuit breaker
)

engine = RecoveryEngine(config)
result = await engine.execute_with_recovery(my_handler, circuit_key="api_calls")

Strategic Registry

Pre-configured pipelines with intelligent defaults:

from aperion_flow.definitions.registry import get_registry, PipelineCategory

registry = get_registry()

# Get LLM config with smart defaults
llm_config = registry.get_llm_config(
    "pytest_execution",
    LLMAssistanceMode.ERROR_DIAGNOSIS,
)

# List pipelines by category
testing_pipelines = registry.get_by_category(PipelineCategory.TESTING)

API Server

Run the Flow API server:

python -m aperion_flow.api.routes
# or
uvicorn aperion_flow.api.routes:create_app --factory --port 8001

Endpoints

Method Path Description
POST /flow/start Start a pipeline execution
POST /flow/resume Resume from checkpoint
GET /flow/{id} Get execution status
GET /flow/{id}/data Get execution output data
GET /flow/ List executions
DELETE /flow/{id} Delete execution record
GET /healthz Health check

Integration Points

Aperion Flow integrates with other ecosystem components:

  • The Nervous System (Event Bus) - Emits flow.started, flow.stage.completed, flow.failed events
  • The Switchboard (LLM Router) - Provides LLM assistance for validation and diagnosis
  • The Cortex (State Gateway) - Persists checkpoints and execution history
  • The Gatekeeper (Auth) - Validates access to pipeline execution

Event Emitter Protocol

from typing import Protocol

class EventEmitter(Protocol):
    async def emit(
        self,
        event_type: str,
        payload: dict,
        *,
        source: str | None = None,
        correlation_id: str | None = None,
    ) -> str:
        ...

LLM Assistant Protocol

from typing import Protocol

class LLMAssistant(Protocol):
    async def pre_validate(self, stage, context, timeout) -> dict:
        ...
    async def analyze_output(self, stage, output, context, timeout) -> dict:
        ...
    async def diagnose_error(self, stage, error, context, timeout) -> dict:
        ...

Migration from aperion-legendary-ai

Source Files Migrated

Original New Location
stack/aperion/pipelines/executor.py src/flow/engine/executor.py
stack/aperion/pipelines/pipeline.py src/flow/definitions/pipeline.py
stack/aperion/pipelines/recovery.py src/flow/engine/recovery.py
stack/aperion/pipelines/strategic_registry.py src/flow/definitions/registry.py

Key Changes

  1. Async-first - Replaced ThreadPoolExecutor with asyncio
  2. Stateless Executor - All state in PipelineContext with checkpoints
  3. Pydantic v2 - Immutable models with frozen=True
  4. Protocol-based Integration - Pluggable EventEmitter, LLMAssistant, CheckpointStore
  5. No Internal Dependencies - Removed @with_doc_context and internal service coupling

Testing

# Run all tests
pytest

# Run with coverage
pytest --cov=aperion_flow --cov-report=html

# Run specific test file
pytest tests/unit/test_executor.py -v

Security Considerations

  • Secrets - Never store secrets in PipelineContext; use SecretRef patterns
  • Timeouts - All stages have configurable timeouts (default 30s)
  • Circuit Breakers - Prevent cascade failures from external services
  • Audit Trail - Checkpoints provide execution history

Constitution Alignment

Constitution Implementation
A4 - Agents Analyze, Not Execute Executor controls flow; handlers execute
A5 - Latency SLOs Stage timeouts, parallel execution
B1 - Secrets via Env SecretRef pattern, no secrets in context
D3 - Audit Logging Checkpoint history, event emission

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aperion_flow-1.2.1.tar.gz (64.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aperion_flow-1.2.1-py3-none-any.whl (47.4 kB view details)

Uploaded Python 3

File details

Details for the file aperion_flow-1.2.1.tar.gz.

File metadata

  • Download URL: aperion_flow-1.2.1.tar.gz
  • Upload date:
  • Size: 64.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aperion_flow-1.2.1.tar.gz
Algorithm Hash digest
SHA256 5c1841de35166b68c6b7114e8fd8358732742500130f5e3ddb1f49d9aba6fd58
MD5 fd99255954bc99521a9b2db491e91ed5
BLAKE2b-256 70514804359dc2411dbafc4d138bf3de9c79c3e32e1e75b16b89e59856181cbd

See more details on using hashes here.

Provenance

The following attestation bundles were made for aperion_flow-1.2.1.tar.gz:

Publisher: release.yml on invictustitan2/aperion-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aperion_flow-1.2.1-py3-none-any.whl.

File metadata

  • Download URL: aperion_flow-1.2.1-py3-none-any.whl
  • Upload date:
  • Size: 47.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aperion_flow-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9011f5573caf3940a60b6251d8c11ce421caf31dc571fc85969607a37df36d7c
MD5 4ad5b079a88abd7bd716055913f7e715
BLAKE2b-256 7f29015b90a6bc826088390e132254bfa1cb8f5db3c6e852af3513e75f3dc7c0

See more details on using hashes here.

Provenance

The following attestation bundles were made for aperion_flow-1.2.1-py3-none-any.whl:

Publisher: release.yml on invictustitan2/aperion-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page