Skip to main content

Harness-engineered agent pipeline library with 16-stage dual-abstraction architecture, built on the Anthropic API

Project description

geny-executor

PyPI version Python 3.11+ License: MIT CI

Harness-engineered agent pipeline library built on the Anthropic API.

geny-executor implements a 16-stage pipeline with dual-abstraction architecture — inspired by Claude Code's agent loop and Anthropic's harness design principles. No LangChain. No LangGraph. Just a clean, modular pipeline that gives you full control over every step of agent execution.

한국어 README


Why geny-executor?

Problem geny-executor's Answer
Frameworks hide too much behind abstractions Every stage is explicit and inspectable
Hard to customize one part without rewriting everything Dual Abstraction — swap stages or strategies within stages
Agent loops are opaque black boxes 16 clearly defined stages with event-driven observability
Vendor lock-in across multiple LLM providers Single dependency: Anthropic SDK. One provider, done right
Cost tracking is an afterthought Built-in token tracking, cost calculation, and budget guards

Architecture

The 16-Stage Pipeline

Phase A (once):   [1: Input]
Phase B (loop):   [2: Context] → [3: System] → [4: Guard] → [5: Cache]
                  → [6: API] → [7: Token] → [8: Think] → [9: Parse]
                  → [10: Tool] → [11: Agent] → [12: Evaluate] → [13: Loop]
Phase C (once):   [14: Emit] → [15: Memory] → [16: Yield]
# Stage Purpose Example Strategies
1 Input Validate & normalize user input Default, Strict, Schema, Multimodal
2 Context Load conversation history & memory SimpleLoad, ProgressiveDisclosure, VectorSearch
3 System Build system prompt Static, Composable, Adaptive
4 Guard Safety checks & budget enforcement TokenBudget, Cost, RateLimit, Permission
5 Cache Optimize prompt caching NoCache, System, Aggressive, Adaptive
6 API Call Anthropic Messages API Anthropic, Mock, Recording, Replay
7 Token Track usage & calculate costs Default, Detailed + AnthropicPricing
8 Think Process extended thinking blocks Passthrough, ExtractAndStore, Filter
9 Parse Parse response & detect completion Default, StructuredOutput + SignalDetector
10 Tool Execute tool calls Sequential, Parallel + RegistryRouter
11 Agent Multi-agent orchestration SingleAgent, Delegate, Evaluator
12 Evaluate Judge quality & completion SignalBased, CriteriaBased, AgentEval
13 Loop Decide: continue or finish? Standard, SingleTurn, BudgetAware
14 Emit Output results Text, Callback, VTuber, TTS, Streaming
15 Memory Persist conversation memory AppendOnly, Reflective + File/SQLite
16 Yield Format final result Default, Structured, Streaming

Dual Abstraction

┌─ Level 1: Stage Abstraction ─────────────────────────┐
│  Swap entire stage modules in/out of the pipeline     │
│                                                       │
│  ┌─ Level 2: Strategy Abstraction ─────────────────┐  │
│  │  Swap internal logic within a stage              │  │
│  │                                                  │  │
│  │  ContextStage can use:                           │  │
│  │    → SimpleLoadStrategy (default)                │  │
│  │    → ProgressiveDisclosureStrategy               │  │
│  │    → VectorSearchStrategy                        │  │
│  │    → YourCustomStrategy                          │  │
│  └──────────────────────────────────────────────────┘  │
└────────────────────────────────────────────────────────┘

Level 1: Replace an entire stage (e.g., swap APIStage for a custom provider).
Level 2: Change behavior within a stage (e.g., switch context loading strategy from simple to vector search).


Installation

pip install geny-executor

With optional dependencies:

# Memory features (numpy for vector operations)
pip install geny-executor[memory]

# All optional dependencies
pip install geny-executor[all]

# Development
pip install geny-executor[dev]

Requirements

  • Python 3.11+
  • Anthropic API key

Quick Start

Minimal Pipeline

The simplest possible agent — input, API call, parse, output:

import asyncio
from geny_executor import PipelinePresets

async def main():
    pipeline = PipelinePresets.minimal(api_key="sk-ant-...")
    result = await pipeline.run("What is the capital of France?")
    print(result.text)

asyncio.run(main())

Chat Pipeline

Full conversational agent with history, system prompt, and tool support:

from geny_executor import PipelinePresets

pipeline = PipelinePresets.chat(
    api_key="sk-ant-...",
    system_prompt="You are a helpful coding assistant.",
)

result = await pipeline.run("Explain Python decorators")
print(result.text)
print(f"Cost: ${result.total_cost_usd:.4f}")

Agent Pipeline (All 16 Stages)

Autonomous agent with tools, evaluation, memory, and loop control:

from geny_executor import PipelinePresets
from geny_executor.tools import ToolRegistry, Tool, ToolResult, ToolContext

class SearchTool(Tool):
    @property
    def name(self) -> str:
        return "search"

    @property
    def description(self) -> str:
        return "Search the web for information"

    @property
    def input_schema(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "Search query"}
            },
            "required": ["query"],
        }

    async def execute(self, input: dict, context: ToolContext) -> ToolResult:
        # Your search implementation
        return ToolResult(content=f"Results for: {input['query']}")

registry = ToolRegistry()
registry.register(SearchTool())

pipeline = PipelinePresets.agent(
    api_key="sk-ant-...",
    system_prompt="You are a research assistant. Use tools to find answers.",
    tools=registry,
    max_turns=20,
)

result = await pipeline.run("Find the latest Python release version")

Custom Pipeline with Builder

Fine-grained control over every stage:

from geny_executor import PipelineBuilder

pipeline = (
    PipelineBuilder("my-agent", api_key="sk-ant-...")
    .with_model(model="claude-sonnet-4-20250514", max_tokens=4096)
    .with_system(prompt="You are a concise assistant.")
    .with_context()
    .with_guard(cost_budget_usd=1.0, max_iterations=30)
    .with_cache(strategy="aggressive")
    .with_tools(registry=my_registry)
    .with_think(enabled=True, budget_tokens=10000)
    .with_evaluate()
    .with_loop(max_turns=30)
    .with_memory()
    .build()
)

result = await pipeline.run("Complex multi-step task here")

Manual Pipeline Construction

For maximum control, assemble stages directly:

from geny_executor import Pipeline, PipelineConfig
from geny_executor.stages.s01_input import InputStage
from geny_executor.stages.s06_api import APIStage, MockProvider
from geny_executor.stages.s09_parse import ParseStage
from geny_executor.stages.s16_yield import YieldStage

config = PipelineConfig(name="custom", api_key="sk-ant-...")
pipeline = Pipeline(config)

pipeline.register_stage(InputStage())
pipeline.register_stage(APIStage(provider=MockProvider(responses=["Hello!"])))
pipeline.register_stage(ParseStage())
pipeline.register_stage(YieldStage())

result = await pipeline.run("Test input")

Sessions

Persistent state management across multiple interactions:

from geny_executor import PipelinePresets
from geny_executor.session import SessionManager

manager = SessionManager()
pipeline = PipelinePresets.chat(api_key="sk-ant-...")

# Create a session — state persists across runs
session = manager.create(pipeline)
result1 = await session.run("My name is Alice")
result2 = await session.run("What's my name?")  # Remembers context

# List active sessions
for info in manager.list_sessions():
    print(f"Session {info.session_id}: {info.message_count} messages, ${info.total_cost_usd:.4f}")

Event System

Real-time observability with pub/sub events:

from geny_executor import PipelinePresets

pipeline = PipelinePresets.agent(api_key="sk-ant-...")

# Subscribe to specific events
@pipeline.on("stage.enter")
async def on_stage_enter(event):
    print(f"  → Entering: {event.stage}")

@pipeline.on("stage.exit")
async def on_stage_exit(event):
    print(f"  ← Exiting: {event.stage}")

@pipeline.on("pipeline.complete")
async def on_complete(event):
    print(f"Done! Iterations: {event.data.get('iterations')}")

# Wildcard: listen to all events
@pipeline.on("*")
async def on_any(event):
    pass  # Log everything

result = await pipeline.run("Hello")

Streaming

async for event in pipeline.run_stream("Solve this step by step"):
    if event.type == "stage.enter":
        print(f"Stage: {event.stage}")
    elif event.type == "api.response":
        print(f"Response received")
    elif event.type == "pipeline.complete":
        print(f"Final: {event.data['result'].text}")

Tool System

Creating Tools

from geny_executor.tools import Tool, ToolResult, ToolContext

class CalculatorTool(Tool):
    @property
    def name(self) -> str:
        return "calculator"

    @property
    def description(self) -> str:
        return "Perform arithmetic calculations"

    @property
    def input_schema(self) -> dict:
        return {
            "type": "object",
            "properties": {
                "expression": {"type": "string", "description": "Math expression to evaluate"}
            },
            "required": ["expression"],
        }

    async def execute(self, input: dict, context: ToolContext) -> ToolResult:
        try:
            result = eval(input["expression"])  # Use a safe evaluator in production
            return ToolResult(content=str(result))
        except Exception as e:
            return ToolResult(content=str(e), is_error=True)

Tool Registry

from geny_executor.tools import ToolRegistry

registry = ToolRegistry()
registry.register(CalculatorTool())
registry.register(SearchTool())

# Filter tools per request
math_tools = registry.filter(include=["calculator"])
api_format = registry.to_api_format()  # Anthropic API format

MCP Integration

Connect to Model Context Protocol servers:

from geny_executor.tools.mcp import MCPManager

mcp = MCPManager()
await mcp.connect("filesystem", command="npx", args=["-y", "@anthropic/mcp-filesystem"])

# MCP tools are automatically adapted to the Tool interface
for tool in mcp.list_tools():
    registry.register(tool)

Error Handling

Structured error hierarchy with automatic classification:

from geny_executor import (
    GenyExecutorError,   # Base exception
    PipelineError,       # Pipeline-level errors
    StageError,          # Stage execution errors
    GuardRejectError,    # Guard rejection (budget, permissions)
    APIError,            # Anthropic API errors (with category)
    ToolExecutionError,  # Tool failures
    ErrorCategory,       # rate_limited, timeout, token_limit, etc.
)

try:
    result = await pipeline.run("input")
except GuardRejectError as e:
    print(f"Blocked by guard: {e}")
except APIError as e:
    print(f"API error ({e.category}): {e}")
    if e.category == ErrorCategory.rate_limited:
        # Handle rate limiting
        pass
except GenyExecutorError as e:
    print(f"Pipeline error: {e}")

Pipeline Presets

Five ready-to-use configurations:

Preset Stages Use Case
PipelinePresets.minimal() 1→6→9→16 Simple Q&A, testing
PipelinePresets.chat() 1→2→3→4→5→6→7→9→13→16 Conversational chatbot
PipelinePresets.agent() All 16 Autonomous agent with tools
PipelinePresets.evaluator() 1→3→6→9→12→16 Quality evaluation
PipelinePresets.geny_vtuber() All 16 + VTuber emit Geny VTuber system

Custom Stages & Strategies

Creating a Custom Strategy

from geny_executor.core.stage import Strategy

class MyContextStrategy(Strategy):
    @property
    def name(self) -> str:
        return "my_context"

    @property
    def description(self) -> str:
        return "Custom context loading with RAG"

    def configure(self, config: dict) -> None:
        self.top_k = config.get("top_k", 5)

    async def load(self, state):
        # Your custom context loading logic
        ...

Creating a Custom Stage

from geny_executor.core.stage import Stage
from geny_executor.core.state import PipelineState

class LoggingStage(Stage[dict, dict]):
    @property
    def name(self) -> str:
        return "logging"

    @property
    def order(self) -> int:
        return 7  # After API, before Think

    @property
    def category(self) -> str:
        return "execution"

    async def execute(self, input: dict, state: PipelineState) -> dict:
        print(f"[{state.iteration}] API response received")
        return input  # Pass through

# Register into pipeline
pipeline.register_stage(LoggingStage())

Configuration Reference

ModelConfig

Parameter Type Default Description
model str "claude-sonnet-4-20250514" Anthropic model ID
max_tokens int 8192 Max output tokens
temperature float 0.0 Sampling temperature
thinking_enabled bool False Enable extended thinking
thinking_budget_tokens int 10000 Thinking token budget

PipelineConfig

Parameter Type Default Description
name str required Pipeline name
api_key str required Anthropic API key
model ModelConfig default Model configuration
max_iterations int 50 Max loop iterations
cost_budget_usd float? None Cost budget limit
context_window_budget int 200_000 Context window token limit
stream bool False Enable streaming mode
single_turn bool False Single turn (no loop)

Project Structure

geny-executor/
├── src/geny_executor/
│   ├── __init__.py          # Public API
│   ├── py.typed             # PEP 561 type marker
│   ├── core/                # Pipeline engine, config, state, errors
│   ├── events/              # EventBus pub/sub system
│   ├── stages/              # 16 pipeline stages (s01-s16)
│   ├── tools/               # Tool system + MCP integration
│   └── session/             # Session management + freshness
├── tests/                   # 81 unit & integration tests
├── pyproject.toml           # Package configuration (Hatch)
└── LICENSE                  # MIT License

Development

# Clone
git clone https://github.com/CocoRoF/geny-executor.git
cd geny-executor

# Install with dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run tests with coverage
pytest --cov=geny_executor --cov-report=term-missing

# Lint
ruff check src/ tests/
ruff format src/ tests/

Roadmap

  • Streaming response support (full implementation)
  • OpenTelemetry integration for tracing
  • Additional memory backends (Redis, PostgreSQL)
  • WebUI pipeline configurator
  • Plugin system for community stages
  • Batch execution mode

License

This project is licensed under the MIT License — see the LICENSE file for details.


Related Projects

  • Anthropic SDK — The foundation geny-executor is built on
  • MCP — Model Context Protocol for tool integration
  • Claude Code — The agent loop that inspired this architecture

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

geny_executor-0.13.5.tar.gz (201.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

geny_executor-0.13.5-py3-none-any.whl (243.4 kB view details)

Uploaded Python 3

File details

Details for the file geny_executor-0.13.5.tar.gz.

File metadata

  • Download URL: geny_executor-0.13.5.tar.gz
  • Upload date:
  • Size: 201.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for geny_executor-0.13.5.tar.gz
Algorithm Hash digest
SHA256 ac519f6e06c903fb7e474bd778bd16bf1d630f1ff1000bc8453f787375055b4d
MD5 6e1dc1532657dcf738c09a96b2d1a4b6
BLAKE2b-256 2d86d9fa9de0165e63fda84c7564c1b8748bef9006b7e939ef4a3e27a78c12dd

See more details on using hashes here.

Provenance

The following attestation bundles were made for geny_executor-0.13.5.tar.gz:

Publisher: publish.yml on CocoRoF/geny-executor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file geny_executor-0.13.5-py3-none-any.whl.

File metadata

  • Download URL: geny_executor-0.13.5-py3-none-any.whl
  • Upload date:
  • Size: 243.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for geny_executor-0.13.5-py3-none-any.whl
Algorithm Hash digest
SHA256 bc108329ba94510abdbcca1aa864f02dfaf6035ae194ee019ec89f6512349e21
MD5 53131fce651bff9ac7d5ee06ac90541a
BLAKE2b-256 f42b033af6385c083a056d812e668407ff218242311cc7a297802fcb221b188d

See more details on using hashes here.

Provenance

The following attestation bundles were made for geny_executor-0.13.5-py3-none-any.whl:

Publisher: publish.yml on CocoRoF/geny-executor

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page