Domain-neutral orchestration SDK (flows/steps, local/remote, DI).
Project description
EAP SDK
A lightweight, domain-neutral orchestration SDK for authoring async flows and steps with a small runtime and CLI. The EAP SDK provides decorator-based flow and step definitions with dependency injection support.
Features
- Flow Decorators:
@flowdecorator for async business logic orchestration - Step/Task Decorators:
@stepand@taskdecorators with retry, timeout, and backoff support - RunContext: Dependency injection container for services and configuration
- Local & Remote Execution: Run flows locally or via HTTP transport
- Remote Execution:
.serve()method for fire-and-forget remote execution - Entrypoints: Module:function format for robot execution
- Flow Specs: Structured flow specifications for orchestration
- Plugin System: Register services via
register_service()for dependency injection - CLI Tool: Command-line interface for running flows
- Telemetry: Optional Prometheus metrics integration
Installation
The EAP SDK is part of the Enterprise Assistant Platform workspace. Install dependencies:
uv sync --all-packages
Or install as a standalone package:
pip install eap-sdk
Quick Start
Defining a Flow
from eap_sdk import RunContext, flow, step, task
@step(retries=3, timeout_s=30.0)
async def fetch_data(ctx: RunContext, source: str) -> dict:
"""Fetch data from a source."""
api = ctx.get("api") # Access injected service
result = await api.fetch(source)
return {"data": result}
# @task is an alias for @step
@task(retries=2, timeout_s=20.0)
async def process_data(ctx: RunContext, data: dict) -> dict:
"""Process data."""
return {"processed": data}
@flow("my_flow")
async def my_flow(ctx: RunContext, name: str, count: int = 1) -> dict:
"""Example flow that orchestrates steps."""
data = await fetch_data(ctx, f"source/{name}")
processed = await process_data(ctx, data)
return {
"success": True,
"processed": count,
"result": processed
}
Running a Flow
Using Python:
from eap_sdk import RunContext
from eap_sdk.decorators import get_flow
# Get registered flow
flow_fn = get_flow("my_flow")
# Create context and inject services
ctx = RunContext(run_id="run_123", tenant="acme")
ctx.set("api", my_api_client)
# Execute flow locally
result = await flow_fn(ctx, name="test", count=5)
# Or use remote execution via .serve() method
import os
os.environ["MAESTRO_HTTP"] = "http://maestro:8000"
# Submit for remote execution (returns run_id immediately)
run_id = my_flow.serve(pool="prod", params={"name": "test", "count": 5})
print(f"Started run: {run_id}")
# With scheduling
run_id = my_flow.serve(
pool="prod",
schedule="0 9 * * *", # Daily at 9 AM
params={"name": "test", "count": 5}
)
Using CLI:
# Run flow locally
uv run eap-sdk run my_flow --runner local \
--param name=test --param count=5
# Run flow remotely (requires MAESTRO_ADDR or ROBOT_ADDR)
uv run eap-sdk run my_flow --runner remote \
--param-json '{"name": "test", "count": 5}'
Core Concepts
RunContext
RunContext provides dependency injection and metadata:
from eap_sdk import RunContext
ctx = RunContext(
run_id="unique_run_id",
tenant="acme",
labels={"env": "production"}
)
# Set services
ctx.set("api", api_client)
ctx.set("db", database_connection)
# Get services
api = ctx.get("api")
db = ctx.get("db")
Flow Decorator
Flows are async functions decorated with @flow(name):
@flow("process_data")
async def process_data(ctx: RunContext, input_data: dict) -> dict:
# Flow logic here
return {"success": True, "output": processed}
Requirements:
- Must be async
- First parameter must be
RunContext - Return type should be
dict - Registered automatically on import
Step/Task Decorator
Steps are functions (sync or async) decorated with @step(...) or @task(...):
@step(retries=3, timeout_s=30.0, backoff=1.5, base_delay=0.5)
async def api_call(ctx: RunContext, endpoint: str) -> dict:
# Step logic with automatic retry
return await http_client.get(endpoint)
# @task is an alias for @step
@task(retries=2, timeout_s=20.0)
async def process_data(ctx: RunContext, data: dict) -> dict:
return {"processed": data}
Parameters:
retries: Number of retry attempts (default: 0)timeout_s: Timeout in seconds (default: None)backoff: Exponential backoff multiplier (default: 1.5)base_delay: Base delay in seconds (default: 0.5)jitter: Random jitter factor (default: 0.2)
Features:
- Automatic retry on failure
- Timeout enforcement (sync steps run in thread pool)
- Exponential backoff with jitter
Note: @task and @step are interchangeable aliases. Use whichever naming convention fits your project.
Service Registration
Register services for dependency injection:
from eap_sdk.plugins import register_service
def create_api_client(ctx: RunContext):
return ApiClient(host=ctx.tenant)
register_service("api", create_api_client)
Services are automatically built when creating a RunContext via build_services().
Runtime
Local Execution
from eap_sdk.runtime import arun
result = await arun("my_flow", runner="local", name="test", count=5)
Remote Execution
import os
os.environ["MAESTRO_ADDR"] = "http://maestro:8000"
result = await arun("my_flow", runner="remote", name="test", count=5)
Transport Protocols
- LocalTransport: Executes flows in-process
- HTTPTransport: Sends flow execution requests via HTTP
CLI Usage
# Basic usage
eap-sdk run <flow_name> [options]
# Options
--runner {local,remote} Execution mode (default: local)
--param k=v Parameter key-value pairs (repeatable)
--param-json JSON JSON object with parameters
# Examples
eap-sdk run my_flow --param name=test --param count=5
eap-sdk run my_flow --param-json '{"name": "test", "count": 5}'
eap-sdk run my_flow --runner remote --param name=test
Parameter Coercion:
--paramvalues are auto-coerced (JSON, bool, number if possible)--param-jsonaccepts a JSON object that's merged into parameters
Error Handling
Flows should return {"success": bool, ...} dictionaries:
@flow("safe_flow")
async def safe_flow(ctx: RunContext, data: dict) -> dict:
try:
result = await process(data)
return {"success": True, "data": result}
except Exception as e:
return {"success": False, "error": str(e)}
The runtime wraps exceptions in RunResponse if they escape the flow.
Telemetry
Optional Prometheus metrics (requires prometheus-client):
eap_jobs_total: Total jobs executedeap_jobs_failed_total: Failed jobseap_step_duration_seconds: Step execution durationeap_step_retries_total: Step retry count
Install with observability extras:
pip install eap-sdk[observability]
Development
Running Tests
pytest tests/eap_sdk/
Linting & Formatting
ruff format src/eap_sdk/
ruff check src/eap_sdk/
Type Checking
pyright src/eap_sdk/
Architecture
eap_sdk/
├── __init__.py # Public API exports
├── context.py # RunContext class
├── decorators.py # @flow, @step, and @task decorators
├── runtime.py # Execution runtime (local/remote), flow specs, entrypoints
├── plugins.py # Service registration system
├── contracts.py # RunRequest/RunResponse models
├── telemetry.py # Prometheus metrics
├── blocks.py # Reusable blocks (HttpAuthBlock)
└── cli.py # Command-line interface
New Features
Remote Execution with .serve()
The .serve() method provides a convenient way to submit flows for remote execution:
@flow("my_flow")
async def my_flow(ctx: RunContext, name: str) -> dict:
return {"result": name}
# Set Maestro endpoint
import os
os.environ["MAESTRO_HTTP"] = "http://maestro:8000"
# Submit for execution
run_id = my_flow.serve(pool="prod", params={"name": "test"})
Benefits:
- Fire-and-forget execution (returns
run_idimmediately) - Supports scheduling via cron expressions
- Automatic flow spec generation
- Better integration with orchestration systems
Entrypoints
Flows automatically generate entrypoints for robot execution:
@flow("my_flow")
async def my_flow(ctx: RunContext, name: str) -> dict:
return {"result": name}
# Entrypoint is automatically stored
print(my_flow._eap_flow_entry) # "my_module:my_flow"
# Robots can execute flows using entrypoints
from eap_sdk.runtime import run_entrypoint
result = run_entrypoint("my_module:my_flow", {"name": "test"})
Task Decorator
The @task decorator is an alias for @step:
from eap_sdk import task
@task(retries=3, timeout_s=30.0)
async def my_task(ctx: RunContext, data: dict) -> dict:
return {"processed": data}
Use @task if you prefer that naming convention, or stick with @step - both work identically.
Requirements
- Python >= 3.10
- httpx >= 0.27, < 1.0
- pydantic >= 2.8, < 3.0
License
MIT License
This README includes:
- Overview and features
- Installation instructions
- Quick start examples
- Core concepts (RunContext, flows, steps)
- Service registration
- Runtime execution modes
- CLI usage
- Error handling
- Telemetry
- Development setup
- Architecture overview
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eap_sdk-0.1.0.tar.gz.
File metadata
- Download URL: eap_sdk-0.1.0.tar.gz
- Upload date:
- Size: 81.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0356cc2811fa4b97e2b3dc1701ca5d9cb140d87e1f0652309b1d87006957a71b
|
|
| MD5 |
836e5d74ef5710a6f3e8c7e8263f9240
|
|
| BLAKE2b-256 |
efce78a7db653e5af32227131fd8dfdfaea0ccb430c0d11324f75c9d2b941fc9
|
File details
Details for the file eap_sdk-0.1.0-py3-none-any.whl.
File metadata
- Download URL: eap_sdk-0.1.0-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
808b9fb73de8d8e88f18f62c62edb6be5911d6a4ce4b8b84f3edb7edb2ee06eb
|
|
| MD5 |
f22fbc246de697028acc5a13a287cee1
|
|
| BLAKE2b-256 |
ad476a07679169472df8a918406c62d5dbf2a755296818e8b00e313a4c505e8e
|