Production-ready orchestration for OpenAI Agents with Redis-backed coordination, activity tracking, and workflow management
Project description
agentexec
Production-ready orchestration for OpenAI Agents SDK with Redis-backed task queues, SQLAlchemy activity tracking, and multiprocessing worker pools.
The Problem
You've built an AI agent. It works great in development. Now you need to run it in production:
Customer Support Automation - A user submits a ticket. Your agent needs to research their account, check previous interactions, and draft a response. This takes 2-3 minutes. You can't block the HTTP request.
Document Processing Pipeline - Users upload contracts for analysis. Each document needs OCR, entity extraction, clause identification, and risk scoring. You need to process dozens concurrently while tracking progress.
Research & Reporting - Your agent researches companies, gathers data from multiple sources, and generates reports. Users need to see "Gathering financials... 40%" not just a spinning loader.
Multi-Agent Workflows - One agent discovers leads, fans out to research each one, then a final agent aggregates results. You need coordination, not chaos.
Running AI agents in production requires:
- Background execution - Agents take minutes; users shouldn't wait
- Progress tracking - Know what your agents are doing in real-time
- Fault tolerance - Handle failures gracefully with full error traces
- Scalability - Process multiple tasks across worker processes
- Observability - Complete audit trail of agent activities
- User interfaces - Components to build status dashboards and CLI monitors
agentexec provides all of this out of the box.
Installation
uv add agentexec
Requirements:
- Python 3.12+
- Redis
- SQLAlchemy-compatible database (PostgreSQL, MySQL, SQLite)
Quick Start
A typical agentexec application has a few files working together. Here's a complete working example showing each part:
1. Database Setup
# db.py
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax
ax.CONF.redis_url = "redis://localhost:6379/0"
engine = create_engine("sqlite:///agents.db")
ax.Base.metadata.create_all(engine) # Creates activity tracking tables
def get_db():
with Session(engine) as db:
yield db
2. Define Your Task
# worker.py
from uuid import UUID
from pydantic import BaseModel
from agents import Agent
import agentexec as ax
from .db import engine
class ResearchContext(BaseModel):
company: str
pool = ax.Pool(engine=engine)
@pool.task("research_company")
async def research_company(agent_id: UUID, context: ResearchContext) -> str:
runner = ax.OpenAIRunner(agent_id)
agent = Agent(
name="Research Agent",
instructions=f"Research {context.company}. {runner.prompts.report_status}",
tools=[runner.tools.report_status], # Agent can report its own progress
model="gpt-4o",
)
result = await runner.run(agent, input="Begin research")
return result.final_output
if __name__ == "__main__":
pool.run()
3. Queue Tasks and Track Progress
# views.py
from uuid import UUID
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
import agentexec as ax
from .worker import ResearchContext
from .db import get_db
router = APIRouter()
@router.post("/research")
async def start_research(company: str) -> dict:
task = await ax.enqueue("research_company", ResearchContext(company=company))
return {"agent_id": str(task.agent_id), "status": "queued"} # Return agent_id for status polling
@router.get("/research/{agent_id}")
def get_status(agent_id: UUID, db: Session = Depends(get_db)) -> ax.activity.ActivityDetailSchema:
return ax.activity.detail(db, agent_id=agent_id) # Query by agent_id
4. Run Workers
python worker.py
That's it. Tasks are queued to Redis, workers process them in parallel, progress is tracked in your database, and your API stays responsive.
Supported Patterns
Activity Metadata (Multi-Tenancy)
Attach arbitrary metadata when enqueueing tasks for filtering and tenant isolation:
task = await ax.enqueue(
"process_document",
context,
metadata={"organization_id": "org-123", "user_id": "user-456"}
)
# Filter activities by metadata
activities = ax.activity.list(db, metadata_filter={"organization_id": "org-123"})
detail = ax.activity.detail(db, agent_id, metadata_filter={"organization_id": "org-123"})
# Access metadata programmatically (excluded from API serialization by default)
org_id = detail.metadata["organization_id"]
Automatic Activity Tracking
Every task gets full lifecycle tracking without manual updates:
runner = ax.OpenAIRunner(agent_id=agent_id)
result = await runner.run(agent, input="...")
# Activity automatically transitions:
# QUEUED → RUNNING → COMPLETE (or ERROR on failure)
Agent Self-Reporting
Agents can report their own progress:
agent = Agent(
instructions=f"Do research. {runner.prompts.report_status}",
tools=[runner.tools.report_status],
)
# Agent calls: report_status("Analyzing financials", 60)
Manual Progress Updates
Update progress explicitly from your task:
ax.activity.update(agent_id, "Processing batch 3 of 10", percentage=30)
Task Locking
When multiple tasks of the same type are queued for the same user, they may need to run sequentially because each task reads and writes shared state. Use lock_key to ensure only one task with the same evaluated key runs at a time:
@pool.task("associate_observation", lock_key="user:{user_id}")
async def associate(agent_id: UUID, context: ObservationContext):
...
# Or with add_task()
pool.add_task("associate_observation", handler, lock_key="user:{user_id}")
The lock_key is a string template evaluated against the task context fields. When a worker dequeues a task whose lock is held, it puts the task back at the end of the queue and moves on. The lock is released automatically when the task completes or errors.
The lock TTL (AGENTEXEC_LOCK_TTL, default 1800s) is a safety net for worker process death — locks are always explicitly released on task completion or error. Set this higher than your longest expected task duration.
Note: When a task is requeued due to a held lock, it goes to the back of the queue. This means strict FIFO ordering is not guaranteed between tasks sharing the same lock key — if tasks T2 and T3 are both waiting on T1's lock, either could run next after T1 completes.
Priority Queue
Control task execution order:
await ax.enqueue("urgent_task", context, priority=ax.Priority.HIGH) # Front of queue
await ax.enqueue("batch_job", context, priority=ax.Priority.LOW) # Back of queue
Max Turns Recovery
Gracefully handle conversation limits:
runner = ax.OpenAIRunner(
agent_id=agent_id,
max_turns_recovery=True,
wrap_up_prompt="Please summarize your findings.",
)
result = await runner.run(agent, max_turns=15)
# If agent hits max turns, automatically continues with wrap-up
Multi-Step Pipelines
Orchestrate complex workflows with parallel execution:
import asyncio
pipeline = ax.Pipeline(pool)
class ResearchPipeline(pipeline.Base):
@pipeline.step(0, "parallel research")
async def gather_data(self, context: InputContext) -> tuple[BrandResult, MarketResult]:
return await asyncio.gather(
research_brand(context),
research_market(context),
)
@pipeline.step(1, "analysis")
async def analyze(self, brand: BrandResult, market: MarketResult) -> FinalReport:
return await analyze_results(brand, market)
# Queue pipeline
task = await pipeline.enqueue(context=InputContext(company="Anthropic"))
Dynamic Fan-Out with Tracker
Coordinate dynamically-queued tasks:
tracker = ax.Tracker("research", batch_id)
@function_tool
async def queue_research(company: str) -> None:
"""Discovery agent calls this for each company found."""
tracker.incr()
await ax.enqueue("research", ResearchContext(company=company, batch_id=batch_id))
@function_tool
async def save_result(result: ResearchResult) -> None:
"""Research agent calls this when done."""
save_to_database(result)
tracker.decr()
if tracker.complete:
await ax.enqueue("aggregate", AggregateContext(batch_id=batch_id))
Integration
Running Alongside Your Application
If you have an existing FastAPI/Flask/Django backend, run the worker pool in a separate process:
# main.py - Your API server
from fastapi import FastAPI
import agentexec as ax
app = FastAPI()
@app.post("/process")
async def process(data: str) -> dict:
task = await ax.enqueue("process_data", ProcessContext(data=data))
return {"agent_id": task.agent_id}
# worker.py - Run separately
from .tasks import pool
if __name__ == "__main__":
pool.run()
Terminal 1: Start your API server
uvicorn main:app
Terminal 2: Start the workers
python worker.py
As a Standalone Worker Service
# worker.py
import os
from uuid import UUID
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax
engine = create_engine(os.environ["DATABASE_URL"])
pool = ax.Pool(engine=engine)
@pool.task("my_task")
async def my_task(agent_id: UUID, context: MyContext) -> None:
# Your task implementation
pass
if __name__ == "__main__":
try:
pool.run()
except KeyboardInterrupt:
with Session(engine) as db:
ax.activity.cancel_pending(db)
Docker Deployment
1. Create your worker Dockerfile:
# Dockerfile.worker
FROM ghcr.io/agent-ci/agentexec-worker:latest
COPY ./src /app/src
ENV AGENTEXEC_WORKER_MODULE=src.worker
2. Create your worker module:
# src/worker.py
import atexit
import os
from uuid import UUID
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import agentexec as ax
engine = create_engine(os.environ["DATABASE_URL"])
pool = ax.Pool(engine=engine)
def cleanup() -> None:
with Session(engine) as db:
ax.activity.cancel_pending(db)
atexit.register(cleanup)
@pool.task("my_task")
async def my_task(agent_id: UUID, context: MyContext) -> None:
pass
3. Build and run:
docker build -f Dockerfile.worker -t my-worker .
docker run -e DATABASE_URL=... -e REDIS_URL=... -e OPENAI_API_KEY=... my-worker
Backend Architecture
Redis
agentexec uses Redis for task queuing, result storage, real-time log streaming, and coordination between workers. We chose Redis because it provides exactly the primitives we need (lists, pubsub, atomic counters) with minimal operational overhead.
AWS Compatible: Since we use standard Redis features, AWS ElastiCache works out of the box.
AGENTEXEC_REDIS_URL=redis://localhost:6379/0
# or
AGENTEXEC_REDIS_URL=redis://my-cluster.abc123.use1.cache.amazonaws.com:6379
Extensible State Backend
The state backend is pluggable. We're adding support for additional backends (DynamoDB, PostgreSQL, in-memory for testing). You can also implement your own:
AGENTEXEC_STATE_BACKEND=agentexec.state.redis_backend # Default
AGENTEXEC_STATE_BACKEND=myapp.state.dynamodb_backend # Custom
Database
Activity tracking uses SQLAlchemy with two tables:
agentexec_activity - Main activity records
agent_id- Unique identifier (UUID)agent_type- Task namemetadata- JSON field for custom data (e.g., tenant info)created_at,updated_at- Timestamps
agentexec_activity_log - Status and progress
activity_id- Foreign keymessage- Log messagestatus- QUEUED, RUNNING, COMPLETE, ERROR, CANCELEDpercentage- Progress (0-100)
Building Your Own Interface
Data Structures
The activity tracking module exposes Pydantic schemas for building APIs:
from agentexec.activity.schemas import (
ActivityListSchema, # Paginated list response
ActivityListItemSchema, # Single item in list (lightweight)
ActivityDetailSchema, # Full activity with log history
ActivityLogSchema, # Single log entry
)
List activities:
with Session(engine) as db:
result = ax.activity.list(db, page=1, page_size=20)
# Returns ActivityListSchema:
# {
# "items": [...], # List of ActivityListItemSchema
# "total": 150,
# "page": 1,
# "page_size": 20,
# "total_pages": 8
# }
Get activity detail:
activity = ax.activity.detail(db, agent_id=agent_id)
# Returns ActivityDetailSchema:
# {
# "id": "...",
# "agent_id": "...",
# "agent_type": "research_company",
# "created_at": "2024-01-15T10:30:00Z",
# "updated_at": "2024-01-15T10:32:45Z",
# "logs": [
# {"status": "queued", "message": "Waiting to start", "percentage": 0, ...},
# {"status": "running", "message": "Gathering data", "percentage": 30, ...},
# {"status": "complete", "message": "Done", "percentage": 100, ...}
# ]
# }
Count active agents:
count = ax.activity.active_count(db)
# Returns number of agents with status QUEUED or RUNNING
Building a CLI Monitor
# cli_monitor.py
from rich.live import Live
from rich.table import Table
from sqlalchemy import Engine
from sqlalchemy.orm import Session
import agentexec as ax
def build_table(db: Session) -> Table:
table = Table(title=f"Active Agents: {ax.activity.active_count(db)}")
table.add_column("Status")
table.add_column("Task")
table.add_column("Message")
table.add_column("Progress")
for item in ax.activity.list(db, page=1, page_size=10).items:
table.add_row(
item.status,
item.agent_type,
item.latest_log_message or "",
f"{item.percentage}%",
)
return table
def monitor(engine: Engine) -> None:
with Live(refresh_per_second=1) as live:
while True:
with Session(engine) as db:
live.update(build_table(db))
if __name__ == "__main__":
from .db import engine
monitor(engine)
UI Components
The agentexec-ui package provides React components for building monitoring interfaces:
npm install agentexec-ui
Components
import {
TaskList,
TaskDetail,
ActiveAgentsBadge,
StatusBadge,
ProgressBar,
} from 'agentexec-ui';
// Display paginated task list
<TaskList
items={activities.items}
loading={isLoading}
onTaskClick={(agentId) => setSelected(agentId)}
selectedAgentId={selectedId}
/>
// Full activity detail view
<TaskDetail
activity={activityDetail}
loading={isDetailLoading}
error={error}
onClose={() => setSelected(null)}
/>
// Active count badge
<ActiveAgentsBadge count={activeCount} loading={isLoading} />
// Individual status indicators
<StatusBadge status="running" />
<ProgressBar percentage={65} status="running" />
TypeScript Types
import type {
Status, // 'queued' | 'running' | 'complete' | 'error' | 'canceled'
ActivityLog,
ActivityDetail,
ActivityListItem,
ActivityList,
} from 'agentexec-ui';
These types mirror the Python API schemas (ActivityDetailSchema, ActivityListSchema, etc.), so your API responses integrate directly with the components.
The components are headless (no built-in styling) and work with any CSS framework. See examples/openai-agents-fastapi/ui/ for a complete React app with TanStack Query integration.
Module Reference
Task Queue
import agentexec as ax
task = await ax.enqueue(task_name, context, priority=ax.Priority.LOW)
result = await ax.get_result(task, timeout=300)
results = await ax.gather(task1, task2, task3)
Worker Pool
import agentexec as ax
pool = ax.Pool(engine=engine)
pool = ax.Pool(database_url="postgresql://...")
@pool.task("name")
async def handler(agent_id: UUID, context: MyContext) -> None: ...
@pool.task("name", lock_key="user:{user_id}") # Sequential per user
async def locked(agent_id: UUID, context: MyContext) -> None: ...
pool.run() # Blocking - runs workers
pool.start() # Non-blocking - starts workers in background
pool.shutdown() # Graceful shutdown
Activity Tracking
import agentexec as ax
# Create activity (returns agent_id for tracking)
agent_id = ax.activity.create(task_name, message="Starting...")
# Update progress
ax.activity.update(agent_id, message, percentage=50)
ax.activity.complete(agent_id, message="Done")
ax.activity.error(agent_id, error="Failed: ...")
# Query activities
activities = ax.activity.list(db, page=1, page_size=20)
activity = ax.activity.detail(db, agent_id=agent_id)
count = ax.activity.active_count(db)
# Cleanup
canceled = ax.activity.cancel_pending(db)
Runners
import agentexec as ax
runner = ax.OpenAIRunner(
agent_id=agent_id,
max_turns_recovery=True,
wrap_up_prompt="Summarize...",
)
runner.prompts.report_status # Instruction text for agents
runner.tools.report_status # Pre-bound function tool
result = await runner.run(agent, input="...", max_turns=15)
result = await runner.run_streamed(agent, input="...", max_turns=15)
# Base class for custom runners
class MyRunner(ax.BaseAgentRunner):
async def run(self, agent: Agent, input: str) -> RunResult: ...
Pipelines
import agentexec as ax
pipeline = ax.Pipeline(pool)
class MyPipeline(pipeline.Base):
@pipeline.step(0, "description")
async def step_one(self, context): ...
Tracker
import agentexec as ax
tracker = ax.Tracker("name", batch_id)
tracker.incr()
if tracker.complete: ... # All tasks done
Database
import agentexec as ax
ax.Base # SQLAlchemy declarative base for activity tables
Configuration
All settings via environment variables:
# Redis (required)
AGENTEXEC_REDIS_URL=redis://localhost:6379/0
# Workers
AGENTEXEC_NUM_WORKERS=4
AGENTEXEC_QUEUE_NAME=agentexec_tasks
AGENTEXEC_GRACEFUL_SHUTDOWN_TIMEOUT=300
# Database
AGENTEXEC_TABLE_PREFIX=agentexec_
# Results
AGENTEXEC_RESULT_TTL=3600
# Task locking
AGENTEXEC_LOCK_TTL=1800
# State backend
AGENTEXEC_STATE_BACKEND=agentexec.state.redis_backend
AGENTEXEC_KEY_PREFIX=agentexec
# Activity messages (customizable)
AGENTEXEC_ACTIVITY_MESSAGE_CREATE="Waiting to start."
AGENTEXEC_ACTIVITY_MESSAGE_STARTED="Task started."
AGENTEXEC_ACTIVITY_MESSAGE_COMPLETE="Task completed successfully."
AGENTEXEC_ACTIVITY_MESSAGE_ERROR="Task failed with error: {error}"
Development
# Clone repository
git clone https://github.com/Agent-CI/agentexec
cd agentexec
# Install dependencies
uv sync
# Run tests
uv run pytest
# Type checking
uv run ty check
# Linting
uv run ruff check src/
# Formatting
uv run ruff format src/
Contributing
- Fork the repository
- Create a feature branch
- Make your changes with tests
- Run
uv run pytestanduv run ty check - Submit a pull request
License
MIT License - see LICENSE for details.
Links
- PyPI: agentexec
- npm: agentexec-ui
- Documentation: docs/
- Example App: examples/openai-agents-fastapi/
- Multi-Tenancy Example: examples/multi-tenancy/
- Issues: GitHub Issues
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentexec-0.1.6.tar.gz.
File metadata
- Download URL: agentexec-0.1.6.tar.gz
- Upload date:
- Size: 187.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1ac6ab0d1f7a37f038a6e3733a775923a889020ae2dbe53134cc75317f9053f7
|
|
| MD5 |
50b23c4bfe5205d5dba95b6bc0e67400
|
|
| BLAKE2b-256 |
e2e257fc34f8e32c9fb3f34c601d9c0e38930c9da778633d413487ea1b8aedfa
|
File details
Details for the file agentexec-0.1.6-py3-none-any.whl.
File metadata
- Download URL: agentexec-0.1.6-py3-none-any.whl
- Upload date:
- Size: 47.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
02508a8e56c4f4b8b6a472899cf02ba395441ce8a4a942bb8796f2207f344d5d
|
|
| MD5 |
62c02683524d6d2c2b38b97236651207
|
|
| BLAKE2b-256 |
7b6d4b8bed841b3e796129920097af3aac9177226e00b67be7b12f0fffa7aab0
|