Skip to main content

Transactional safety layer for AI agent tool execution — auto-rollback via Saga pattern

Project description

saga-agent

Transactional safety layer for AI agent tool execution.
When an AI agent fails mid-workflow, saga-agent automatically rolls back every completed step — in reverse order.


The Problem

AI agents can now call real APIs, write to databases, and trigger external services. But what happens when step 3 of 4 fails?

Step 1. Charge payment     ✅ $150 charged
Step 2. Deduct inventory   ✅ -1 item
Step 3. Register shipping  💥 Connection timeout
Step 4. (never reached)

The payment went through. The inventory was deducted. But the order never completed.

Most agent frameworks (LangChain, LlamaIndex, CrewAI) decide what to call next — but none of them handle what to undo when something goes wrong halfway through. That cleanup logic gets written by hand, once per workflow, and junior developers routinely get it wrong or skip it entirely.

saga-agent solves this. Declare a compensate() alongside every execute(), and the framework handles rollback automatically.


How It Works

saga-agent implements the Saga pattern for AI agent tool execution.

Every tool declares two methods:

  • execute() — the forward action
  • compensate() — what to undo if a later step fails

When a failure occurs, saga-agent walks the execution stack in reverse and calls compensate() on every step that already succeeded.

Failure detected at Step 3
  → compensate Step 2 (InventoryTool)  ↩️
  → compensate Step 1 (PaymentTool)    ↩️

Every execution is recorded as a structured audit log — what ran, what failed, what was rolled back.


Features

Feature Description
Auto rollback Compensates completed steps in reverse order on failure
Parallel execution Independent tools in the same group run concurrently
Retry policy Per-tool retry count with configurable rollback scope
Audit log Structured JSON log of every step, status, and timestamp
LLM-agnostic Works with OpenAI, Anthropic, or any function-calling LLM
Sync + Async Supports both def and async def tool implementations
Zero dependencies Standard library only

Installation

# From GitHub
pip install git+https://github.com/rlgh135/saga-agent.git

# For local development
git clone https://github.com/rlgh135/saga-agent.git
cd saga-agent
pip install -e ".[dev]"

Quickstart

1. Define your tools

from saga_agent import SagaAgent

agent = SagaAgent()


@agent.tool
class PaymentTool:
    def execute(self, order_id: str) -> dict:
        result = payment_api.charge(order_id)
        return {"tx_id": result.tx_id}

    def compensate(self, result: dict) -> None:
        payment_api.refund(result["tx_id"])


@agent.tool
class InventoryTool:
    def execute(self, item_id: str, qty: int) -> dict:
        inventory.deduct(item_id, qty)
        return {"item_id": item_id, "qty": qty}

    def compensate(self, result: dict) -> None:
        inventory.restore(result["item_id"], result["qty"])


@agent.tool
class ShippingTool:
    def execute(self, address: str) -> dict:
        return {"tracking_id": shipping.register(address)}

    def compensate(self, result: dict) -> None:
        shipping.cancel(result["tracking_id"])

2. Pass the LLM's tool call sequence

saga-agent is LLM-agnostic. Pass whatever sequence your LLM decides on:

steps = [
    {"tool": "PaymentTool",   "args": {"order_id": "ORD-001"}},
    {"tool": "InventoryTool", "args": {"item_id": "ITEM-A", "qty": 2}},
    {"tool": "ShippingTool",  "args": {"address": "Seoul, Korea"}},
]

context = agent.run(steps)

3. Automatic rollback on failure

If ShippingTool fails:

──────────────────────────────────────────────────
  Saga ID : fae31d40-17ae-4dda-857d-1fe14d698dbf
  Status  : ↩️  COMPENSATED
──────────────────────────────────────────────────
  Step 1. [↩️  COMPENSATED] PaymentTool
  Step 2. [↩️  COMPENSATED] InventoryTool
  Step 3. [❌ FAILED      ] ShippingTool
           └─ error: Connection timeout
──────────────────────────────────────────────────

Payment refunded. Inventory restored. Automatically.


Parallel Execution (Async)

Tools with no dependencies can run concurrently. Group them in a nested list:

from saga_agent import AsyncSagaExecutor

executor = AsyncSagaExecutor(agent._registry, ...)

steps = [
    # Group 1: payment + inventory run at the same time
    [
        {"tool": "PaymentTool",   "args": {"order_id": "ORD-001", "amount": 15000}},
        {"tool": "InventoryTool", "args": {"item_id": "ITEM-A", "qty": 2}},
    ],
    # Group 2: runs after Group 1 completes
    [
        {"tool": "ShippingTool",  "args": {"address": "Seoul, Korea"}},
    ],
]

context = await executor.run(steps)

On failure, rollback is also parallel within each group — and groups are compensated in reverse order.

The flat list[dict] format from the sync API is also accepted — each step becomes its own group automatically.


Retry Policy

Configure retries per tool with @agent.tool(retries=N, retry_scope=...):

# retry_scope="transaction" (default)
# → exhausted retries trigger full Saga rollback
@agent.tool(retries=3, retry_scope="transaction")
class ShippingTool:
    async def execute(self, address: str) -> dict: ...
    async def compensate(self, result: dict) -> None: ...


# retry_scope="tool"
# → exhausted retries mark only this tool as FAILED, Saga continues
@agent.tool(retries=3, retry_scope="tool")
class NotificationTool:
    async def execute(self, user_id: str) -> dict: ...
    async def compensate(self, result: dict) -> None: ...
retry_scope On exhausted retries
"transaction" (default) Full Saga rollback
"tool" This tool FAILED, Saga continues

LLM Integration

saga-agent is middleware — it sits between your LLM and your tools.

from saga_agent import SagaAgent, LLMRunner
from openai import OpenAI

agent = SagaAgent()

# ... register tools with @agent.tool ...

runner = LLMRunner(
    client=OpenAI(),
    model="gpt-4o",
    registry=agent._registry,
)

# LLMRunner handles the function-calling loop and passes
# the decided sequence to AsyncSagaExecutor automatically.
context = runner.run("Process order ORD-001 for item ITEM-A, qty 2")

No OpenAI key? Use the built-in mock for local development:

from saga_agent import MockLLMClient

client = MockLLMClient(tool_sequence=[
    ("PaymentTool",   {"order_id": "ORD-001", "amount": 15000}),
    ("InventoryTool", {"item_id": "ITEM-A",   "qty": 2}),
    ("ShippingTool",  {"address": "Seoul"}),
])

runner = LLMRunner(client=client, model="mock", registry=agent._registry)
context = runner.run("Process order ORD-001")

Audit Log

Every execution produces a structured log entry:

{
  "saga_id": "fae31d40-17ae-4dda-857d-1fe14d698dbf",
  "status": "COMPENSATED",
  "created_at": "2024-06-01T09:00:00+00:00",
  "finished_at": "2024-06-01T09:00:01+00:00",
  "steps": [
    {
      "tool_name": "PaymentTool",
      "status": "COMPENSATED",
      "result": {"tx_id": "TX-ORD-001"},
      "executed_at": "2024-06-01T09:00:00.100000+00:00",
      "compensated_at": "2024-06-01T09:00:01.300000+00:00"
    },
    {
      "tool_name": "ShippingTool",
      "status": "FAILED",
      "error": "Connection timeout",
      "executed_at": "2024-06-01T09:00:00.900000+00:00"
    }
  ]
}

Status Reference

Status Meaning
SUCCESS All steps completed
COMPENSATED A step failed; all prior steps rolled back successfully
COMPENSATION_FAILED A step failed and at least one rollback also failed

Project Structure

saga_agent/
├── models.py         — SagaContext, StepRecord, status enums
├── retry.py          — RetryPolicy dataclass
├── registry.py       — @agent.tool decorator, tool + policy storage
├── executor.py       — Synchronous saga executor
├── async_executor.py — Parallel async executor with retry support
├── audit.py          — Structured audit logger
├── llm_runner.py     — OpenAI function-calling loop integration
├── mock_llm.py       — Zero-dependency mock LLM client
└── __init__.py       — SagaAgent public API

examples/
├── order_flow.py       — Basic sync usage
├── async_order_flow.py — Parallel async execution
├── retry_flow.py       — Retry policy scenarios
└── llm_integration.py  — End-to-end LLM integration

Running the Examples

git clone https://github.com/rlgh135/saga-agent.git
cd saga-agent
pip install -e ".[dev]"

# Basic sync
python examples/order_flow.py

# Parallel async
python examples/async_order_flow.py

# Retry policy
python examples/retry_flow.py

# LLM integration (mock, no API key needed)
python examples/llm_integration.py

# LLM integration (real OpenAI)
OPENAI_API_KEY=sk-... python examples/llm_integration.py --real

Running Tests

pytest tests/ -v

Design Philosophy

saga-agent does one thing: make AI agent tool execution transactionally safe.

It does not decide which tools to call (that's your LLM), manage conversation history (that's your framework), or handle retries beyond the declared policy.

The interface is a deliberate constraint — if you can't define compensate(), you probably shouldn't be calling that tool from an autonomous agent.


Roadmap

  • Synchronous saga execution with auto rollback
  • Parallel async execution with group-level rollback
  • Per-tool retry policy with configurable rollback scope
  • Structured audit log
  • LLM-agnostic integration layer (OpenAI function calling)
  • Persistent saga log (SQLite / PostgreSQL)
  • LangChain tool adapter
  • Backoff strategy for retries (exponential, jitter)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

saga_agent-0.1.0.tar.gz (6.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

saga_agent-0.1.0-py3-none-any.whl (4.9 kB view details)

Uploaded Python 3

File details

Details for the file saga_agent-0.1.0.tar.gz.

File metadata

  • Download URL: saga_agent-0.1.0.tar.gz
  • Upload date:
  • Size: 6.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for saga_agent-0.1.0.tar.gz
Algorithm Hash digest
SHA256 7c400da04851dd3c497a3178586f84cd6c29457feb6b48f3a77c154e8d2109fd
MD5 7cea1f6b0177e2a983797a930b1c3434
BLAKE2b-256 ffd4336e2938c700c792ee8fe232fc73110ef37ac23b8da936b6056fa120be8e

See more details on using hashes here.

File details

Details for the file saga_agent-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: saga_agent-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 4.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for saga_agent-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d14a4db5f57a6d8447f9599fff01144fa0934b1c2b04844216ce0c9db85ab6da
MD5 0dc585f4c7c3add4b54e3624bb1154f1
BLAKE2b-256 85b3e48000a99b2464b5aeab527968ee43a9943c90471a108b32a331291b78ae

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page