Skip to main content

Keep humans in the loop. HITL control library for AI agent workflows with LangGraph.

Project description

hitloop

Human-in-the-Loop control library for AI agent workflows with LangGraph integration.

hitloop provides explicit control nodes for human oversight in AI agent workflows, with strong instrumentation for research experiments. Unlike passive monitoring, human approval is a first-class control signal and event in the execution trace.

Core Concept

LLM proposes action → HITL policy decides → Human approves/rejects → Tool executes → Telemetry logs all

Human approval is not a UI gimmick. It is:

  • A control signal that gates execution
  • A first-class event in the trace
  • A research artifact for measuring oversight effectiveness

Quick Start

Installation

# Clone the repository
git clone https://github.com/ebaenamar/hitloop.git
cd hitloop

# Install with uv (recommended)
uv pip install -e .

# Or with pip
pip install -e .

# For development
pip install -e ".[dev]"

Run an Example

# Recommended: Using LangGraph's native interrupt() (v0.4.0+)
python examples/interrupt_example.py

# Legacy: CLI-based approval prompts
python examples/basic_workflow.py

# Auto-approve mode (no prompts)
python examples/basic_workflow.py --auto

# Run a full experiment
python examples/run_experiment.py --n-trials 20

Architecture

┌─────────────────────────────────────────────────────────────┐
│                      LangGraph Workflow                      │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐    ┌──────────┐    ┌──────────────────────┐  │
│  │   LLM    │───►│  HITL    │───►│   Tool Executor      │  │
│  │  Node    │    │  Gate    │    │                      │  │
│  └──────────┘    └────┬─────┘    └──────────────────────┘  │
│                       │                                     │
│                       ▼                                     │
│              ┌────────────────┐                             │
│              │  HITL Policy   │                             │
│              │  ┌──────────┐  │                             │
│              │  │ Approval │  │◄──► Human (CLI/Web/etc)    │
│              │  │ Backend  │  │                             │
│              │  └──────────┘  │                             │
│              └────────────────┘                             │
│                       │                                     │
│                       ▼                                     │
│              ┌────────────────┐                             │
│              │   Telemetry    │───► SQLite / Analysis      │
│              │    Logger      │                             │
│              └────────────────┘                             │
└─────────────────────────────────────────────────────────────┘

API Overview

Core Models

from hitloop import Action, Decision, RiskClass

# Define an action
action = Action(
    tool_name="send_email",
    tool_args={"recipient": "alice@example.com", "subject": "Hello"},
    risk_class=RiskClass.MEDIUM,
    side_effects=["email_sent"],
    rationale="Sending follow-up email to client",
)

# Decisions from human review
decision = Decision(
    action_id=action.id,
    approved=True,
    reason="Verified recipient is correct",
    decided_by="human:operator",
    latency_ms=1500.0,
)

Policies

Three built-in policies for different oversight tiers:

from hitloop import AlwaysApprovePolicy, RiskBasedPolicy, AuditPlusEscalatePolicy

# Tier 4: No human oversight (baseline)
policy = AlwaysApprovePolicy()

# Risk-based: Approve high-risk actions only
policy = RiskBasedPolicy(
    require_approval_for_high=True,
    require_approval_for_medium=False,
    high_risk_tools=["send_email", "delete_record"],
)

# Audit + Escalate: Random sampling + anomaly detection
policy = AuditPlusEscalatePolicy(
    audit_sample_rate=0.1,  # 10% random audit
    escalate_on_high_risk=True,
    anomaly_signals=["unusual_recipient", "large_amount"],
)

Adding a New Policy

Create a single file in src/hitloop/policies/:

# src/hitloop/policies/my_policy.py
from hitloop.core.interfaces import HITLPolicy
from hitloop.core.models import Action, Decision

class MyCustomPolicy(HITLPolicy):
    @property
    def name(self) -> str:
        return "my_custom"

    def should_request_approval(
        self, action: Action, state: dict
    ) -> tuple[bool, str]:
        # Your logic here
        if action.tool_name in self.critical_tools:
            return True, "Critical tool requires approval"
        return False, "Auto-approved"

LangGraph Integration

Add human approval to any LangGraph agent:

from hitloop import RiskBasedPolicy, ApprovalRequest, Action, Decision, RiskClass
from hitloop.backends import AutoApproveBackend, CLIBackend

# 1. Configure policy - which actions need approval?
policy = RiskBasedPolicy(
    require_approval_for_high=True,
    high_risk_tools=["send_email", "delete_file", "transfer_money"],
)

# 2. Choose backend - how to get approval?
backend = CLIBackend()  # Interactive CLI prompts
# backend = AutoApproveBackend()  # For testing

# 3. In your LangGraph node, check if approval is needed:
async def hitl_gate_node(state):
    action = state["pending_action"]
    needs_approval, reason = policy.should_request_approval(action, state)
    
    if needs_approval:
        request = ApprovalRequest(run_id="my-run", action=action)
        decision = await backend.request_approval(request)
        return {"approval_decision": decision}
    else:
        # Auto-approve low-risk actions
        return {"approval_decision": Decision(action_id=action.id, approved=True)}

Full working example: See examples/langgraph_agent.py

# Run with simulated LLM (no API key needed)
python examples/langgraph_agent.py --simulate --auto

# With CLI approval prompts
python examples/langgraph_agent.py --simulate

Third-Party Integrations (Slack, Telegram, Discord, etc.)

hitloop is completely agnostic to your approval channel. Use WebhookBackend to integrate with any service:

from hitloop.backends import WebhookBackend

# Your custom function to send to Slack/Telegram/Discord/etc.
async def send_to_slack(request, callback_id, callback_url):
    await slack_client.chat_postMessage(
        channel="#approvals",
        text=f"Approve {request.action.tool_name}?",
        # Include buttons that POST to callback_url
    )

backend = WebhookBackend(
    send_request=send_to_slack,
    timeout_seconds=300,  # 5 min timeout (rejects if no response)
    callback_base_url="https://your-app.com/hitloop/callback",
)

# In your webhook handler (FastAPI/Flask):
@app.post("/hitloop/callback/{callback_id}")
async def handle_callback(callback_id: str, data: dict):
    await backend.handle_callback(
        callback_id=callback_id,
        approved=data["approved"],
        decided_by=f"slack:{data['user']}",
    )

Full working example: See examples/webhook_server.py

# Install server dependencies
pip install hitloop[server]

# Run the webhook server
uvicorn examples.webhook_server:app --port 8000

# Test with curl
curl -X POST http://localhost:8000/test/request-approval
# Then approve/reject via the callback URL shown in console

Using LangGraph's Native interrupt() (Recommended)

New in v0.4.0: hitloop now supports LangGraph's native interrupt() function for human-in-the-loop. This is the recommended approach as it:

  • Uses LangGraph's built-in checkpointing for persistence
  • Is compatible with agent-inbox and LangGraph Studio
  • Follows LangGraph's standard patterns
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command

from hitloop import RiskBasedPolicy
from hitloop.langgraph import create_interrupt_gate_node, create_interrupt_tool_node, should_execute

# Setup
policy = RiskBasedPolicy(high_risk_tools=["send_email", "delete_file"])
gate = create_interrupt_gate_node(policy)
executor = create_interrupt_tool_node({"send_email": send_email_func})

# Build graph
builder = StateGraph(MyState)
builder.add_node("hitl_gate", gate)
builder.add_node("execute", executor)
builder.add_conditional_edges("hitl_gate", should_execute, {"execute": "execute", "skip": END})

graph = builder.compile(checkpointer=MemorySaver())

# Run - will pause at interrupt for high-risk actions
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"proposed_action": action}, config)

# Check for interrupt
if "__interrupt__" in result:
    print(result["__interrupt__"])  # Shows approval request
    # Resume with human decision
    graph.invoke(Command(resume={"approved": True, "decided_by": "human:alice"}), config)

Full example: See examples/interrupt_example.py

Production Deployment

Persistent Storage (Survives Restarts)

hitloop follows LangGraph's pattern for pluggable storage backends:

# Install with your preferred backend
pip install hitloop[postgres]  # PostgreSQL
pip install hitloop[redis]     # Redis
from hitloop.persistence import PostgresApprovalStore, RedisApprovalStore
from hitloop.backends import PersistentWebhookBackend

# PostgreSQL (recommended for most cases)
store = await PostgresApprovalStore.from_conn_string(
    "postgresql://user:pass@localhost:5432/hitloop"
)
await store.setup()  # Creates tables

# Or Redis (faster, built-in TTL)
store = await RedisApprovalStore.from_url("redis://localhost:6379/0")

# Use with PersistentWebhookBackend
backend = PersistentWebhookBackend(
    send_request=my_slack_sender,
    store=store,
    timeout_seconds=300,
)

Retry & Circuit Breaker

Built-in resilience for production:

from hitloop.backends import PersistentWebhookBackend, RetryConfig, CircuitBreakerConfig

backend = PersistentWebhookBackend(
    send_request=my_sender,
    store=store,
    retry_config=RetryConfig(
        max_retries=3,
        initial_delay=1.0,
        exponential_base=2.0,
    ),
    circuit_breaker_config=CircuitBreakerConfig(
        failure_threshold=5,    # Open after 5 failures
        recovery_timeout=30.0,  # Try again after 30s
    ),
)

# Check circuit state
print(backend.get_circuit_state())  # CLOSED, OPEN, or HALF_OPEN

Recovery After Restart

# On startup, recover pending requests
pending = await store.list_pending(thread_id="user-123")
for record in pending:
    # Re-send or resolve as needed
    future = backend.register_pending(record)

Running Experiments

from hitloop import TelemetryLogger
from hitloop.eval import ExperimentRunner, ExperimentCondition
from hitloop.eval.runner import create_standard_conditions
from hitloop.scenarios import EmailDraftScenario

# Setup
logger = TelemetryLogger("experiment.db")
scenario = EmailDraftScenario()

# Create standard conditions (4 policies × scenarios)
conditions = create_standard_conditions(
    scenario=scenario,
    n_trials=20,
    injection_rate=0.2,  # 20% error injection
)

# Run
runner = ExperimentRunner(logger)
for c in conditions:
    runner.add_condition(c)

await runner.run_all()

# Export
runner.export_results("results.csv", "summary.json")

Output: results.csv

run_id scenario_id condition_id policy_name task_success approval_requested injected_error error_caught
abc123 email_draft risk_based risk_based 1 1 0 0
def456 email_draft risk_based risk_based 0 1 1 1

Output: summary.json

{
  "risk_based": {
    "n_runs": 20,
    "success_rate": 0.85,
    "approval_rate": 0.65,
    "error_catch_rate": 0.75,
    "false_reject_rate": 0.05,
    "human_latency_mean_ms": 1200.5
  }
}

Research Alignment

hitloop metrics map directly to the research framework:

Metric Research Concept Description
success_rate Quality Task completion rate
approval_rate Leverage proxy Human involvement frequency
error_catch_rate Appropriate reliance Injected error detection
false_reject_rate Appropriate reliance Unnecessary rejections
human_latency_ms Human burden Time cost per decision
cost_proxy Efficiency Token/call overhead

Injected Errors for Ground Truth

The error injector provides ground truth for measuring oversight effectiveness:

from hitloop.eval import ErrorInjector, InjectionConfig

injector = ErrorInjector(InjectionConfig(
    injection_rate=0.2,
    injection_types=[
        InjectionType.WRONG_RECIPIENT,
        InjectionType.WRONG_RECORD_ID,
    ]
))

# Every action has known correctness
result = injector.maybe_inject(action)
if result.injected:
    # This action is KNOWN to be wrong
    # If human approves it → false negative
    # If human rejects it → true positive (error caught)

Project Structure

hitloop/
├── src/hitloop/
│   ├── core/
│   │   ├── models.py      # Action, Decision, TraceEvent
│   │   ├── interfaces.py  # ApprovalBackend, HITLPolicy
│   │   └── logger.py      # TelemetryLogger (SQLite)
│   ├── policies/
│   │   ├── always_approve.py
│   │   ├── risk_based.py
│   │   └── audit_plus_escalate.py
│   ├── backends/
│   │   ├── cli_backend.py
│   │   └── humanlayer_backend.py  # Optional
│   ├── langgraph/
│   │   └── nodes.py       # hitl_gate_node, execute_tool_node
│   ├── scenarios/
│   │   ├── email_draft.py
│   │   └── record_update.py
│   └── eval/
│       ├── runner.py      # ExperimentRunner
│       ├── injectors.py   # ErrorInjector
│       └── metrics.py     # MetricsCalculator
├── tests/
├── examples/
├── pyproject.toml
└── README.md

Instrumentation

Every run emits structured events:

# Per run
{
    "run_id": "abc123",
    "scenario_id": "email_draft",
    "condition_id": "risk_based",
    "seed": 42
}

# Per action
{
    "action_id": "xyz789",
    "tool_name": "send_email",
    "args_hash": "a1b2c3d4",
    "risk_class": "medium",
    "injected_error": false
}

# Per approval
{
    "channel": "cli",
    "latency_ms": 1250.0,
    "decision": true,
    "decided_by": "human"
}

# Per execution
{
    "success": true,
    "execution_time_ms": 45.2
}

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Type checking
mypy src/hitloop

# Linting
ruff check src/hitloop
ruff format src/hitloop

License

MIT License - see LICENSE file.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hitloop-0.5.1.tar.gz (70.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hitloop-0.5.1-py3-none-any.whl (68.0 kB view details)

Uploaded Python 3

File details

Details for the file hitloop-0.5.1.tar.gz.

File metadata

  • Download URL: hitloop-0.5.1.tar.gz
  • Upload date:
  • Size: 70.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for hitloop-0.5.1.tar.gz
Algorithm Hash digest
SHA256 b261f449ec788c954b203e737d2833b20df0c7b83d4983bd6d8d050309d10915
MD5 5502731690e7a144ea3c433640f53c77
BLAKE2b-256 68afe8726db9e0833033b2e1a576703d99512adc72e2ea42a34e0cd83b2beef0

See more details on using hashes here.

File details

Details for the file hitloop-0.5.1-py3-none-any.whl.

File metadata

  • Download URL: hitloop-0.5.1-py3-none-any.whl
  • Upload date:
  • Size: 68.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.7

File hashes

Hashes for hitloop-0.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 845ed2738d9d162b26ce05f3a74a7b8ad1c1832f9f05c3d05016302c7083bcd6
MD5 cbd639739add5c362263374e45c394a3
BLAKE2b-256 0f2345217f062b836cfe23e81fbea11ab12e8755d150d1bfe7ca41b0d437de7b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page