Production-ready orchestration for OpenAI Agents with Redis-backed coordination, activity tracking, and workflow management
Project description
agentexec
Production-ready orchestration for OpenAI Agents SDK with Redis-backed task queues, SQLAlchemy activity tracking, and multiprocessing worker pools.
Build reliable, scalable AI agent applications with automatic lifecycle management, progress tracking, and fault tolerance.
Running AI agents in production requires more than just the SDK. You need:
- Background execution - Agents can take minutes to complete; users shouldn't wait
- Progress tracking - Know what your agents are doing and when they finish
- Fault tolerance - Handle failures gracefully with automatic error tracking
- Scalability - Process multiple agent tasks concurrently across worker processes
- Observability - Full audit trail of agent activities and status updates
agentexec provides all of this out of the box, with a simple API that integrates seamlessly with the OpenAI Agents SDK (and the extensibility to continue adding support for other frameworks).
Features
- Multi-process worker pool - True parallelism for concurrent agent execution
- Redis task queue - Reliable job distribution with priority support
- Automatic activity tracking - Full lifecycle management (QUEUED → RUNNING → COMPLETE/ERROR)
- OpenAI Agents integration - Drop-in runner with max turns recovery
- Agent self-reporting - Built-in tools for agents to report progress
- SQLAlchemy-based storage - Flexible database support (PostgreSQL, MySQL, SQLite)
- Type-safe - Full type annotations with Pydantic schemas
- Production-ready - Graceful shutdown, error handling, configurable timeouts
Installation
uv add agentexec
Requirements:
- Python 3.11+
- Redis (for task queue)
- SQLAlchemy-compatible database (for activity tracking)
- Agents that you want to parallelize!
Quick Start
1. Set Up Your Worker
import agentexec as ax
from agents import Agent
from sqlalchemy import Session, create_engine
# database for activity tracking (share with your app)
engine = create_engine("sqlite:///agents.db")
# create worker pool
pool = ax.WorkerPool(engine=engine)
@pool.task("research_company")
async def research_company(agent_id: UUID, payload: dict) -> None:
"""Background task that runs an AI agent."""
runner = ax.OpenAIRunner(
agent_id=agent_id,
max_turns_recovery=True,
)
agent = Agent(
name="Research Agent",
instructions=(
f"Research {payload['company']}.\n"
"\n"
f"{runner.prompts.report_status}"
),
tools=[
runner.tools.report_status,
],
model="gpt-5.1",
)
result = await runner.run(
agent,
input="Start research",
max_turns=15,
)
print(f"Done! {result.final_output}")
if __name__ == "__main__":
pool.start() # start workers
2. Queue Tasks from Your Application
import agentexec as ax
# enqueue a task (from your API, web app, etc.)
task = ax.enqueue(
"research_company",
{"company": "Anthropic"},
)
print(f"Task queued: {task.agent_id}")
3. Track Progress
with Session(engine) as db:
# list recent activities
activities = ax.activity.list(db, page=1, page_size=10)
for activity in activities:
print(f"Agent {activity.agent_id} - Status: {activity.status}")
# get activity with full log history
activity = ax.activity.detail(db, agent_id=task.agent_id)
print(f"Activity for {activity.agent_id}:")
for log in activity.logs:
print(f" - {log.created_at}: {log.message} ({log.status})")
What You Get
Automatic Activity Tracking
Every task gets full lifecycle tracking without manual updates:
runner = ax.OpenAIRunner(agent_id=agent_id)
result = await runner.run(agent, input="...")
# Activity automatically transitions:
# QUEUED → RUNNING → COMPLETE (or ERROR on failure)
Agent Self-Reporting
Agents can report their own progress using a built-in tool:
agent = Agent(
instructions=f"Do research. {runner.prompts.report_status}",
tools=[runner.tools.report_status], # Agent can call this
)
# Agent will report: "Gathering data" (40%), "Analyzing results" (80%), etc.
Max Turns Recovery
Automatically handle conversation limits with graceful wrap-up:
runner = ax.OpenAIRunner(
agent_id=agent_id,
max_turns_recovery=True,
wrap_up_prompt="Please summarize your findings.",
)
# If agent hits max turns, runner automatically:
# 1. Catches MaxTurnsExceeded
# 2. Continues with wrap-up prompt
# 3. Returns final result
Priority Queue
Control task execution order:
# High priority - processed first
ax.enqueue("urgent_task", payload, priority=ax.Priority.HIGH)
# Low priority - processed later
ax.enqueue("batch_job", payload, priority=ax.Priority.LOW)
Full Example: FastAPI Integration
See examples/openai-agents-fastapi/ for a complete production application showing:
- Background worker pool with task handlers
- FastAPI routes for queueing tasks and checking status
- Database session management with SQLAlchemy
- Custom agents with function tools
- Real-time progress monitoring
- Graceful shutdown with cleanup
Configuration
Configure via environment variables or .env file:
# Worker settings
AGENTEXEC_NUM_WORKERS=4
# Redis settings
AGENTEXEC_REDIS_URL=redis://localhost:6379/0
AGENTEXEC_QUEUE_NAME=agentexec:tasks
# Database table prefix
AGENTEXEC_TABLE_PREFIX=agentexec_
Public API
Task Queue
# Enqueue task
task = ax.enqueue(task_name, payload, priority=ax.Priority.LOW)
Activity Tracking
# Query activities
activities = ax.activity.list(session, page=1, page_size=50)
activity = ax.activity.detail(session, agent_id)
Worker Pool
pool = ax.WorkerPool(engine=engine)
@pool.task("task_name")
async def handler(agent_id: UUID, payload: dict) -> None:
# Task implementation
pass
pool.start() # Start worker processes
OpenAI Runner
runner = ax.OpenAIRunner(
agent_id=agent_id,
max_turns_recovery=True,
wrap_up_prompt="Summarize...",
)
# Run agent
result = await runner.run(agent, input="...", max_turns=15)
# Streaming
result = await runner.run_streamed(agent, input="...", max_turns=15)
Architecture
┌─────────────┐ ┌──────────┐ ┌─────────────┐
│ Your │────────>│ Redis │<────────│ Worker │
│ Application │ enqueue │ Queue │ dequeue │ Pool │
└─────────────┘ └──────────┘ └─────────────┘
│ │
│ Runner │
│ (+ Activity Tracking) │
v v
┌─────────────────────────────────────────────────────────-┐
│ SQLAlchemy Database │
│ (Activities, Logs, Progress) │
└─────────────────────────────────────────────────────────-┘
Flow:
- Application enqueues task → Activity created (QUEUED)
- Worker dequeues task → Executes with OpenAIRunner
- Runner updates activity → RUNNING
- Agent reports progress → Log entries created
- Task completes → Activity marked COMPLETE/ERROR
Database Models
AgentExec creates two tables (prefix configurable):
agentexec_activity - Main activity records
id- Primary key (UUID)agent_id- Unique agent identifier (UUID)agent_type- Task name/typecreated_at- When activity was createdupdated_at- Last update timestamp
agentexec_activity_log - Status and progress logs
id- Primary key (UUID)activity_id- Foreign key to activitymessage- Log messagestatus- QUEUED, RUNNING, COMPLETE, ERROR, CANCELEDcompletion_percentage- Progress (0-100)created_at- When log was created
Development
# Clone repository
git clone https://github.com/Agent-CI/agentexec
cd agentexec
# Install dependencies
uv sync
# Run tests
uv run pytest
# Type checking
uv run mypy src/agentexec
# Linting
uv run ruff check src/
# Formatting
uv run ruff format src/
License
MIT License - see LICENSE for details
Links
- Documentation: See example application in
examples/openai-agents-fastapi/ - Issues: GitHub Issues
- PyPI: agentexec
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file agentexec-0.1.0.tar.gz.
File metadata
- Download URL: agentexec-0.1.0.tar.gz
- Upload date:
- Size: 30.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
502a8af774a1331336bab0771c2da7c181bdbbfedb0afe31bc33fcc3986efb2a
|
|
| MD5 |
0e5f351356a3e1c5288319b6d5c32e62
|
|
| BLAKE2b-256 |
3b0e3ee46a3f9d4f739a379ac2afce31420352b04523af9ba6350b5ac0557bb0
|
File details
Details for the file agentexec-0.1.0-py3-none-any.whl.
File metadata
- Download URL: agentexec-0.1.0-py3-none-any.whl
- Upload date:
- Size: 23.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.11 {"installer":{"name":"uv","version":"0.9.11"},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0c7f69f9816b5938a84e11c0cbd69e3455bb91518957357a07a91c55a06ae2d4
|
|
| MD5 |
6dde42f0f3f65c37253c1e8689ddd871
|
|
| BLAKE2b-256 |
c1e1bfe0d110d1a9556812ea058a2db1f8334e4ef6ba746cc27e58173c48cbd0
|