Skip to main content

Idempotency and audit ledger for AI agent tool calls

Project description

Pixel fonts

AI agents retry. Tool calls shouldn't.


Idempotent tool execution for AI agents (Postgres for crash recovery + multi-worker).
Human sign-off locked to the exact tool call + args (no "approve X, run Y").

Idempotent side effects. Human approval gates. Queryable audit trail.

Works with any async Python tool executor. Examples: LangGraph / LangChain. (more coming)

PyPI version CI Python 3.10+ License: Apache-2.0

Installation

pip install agent-ledger

# With PostgreSQL support
pip install agent-ledger[postgres]

Quick Start

import asyncio
from agent_ledger import EffectLedger, EffectLedgerOptions, MemoryStore, ToolCall

async def charge_customer(effect):
    print(f"Charging {effect.args_canonical}...")
    return {"status": "charged", "id": "ch_123"}

async def main():
    store = MemoryStore()
    ledger = EffectLedger(EffectLedgerOptions(store=store))

    # First call: executes the handler
    result = await ledger.run(
        ToolCall(
            workflow_id="order-123",
            tool="stripe.charge",
            args={"amount": 1000, "currency": "usd"},
        ),
        handler=charge_customer,
    )
    print(f"First call: {result}")

    # Second call: same inputs → returns recorded result
    result2 = await ledger.run(
        ToolCall(
            workflow_id="order-123",
            tool="stripe.charge",
            args={"amount": 1000, "currency": "usd"},
        ),
        handler=charge_customer,
    )
    print(f"Second call: {result2}")
    # Handler only executed once. No double charge.

asyncio.run(main())

Output:

Charging {"amount":1000,"currency":"usd"}...
First call: {'status': 'charged', 'id': 'ch_123'}
Second call: {'status': 'charged', 'id': 'ch_123'}

Same inputs → same hash → same result. The handler only runs once.


Why This Exists

LLMs are non-deterministic. Your payment API isn't.

When an agent crashes or retries, it doesn't remember what it already did. A timeout becomes a double charge. A retry loop becomes inbox spam. A crashed deploy leaves you guessing what actually ran.

agent-ledger sits between your agent and the outside world:

Problem How agent-ledger helps
Agent retried → customer charged twice Idempotency: Same inputs → same hash → recorded result replayed, handler skipped
Agent deployed without permission Approvals: Sensitive tools pause until a human approves
"What did the agent actually do?" Audit trail: Every call recorded with inputs, outputs, timing
Process crashed mid-execution Crash recovery: New process reads ledger, resumes safely
Two workers hit the same task Concurrency: First writer wins, others wait for result

The "Pause Button" for Your Agent (Human-in-the-loop)

High-stakes operations can require human approval before execution (example of a Slack message as approval request integration):

from agent_ledger import RunOptions, LedgerHooks
import json

# Agent side: request approval and wait
async def notify_slack_approval_hook(effect):
    """Hook fires once when approval is required."""
    await slack.post_message(
        channel="#deployments",
        text=f"Approval needed: {effect.tool}",
        blocks=[...],
    )


result = await ledger.run(
    ToolCall(
        workflow_id="deploy-prod",
        tool="k8s.deploy",
        args={"image": "app:v2"},
    ),
    handler=deploy_to_k8s,
    run_options=RunOptions(requires_approval=True),
    hooks=LedgerHooks(on_approval_required=notify_slack_approval_hook),
)

# Slack bot side: handle button click
@slack_app.action("approve")
async def handle_approve(ack, body):
    await ack()
    idem_key = body["actions"][0]["value"]  # From button payload

    await ledger.approve(idem_key)
    await slack.post_message(channel="#deployments", text=f"✅ Approved: {idem_key}")

@slack_app.action("deny")
async def handle_deny(ack, body):
    await ack()
    idem_key = body["actions"][0]["value"]

    await ledger.deny(idem_key, reason="Denied by operator")
    await slack.post_message(channel="#deployments", text=f"❌ Denied: {idem_key}")

The agent waits. The human decides. The ledger records everything.

run() polls until the effect is approved, with exponential backoff (default: 30s timeout, 50ms initial interval, 1.5x backoff multiplier). Configurable via RunOptions.concurrency (see Configuration). After approval, the handler executes and the result is returned.

The on_approval_required hook fires once when the approval request is created—not on retries or replays. Hook errors are logged but don't abort the run.

Key flow: The hook receives effect.idem_key—this is the approval handle. External systems (Slack, admin panels, CLIs) store this key in button payloads/URLs and pass it to approve(idem_key) or deny(idem_key, reason).

Intent-bound approval: The approval is tied to the exact payload hash. If the agent retries with different arguments, that's a new approval request—not a bypass of the previous one.


With LangChain

from agent_ledger import EffectLedger, EffectLedgerOptions, MemoryStore, ToolCall

ledger = EffectLedger(EffectLedgerOptions(store=MemoryStore()))

async def send_email(to: str, subject: str, body: str) -> str:
    # Your actual email-sending logic
    return f"Email sent to {to}"

async def execute_tool_safely(tool_name: str, args: dict, workflow_id: str):
    """Wrap any async function with idempotency."""
    async def handler(_):
        return await send_email(**args)

    return await ledger.run(
        ToolCall(workflow_id=workflow_id, tool=tool_name, args=args),
        handler=handler,
    )

# In your agent's tool execution loop:
result = await execute_tool_safely(
    tool_name="send_email",
    args={"to": "customer@example.com", "subject": "Order confirmed", "body": "..."},
    workflow_id="order-456",
)
# Retries won't send duplicate emails

This pattern works with LangChain, LangGraph, CrewAI, or any other framework. See examples/ for framework-specific integrations.


How It Works

Every tool call becomes a transaction in the ledger:

ToolCall(workflow_id, tool, args)
              │
              ▼
    SHA256(workflow_id | tool | args) → idem_key
              │
              ▼
         ┌─────────┐
         │ LEDGER  │
         └────┬────┘
              │
   ┌──────────┼──────────┐
   │          │          │
 fresh    in-flight   terminal
   │          │          │
   ▼          ▼          ▼
execute     wait       replay
handler   for result   recorded

Effect lifecycle:

processing → succeeded
           → failed
           → requires_approval → ready → succeeded/failed
                               → denied
           → canceled

Design Constraints

  • At-most-once commit per idem_key: Each unique (workflow_id, tool, args) tuple is recorded at most once, enforced by the store's unique constraint on idem_key and atomic upsert semantics
  • Downstream exactly-once: Depends on your handler being idempotent or the downstream API supporting idempotency keys
  • Deterministic canonicalization: Args are JSON-serialized with sorted keys; non-deterministic values (timestamps, UUIDs) in args will create new records
  • Multi-tenant isolation: Include tenant/user/principal in workflow_id (or in args) to prevent cross-actor deduplication. The library does not enforce tenant boundaries—your application must scope workflow_id appropriately

More Examples

Custom Idempotency Keys

Hash only specific fields—ignore the rest:

await ledger.run(
    ToolCall(
        workflow_id="ticket-456",
        tool="github.create_issue",
        args={"owner": "acme", "repo": "app", "title": "Bug", "body": "Details..."},
        idempotency_keys=["owner", "repo", "title"],  # body changes won't re-execute
    ),
    handler=create_issue,
)

PostgreSQL for Production

Your agent's state belongs in Postgres, not ephemeral memory:

from psycopg_pool import AsyncConnectionPool
from agent_ledger.stores.postgres import PostgresStore, SCHEMA_SQL

pool = AsyncConnectionPool(conninfo="postgresql://localhost/mydb")
async with pool.connection() as conn:
    await conn.execute(SCHEMA_SQL)

store = PostgresStore(pool=pool)
ledger = EffectLedger(EffectLedgerOptions(store=store))
# Query your audit trail with SQL

Fine-Grained Control

For custom execution logic:

from agent_ledger import CommitSucceeded, CommitFailed, EffectError

begin_result = await ledger.begin(call)

if begin_result.cached:
    return begin_result.cached_result

try:
    result = await execute_tool(call.args)
    await ledger.commit(begin_result.effect.id, CommitSucceeded(result=result))
    return result
except Exception as e:
    await ledger.commit(
        begin_result.effect.id,
        CommitFailed(error=EffectError(message=str(e))),
    )
    raise

When to Use This

Good fit:

  • Agents calling payment APIs, sending emails, creating tickets
  • Workflows requiring human-in-the-loop oversight
  • Workflows that retry on failure or resume after crashes
  • Operations requiring human sign-off before execution
  • Systems needing audit trails of what the agent did

Probably not needed:

  • Read-only agents (RAG, summarization, search)
  • One-off scripts without retry logic
  • Prototypes where duplicates are acceptable

Why Not...?

Alternative What's missing
Retry libraries (Tenacity, Stamina) Retry the call, but don't deduplicate across processes or restarts
In-memory cache Lost on restart, can't coordinate multiple workers
DB unique constraints Good start, but no lifecycle states, result caching, or approval flows
Workflow engines (Temporal, Celery) Full orchestration systems; agent-ledger is a lightweight layer you can use inside them

Library-only: no sidecar, no agent runtime, no SaaS. Bring your own store (Memory/Postgres). pip install and go.


FAQ

Does this replace Temporal? No. Temporal is a full workflow orchestration engine. agent-ledger is a lightweight idempotency layer you can use inside Temporal activities, or standalone.

Can it prevent double Stripe charges? Yes—by replaying the recorded result instead of re-executing the handler. For extra safety, also pass Stripe's own idempotency_key in your API call.

What is workflow_id? A scope boundary for idempotency. Same (workflow_id, tool, args) = same effect. Different workflow_id = independent effects, even with identical tool+args. You can use this, for example, if an agent is invoked via a webhook to deduplicate all side effects across multiple retries by passing the webhook's id as the workflow_id.

Important: In multi-tenant systems, include the tenant/user identifier in workflow_id (e.g., "tenant-123:order-456") to prevent unintended deduplication across different actors or security boundaries.


Core API

Method Purpose
run(call, handler) Execute with idempotency—the main entry point
begin(call) / commit(id, outcome) Manual transaction control
approve(key) / deny(key) Human-in-the-loop approval
get_effect(id) / find_by_idem_key(key) Query ledger state

See ledger.py for full API with type signatures.

Configuration

Control polling, timeouts, and stale effect handling:

from agent_ledger import RunOptions, ConcurrencyOptions, StaleOptions

# Per-call configuration
await ledger.run(
    call,
    handler=my_handler,
    run_options=RunOptions(
        requires_approval=False,
        concurrency=ConcurrencyOptions(
            wait_timeout_ms=30_000,      # Max wait time for in-flight effects (default: 30s)
            initial_interval_ms=50,      # First poll interval (default: 50ms)
            max_interval_ms=1_000,       # Cap for backoff interval (default: 1s)
            backoff_multiplier=1.5,      # Exponential backoff rate (default: 1.5x)
            jitter_factor=0.3,           # Random jitter to avoid thundering herd (default: 0.3)
        ),
        stale=StaleOptions(
            after_ms=60_000,             # Take over stale PROCESSING effects (default: 0 = disabled)
        ),
    ),
)

# Global defaults (applied to all run() calls unless overridden)
from agent_ledger import EffectLedger, EffectLedgerOptions, LedgerDefaults

ledger = EffectLedger(
    EffectLedgerOptions(
        store=store,
        defaults=LedgerDefaults(
            run=RunOptions(
                requires_approval=True,  # All calls need approval by default
                concurrency=ConcurrencyOptions(wait_timeout_ms=60_000),  # 60s timeout
            ),
        ),
    ),
)

Polling behavior: When waiting for an in-flight or approval-required effect, run() polls the store with exponential backoff. Initial interval is 50ms, increasing by 1.5x each retry (up to 1s max), with 30% jitter. Times out after 30s by default.


License

Apache-2.0


Disclaimer: This library is designed to help reduce duplicate executions through idempotency patterns. It does not guarantee exactly-once semantics in all failure scenarios—correct behavior depends on proper integration, idempotent handlers, and appropriate storage configuration. The authors are not liable for any damages arising from the use of this software. Always test thoroughly before deploying to production. See LICENSE for full terms.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agent_ledger-0.1.1.tar.gz (29.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agent_ledger-0.1.1-py3-none-any.whl (24.9 kB view details)

Uploaded Python 3

File details

Details for the file agent_ledger-0.1.1.tar.gz.

File metadata

  • Download URL: agent_ledger-0.1.1.tar.gz
  • Upload date:
  • Size: 29.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for agent_ledger-0.1.1.tar.gz
Algorithm Hash digest
SHA256 9a3c206bb0b0f9522bcad23a2c519564b52ebe4a07e5c39e525659fd41e30299
MD5 5ef696071282c28c88706aa382123c60
BLAKE2b-256 3a58c67928c92b662649b3768a82eeccaab7323b69c673ed91dc71ecfc8aac8d

See more details on using hashes here.

Provenance

The following attestation bundles were made for agent_ledger-0.1.1.tar.gz:

Publisher: ci.yml on rune0-dev/agent-ledger

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file agent_ledger-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: agent_ledger-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 24.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for agent_ledger-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 535405f76f0c6a709202bd42a67f29ed8cffb175a4dc3f17689cb74bf70c22ab
MD5 bbcacc93446cea8da7479cf7b172f383
BLAKE2b-256 bd333025fdb14436b0cce54122c45372475e664a8d6d5f5a8b4d789537c53a60

See more details on using hashes here.

Provenance

The following attestation bundles were made for agent_ledger-0.1.1-py3-none-any.whl:

Publisher: ci.yml on rune0-dev/agent-ledger

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page