Transactional safety layer for AI agent tool execution — auto-rollback via Saga pattern
Project description
saga-agent
Transactional safety layer for AI agent tool execution.
When an AI agent fails mid-workflow, saga-agent automatically rolls back every completed step — in reverse order.
The Problem
AI agents can now call real APIs, write to databases, and trigger external services. But what happens when step 3 of 4 fails?
Step 1. Charge payment ✅ $150 charged
Step 2. Deduct inventory ✅ -1 item
Step 3. Register shipping 💥 Connection timeout
Step 4. (never reached)
The payment went through. The inventory was deducted. But the order never completed.
Most agent frameworks (LangChain, LlamaIndex, CrewAI) decide what to call next — but none of them handle what to undo when something goes wrong halfway through. That cleanup logic gets written by hand, once per workflow, and junior developers routinely get it wrong or skip it entirely.
saga-agent solves this. Declare a compensate() alongside every execute(), and the framework handles rollback automatically.
How It Works
saga-agent implements the Saga pattern for AI agent tool execution.
Every tool declares two methods:
execute()— the forward actioncompensate()— what to undo if a later step fails
When a failure occurs, saga-agent walks the execution stack in reverse and calls compensate() on every step that already succeeded.
Failure detected at Step 3
→ compensate Step 2 (InventoryTool) ↩️
→ compensate Step 1 (PaymentTool) ↩️
Every execution is recorded as a structured audit log — what ran, what failed, what was rolled back.
Features
| Feature | Description |
|---|---|
| Auto rollback | Compensates completed steps in reverse order on failure |
| Parallel execution | Independent tools in the same group run concurrently |
| Retry policy | Per-tool retry count with configurable rollback scope |
| Audit log | Structured JSON log of every step, status, and timestamp |
| LLM-agnostic | Works with OpenAI, Anthropic, or any function-calling LLM |
| Sync + Async | Supports both def and async def tool implementations |
| Zero dependencies | Standard library only |
Installation
# From GitHub
pip install git+https://github.com/rlgh135/saga-agent.git
# For local development
git clone https://github.com/rlgh135/saga-agent.git
cd saga-agent
pip install -e ".[dev]"
Quickstart
1. Define your tools
from saga_agent import SagaAgent
agent = SagaAgent()
@agent.tool
class PaymentTool:
def execute(self, order_id: str) -> dict:
result = payment_api.charge(order_id)
return {"tx_id": result.tx_id}
def compensate(self, result: dict) -> None:
payment_api.refund(result["tx_id"])
@agent.tool
class InventoryTool:
def execute(self, item_id: str, qty: int) -> dict:
inventory.deduct(item_id, qty)
return {"item_id": item_id, "qty": qty}
def compensate(self, result: dict) -> None:
inventory.restore(result["item_id"], result["qty"])
@agent.tool
class ShippingTool:
def execute(self, address: str) -> dict:
return {"tracking_id": shipping.register(address)}
def compensate(self, result: dict) -> None:
shipping.cancel(result["tracking_id"])
2. Pass the LLM's tool call sequence
saga-agent is LLM-agnostic. Pass whatever sequence your LLM decides on:
steps = [
{"tool": "PaymentTool", "args": {"order_id": "ORD-001"}},
{"tool": "InventoryTool", "args": {"item_id": "ITEM-A", "qty": 2}},
{"tool": "ShippingTool", "args": {"address": "Seoul, Korea"}},
]
context = agent.run(steps)
3. Automatic rollback on failure
If ShippingTool fails:
──────────────────────────────────────────────────
Saga ID : fae31d40-17ae-4dda-857d-1fe14d698dbf
Status : ↩️ COMPENSATED
──────────────────────────────────────────────────
Step 1. [↩️ COMPENSATED] PaymentTool
Step 2. [↩️ COMPENSATED] InventoryTool
Step 3. [❌ FAILED ] ShippingTool
└─ error: Connection timeout
──────────────────────────────────────────────────
Payment refunded. Inventory restored. Automatically.
Parallel Execution (Async)
Tools with no dependencies can run concurrently. Group them in a nested list:
from saga_agent import AsyncSagaExecutor
executor = AsyncSagaExecutor(agent._registry, ...)
steps = [
# Group 1: payment + inventory run at the same time
[
{"tool": "PaymentTool", "args": {"order_id": "ORD-001", "amount": 15000}},
{"tool": "InventoryTool", "args": {"item_id": "ITEM-A", "qty": 2}},
],
# Group 2: runs after Group 1 completes
[
{"tool": "ShippingTool", "args": {"address": "Seoul, Korea"}},
],
]
context = await executor.run(steps)
On failure, rollback is also parallel within each group — and groups are compensated in reverse order.
The flat list[dict] format from the sync API is also accepted — each step becomes its own group automatically.
Retry Policy
Configure retries per tool with @agent.tool(retries=N, retry_scope=...):
# retry_scope="transaction" (default)
# → exhausted retries trigger full Saga rollback
@agent.tool(retries=3, retry_scope="transaction")
class ShippingTool:
async def execute(self, address: str) -> dict: ...
async def compensate(self, result: dict) -> None: ...
# retry_scope="tool"
# → exhausted retries mark only this tool as FAILED, Saga continues
@agent.tool(retries=3, retry_scope="tool")
class NotificationTool:
async def execute(self, user_id: str) -> dict: ...
async def compensate(self, result: dict) -> None: ...
retry_scope |
On exhausted retries |
|---|---|
"transaction" (default) |
Full Saga rollback |
"tool" |
This tool FAILED, Saga continues |
LLM Integration
saga-agent is middleware — it sits between your LLM and your tools.
from saga_agent import SagaAgent, LLMRunner
from openai import OpenAI
agent = SagaAgent()
# ... register tools with @agent.tool ...
runner = LLMRunner(
client=OpenAI(),
model="gpt-4o",
registry=agent._registry,
)
# LLMRunner handles the function-calling loop and passes
# the decided sequence to AsyncSagaExecutor automatically.
context = runner.run("Process order ORD-001 for item ITEM-A, qty 2")
No OpenAI key? Use the built-in mock for local development:
from saga_agent import MockLLMClient
client = MockLLMClient(tool_sequence=[
("PaymentTool", {"order_id": "ORD-001", "amount": 15000}),
("InventoryTool", {"item_id": "ITEM-A", "qty": 2}),
("ShippingTool", {"address": "Seoul"}),
])
runner = LLMRunner(client=client, model="mock", registry=agent._registry)
context = runner.run("Process order ORD-001")
Audit Log
Every execution produces a structured log entry:
{
"saga_id": "fae31d40-17ae-4dda-857d-1fe14d698dbf",
"status": "COMPENSATED",
"created_at": "2024-06-01T09:00:00+00:00",
"finished_at": "2024-06-01T09:00:01+00:00",
"steps": [
{
"tool_name": "PaymentTool",
"status": "COMPENSATED",
"result": {"tx_id": "TX-ORD-001"},
"executed_at": "2024-06-01T09:00:00.100000+00:00",
"compensated_at": "2024-06-01T09:00:01.300000+00:00"
},
{
"tool_name": "ShippingTool",
"status": "FAILED",
"error": "Connection timeout",
"executed_at": "2024-06-01T09:00:00.900000+00:00"
}
]
}
Status Reference
| Status | Meaning |
|---|---|
SUCCESS |
All steps completed |
COMPENSATED |
A step failed; all prior steps rolled back successfully |
COMPENSATION_FAILED |
A step failed and at least one rollback also failed |
Project Structure
saga_agent/
├── models.py — SagaContext, StepRecord, status enums
├── retry.py — RetryPolicy dataclass
├── registry.py — @agent.tool decorator, tool + policy storage
├── executor.py — Synchronous saga executor
├── async_executor.py — Parallel async executor with retry support
├── audit.py — Structured audit logger
├── llm_runner.py — OpenAI function-calling loop integration
├── mock_llm.py — Zero-dependency mock LLM client
└── __init__.py — SagaAgent public API
examples/
├── order_flow.py — Basic sync usage
├── async_order_flow.py — Parallel async execution
├── retry_flow.py — Retry policy scenarios
└── llm_integration.py — End-to-end LLM integration
Running the Examples
git clone https://github.com/rlgh135/saga-agent.git
cd saga-agent
pip install -e ".[dev]"
# Basic sync
python examples/order_flow.py
# Parallel async
python examples/async_order_flow.py
# Retry policy
python examples/retry_flow.py
# LLM integration (mock, no API key needed)
python examples/llm_integration.py
# LLM integration (real OpenAI)
OPENAI_API_KEY=sk-... python examples/llm_integration.py --real
Running Tests
pytest tests/ -v
Design Philosophy
saga-agent does one thing: make AI agent tool execution transactionally safe.
It does not decide which tools to call (that's your LLM), manage conversation history (that's your framework), or handle retries beyond the declared policy.
The interface is a deliberate constraint — if you can't define compensate(), you probably shouldn't be calling that tool from an autonomous agent.
Roadmap
- Synchronous saga execution with auto rollback
- Parallel async execution with group-level rollback
- Per-tool retry policy with configurable rollback scope
- Structured audit log
- LLM-agnostic integration layer (OpenAI function calling)
- Persistent saga log (SQLite / PostgreSQL)
- LangChain tool adapter
- Backoff strategy for retries (exponential, jitter)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file saga_agent-0.1.0.tar.gz.
File metadata
- Download URL: saga_agent-0.1.0.tar.gz
- Upload date:
- Size: 6.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7c400da04851dd3c497a3178586f84cd6c29457feb6b48f3a77c154e8d2109fd
|
|
| MD5 |
7cea1f6b0177e2a983797a930b1c3434
|
|
| BLAKE2b-256 |
ffd4336e2938c700c792ee8fe232fc73110ef37ac23b8da936b6056fa120be8e
|
File details
Details for the file saga_agent-0.1.0-py3-none-any.whl.
File metadata
- Download URL: saga_agent-0.1.0-py3-none-any.whl
- Upload date:
- Size: 4.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d14a4db5f57a6d8447f9599fff01144fa0934b1c2b04844216ce0c9db85ab6da
|
|
| MD5 |
0dc585f4c7c3add4b54e3624bb1154f1
|
|
| BLAKE2b-256 |
85b3e48000a99b2464b5aeab527968ee43a9943c90471a108b32a331291b78ae
|