Skip to main content

Composable compensation middleware for LangChain agents with automatic rollback, pluggable strategies, multi-agent support, and LLM-driven parameter extraction

Project description

LangChain Compensation

v0.5.8 — Composable compensation middleware for LangChain agents with automatic rollback, retry-before-rollback strategies, pluggable error detection, multi-agent support, and LLM-driven parameter extraction. Implements the Saga pattern for distributed transactions in agent workflows.

PyPI version Python 3.9+ License: MIT

Features

  • Automatic Rollback: Compensates completed actions when failures occur using the Saga pattern
  • Retry Before Rollback: Configurable retry strategies with exponential backoff before triggering rollback
  • Partial Rollback: Only rollback dependent actions, preserving independent successful operations
  • DAG-Based Ordering: Rolls back actions in correct dependency order (not just LIFO)
  • Pluggable Error Detection: Extensible strategies for detecting tool failures
  • Pluggable Parameter Extraction: Multiple strategies including LLM-based extraction
  • Multi-Agent Support: Shared compensation log for coordinated rollback across agents
  • Parallel Execution Safe: Thread-safe with atomic batch operations
  • Fault Tolerance: Checkpoint integration with LangGraph for crash recovery
  • Simple API: Drop-in replacement for create_agent() with one additional parameter

Installation

pip install langchain-compensation

For LLM-based parameter extraction (optional):

pip install langchain-compensation langchain-openai
# or
pip install langchain-compensation langchain-anthropic

Quick Start

from langchain_compensation import create_comp_agent
from langchain_core.tools import tool

@tool
def book_flight(destination: str) -> dict:
    """Books a flight to the destination."""
    return {"id": f"FL-{destination.upper()}-123", "status": "confirmed"}

@tool
def cancel_flight(id: str) -> str:
    """Cancels a flight booking."""
    return f"Flight {id} cancelled"

@tool
def book_hotel(location: str) -> dict:
    """Books a hotel at the location."""
    if "fail" in location.lower():
        return {"status": "error", "message": "No rooms available"}
    return {"id": f"HT-{location.upper()}-456", "status": "confirmed"}

@tool
def cancel_hotel(id: str) -> str:
    """Cancels a hotel booking."""
    return f"Hotel {id} cancelled"

# Create agent with automatic compensation
agent = create_comp_agent(
    model="gpt-4o",
    tools=[book_flight, cancel_flight, book_hotel, cancel_hotel],
    compensation_mapping={
        "book_flight": "cancel_flight",
        "book_hotel": "cancel_hotel",
    },
)

# When hotel booking fails, flight is automatically cancelled!
result = agent.invoke({
    "messages": [("user", "Book a flight to Paris and a hotel in FailCity")]
})

How It Works

1. Agent calls book_flight("Paris")     → Success, records compensation action
2. Agent calls book_hotel("FailCity")   → Fails with error status
3. Middleware detects failure           → Triggers automatic rollback
4. Middleware calls cancel_flight()     → Compensates the successful booking
5. Agent continues with clean state

The compensation log tracks all compensatable actions with their results, enabling intelligent parameter extraction for compensation calls.


Using CompensationMiddleware with create_agent

While create_comp_agent is the recommended way to get started, you can also use CompensationMiddleware directly with create_agent for more control:

from langchain.agents import create_agent
from langchain_compensation.middleware import CompensationMiddleware

# Define your tools
@tool
def book_flight(destination: str) -> str:
    """Books a flight."""
    return f"flight_id_for_{destination}"

@tool
def cancel_flight(booking_id: str) -> str:
    """Cancels a flight."""
    return "Cancelled"

# Create the compensation middleware
comp_middleware = CompensationMiddleware(
    compensation_mapping={
        "book_flight": "cancel_flight",
    },
    tools=[book_flight, cancel_flight],  # Required for compensation execution
    state_mappers={  # Optional: custom parameter extraction
        "book_flight": lambda result, params: {"booking_id": result}
    },
)

# Create agent with middleware
agent = create_agent(
    model="gpt-4o",
    tools=[book_flight, cancel_flight],
    middleware=[comp_middleware],  # Pass middleware as a list
    system_prompt="You are a helpful travel assistant.",
)

CompensationMiddleware Parameters

Parameter Type Description
compensation_mapping dict Required. Maps tool names to compensation tools
tools list Tools needed to execute compensations
state_mappers dict Custom functions to extract compensation parameters
shared_log CompensationLog Shared log for multi-agent coordination
agent_id str Identifier for this agent instance
compensation_schemas dict Declarative schemas for parameter extraction
error_strategies list Custom error detection strategies
extraction_strategies list Custom parameter extraction strategies

create_comp_agent vs create_agent + Middleware

Feature create_comp_agent create_agent + Middleware
Setup Required Minimal - single function call More boilerplate
LLM Extraction Automatic with use_llm_extraction=True Manual strategy building
Checkpointing Auto-configured with use_checkpointing=True Manual middleware stacking
Recursion Limit Auto-set to 1000 Must set manually
Human-in-the-Loop Auto-added with interrupt_on Manual middleware stacking
Control Limited - opinionated setup Full control over middleware stack
Learning Curve Shallow - get started quickly Steeper - need middleware knowledge

When to use each:

  • Use create_comp_agent for:

    • Getting started quickly
    • Standard compensation patterns
    • Using LLM extraction or checkpointing
    • Minimal boilerplate preferred
  • Use create_agent + Middleware for:

    • Fine-grained control over middleware ordering
    • Combining with other custom middleware
    • Debugging middleware behavior
    • Complex middleware stacks

Core Concepts

Compensation Mapping

Maps each tool to its compensation (rollback) tool:

compensation_mapping = {
    "book_flight": "cancel_flight",      # book_flight → cancel_flight
    "reserve_inventory": "release_inventory",
    "charge_payment": "refund_payment",
}

Parameter Extraction Priority

When calling compensation tools, parameters are extracted in this order:

  1. state_mappers - Custom functions (highest priority)
  2. CompensationSchema - Declarative field mapping
  3. Heuristic - Common ID fields (id, booking_id, etc.)
  4. Recursive Search - Deep nested structure search
  5. LLM Extraction - AI-powered extraction (if enabled)
  6. Passthrough - Return entire result (last resort)

Parameter Extraction Options

Option 1: Automatic (Default)

Works out of the box for tools returning common ID patterns:

# Tool returns: {"id": "123", "status": "confirmed"}
# Compensation receives: {"id": "123"}

agent = create_comp_agent(
    model="gpt-4o",
    tools=[book_flight, cancel_flight],
    compensation_mapping={"book_flight": "cancel_flight"},
    # No additional configuration needed!
)

Option 2: Custom State Mappers

For complex extraction logic:

def extract_flight_params(result, original_params):
    """Custom extraction from nested result."""
    return {
        "booking_id": result["booking"]["confirmation_code"],
        "reason": f"Rollback for {original_params.get('destination')}",
    }

agent = create_comp_agent(
    model="gpt-4o",
    tools=[book_flight, cancel_flight],
    compensation_mapping={"book_flight": "cancel_flight"},
    state_mappers={"book_flight": extract_flight_params},
)

Option 3: Declarative Schema

Use CompensationSchema for path-based extraction without code:

from langchain_compensation import CompensationSchema, create_comp_agent

agent = create_comp_agent(
    model="gpt-4o",
    tools=[book_flight, cancel_flight],
    compensation_mapping={"book_flight": "cancel_flight"},
    compensation_schemas={
        "book_flight": CompensationSchema(
            param_mapping={
                "booking_id": "result.id",              # result["id"]
                "confirmation": "result.conf_code",     # result["conf_code"]
                "origin": "params.origin",              # original params
            },
            static_params={
                "reason": "Automatic rollback",
            },
        ),
    },
)

Path Syntax:

  • result.fieldresult["field"]
  • result.nested.fieldresult["nested"]["field"]
  • params.fieldoriginal_params["field"]
  • result.items[0].idresult["items"][0]["id"]
  • result.optional? → Optional field (no error if missing)

Option 4: LLM-Based Extraction

For complex results where heuristics fail:

agent = create_comp_agent(
    model="gpt-4o",
    tools=[book_flight, cancel_flight],
    compensation_mapping={"book_flight": "cancel_flight"},
    use_llm_extraction=True,           # Enable LLM extraction
    extraction_model="gpt-4o-mini",    # Use cheap model for extraction
)

The LLM receives the tool result and compensation tool schema, then intelligently extracts the required parameters.


Multi-Agent Support

Multiple agents can share a compensation log for coordinated rollback:

from langchain_compensation import create_comp_agent, CompensationLog

# Create shared log
shared_log = CompensationLog()

# Flight booking agent
flight_agent = create_comp_agent(
    model="gpt-4o",
    tools=[book_flight, cancel_flight],
    compensation_mapping={"book_flight": "cancel_flight"},
    shared_log=shared_log,
    agent_id="flight-agent",
)

# Hotel booking agent
hotel_agent = create_comp_agent(
    model="gpt-4o",
    tools=[book_hotel, cancel_hotel],
    compensation_mapping={"book_hotel": "cancel_hotel"},
    shared_log=shared_log,
    agent_id="hotel-agent",
)

# If hotel_agent fails, flight_agent's bookings are also rolled back!

Multi-Agent Flow

Flight Agent: book_flight()   → Success → Recorded in shared_log
Hotel Agent:  book_hotel()    → Success → Recorded in shared_log
Hotel Agent:  book_rental()   → FAILS!
                ↓
Shared log triggers coordinated rollback:
  1. cancel_rental() (if it was recorded)
  2. cancel_hotel()  (Hotel Agent's action)
  3. cancel_flight() (Flight Agent's action)
                ↓
Both agents' actions compensated atomically

Fault Tolerance with Checkpointing

Integrate with LangGraph checkpointers for crash recovery:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver

# Development: In-memory
agent = create_comp_agent(
    model="gpt-4o",
    tools=[...],
    compensation_mapping={...},
    checkpointer=MemorySaver(),
    use_checkpointing=True,
)

# Production: PostgreSQL
agent = create_comp_agent(
    model="gpt-4o",
    tools=[...],
    compensation_mapping={...},
    checkpointer=PostgresSaver(conn_string="postgresql://..."),
    use_checkpointing=True,
)

Recovery After Crash

from langchain_compensation import CheckpointMiddleware, CompensationMiddleware

# After process restart, restore state
checkpoint_middleware = CheckpointMiddleware(checkpointer=saved_checkpointer)
restored_log = checkpoint_middleware.restore_from_failure({"thread_id": "abc123"})

if restored_log:
    # Resume with restored compensation state
    middleware = CompensationMiddleware(
        compensation_mapping={...},
        shared_log=restored_log,
    )

Pluggable Strategies

Custom Error Detection

from langchain_compensation import (
    ErrorStrategy,
    ExplicitStatusStrategy,
    ContentDictStrategy,
    create_comp_agent,
)

class CustomAPIErrorStrategy(ErrorStrategy):
    """Detect errors from custom API responses."""

    def is_error(self, result) -> bool | None:
        content = result.content
        if isinstance(content, dict):
            if content.get("api_status") == "failed":
                return True
            if content.get("error_code") is not None:
                return True
        return None  # Defer to next strategy

agent = create_comp_agent(
    model="gpt-4o",
    tools=[...],
    compensation_mapping={...},
    error_strategies=[
        ExplicitStatusStrategy(),    # Check ToolMessage.status
        ContentDictStrategy(),       # Check {"error": ...}
        CustomAPIErrorStrategy(),    # Your custom logic
    ],
)

Custom Parameter Extraction

from langchain_compensation import (
    ExtractionStrategy,
    HeuristicExtractionStrategy,
    create_comp_agent,
)

class GraphQLExtractionStrategy(ExtractionStrategy):
    """Extract IDs from GraphQL-style responses."""

    def extract(self, result, original_params, comp_tool=None, tool_name=None):
        if isinstance(result, dict) and "data" in result:
            data = result["data"]
            # Find the first mutation result with an ID
            for key, value in data.items():
                if isinstance(value, dict) and "id" in value:
                    return {"id": value["id"]}
        return None  # Defer to next strategy

agent = create_comp_agent(
    model="gpt-4o",
    tools=[...],
    compensation_mapping={...},
    extraction_strategies=[
        GraphQLExtractionStrategy(),
        HeuristicExtractionStrategy(),
    ],
)

Retry Before Rollback

By default, when a tool fails, the middleware immediately triggers a rollback of all completed actions. With retry strategies, you can attempt to recover from transient failures before resorting to rollback.

Basic Retry Configuration

from langchain_compensation import create_comp_agent

agent = create_comp_agent(
    model="gpt-4o",
    tools=[book_flight, cancel_flight, book_hotel, cancel_hotel],
    compensation_mapping={
        "book_flight": "cancel_flight",
        "book_hotel": "cancel_hotel",
    },
    max_retries=3,           # Retry up to 3 times before rollback
    retry_backoff=1.0,       # 1 second base delay with exponential backoff
)

Retry with Parameter Transformation

When a resource fails (e.g., machine breakdown), try alternative resources:

from langchain_compensation import (
    create_comp_agent,
    AlternativeResourceTransformer,
)

agent = create_comp_agent(
    model="gpt-4o",
    tools=[schedule_job, cancel_job],
    compensation_mapping={"schedule_job": "cancel_job"},
    max_retries=3,
    retry_transformer=AlternativeResourceTransformer(
        resource_field="machine_id",
        alternatives_fn=lambda current, ctx: ["machine_2", "machine_3", "machine_4"],
    ),
    partial_rollback=True,  # Only rollback dependent actions
)

Custom Retry Strategies

Control which failures should be retried:

from langchain_compensation import (
    create_comp_agent,
    RetryStrategy,
    FailureType,
    NetworkErrorStrategy,
    ToolSpecificRetryStrategy,
    RetryConfig,
)

# Per-tool retry configuration
agent = create_comp_agent(
    model="gpt-4o",
    tools=[...],
    compensation_mapping={...},
    max_retries=5,
    retry_strategies=[
        ToolSpecificRetryStrategy({
            "schedule_job": RetryConfig(max_retries=5),   # More retries for flaky tool
            "validate_input": RetryConfig(max_retries=0), # Never retry validation
        }),
        NetworkErrorStrategy(),  # Retry network errors
    ],
)

Failure Classification

The retry system classifies failures as:

Type Behavior Examples
TRANSIENT Will retry Network timeout, rate limit, temporary unavailability
PERMANENT No retry Validation error, not found, unauthorized
UNKNOWN Defer to default Unrecognized error patterns

Built-in Retry Strategies

Strategy Description
NetworkErrorStrategy Classifies network/timeout errors as transient
ValidationErrorStrategy Classifies validation/auth errors as permanent
ResourceUnavailableStrategy Configurable busy vs broken detection
ToolSpecificRetryStrategy Per-tool retry configuration
CompositeRetryStrategy Chains multiple strategies

Partial Rollback

With partial_rollback=True, only actions that depend on the failed action are rolled back. Independent successful actions are preserved:

agent = create_comp_agent(
    model="gpt-4o",
    tools=[...],
    compensation_mapping={...},
    max_retries=3,
    partial_rollback=True,  # Preserve independent successful actions
)

Example Flow:

1. book_flight() → Success (FL-123)
2. book_hotel(near_airport="FL-123") → Success (depends on flight)
3. book_car() → Success (independent)
4. book_restaurant() → FAILS!

With partial_rollback=True:
  - book_car is PRESERVED (independent)
  - book_hotel is rolled back (depends on flight conceptually)
  - book_flight may be rolled back (based on dependency chain)

With partial_rollback=False (default):
  - ALL actions are rolled back

API Reference

create_comp_agent

def create_comp_agent(
    model: str | BaseChatModel,
    tools: Sequence[BaseTool | Callable],
    *,
    compensation_mapping: dict[str, str],

    # Parameter Extraction
    state_mappers: dict[str, Callable] = None,
    compensation_schemas: dict[str, CompensationSchema] = None,
    use_llm_extraction: bool = False,
    extraction_model: str = "gpt-4o-mini",

    # Multi-Agent
    shared_log: CompensationLog = None,
    agent_id: str = None,

    # Pluggable Strategies
    error_strategies: list[ErrorStrategy] = None,
    extraction_strategies: list[ExtractionStrategy] = None,

    # Retry Configuration (v0.5.8)
    retry_strategies: list[RetryStrategy] = None,
    retry_transformer: RetryTransformer = None,
    max_retries: int = 0,            # 0 = no retry (default)
    retry_backoff: float = 1.0,      # Base delay in seconds
    partial_rollback: bool = False,  # Only rollback dependent actions

    # Parallel Execution Control
    enable_batch_abort: bool = True,
    sequential_execution: bool = False,
    track_intent: bool = False,

    # Fault Tolerance
    checkpointer: Checkpointer = None,
    use_checkpointing: bool = False,

    # Standard Agent Parameters
    system_prompt: str = None,
    middleware: Sequence[AgentMiddleware] = (),
    ...
) -> CompiledStateGraph

CompensationSchema

@dataclass
class CompensationSchema:
    param_mapping: Dict[str, str]   # Maps param name → path expression
    static_params: Dict[str, Any]   # Static values to always include

    def extract(self, result, original_params) -> Dict[str, Any]

CompensationLog

class CompensationLog:
    def add(self, record: CompensationRecord) -> None
    def update(self, record_id: str, **kwargs) -> None
    def get(self, record_id: str) -> CompensationRecord | None
    def snapshot(self) -> Dict[str, CompensationRecord]  # Thread-safe copy
    def atomic_batch(self, operations: List[tuple]) -> None  # Atomic ops
    def filter_by_agent(self, agent_id: str) -> List[CompensationRecord]
    def get_rollback_plan(self, agent_id: str = None) -> List[CompensationRecord]
    def mark_compensated(self, record_id: str) -> None
    def clear(self, agent_id: str = None) -> None

CompensationMiddleware

class CompensationMiddleware(AgentMiddleware):
    def __init__(
        self,
        compensation_mapping: Dict[str, str],
        tools: List[BaseTool] = None,
        state_mappers: Dict[str, Callable] = None,
        shared_log: CompensationLog = None,
        agent_id: str = None,
        compensation_schemas: Dict[str, CompensationSchema] = None,
        error_strategies: List[ErrorStrategy] = None,
        extraction_strategies: List[ExtractionStrategy] = None,
        # Parallel execution control
        enable_batch_abort: bool = True,
        sequential_execution: bool = False,
        track_intent: bool = False,
        # Retry configuration (v0.5.8)
        retry_strategies: List[RetryStrategy] = None,
        retry_transformer: RetryTransformer = None,
        max_retries: int = 0,
        retry_backoff: float = 1.0,
        partial_rollback: bool = False,
    )

Error Strategies

Strategy Description
ExplicitStatusStrategy Checks ToolMessage.status == "error"
ContentDictStrategy Checks for {"error": ...}, {"success": false}, {"ok": false}
ExceptionContentStrategy Detects exception-like patterns in content
CompositeErrorStrategy Chains multiple strategies together

Extraction Strategies

Strategy Description
StateMappersStrategy Uses developer-provided functions
SchemaExtractionStrategy Uses CompensationSchema declarations
HeuristicExtractionStrategy Looks for common ID fields
RecursiveSearchStrategy Deep searches nested structures
LLMExtractionStrategy Uses LLM for intelligent extraction
PassthroughStrategy Returns entire result as-is
CompositeExtractionStrategy Chains multiple strategies

Retry Strategies

Strategy Description
NetworkErrorStrategy Classifies network/timeout errors as transient
ValidationErrorStrategy Classifies validation/auth errors as permanent
ResourceUnavailableStrategy Configurable busy vs broken resource detection
ToolSpecificRetryStrategy Per-tool retry configuration via RetryConfig
CompositeRetryStrategy Chains multiple strategies together

Retry Transformers

Transformer Description
IdentityTransformer Returns parameters unchanged (default)
AlternativeResourceTransformer Picks alternative resources on failure
CompositeTransformer Chains multiple transformers

Retry Data Classes

Class Description
FailureType Enum: TRANSIENT, PERMANENT, UNKNOWN
RetryConfig Configuration: max_retries, base_delay, max_delay, exponential_base, jitter
RetryContext Context passed to transformers: tool_name, attempt, original_params, last_result
RetryResult Result: success, attempt, result, elapsed_time, failure_type, params_used

Use Cases

  • Travel Booking: Cancel flights/hotels/cars if any booking fails
  • E-commerce: Reverse inventory, refund payments on order failure
  • Financial: Rollback transfers, cancel holds on transaction failure
  • Infrastructure: Delete created resources if provisioning fails
  • Multi-Service: Coordinate rollback across microservices
  • AI Agents: Clean up side effects when agent workflows fail

Migration Guide

From v0.4.x to v0.5.8

v0.5.8 is fully backwards compatible. New retry features are opt-in:

# v0.4.x code still works unchanged
agent = create_comp_agent(
    model="gpt-4o",
    tools=[...],
    compensation_mapping={...},
)

# v0.5.8 additions (all optional)
agent = create_comp_agent(
    model="gpt-4o",
    tools=[...],
    compensation_mapping={...},
    # New in v0.5.8:
    max_retries=3,                   # Retry before rollback
    retry_backoff=1.0,               # Exponential backoff
    retry_strategies=[...],          # Custom failure classification
    retry_transformer=...,           # Parameter transformation between retries
    partial_rollback=True,           # Only rollback dependent actions
)

From v0.3.x to v0.4.0+

# v0.3.x code still works
agent = create_comp_agent(
    model="gpt-4o",
    tools=[...],
    compensation_mapping={...},
    state_mappers={...},  # Still supported
)

# v0.4.0 additions (all optional)
agent = create_comp_agent(
    model="gpt-4o",
    tools=[...],
    compensation_mapping={...},
    compensation_schemas={...},      # Declarative extraction
    shared_log=CompensationLog(),    # Multi-agent support
    agent_id="my-agent",             # Agent identification
    use_llm_extraction=True,         # LLM-based extraction
    use_checkpointing=True,          # Fault tolerance
)

Requirements

  • Python 3.9+
  • langchain >= 0.3.0
  • langchain-core >= 0.3.0
  • langgraph >= 0.2.0

Optional:

  • langchain-openai (for LLM extraction with GPT models)
  • langchain-anthropic (for LLM extraction with Claude models)

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License - see LICENSE file for details.

Credits

Inspired by the Saga pattern for distributed transactions and built on LangChain's middleware system.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langchain_compensation-0.5.8.tar.gz (61.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

langchain_compensation-0.5.8-py3-none-any.whl (53.7 kB view details)

Uploaded Python 3

File details

Details for the file langchain_compensation-0.5.8.tar.gz.

File metadata

  • Download URL: langchain_compensation-0.5.8.tar.gz
  • Upload date:
  • Size: 61.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.10

File hashes

Hashes for langchain_compensation-0.5.8.tar.gz
Algorithm Hash digest
SHA256 445dd2e88ed1bb017699e4e06d7b44e01b8832d7c2675593b7887779bd42430b
MD5 570a27d09b7805ee51b82cf89587f3d6
BLAKE2b-256 8bb7a904e530d17f1bbe3cfd9f50f0f41db2553d9802617112a0a101d4be0024

See more details on using hashes here.

File details

Details for the file langchain_compensation-0.5.8-py3-none-any.whl.

File metadata

File hashes

Hashes for langchain_compensation-0.5.8-py3-none-any.whl
Algorithm Hash digest
SHA256 c150b968c1986868f024df681f853a390298550b74a736deb164a838fa4cb7d4
MD5 8b49b2e26e3298254cfc96fc9a7e3073
BLAKE2b-256 975dee104f5e7197bd16bdbd075704d50f102bcc1b7d1e94391c7bae657eecce

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page