Skip to main content

The Conductor - Pipeline & Workflow Engine for Aperion Ecosystem

Project description

Aperion Flow - The Conductor

Pipeline & Workflow Engine for the Aperion Ecosystem

Identity: The Conductor orchestrates the symphony of agents with stateful execution and crash recovery.

Overview

Aperion Flow is a standalone workflow engine extracted from aperion-legendary-ai. It provides:

  • DAG-based Pipeline Execution - Define workflows with dependencies
  • Crash Recovery - Checkpoint-based state persistence
  • LLM Assistance Integration - Pre-validate, analyze, diagnose with AI
  • Circuit Breaker Protection - Prevent cascade failures
  • Event Emission - Real-time progress tracking via The Nervous System

Installation

cd aperion-flow
pip install -e ".[dev]"

Quick Start

Define a Pipeline

from aperion_flow import Pipeline, PipelineStage, LLMAssistanceMode, LLMAssistanceConfig

# Create stages
stages = [
    PipelineStage(
        id="fetch",
        name="Fetch Data",
        handler="myapp.handlers.fetch_data",
        timeout=30.0,
    ),
    PipelineStage(
        id="process",
        name="Process Data",
        handler="myapp.handlers.process_data",
        depends_on=["fetch"],
        llm_assistance=[
            LLMAssistanceConfig(mode=LLMAssistanceMode.ERROR_DIAGNOSIS)
        ],
    ),
    PipelineStage(
        id="store",
        name="Store Results",
        handler="myapp.handlers.store_results",
        depends_on=["process"],
    ),
]

# Create pipeline
pipeline = Pipeline(
    name="data_pipeline",
    description="ETL pipeline with LLM assistance",
    stages=stages,
    max_parallel=2,
    fail_fast=True,
)

Using the Builder

from aperion_flow import PipelineBuilder, LLMAssistanceMode

pipeline = (
    PipelineBuilder("my_pipeline", "Description")
    .add_stage("fetch", "myapp.fetch", timeout=60.0)
    .add_stage("process", "myapp.process", depends_on=["fetch"], retry_count=3)
    .add_stage("analyze", "myapp.analyze", depends_on=["process"],
               llm_modes=[LLMAssistanceMode.ANALYZE_OUTPUT])
    .with_max_parallel(4)
    .with_fail_fast(True)
    .build()
)

Execute a Pipeline

import asyncio
from aperion_flow import PipelineExecutor

async def main():
    executor = PipelineExecutor()

    # Register handlers (or use fully qualified module paths)
    executor.register_handler("fetch", fetch_handler)
    executor.register_handler("process", process_handler)

    result = await executor.execute(
        pipeline,
        initial_data={"input": "value"},
        correlation_id="req-123",
    )

    print(f"Status: {result.status}")
    print(f"Completed: {result.stages_completed}/{result.stage_count}")
    print(f"Output: {result.data}")

asyncio.run(main())

Crash Recovery

# Resume from checkpoint
result = await executor.execute(
    pipeline,
    resume_from="execution-id-from-previous-run",
)

Architecture

aperion-flow/
├── src/aperion_flow/
│   ├── definitions/
│   │   ├── pipeline.py    # Pipeline, Stage, LLMConfig models
│   │   └── registry.py    # Strategic pipeline catalog
│   ├── engine/
│   │   ├── context.py     # PipelineContext, Checkpoint, State
│   │   ├── executor.py    # Async DAG runner
│   │   └── recovery.py    # Retry, backoff, circuit breaker
│   └── api/
│       └── routes.py      # FastAPI endpoints
└── tests/

Key Components

PipelineContext

The "State Bag" that flows through execution:

from aperion_flow import PipelineContext

ctx = PipelineContext(
    pipeline_id="p1",
    pipeline_name="my_pipeline",
)

# Store data between stages
ctx.set("key", "value")
ctx.update({"batch": [1, 2, 3]})

# Track stage results
ctx.record_stage_start("stage_id", "stage_name")
ctx.record_stage_complete("stage_id", {"output": "data"})

Recovery Engine

Resilient execution with multiple strategies:

from aperion_flow import RecoveryConfig, RecoveryEngine, RecoveryStrategy

config = RecoveryConfig(
    strategy=RecoveryStrategy.EXPONENTIAL_BACKOFF,
    max_retries=5,
    base_delay=1.0,
    max_delay=60.0,
    failure_threshold=10,  # Circuit breaker
)

engine = RecoveryEngine(config)
result = await engine.execute_with_recovery(my_handler, circuit_key="api_calls")

Strategic Registry

Pre-configured pipelines with intelligent defaults:

from aperion_flow.definitions.registry import get_registry, PipelineCategory

registry = get_registry()

# Get LLM config with smart defaults
llm_config = registry.get_llm_config(
    "pytest_execution",
    LLMAssistanceMode.ERROR_DIAGNOSIS,
)

# List pipelines by category
testing_pipelines = registry.get_by_category(PipelineCategory.TESTING)

API Server

Run the Flow API server:

python -m aperion_flow.api.routes
# or
uvicorn aperion_flow.api.routes:create_app --factory --port 8001

Endpoints

Method Path Description
POST /flow/start Start a pipeline execution
POST /flow/resume Resume from checkpoint
GET /flow/{id} Get execution status
GET /flow/{id}/data Get execution output data
GET /flow/ List executions
DELETE /flow/{id} Delete execution record
GET /healthz Health check

Integration Points

Aperion Flow integrates with other ecosystem components:

  • The Nervous System (Event Bus) - Emits flow.started, flow.stage.completed, flow.failed events
  • The Switchboard (LLM Router) - Provides LLM assistance for validation and diagnosis
  • The Cortex (State Gateway) - Persists checkpoints and execution history
  • The Gatekeeper (Auth) - Validates access to pipeline execution

Event Emitter Protocol

from typing import Protocol

class EventEmitter(Protocol):
    async def emit(
        self,
        event_type: str,
        payload: dict,
        *,
        source: str | None = None,
        correlation_id: str | None = None,
    ) -> str:
        ...

LLM Assistant Protocol

from typing import Protocol

class LLMAssistant(Protocol):
    async def pre_validate(self, stage, context, timeout) -> dict:
        ...
    async def analyze_output(self, stage, output, context, timeout) -> dict:
        ...
    async def diagnose_error(self, stage, error, context, timeout) -> dict:
        ...

Migration from aperion-legendary-ai

Source Files Migrated

Original New Location
stack/aperion/pipelines/executor.py src/flow/engine/executor.py
stack/aperion/pipelines/pipeline.py src/flow/definitions/pipeline.py
stack/aperion/pipelines/recovery.py src/flow/engine/recovery.py
stack/aperion/pipelines/strategic_registry.py src/flow/definitions/registry.py

Key Changes

  1. Async-first - Replaced ThreadPoolExecutor with asyncio
  2. Stateless Executor - All state in PipelineContext with checkpoints
  3. Pydantic v2 - Immutable models with frozen=True
  4. Protocol-based Integration - Pluggable EventEmitter, LLMAssistant, CheckpointStore
  5. No Internal Dependencies - Removed @with_doc_context and internal service coupling

Testing

# Run all tests
pytest

# Run with coverage
pytest --cov=aperion_flow --cov-report=html

# Run specific test file
pytest tests/unit/test_executor.py -v

Security Considerations

  • Secrets - Never store secrets in PipelineContext; use SecretRef patterns
  • Timeouts - All stages have configurable timeouts (default 30s)
  • Circuit Breakers - Prevent cascade failures from external services
  • Audit Trail - Checkpoints provide execution history

Constitution Alignment

Constitution Implementation
A4 - Agents Analyze, Not Execute Executor controls flow; handlers execute
A5 - Latency SLOs Stage timeouts, parallel execution
B1 - Secrets via Env SecretRef pattern, no secrets in context
D3 - Audit Logging Checkpoint history, event emission

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aperion_flow-0.1.0.tar.gz (64.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aperion_flow-0.1.0-py3-none-any.whl (47.3 kB view details)

Uploaded Python 3

File details

Details for the file aperion_flow-0.1.0.tar.gz.

File metadata

  • Download URL: aperion_flow-0.1.0.tar.gz
  • Upload date:
  • Size: 64.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aperion_flow-0.1.0.tar.gz
Algorithm Hash digest
SHA256 6bc33bb363c67e9a831036f50ccd1d1e7d49c2196da7b1c07a90f3092a282ac5
MD5 9500720ea957fe41a56d4aee3a4bf127
BLAKE2b-256 9bb6b2e558fde20e52d9d52fc1b992b51a3269d99bad6018c2253dee1449b17f

See more details on using hashes here.

Provenance

The following attestation bundles were made for aperion_flow-0.1.0.tar.gz:

Publisher: release.yml on invictustitan2/aperion-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aperion_flow-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: aperion_flow-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 47.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for aperion_flow-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cd75141e4e7c78bd4d8882ebf1a7dfd18ddbefa566dfe771dd2722616c536fd4
MD5 b512145f3a410fc6dbe4a7a546468a6b
BLAKE2b-256 7f85b722786f87fd3fe150faefbb34af021ea6484657db54a465ef090e7d7401

See more details on using hashes here.

Provenance

The following attestation bundles were made for aperion_flow-0.1.0-py3-none-any.whl:

Publisher: release.yml on invictustitan2/aperion-flow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page