Composable compensation middleware for LangChain agents with automatic rollback, pluggable strategies, multi-agent support, and LLM-driven parameter extraction
Project description
LangChain Compensation
v0.4.0 — Composable compensation middleware for LangChain agents with automatic rollback, pluggable strategies, multi-agent support, and LLM-driven parameter extraction. Implements the Saga pattern for distributed transactions in agent workflows.
Features
- Automatic Rollback: Compensates completed actions when failures occur using the Saga pattern
- DAG-Based Ordering: Rolls back actions in correct dependency order (not just LIFO)
- Pluggable Error Detection: Extensible strategies for detecting tool failures
- Pluggable Parameter Extraction: Multiple strategies including LLM-based extraction
- Multi-Agent Support: Shared compensation log for coordinated rollback across agents
- Parallel Execution Safe: Thread-safe with atomic batch operations
- Fault Tolerance: Checkpoint integration with LangGraph for crash recovery
- Simple API: Drop-in replacement for
create_agent()with one additional parameter
Installation
pip install langchain-compensation
For LLM-based parameter extraction (optional):
pip install langchain-compensation langchain-openai
# or
pip install langchain-compensation langchain-anthropic
Quick Start
from langchain_compensation import create_comp_agent
from langchain_core.tools import tool
@tool
def book_flight(destination: str) -> dict:
"""Books a flight to the destination."""
return {"id": f"FL-{destination.upper()}-123", "status": "confirmed"}
@tool
def cancel_flight(id: str) -> str:
"""Cancels a flight booking."""
return f"Flight {id} cancelled"
@tool
def book_hotel(location: str) -> dict:
"""Books a hotel at the location."""
if "fail" in location.lower():
return {"status": "error", "message": "No rooms available"}
return {"id": f"HT-{location.upper()}-456", "status": "confirmed"}
@tool
def cancel_hotel(id: str) -> str:
"""Cancels a hotel booking."""
return f"Hotel {id} cancelled"
# Create agent with automatic compensation
agent = create_comp_agent(
model="gpt-4o",
tools=[book_flight, cancel_flight, book_hotel, cancel_hotel],
compensation_mapping={
"book_flight": "cancel_flight",
"book_hotel": "cancel_hotel",
},
)
# When hotel booking fails, flight is automatically cancelled!
result = agent.invoke({
"messages": [("user", "Book a flight to Paris and a hotel in FailCity")]
})
How It Works
1. Agent calls book_flight("Paris") → Success, records compensation action
2. Agent calls book_hotel("FailCity") → Fails with error status
3. Middleware detects failure → Triggers automatic rollback
4. Middleware calls cancel_flight() → Compensates the successful booking
5. Agent continues with clean state
The compensation log tracks all compensatable actions with their results, enabling intelligent parameter extraction for compensation calls.
Core Concepts
Compensation Mapping
Maps each tool to its compensation (rollback) tool:
compensation_mapping = {
"book_flight": "cancel_flight", # book_flight → cancel_flight
"reserve_inventory": "release_inventory",
"charge_payment": "refund_payment",
}
Parameter Extraction Priority
When calling compensation tools, parameters are extracted in this order:
- state_mappers - Custom functions (highest priority)
- CompensationSchema - Declarative field mapping
- Heuristic - Common ID fields (id, booking_id, etc.)
- Recursive Search - Deep nested structure search
- LLM Extraction - AI-powered extraction (if enabled)
- Passthrough - Return entire result (last resort)
Parameter Extraction Options
Option 1: Automatic (Default)
Works out of the box for tools returning common ID patterns:
# Tool returns: {"id": "123", "status": "confirmed"}
# Compensation receives: {"id": "123"}
agent = create_comp_agent(
model="gpt-4o",
tools=[book_flight, cancel_flight],
compensation_mapping={"book_flight": "cancel_flight"},
# No additional configuration needed!
)
Option 2: Custom State Mappers
For complex extraction logic:
def extract_flight_params(result, original_params):
"""Custom extraction from nested result."""
return {
"booking_id": result["booking"]["confirmation_code"],
"reason": f"Rollback for {original_params.get('destination')}",
}
agent = create_comp_agent(
model="gpt-4o",
tools=[book_flight, cancel_flight],
compensation_mapping={"book_flight": "cancel_flight"},
state_mappers={"book_flight": extract_flight_params},
)
Option 3: Declarative Schema
Use CompensationSchema for path-based extraction without code:
from langchain_compensation import CompensationSchema, create_comp_agent
agent = create_comp_agent(
model="gpt-4o",
tools=[book_flight, cancel_flight],
compensation_mapping={"book_flight": "cancel_flight"},
compensation_schemas={
"book_flight": CompensationSchema(
param_mapping={
"booking_id": "result.id", # result["id"]
"confirmation": "result.conf_code", # result["conf_code"]
"origin": "params.origin", # original params
},
static_params={
"reason": "Automatic rollback",
},
),
},
)
Path Syntax:
result.field→result["field"]result.nested.field→result["nested"]["field"]params.field→original_params["field"]result.items[0].id→result["items"][0]["id"]result.optional?→ Optional field (no error if missing)
Option 4: LLM-Based Extraction
For complex results where heuristics fail:
agent = create_comp_agent(
model="gpt-4o",
tools=[book_flight, cancel_flight],
compensation_mapping={"book_flight": "cancel_flight"},
use_llm_extraction=True, # Enable LLM extraction
extraction_model="gpt-4o-mini", # Use cheap model for extraction
)
The LLM receives the tool result and compensation tool schema, then intelligently extracts the required parameters.
Multi-Agent Support
Multiple agents can share a compensation log for coordinated rollback:
from langchain_compensation import create_comp_agent, CompensationLog
# Create shared log
shared_log = CompensationLog()
# Flight booking agent
flight_agent = create_comp_agent(
model="gpt-4o",
tools=[book_flight, cancel_flight],
compensation_mapping={"book_flight": "cancel_flight"},
shared_log=shared_log,
agent_id="flight-agent",
)
# Hotel booking agent
hotel_agent = create_comp_agent(
model="gpt-4o",
tools=[book_hotel, cancel_hotel],
compensation_mapping={"book_hotel": "cancel_hotel"},
shared_log=shared_log,
agent_id="hotel-agent",
)
# If hotel_agent fails, flight_agent's bookings are also rolled back!
Multi-Agent Flow
Flight Agent: book_flight() → Success → Recorded in shared_log
Hotel Agent: book_hotel() → Success → Recorded in shared_log
Hotel Agent: book_rental() → FAILS!
↓
Shared log triggers coordinated rollback:
1. cancel_rental() (if it was recorded)
2. cancel_hotel() (Hotel Agent's action)
3. cancel_flight() (Flight Agent's action)
↓
Both agents' actions compensated atomically
Fault Tolerance with Checkpointing
Integrate with LangGraph checkpointers for crash recovery:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
# Development: In-memory
agent = create_comp_agent(
model="gpt-4o",
tools=[...],
compensation_mapping={...},
checkpointer=MemorySaver(),
use_checkpointing=True,
)
# Production: PostgreSQL
agent = create_comp_agent(
model="gpt-4o",
tools=[...],
compensation_mapping={...},
checkpointer=PostgresSaver(conn_string="postgresql://..."),
use_checkpointing=True,
)
Recovery After Crash
from langchain_compensation import CheckpointMiddleware, CompensationMiddleware
# After process restart, restore state
checkpoint_middleware = CheckpointMiddleware(checkpointer=saved_checkpointer)
restored_log = checkpoint_middleware.restore_from_failure({"thread_id": "abc123"})
if restored_log:
# Resume with restored compensation state
middleware = CompensationMiddleware(
compensation_mapping={...},
shared_log=restored_log,
)
Pluggable Strategies
Custom Error Detection
from langchain_compensation import (
ErrorStrategy,
ExplicitStatusStrategy,
ContentDictStrategy,
create_comp_agent,
)
class CustomAPIErrorStrategy(ErrorStrategy):
"""Detect errors from custom API responses."""
def is_error(self, result) -> bool | None:
content = result.content
if isinstance(content, dict):
if content.get("api_status") == "failed":
return True
if content.get("error_code") is not None:
return True
return None # Defer to next strategy
agent = create_comp_agent(
model="gpt-4o",
tools=[...],
compensation_mapping={...},
error_strategies=[
ExplicitStatusStrategy(), # Check ToolMessage.status
ContentDictStrategy(), # Check {"error": ...}
CustomAPIErrorStrategy(), # Your custom logic
],
)
Custom Parameter Extraction
from langchain_compensation import (
ExtractionStrategy,
HeuristicExtractionStrategy,
create_comp_agent,
)
class GraphQLExtractionStrategy(ExtractionStrategy):
"""Extract IDs from GraphQL-style responses."""
def extract(self, result, original_params, comp_tool=None, tool_name=None):
if isinstance(result, dict) and "data" in result:
data = result["data"]
# Find the first mutation result with an ID
for key, value in data.items():
if isinstance(value, dict) and "id" in value:
return {"id": value["id"]}
return None # Defer to next strategy
agent = create_comp_agent(
model="gpt-4o",
tools=[...],
compensation_mapping={...},
extraction_strategies=[
GraphQLExtractionStrategy(),
HeuristicExtractionStrategy(),
],
)
API Reference
create_comp_agent
def create_comp_agent(
model: str | BaseChatModel,
tools: Sequence[BaseTool | Callable],
*,
compensation_mapping: dict[str, str],
# Parameter Extraction
state_mappers: dict[str, Callable] = None,
compensation_schemas: dict[str, CompensationSchema] = None,
use_llm_extraction: bool = False,
extraction_model: str = "gpt-4o-mini",
# Multi-Agent
shared_log: CompensationLog = None,
agent_id: str = None,
# Pluggable Strategies
error_strategies: list[ErrorStrategy] = None,
extraction_strategies: list[ExtractionStrategy] = None,
# Fault Tolerance
checkpointer: Checkpointer = None,
use_checkpointing: bool = False,
# Standard Agent Parameters
system_prompt: str = None,
middleware: Sequence[AgentMiddleware] = (),
...
) -> CompiledStateGraph
CompensationSchema
@dataclass
class CompensationSchema:
param_mapping: Dict[str, str] # Maps param name → path expression
static_params: Dict[str, Any] # Static values to always include
def extract(self, result, original_params) -> Dict[str, Any]
CompensationLog
class CompensationLog:
def add(self, record: CompensationRecord) -> None
def update(self, record_id: str, **kwargs) -> None
def get(self, record_id: str) -> CompensationRecord | None
def snapshot(self) -> Dict[str, CompensationRecord] # Thread-safe copy
def atomic_batch(self, operations: List[tuple]) -> None # Atomic ops
def filter_by_agent(self, agent_id: str) -> List[CompensationRecord]
def get_rollback_plan(self, agent_id: str = None) -> List[CompensationRecord]
def mark_compensated(self, record_id: str) -> None
def clear(self, agent_id: str = None) -> None
CompensationMiddleware
class CompensationMiddleware(AgentMiddleware):
def __init__(
self,
compensation_mapping: Dict[str, str],
tools: List[BaseTool] = None,
state_mappers: Dict[str, Callable] = None,
shared_log: CompensationLog = None,
agent_id: str = None,
compensation_schemas: Dict[str, CompensationSchema] = None,
error_strategies: List[ErrorStrategy] = None,
extraction_strategies: List[ExtractionStrategy] = None,
)
Error Strategies
| Strategy | Description |
|---|---|
ExplicitStatusStrategy |
Checks ToolMessage.status == "error" |
ContentDictStrategy |
Checks for {"error": ...}, {"success": false}, {"ok": false} |
ExceptionContentStrategy |
Detects exception-like patterns in content |
CompositeErrorStrategy |
Chains multiple strategies together |
Extraction Strategies
| Strategy | Description |
|---|---|
StateMappersStrategy |
Uses developer-provided functions |
SchemaExtractionStrategy |
Uses CompensationSchema declarations |
HeuristicExtractionStrategy |
Looks for common ID fields |
RecursiveSearchStrategy |
Deep searches nested structures |
LLMExtractionStrategy |
Uses LLM for intelligent extraction |
PassthroughStrategy |
Returns entire result as-is |
CompositeExtractionStrategy |
Chains multiple strategies |
Use Cases
- Travel Booking: Cancel flights/hotels/cars if any booking fails
- E-commerce: Reverse inventory, refund payments on order failure
- Financial: Rollback transfers, cancel holds on transaction failure
- Infrastructure: Delete created resources if provisioning fails
- Multi-Service: Coordinate rollback across microservices
- AI Agents: Clean up side effects when agent workflows fail
Migration from v0.3.x
v0.4.0 is fully backwards compatible. New features are opt-in:
# v0.3.x code still works
agent = create_comp_agent(
model="gpt-4o",
tools=[...],
compensation_mapping={...},
state_mappers={...}, # Still supported
)
# v0.4.0 additions (all optional)
agent = create_comp_agent(
model="gpt-4o",
tools=[...],
compensation_mapping={...},
# New in v0.4.0:
compensation_schemas={...}, # Declarative extraction
shared_log=CompensationLog(), # Multi-agent support
agent_id="my-agent", # Agent identification
use_llm_extraction=True, # LLM-based extraction
use_checkpointing=True, # Fault tolerance
)
Requirements
- Python 3.9+
- langchain >= 0.3.0
- langchain-core >= 0.3.0
- langgraph >= 0.2.0
Optional:
- langchain-openai (for LLM extraction with GPT models)
- langchain-anthropic (for LLM extraction with Claude models)
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
MIT License - see LICENSE file for details.
Credits
Inspired by the Saga pattern for distributed transactions and built on LangChain's middleware system.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langchain_compensation-0.4.0.tar.gz.
File metadata
- Download URL: langchain_compensation-0.4.0.tar.gz
- Upload date:
- Size: 30.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0dbdeea1f35bd568079fd4e1370a7640f443741b9341836bfd976146a560a8e6
|
|
| MD5 |
2a3fb70f5cc188328bd1cef44bfdd16b
|
|
| BLAKE2b-256 |
cb24d32677f956e9bdc010cf424bb095bfda219dff988bc3238cc32a23fda835
|
File details
Details for the file langchain_compensation-0.4.0-py3-none-any.whl.
File metadata
- Download URL: langchain_compensation-0.4.0-py3-none-any.whl
- Upload date:
- Size: 31.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.10
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
31c4d8c2a8bf9908f23c2f478e3d0cc9a49a24ccc3f689f3cec82f7ae8ae3a29
|
|
| MD5 |
e4c4876c8733fcc4bc4c6111f0a2d073
|
|
| BLAKE2b-256 |
211d4e39ea50b9190502821747eafbd01df402f9e0404c43bca91e157d1a14f2
|