Skip to main content

Keep humans in the loop. HITL control library for AI agent workflows with LangGraph.

Project description

hitloop

Human-in-the-Loop control library for AI agent workflows with LangGraph integration.

hitloop provides explicit control nodes for human oversight in AI agent workflows, with strong instrumentation for research experiments. Unlike passive monitoring, human approval is a first-class control signal and event in the execution trace.

Core Concept

LLM proposes action → HITL policy decides → Human approves/rejects → Tool executes → Telemetry logs all

Human approval is not a UI gimmick. It is:

  • A control signal that gates execution
  • A first-class event in the trace
  • A research artifact for measuring oversight effectiveness

Quick Start

Installation

# Clone the repository
git clone https://github.com/ebaenamar/hitloop.git
cd hitloop

# Install with uv (recommended)
uv pip install -e .

# Or with pip
pip install -e .

# For development
pip install -e ".[dev]"

Run an Example

# Recommended: Using LangGraph's native interrupt() (v0.4.0+)
python examples/interrupt_example.py

# Legacy: CLI-based approval prompts
python examples/basic_workflow.py

# Auto-approve mode (no prompts)
python examples/basic_workflow.py --auto

# Run a full experiment
python examples/run_experiment.py --n-trials 20

Architecture

┌─────────────────────────────────────────────────────────────┐
│                      LangGraph Workflow                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐    ┌──────────┐    ┌──────────────────────┐  │
│  │   LLM    │───►│  HITL    │───►│   Tool Executor      │  │
│  │  Node    │    │  Gate    │    │                      │  │
│  └──────────┘    └────┬─────┘    └──────────────────────┘  │
│                       │                                     │
│                       ▼                                     │
│              ┌────────────────┐                             │
│              │  HITL Policy   │                             │
│              │  ┌──────────┐  │                             │
│              │  │ Approval │  │◄──► Human (CLI/Web/etc)    │
│              │  │ Backend  │  │                             │
│              │  └──────────┘  │                             │
│              └────────────────┘                             │
│                       │                                     │
│                       ▼                                     │
│              ┌────────────────┐                             │
│              │   Telemetry    │───► SQLite / Analysis      │
│              │    Logger      │                             │
│              └────────────────┘                             │
└─────────────────────────────────────────────────────────────┘

API Overview

Core Models

from hitloop import Action, Decision, RiskClass

# Define an action
action = Action(
    tool_name="send_email",
    tool_args={"recipient": "alice@example.com", "subject": "Hello"},
    risk_class=RiskClass.MEDIUM,
    side_effects=["email_sent"],
    rationale="Sending follow-up email to client",
)

# Decisions from human review
decision = Decision(
    action_id=action.id,
    approved=True,
    reason="Verified recipient is correct",
    decided_by="human:operator",
    latency_ms=1500.0,
)

Policies

Three built-in policies for different oversight tiers:

from hitloop import AlwaysApprovePolicy, RiskBasedPolicy, AuditPlusEscalatePolicy

# Tier 4: No human oversight (baseline)
policy = AlwaysApprovePolicy()

# Risk-based: Approve high-risk actions only
policy = RiskBasedPolicy(
    require_approval_for_high=True,
    require_approval_for_medium=False,
    high_risk_tools=["send_email", "delete_record"],
)

# Audit + Escalate: Random sampling + anomaly detection
policy = AuditPlusEscalatePolicy(
    audit_sample_rate=0.1,  # 10% random audit
    escalate_on_high_risk=True,
    anomaly_signals=["unusual_recipient", "large_amount"],
)

Adding a New Policy

Create a single file in src/hitloop/policies/:

# src/hitloop/policies/my_policy.py
from hitloop.core.interfaces import HITLPolicy
from hitloop.core.models import Action, Decision

class MyCustomPolicy(HITLPolicy):
    @property
    def name(self) -> str:
        return "my_custom"

    def should_request_approval(
        self, action: Action, state: dict
    ) -> tuple[bool, str]:
        # Your logic here
        if action.tool_name in self.critical_tools:
            return True, "Critical tool requires approval"
        return False, "Auto-approved"

LangGraph Integration

Add human approval to any LangGraph agent:

from hitloop import RiskBasedPolicy, ApprovalRequest, Action, Decision, RiskClass
from hitloop.backends import AutoApproveBackend, CLIBackend

# 1. Configure policy - which actions need approval?
policy = RiskBasedPolicy(
    require_approval_for_high=True,
    high_risk_tools=["send_email", "delete_file", "transfer_money"],
)

# 2. Choose backend - how to get approval?
backend = CLIBackend()  # Interactive CLI prompts
# backend = AutoApproveBackend()  # For testing

# 3. In your LangGraph node, check if approval is needed:
async def hitl_gate_node(state):
    action = state["pending_action"]
    needs_approval, reason = policy.should_request_approval(action, state)
    
    if needs_approval:
        request = ApprovalRequest(run_id="my-run", action=action)
        decision = await backend.request_approval(request)
        return {"approval_decision": decision}
    else:
        # Auto-approve low-risk actions
        return {"approval_decision": Decision(action_id=action.id, approved=True)}

Full working example: See examples/langgraph_agent.py

# Run with simulated LLM (no API key needed)
python examples/langgraph_agent.py --simulate --auto

# With CLI approval prompts
python examples/langgraph_agent.py --simulate

Third-Party Integrations (Slack, Telegram, Discord, etc.)

hitloop is completely agnostic to your approval channel. Use WebhookBackend to integrate with any service:

from hitloop.backends import WebhookBackend

# Your custom function to send to Slack/Telegram/Discord/etc.
async def send_to_slack(request, callback_id, callback_url):
    await slack_client.chat_postMessage(
        channel="#approvals",
        text=f"Approve {request.action.tool_name}?",
        # Include buttons that POST to callback_url
    )

backend = WebhookBackend(
    send_request=send_to_slack,
    timeout_seconds=300,  # 5 min timeout (rejects if no response)
    callback_base_url="https://your-app.com/hitloop/callback",
)

# In your webhook handler (FastAPI/Flask):
@app.post("/hitloop/callback/{callback_id}")
async def handle_callback(callback_id: str, data: dict):
    await backend.handle_callback(
        callback_id=callback_id,
        approved=data["approved"],
        decided_by=f"slack:{data['user']}",
    )

Full working example: See examples/webhook_server.py

# Install server dependencies
pip install hitloop[server]

# Run the webhook server
uvicorn examples.webhook_server:app --port 8000

# Test with curl
curl -X POST http://localhost:8000/test/request-approval
# Then approve/reject via the callback URL shown in console

Using LangGraph's Native interrupt() (Recommended)

New in v0.4.0: hitloop now supports LangGraph's native interrupt() function for human-in-the-loop. This is the recommended approach as it:

  • Uses LangGraph's built-in checkpointing for persistence
  • Is compatible with agent-inbox and LangGraph Studio
  • Follows LangGraph's standard patterns
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command

from hitloop import RiskBasedPolicy
from hitloop.langgraph import create_interrupt_gate_node, create_interrupt_tool_node, should_execute

# Setup
policy = RiskBasedPolicy(high_risk_tools=["send_email", "delete_file"])
gate = create_interrupt_gate_node(policy)
executor = create_interrupt_tool_node({"send_email": send_email_func})

# Build graph
builder = StateGraph(MyState)
builder.add_node("hitl_gate", gate)
builder.add_node("execute", executor)
builder.add_conditional_edges("hitl_gate", should_execute, {"execute": "execute", "skip": END})

graph = builder.compile(checkpointer=MemorySaver())

# Run - will pause at interrupt for high-risk actions
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"proposed_action": action}, config)

# Check for interrupt
if "__interrupt__" in result:
    print(result["__interrupt__"])  # Shows approval request
    # Resume with human decision
    graph.invoke(Command(resume={"approved": True, "decided_by": "human:alice"}), config)

Full example: See examples/interrupt_example.py

Production Deployment

Persistent Storage (Survives Restarts)

hitloop follows LangGraph's pattern for pluggable storage backends:

# Install with your preferred backend
pip install hitloop[postgres]  # PostgreSQL
pip install hitloop[redis]     # Redis
from hitloop.persistence import PostgresApprovalStore, RedisApprovalStore
from hitloop.backends import PersistentWebhookBackend

# PostgreSQL (recommended for most cases)
store = await PostgresApprovalStore.from_conn_string(
    "postgresql://user:pass@localhost:5432/hitloop"
)
await store.setup()  # Creates tables

# Or Redis (faster, built-in TTL)
store = await RedisApprovalStore.from_url("redis://localhost:6379/0")

# Use with PersistentWebhookBackend
backend = PersistentWebhookBackend(
    send_request=my_slack_sender,
    store=store,
    timeout_seconds=300,
)

Retry & Circuit Breaker

Built-in resilience for production:

from hitloop.backends import PersistentWebhookBackend, RetryConfig, CircuitBreakerConfig

backend = PersistentWebhookBackend(
    send_request=my_sender,
    store=store,
    retry_config=RetryConfig(
        max_retries=3,
        initial_delay=1.0,
        exponential_base=2.0,
    ),
    circuit_breaker_config=CircuitBreakerConfig(
        failure_threshold=5,    # Open after 5 failures
        recovery_timeout=30.0,  # Try again after 30s
    ),
)

# Check circuit state
print(backend.get_circuit_state())  # CLOSED, OPEN, or HALF_OPEN

Recovery After Restart

# On startup, recover pending requests
pending = await store.list_pending(thread_id="user-123")
for record in pending:
    # Re-send or resolve as needed
    future = backend.register_pending(record)

Running Experiments

from hitloop import TelemetryLogger
from hitloop.eval import ExperimentRunner, ExperimentCondition
from hitloop.eval.runner import create_standard_conditions
from hitloop.scenarios import EmailDraftScenario

# Setup
logger = TelemetryLogger("experiment.db")
scenario = EmailDraftScenario()

# Create standard conditions (4 policies × scenarios)
conditions = create_standard_conditions(
    scenario=scenario,
    n_trials=20,
    injection_rate=0.2,  # 20% error injection
)

# Run
runner = ExperimentRunner(logger)
for c in conditions:
    runner.add_condition(c)

await runner.run_all()

# Export
runner.export_results("results.csv", "summary.json")

Output: results.csv

run_id scenario_id condition_id policy_name task_success approval_requested injected_error error_caught
abc123 email_draft risk_based risk_based 1 1 0 0
def456 email_draft risk_based risk_based 0 1 1 1

Output: summary.json

{
  "risk_based": {
    "n_runs": 20,
    "success_rate": 0.85,
    "approval_rate": 0.65,
    "error_catch_rate": 0.75,
    "false_reject_rate": 0.05,
    "human_latency_mean_ms": 1200.5
  }
}

Research Alignment

hitloop metrics map directly to the research framework:

Metric Research Concept Description
success_rate Quality Task completion rate
approval_rate Leverage proxy Human involvement frequency
error_catch_rate Appropriate reliance Injected error detection
false_reject_rate Appropriate reliance Unnecessary rejections
human_latency_ms Human burden Time cost per decision
cost_proxy Efficiency Token/call overhead

Injected Errors for Ground Truth

The error injector provides ground truth for measuring oversight effectiveness:

from hitloop.eval import ErrorInjector, InjectionConfig

injector = ErrorInjector(InjectionConfig(
    injection_rate=0.2,
    injection_types=[
        InjectionType.WRONG_RECIPIENT,
        InjectionType.WRONG_RECORD_ID,
    ]
))

# Every action has known correctness
result = injector.maybe_inject(action)
if result.injected:
    # This action is KNOWN to be wrong
    # If human approves it → false negative
    # If human rejects it → true positive (error caught)

Project Structure

hitloop/
├── src/hitloop/
│   ├── core/
│   │   ├── models.py      # Action, Decision, TraceEvent
│   │   ├── interfaces.py  # ApprovalBackend, HITLPolicy
│   │   └── logger.py      # TelemetryLogger (SQLite)
│   ├── policies/
│   │   ├── always_approve.py
│   │   ├── risk_based.py
│   │   └── audit_plus_escalate.py
│   ├── backends/
│   │   ├── cli_backend.py
│   │   └── humanlayer_backend.py  # Optional
│   ├── langgraph/
│   │   └── nodes.py       # hitl_gate_node, execute_tool_node
│   ├── scenarios/
│   │   ├── email_draft.py
│   │   └── record_update.py
│   └── eval/
│       ├── runner.py      # ExperimentRunner
│       ├── injectors.py   # ErrorInjector
│       └── metrics.py     # MetricsCalculator
├── tests/
├── examples/
├── pyproject.toml
└── README.md

Instrumentation

Every run emits structured events:

# Per run
{
    "run_id": "abc123",
    "scenario_id": "email_draft",
    "condition_id": "risk_based",
    "seed": 42
}

# Per action
{
    "action_id": "xyz789",
    "tool_name": "send_email",
    "args_hash": "a1b2c3d4",
    "risk_class": "medium",
    "injected_error": false
}

# Per approval
{
    "channel": "cli",
    "latency_ms": 1250.0,
    "decision": true,
    "decided_by": "human"
}

# Per execution
{
    "success": true,
    "execution_time_ms": 45.2
}

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Type checking
mypy src/hitloop

# Linting
ruff check src/hitloop
ruff format src/hitloop

License

MIT License - see LICENSE file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hitloop-0.4.0.tar.gz (67.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hitloop-0.4.0-py3-none-any.whl (65.1 kB view details)

Uploaded Python 3

File details

Details for the file hitloop-0.4.0.tar.gz.

File metadata

  • Download URL: hitloop-0.4.0.tar.gz
  • Upload date:
  • Size: 67.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for hitloop-0.4.0.tar.gz
Algorithm Hash digest
SHA256 fe21f1164b3be41095851d71eef44948690588861da53898fd8e30f0b4c54d5a
MD5 16a37d77f2a6bc6a4bd2dcbc63a6270f
BLAKE2b-256 2a35bb369a5d2972cda4f72887000fc99011cabc59c8f26b4d29b8b52a5ec641

See more details on using hashes here.

File details

Details for the file hitloop-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: hitloop-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 65.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for hitloop-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1ded40c50abec5ef10fd214865647a198b873071203eaf5fd0d54aa63e01b0c5
MD5 0c4df883bc396c18504ea40493395b9b
BLAKE2b-256 45d64e96f77b659746b01bc37b08be36c846433cfe2f486747a98122fd1af0a5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page