Keep humans in the loop. HITL control library for AI agent workflows with LangGraph.
Project description
hitloop
Human-in-the-Loop control library for AI agent workflows with LangGraph integration.
hitloop provides explicit control nodes for human oversight in AI agent workflows, with strong instrumentation for research experiments. Unlike passive monitoring, human approval is a first-class control signal and event in the execution trace.
Core Concept
LLM proposes action → HITL policy decides → Human approves/rejects → Tool executes → Telemetry logs all
Human approval is not a UI gimmick. It is:
- A control signal that gates execution
- A first-class event in the trace
- A research artifact for measuring oversight effectiveness
Quick Start
Installation
# Clone the repository
git clone https://github.com/ebaenamar/hitloop.git
cd hitloop
# Install with uv (recommended)
uv pip install -e .
# Or with pip
pip install -e .
# For development
pip install -e ".[dev]"
Run an Example
# Recommended: Using LangGraph's native interrupt() (v0.4.0+)
python examples/interrupt_example.py
# Legacy: CLI-based approval prompts
python examples/basic_workflow.py
# Auto-approve mode (no prompts)
python examples/basic_workflow.py --auto
# Run a full experiment
python examples/run_experiment.py --n-trials 20
Architecture
┌─────────────────────────────────────────────────────────────┐
│ LangGraph Workflow │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────────┐ │
│ │ LLM │───►│ HITL │───►│ Tool Executor │ │
│ │ Node │ │ Gate │ │ │ │
│ └──────────┘ └────┬─────┘ └──────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────┐ │
│ │ HITL Policy │ │
│ │ ┌──────────┐ │ │
│ │ │ Approval │ │◄──► Human (CLI/Web/etc) │
│ │ │ Backend │ │ │
│ │ └──────────┘ │ │
│ └────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────┐ │
│ │ Telemetry │───► SQLite / Analysis │
│ │ Logger │ │
│ └────────────────┘ │
└─────────────────────────────────────────────────────────────┘
API Overview
Core Models
from hitloop import Action, Decision, RiskClass
# Define an action
action = Action(
tool_name="send_email",
tool_args={"recipient": "alice@example.com", "subject": "Hello"},
risk_class=RiskClass.MEDIUM,
side_effects=["email_sent"],
rationale="Sending follow-up email to client",
)
# Decisions from human review
decision = Decision(
action_id=action.id,
approved=True,
reason="Verified recipient is correct",
decided_by="human:operator",
latency_ms=1500.0,
)
Policies
Three built-in policies for different oversight tiers:
from hitloop import AlwaysApprovePolicy, RiskBasedPolicy, AuditPlusEscalatePolicy
# Tier 4: No human oversight (baseline)
policy = AlwaysApprovePolicy()
# Risk-based: Approve high-risk actions only
policy = RiskBasedPolicy(
require_approval_for_high=True,
require_approval_for_medium=False,
high_risk_tools=["send_email", "delete_record"],
)
# Audit + Escalate: Random sampling + anomaly detection
policy = AuditPlusEscalatePolicy(
audit_sample_rate=0.1, # 10% random audit
escalate_on_high_risk=True,
anomaly_signals=["unusual_recipient", "large_amount"],
)
Adding a New Policy
Create a single file in src/hitloop/policies/:
# src/hitloop/policies/my_policy.py
from hitloop.core.interfaces import HITLPolicy
from hitloop.core.models import Action, Decision
class MyCustomPolicy(HITLPolicy):
@property
def name(self) -> str:
return "my_custom"
def should_request_approval(
self, action: Action, state: dict
) -> tuple[bool, str]:
# Your logic here
if action.tool_name in self.critical_tools:
return True, "Critical tool requires approval"
return False, "Auto-approved"
LangGraph Integration
Add human approval to any LangGraph agent:
from hitloop import RiskBasedPolicy, ApprovalRequest, Action, Decision, RiskClass
from hitloop.backends import AutoApproveBackend, CLIBackend
# 1. Configure policy - which actions need approval?
policy = RiskBasedPolicy(
require_approval_for_high=True,
high_risk_tools=["send_email", "delete_file", "transfer_money"],
)
# 2. Choose backend - how to get approval?
backend = CLIBackend() # Interactive CLI prompts
# backend = AutoApproveBackend() # For testing
# 3. In your LangGraph node, check if approval is needed:
async def hitl_gate_node(state):
action = state["pending_action"]
needs_approval, reason = policy.should_request_approval(action, state)
if needs_approval:
request = ApprovalRequest(run_id="my-run", action=action)
decision = await backend.request_approval(request)
return {"approval_decision": decision}
else:
# Auto-approve low-risk actions
return {"approval_decision": Decision(action_id=action.id, approved=True)}
Full working example: See examples/langgraph_agent.py
# Run with simulated LLM (no API key needed)
python examples/langgraph_agent.py --simulate --auto
# With CLI approval prompts
python examples/langgraph_agent.py --simulate
Third-Party Integrations (Slack, Telegram, Discord, etc.)
hitloop is completely agnostic to your approval channel. Use WebhookBackend to integrate with any service:
from hitloop.backends import WebhookBackend
# Your custom function to send to Slack/Telegram/Discord/etc.
async def send_to_slack(request, callback_id, callback_url):
await slack_client.chat_postMessage(
channel="#approvals",
text=f"Approve {request.action.tool_name}?",
# Include buttons that POST to callback_url
)
backend = WebhookBackend(
send_request=send_to_slack,
timeout_seconds=300, # 5 min timeout (rejects if no response)
callback_base_url="https://your-app.com/hitloop/callback",
)
# In your webhook handler (FastAPI/Flask):
@app.post("/hitloop/callback/{callback_id}")
async def handle_callback(callback_id: str, data: dict):
await backend.handle_callback(
callback_id=callback_id,
approved=data["approved"],
decided_by=f"slack:{data['user']}",
)
Full working example: See examples/webhook_server.py
# Install server dependencies
pip install hitloop[server]
# Run the webhook server
uvicorn examples.webhook_server:app --port 8000
# Test with curl
curl -X POST http://localhost:8000/test/request-approval
# Then approve/reject via the callback URL shown in console
Using LangGraph's Native interrupt() (Recommended)
New in v0.4.0: hitloop now supports LangGraph's native interrupt() function for human-in-the-loop. This is the recommended approach as it:
- Uses LangGraph's built-in checkpointing for persistence
- Is compatible with agent-inbox and LangGraph Studio
- Follows LangGraph's standard patterns
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command
from hitloop import RiskBasedPolicy
from hitloop.langgraph import create_interrupt_gate_node, create_interrupt_tool_node, should_execute
# Setup
policy = RiskBasedPolicy(high_risk_tools=["send_email", "delete_file"])
gate = create_interrupt_gate_node(policy)
executor = create_interrupt_tool_node({"send_email": send_email_func})
# Build graph
builder = StateGraph(MyState)
builder.add_node("hitl_gate", gate)
builder.add_node("execute", executor)
builder.add_conditional_edges("hitl_gate", should_execute, {"execute": "execute", "skip": END})
graph = builder.compile(checkpointer=MemorySaver())
# Run - will pause at interrupt for high-risk actions
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"proposed_action": action}, config)
# Check for interrupt
if "__interrupt__" in result:
print(result["__interrupt__"]) # Shows approval request
# Resume with human decision
graph.invoke(Command(resume={"approved": True, "decided_by": "human:alice"}), config)
Full example: See examples/interrupt_example.py
Production Deployment
Persistent Storage (Survives Restarts)
hitloop follows LangGraph's pattern for pluggable storage backends:
# Install with your preferred backend
pip install hitloop[postgres] # PostgreSQL
pip install hitloop[redis] # Redis
from hitloop.persistence import PostgresApprovalStore, RedisApprovalStore
from hitloop.backends import PersistentWebhookBackend
# PostgreSQL (recommended for most cases)
store = await PostgresApprovalStore.from_conn_string(
"postgresql://user:pass@localhost:5432/hitloop"
)
await store.setup() # Creates tables
# Or Redis (faster, built-in TTL)
store = await RedisApprovalStore.from_url("redis://localhost:6379/0")
# Use with PersistentWebhookBackend
backend = PersistentWebhookBackend(
send_request=my_slack_sender,
store=store,
timeout_seconds=300,
)
Retry & Circuit Breaker
Built-in resilience for production:
from hitloop.backends import PersistentWebhookBackend, RetryConfig, CircuitBreakerConfig
backend = PersistentWebhookBackend(
send_request=my_sender,
store=store,
retry_config=RetryConfig(
max_retries=3,
initial_delay=1.0,
exponential_base=2.0,
),
circuit_breaker_config=CircuitBreakerConfig(
failure_threshold=5, # Open after 5 failures
recovery_timeout=30.0, # Try again after 30s
),
)
# Check circuit state
print(backend.get_circuit_state()) # CLOSED, OPEN, or HALF_OPEN
Recovery After Restart
# On startup, recover pending requests
pending = await store.list_pending(thread_id="user-123")
for record in pending:
# Re-send or resolve as needed
future = backend.register_pending(record)
Running Experiments
from hitloop import TelemetryLogger
from hitloop.eval import ExperimentRunner, ExperimentCondition
from hitloop.eval.runner import create_standard_conditions
from hitloop.scenarios import EmailDraftScenario
# Setup
logger = TelemetryLogger("experiment.db")
scenario = EmailDraftScenario()
# Create standard conditions (4 policies × scenarios)
conditions = create_standard_conditions(
scenario=scenario,
n_trials=20,
injection_rate=0.2, # 20% error injection
)
# Run
runner = ExperimentRunner(logger)
for c in conditions:
runner.add_condition(c)
await runner.run_all()
# Export
runner.export_results("results.csv", "summary.json")
Output: results.csv
| run_id | scenario_id | condition_id | policy_name | task_success | approval_requested | injected_error | error_caught |
|---|---|---|---|---|---|---|---|
| abc123 | email_draft | risk_based | risk_based | 1 | 1 | 0 | 0 |
| def456 | email_draft | risk_based | risk_based | 0 | 1 | 1 | 1 |
Output: summary.json
{
"risk_based": {
"n_runs": 20,
"success_rate": 0.85,
"approval_rate": 0.65,
"error_catch_rate": 0.75,
"false_reject_rate": 0.05,
"human_latency_mean_ms": 1200.5
}
}
Research Alignment
hitloop metrics map directly to the research framework:
| Metric | Research Concept | Description |
|---|---|---|
success_rate |
Quality | Task completion rate |
approval_rate |
Leverage proxy | Human involvement frequency |
error_catch_rate |
Appropriate reliance | Injected error detection |
false_reject_rate |
Appropriate reliance | Unnecessary rejections |
human_latency_ms |
Human burden | Time cost per decision |
cost_proxy |
Efficiency | Token/call overhead |
Injected Errors for Ground Truth
The error injector provides ground truth for measuring oversight effectiveness:
from hitloop.eval import ErrorInjector, InjectionConfig
injector = ErrorInjector(InjectionConfig(
injection_rate=0.2,
injection_types=[
InjectionType.WRONG_RECIPIENT,
InjectionType.WRONG_RECORD_ID,
]
))
# Every action has known correctness
result = injector.maybe_inject(action)
if result.injected:
# This action is KNOWN to be wrong
# If human approves it → false negative
# If human rejects it → true positive (error caught)
Project Structure
hitloop/
├── src/hitloop/
│ ├── core/
│ │ ├── models.py # Action, Decision, TraceEvent
│ │ ├── interfaces.py # ApprovalBackend, HITLPolicy
│ │ └── logger.py # TelemetryLogger (SQLite)
│ ├── policies/
│ │ ├── always_approve.py
│ │ ├── risk_based.py
│ │ └── audit_plus_escalate.py
│ ├── backends/
│ │ ├── cli_backend.py
│ │ └── humanlayer_backend.py # Optional
│ ├── langgraph/
│ │ └── nodes.py # hitl_gate_node, execute_tool_node
│ ├── scenarios/
│ │ ├── email_draft.py
│ │ └── record_update.py
│ └── eval/
│ ├── runner.py # ExperimentRunner
│ ├── injectors.py # ErrorInjector
│ └── metrics.py # MetricsCalculator
├── tests/
├── examples/
├── pyproject.toml
└── README.md
Instrumentation
Every run emits structured events:
# Per run
{
"run_id": "abc123",
"scenario_id": "email_draft",
"condition_id": "risk_based",
"seed": 42
}
# Per action
{
"action_id": "xyz789",
"tool_name": "send_email",
"args_hash": "a1b2c3d4",
"risk_class": "medium",
"injected_error": false
}
# Per approval
{
"channel": "cli",
"latency_ms": 1250.0,
"decision": true,
"decided_by": "human"
}
# Per execution
{
"success": true,
"execution_time_ms": 45.2
}
Development
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Type checking
mypy src/hitloop
# Linting
ruff check src/hitloop
ruff format src/hitloop
License
MIT License - see LICENSE file.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hitloop-0.4.0.tar.gz.
File metadata
- Download URL: hitloop-0.4.0.tar.gz
- Upload date:
- Size: 67.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fe21f1164b3be41095851d71eef44948690588861da53898fd8e30f0b4c54d5a
|
|
| MD5 |
16a37d77f2a6bc6a4bd2dcbc63a6270f
|
|
| BLAKE2b-256 |
2a35bb369a5d2972cda4f72887000fc99011cabc59c8f26b4d29b8b52a5ec641
|
File details
Details for the file hitloop-0.4.0-py3-none-any.whl.
File metadata
- Download URL: hitloop-0.4.0-py3-none-any.whl
- Upload date:
- Size: 65.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1ded40c50abec5ef10fd214865647a198b873071203eaf5fd0d54aa63e01b0c5
|
|
| MD5 |
0c4df883bc396c18504ea40493395b9b
|
|
| BLAKE2b-256 |
45d64e96f77b659746b01bc37b08be36c846433cfe2f486747a98122fd1af0a5
|