Harness-engineered agent pipeline library with 16-stage dual-abstraction architecture, built on the Anthropic API
Project description
geny-executor
Harness-engineered agent pipeline library built on the Anthropic API.
geny-executor implements a 16-stage pipeline with dual-abstraction architecture — inspired by Claude Code's agent loop and Anthropic's harness design principles. No LangChain. No LangGraph. Just a clean, modular pipeline that gives you full control over every step of agent execution.
Why geny-executor?
| Problem | geny-executor's Answer |
|---|---|
| Frameworks hide too much behind abstractions | Every stage is explicit and inspectable |
| Hard to customize one part without rewriting everything | Dual Abstraction — swap stages or strategies within stages |
| Agent loops are opaque black boxes | 16 clearly defined stages with event-driven observability |
| Vendor lock-in across multiple LLM providers | Single dependency: Anthropic SDK. One provider, done right |
| Cost tracking is an afterthought | Built-in token tracking, cost calculation, and budget guards |
Architecture
The 16-Stage Pipeline
Phase A (once): [1: Input]
Phase B (loop): [2: Context] → [3: System] → [4: Guard] → [5: Cache]
→ [6: API] → [7: Token] → [8: Think] → [9: Parse]
→ [10: Tool] → [11: Agent] → [12: Evaluate] → [13: Loop]
Phase C (once): [14: Emit] → [15: Memory] → [16: Yield]
| # | Stage | Purpose | Example Strategies |
|---|---|---|---|
| 1 | Input | Validate & normalize user input | Default, Strict, Schema, Multimodal |
| 2 | Context | Load conversation history & memory | SimpleLoad, ProgressiveDisclosure, VectorSearch |
| 3 | System | Build system prompt | Static, Composable, Adaptive |
| 4 | Guard | Safety checks & budget enforcement | TokenBudget, Cost, RateLimit, Permission |
| 5 | Cache | Optimize prompt caching | NoCache, System, Aggressive, Adaptive |
| 6 | API | Call Anthropic Messages API | Anthropic, Mock, Recording, Replay |
| 7 | Token | Track usage & calculate costs | Default, Detailed + AnthropicPricing |
| 8 | Think | Process extended thinking blocks | Passthrough, ExtractAndStore, Filter |
| 9 | Parse | Parse response & detect completion | Default, StructuredOutput + SignalDetector |
| 10 | Tool | Execute tool calls | Sequential, Parallel + RegistryRouter |
| 11 | Agent | Multi-agent orchestration | SingleAgent, Delegate, Evaluator |
| 12 | Evaluate | Judge quality & completion | SignalBased, CriteriaBased, AgentEval |
| 13 | Loop | Decide: continue or finish? | Standard, SingleTurn, BudgetAware |
| 14 | Emit | Output results | Text, Callback, VTuber, TTS, Streaming |
| 15 | Memory | Persist conversation memory | AppendOnly, Reflective + File/SQLite |
| 16 | Yield | Format final result | Default, Structured, Streaming |
Dual Abstraction
┌─ Level 1: Stage Abstraction ─────────────────────────┐
│ Swap entire stage modules in/out of the pipeline │
│ │
│ ┌─ Level 2: Strategy Abstraction ─────────────────┐ │
│ │ Swap internal logic within a stage │ │
│ │ │ │
│ │ ContextStage can use: │ │
│ │ → SimpleLoadStrategy (default) │ │
│ │ → ProgressiveDisclosureStrategy │ │
│ │ → VectorSearchStrategy │ │
│ │ → YourCustomStrategy │ │
│ └──────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────┘
Level 1: Replace an entire stage (e.g., swap APIStage for a custom provider).
Level 2: Change behavior within a stage (e.g., switch context loading strategy from simple to vector search).
Installation
pip install geny-executor
With optional dependencies:
# Memory features (numpy for vector operations)
pip install geny-executor[memory]
# All optional dependencies
pip install geny-executor[all]
# Development
pip install geny-executor[dev]
Requirements
- Python 3.11+
- Anthropic API key
Quick Start
Minimal Pipeline
The simplest possible agent — input, API call, parse, output:
import asyncio
from geny_executor import PipelinePresets
async def main():
pipeline = PipelinePresets.minimal(api_key="sk-ant-...")
result = await pipeline.run("What is the capital of France?")
print(result.text)
asyncio.run(main())
Chat Pipeline
Full conversational agent with history, system prompt, and tool support:
from geny_executor import PipelinePresets
pipeline = PipelinePresets.chat(
api_key="sk-ant-...",
system_prompt="You are a helpful coding assistant.",
)
result = await pipeline.run("Explain Python decorators")
print(result.text)
print(f"Cost: ${result.total_cost_usd:.4f}")
Agent Pipeline (All 16 Stages)
Autonomous agent with tools, evaluation, memory, and loop control:
from geny_executor import PipelinePresets
from geny_executor.tools import ToolRegistry, Tool, ToolResult, ToolContext
class SearchTool(Tool):
@property
def name(self) -> str:
return "search"
@property
def description(self) -> str:
return "Search the web for information"
@property
def input_schema(self) -> dict:
return {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"],
}
async def execute(self, input: dict, context: ToolContext) -> ToolResult:
# Your search implementation
return ToolResult(content=f"Results for: {input['query']}")
registry = ToolRegistry()
registry.register(SearchTool())
pipeline = PipelinePresets.agent(
api_key="sk-ant-...",
system_prompt="You are a research assistant. Use tools to find answers.",
tools=registry,
max_turns=20,
)
result = await pipeline.run("Find the latest Python release version")
Custom Pipeline with Builder
Fine-grained control over every stage:
from geny_executor import PipelineBuilder
pipeline = (
PipelineBuilder("my-agent", api_key="sk-ant-...")
.with_model(model="claude-sonnet-4-20250514", max_tokens=4096)
.with_system(prompt="You are a concise assistant.")
.with_context()
.with_guard(cost_budget_usd=1.0, max_iterations=30)
.with_cache(strategy="aggressive")
.with_tools(registry=my_registry)
.with_think(enabled=True, budget_tokens=10000)
.with_evaluate()
.with_loop(max_turns=30)
.with_memory()
.build()
)
result = await pipeline.run("Complex multi-step task here")
Manual Pipeline Construction
For maximum control, assemble stages directly:
from geny_executor import Pipeline, PipelineConfig
from geny_executor.stages.s01_input import InputStage
from geny_executor.stages.s06_api import APIStage, MockProvider
from geny_executor.stages.s09_parse import ParseStage
from geny_executor.stages.s16_yield import YieldStage
config = PipelineConfig(name="custom", api_key="sk-ant-...")
pipeline = Pipeline(config)
pipeline.register_stage(InputStage())
pipeline.register_stage(APIStage(provider=MockProvider(responses=["Hello!"])))
pipeline.register_stage(ParseStage())
pipeline.register_stage(YieldStage())
result = await pipeline.run("Test input")
Sessions
Persistent state management across multiple interactions:
from geny_executor import PipelinePresets
from geny_executor.session import SessionManager
manager = SessionManager()
pipeline = PipelinePresets.chat(api_key="sk-ant-...")
# Create a session — state persists across runs
session = manager.create(pipeline)
result1 = await session.run("My name is Alice")
result2 = await session.run("What's my name?") # Remembers context
# List active sessions
for info in manager.list_sessions():
print(f"Session {info.session_id}: {info.message_count} messages, ${info.total_cost_usd:.4f}")
Event System
Real-time observability with pub/sub events:
from geny_executor import PipelinePresets
pipeline = PipelinePresets.agent(api_key="sk-ant-...")
# Subscribe to specific events
@pipeline.on("stage.enter")
async def on_stage_enter(event):
print(f" → Entering: {event.stage}")
@pipeline.on("stage.exit")
async def on_stage_exit(event):
print(f" ← Exiting: {event.stage}")
@pipeline.on("pipeline.complete")
async def on_complete(event):
print(f"Done! Iterations: {event.data.get('iterations')}")
# Wildcard: listen to all events
@pipeline.on("*")
async def on_any(event):
pass # Log everything
result = await pipeline.run("Hello")
Streaming
async for event in pipeline.run_stream("Solve this step by step"):
if event.type == "stage.enter":
print(f"Stage: {event.stage}")
elif event.type == "api.response":
print(f"Response received")
elif event.type == "pipeline.complete":
print(f"Final: {event.data['result'].text}")
Tool System
Creating Tools
from geny_executor.tools import Tool, ToolResult, ToolContext
class CalculatorTool(Tool):
@property
def name(self) -> str:
return "calculator"
@property
def description(self) -> str:
return "Perform arithmetic calculations"
@property
def input_schema(self) -> dict:
return {
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Math expression to evaluate"}
},
"required": ["expression"],
}
async def execute(self, input: dict, context: ToolContext) -> ToolResult:
try:
result = eval(input["expression"]) # Use a safe evaluator in production
return ToolResult(content=str(result))
except Exception as e:
return ToolResult(content=str(e), is_error=True)
Tool Registry
from geny_executor.tools import ToolRegistry
registry = ToolRegistry()
registry.register(CalculatorTool())
registry.register(SearchTool())
# Filter tools per request
math_tools = registry.filter(include=["calculator"])
api_format = registry.to_api_format() # Anthropic API format
MCP Integration
Connect to Model Context Protocol servers:
from geny_executor.tools.mcp import MCPManager
mcp = MCPManager()
await mcp.connect("filesystem", command="npx", args=["-y", "@anthropic/mcp-filesystem"])
# MCP tools are automatically adapted to the Tool interface
for tool in mcp.list_tools():
registry.register(tool)
Error Handling
Structured error hierarchy with automatic classification:
from geny_executor import (
GenyExecutorError, # Base exception
PipelineError, # Pipeline-level errors
StageError, # Stage execution errors
GuardRejectError, # Guard rejection (budget, permissions)
APIError, # Anthropic API errors (with category)
ToolExecutionError, # Tool failures
ErrorCategory, # rate_limited, timeout, token_limit, etc.
)
try:
result = await pipeline.run("input")
except GuardRejectError as e:
print(f"Blocked by guard: {e}")
except APIError as e:
print(f"API error ({e.category}): {e}")
if e.category == ErrorCategory.rate_limited:
# Handle rate limiting
pass
except GenyExecutorError as e:
print(f"Pipeline error: {e}")
Pipeline Presets
Five ready-to-use configurations:
| Preset | Stages | Use Case |
|---|---|---|
PipelinePresets.minimal() |
1→6→9→16 | Simple Q&A, testing |
PipelinePresets.chat() |
1→2→3→4→5→6→7→9→13→16 | Conversational chatbot |
PipelinePresets.agent() |
All 16 | Autonomous agent with tools |
PipelinePresets.evaluator() |
1→3→6→9→12→16 | Quality evaluation |
PipelinePresets.geny_vtuber() |
All 16 + VTuber emit | Geny VTuber system |
Custom Stages & Strategies
Creating a Custom Strategy
from geny_executor.core.stage import Strategy
class MyContextStrategy(Strategy):
@property
def name(self) -> str:
return "my_context"
@property
def description(self) -> str:
return "Custom context loading with RAG"
def configure(self, config: dict) -> None:
self.top_k = config.get("top_k", 5)
async def load(self, state):
# Your custom context loading logic
...
Creating a Custom Stage
from geny_executor.core.stage import Stage
from geny_executor.core.state import PipelineState
class LoggingStage(Stage[dict, dict]):
@property
def name(self) -> str:
return "logging"
@property
def order(self) -> int:
return 7 # After API, before Think
@property
def category(self) -> str:
return "execution"
async def execute(self, input: dict, state: PipelineState) -> dict:
print(f"[{state.iteration}] API response received")
return input # Pass through
# Register into pipeline
pipeline.register_stage(LoggingStage())
Configuration Reference
ModelConfig
| Parameter | Type | Default | Description |
|---|---|---|---|
model |
str |
"claude-sonnet-4-20250514" |
Anthropic model ID |
max_tokens |
int |
8192 |
Max output tokens |
temperature |
float |
0.0 |
Sampling temperature |
thinking_enabled |
bool |
False |
Enable extended thinking |
thinking_budget_tokens |
int |
10000 |
Thinking token budget |
PipelineConfig
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Pipeline name |
api_key |
str |
required | Anthropic API key |
model |
ModelConfig |
default | Model configuration |
max_iterations |
int |
50 |
Max loop iterations |
cost_budget_usd |
float? |
None |
Cost budget limit |
context_window_budget |
int |
200_000 |
Context window token limit |
stream |
bool |
False |
Enable streaming mode |
single_turn |
bool |
False |
Single turn (no loop) |
Project Structure
geny-executor/
├── src/geny_executor/
│ ├── __init__.py # Public API
│ ├── py.typed # PEP 561 type marker
│ ├── core/ # Pipeline engine, config, state, errors
│ ├── events/ # EventBus pub/sub system
│ ├── stages/ # 16 pipeline stages (s01-s16)
│ ├── tools/ # Tool system + MCP integration
│ └── session/ # Session management + freshness
├── tests/ # 81 unit & integration tests
├── pyproject.toml # Package configuration (Hatch)
└── LICENSE # MIT License
Development
# Clone
git clone https://github.com/CocoRoF/geny-executor.git
cd geny-executor
# Install with dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run tests with coverage
pytest --cov=geny_executor --cov-report=term-missing
# Lint
ruff check src/ tests/
ruff format src/ tests/
Roadmap
- Streaming response support (full implementation)
- OpenTelemetry integration for tracing
- Additional memory backends (Redis, PostgreSQL)
- WebUI pipeline configurator
- Plugin system for community stages
- Batch execution mode
License
This project is licensed under the MIT License — see the LICENSE file for details.
Related Projects
- Anthropic SDK — The foundation geny-executor is built on
- MCP — Model Context Protocol for tool integration
- Claude Code — The agent loop that inspired this architecture
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file geny_executor-0.2.3.tar.gz.
File metadata
- Download URL: geny_executor-0.2.3.tar.gz
- Upload date:
- Size: 76.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b30e298d0c7a7da6472259cb4cf738585a19e2addae878a8496b837abd354322
|
|
| MD5 |
2c8cedd4c4a61f3aaa1a6f42c2fd9d5c
|
|
| BLAKE2b-256 |
27a5d9c0fa9e3756f0780cb3b574f2ac4083ffcacc589dffc750e4010c01dc8f
|
Provenance
The following attestation bundles were made for geny_executor-0.2.3.tar.gz:
Publisher:
publish.yml on CocoRoF/geny-executor
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
geny_executor-0.2.3.tar.gz -
Subject digest:
b30e298d0c7a7da6472259cb4cf738585a19e2addae878a8496b837abd354322 - Sigstore transparency entry: 1268454560
- Sigstore integration time:
-
Permalink:
CocoRoF/geny-executor@c6d2402aba5d54f280608c0d901e0191b7a990fd -
Branch / Tag:
refs/heads/main - Owner: https://github.com/CocoRoF
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@c6d2402aba5d54f280608c0d901e0191b7a990fd -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file geny_executor-0.2.3-py3-none-any.whl.
File metadata
- Download URL: geny_executor-0.2.3-py3-none-any.whl
- Upload date:
- Size: 131.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7c8558b2f56e713e20bd49453b7e023f6c40ef736d3d0069ae56c5ecb019b40c
|
|
| MD5 |
fc3aa651f4ef3330b01361b885ce725e
|
|
| BLAKE2b-256 |
47bcf822d00e5016edce0513983bf8ec372c23053b39983b9c532e5c091ab3fa
|
Provenance
The following attestation bundles were made for geny_executor-0.2.3-py3-none-any.whl:
Publisher:
publish.yml on CocoRoF/geny-executor
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
geny_executor-0.2.3-py3-none-any.whl -
Subject digest:
7c8558b2f56e713e20bd49453b7e023f6c40ef736d3d0069ae56c5ecb019b40c - Sigstore transparency entry: 1268454617
- Sigstore integration time:
-
Permalink:
CocoRoF/geny-executor@c6d2402aba5d54f280608c0d901e0191b7a990fd -
Branch / Tag:
refs/heads/main - Owner: https://github.com/CocoRoF
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@c6d2402aba5d54f280608c0d901e0191b7a990fd -
Trigger Event:
workflow_dispatch
-
Statement type: