Lightweight zero-dependency Agent orchestration framework: Tool Registry, Pipeline/ReAct Agent, Multi-Agent Orchestrator, Pluggable Verification, and Loop Engineering
Project description
harnessloop
轻量级零依赖 Agent 编排框架
Model + Harness = Agent
概述
harnessloop 是一个零外部依赖的 Agent 编排框架,提供从 Tool 注册、Agent 执行循环、多 Agent 编排到验证规则引擎的完整控制流。
核心思想:Agent 循环的本质是 决策→执行→观察→验证→收敛 这套骨架,Pipeline、ReAct、多 Agent 协作都只是在骨架上的不同变体。
架构
harnessloop/
├── harnessloop/
│ ├── exceptions.py # 结构化异常体系(类型优先 + 字符串降级)
│ ├── tool_registry.py # Tool / ToolResult / ToolRegistry
│ ├── agent.py # Agent(Pipeline)+ ReActAgent(LLM 动态推理)
│ ├── orchestrator.py # Orchestrator(约束→执行→验证→纠错→收敛 + HITL + Streaming)
│ ├── verification.py # RuleEngine(输入/输出分层护栏 + 5 条内置规则)
│ └── loop/ # Loop Engineering 生产级加固
│ ├── termination.py # Terminator:6 种可组合终止条件
│ ├── smart_retry.py # SmartRetry:错误分类 + 指数退避 + 死信队列
│ ├── loop_state.py # LoopState:结构化循环状态追踪
│ ├── hitl.py # HITLManager:异步/同步人机协作
│ ├── checkpoint.py # Checkpointer:JSON 检查点,可恢复中断执行
│ ├── callbacks.py # CallbackManager:8 阶段步骤回调(监控/通知/日志)
│ └── plan_agent.py # PlanAgent:先规划再执行
└── setup.py
执行流程图
flowchart TD
Start(["📋 接收任务"]) --> Guard["🛡️ 输入护栏<br/>MaxInputLength / SensitiveKeyword"]
Guard -->|通过| Decide{"🤔 decide_next_action()<br/>Pipeline 按序 / ReAct LLM 推理"}
Guard -->|阻断 tripwire| Fail(["❌ 终止"])
Decide -->|无更多动作| Converge["🏁 收敛"]
Decide -->|选择 Tool| CB_BEFORE["📡 BEFORE_EXECUTE"]
CB_BEFORE --> Retry{"♻️ SmartRetry<br/>错误分类 + 指数退避"}
Retry -->|瞬时故障| Backoff["⏳ 退避等待"]
Backoff --> Retry
Retry -->|永久故障| Dead["📬 死信队列"]
Dead --> HITL{"🧑 HITL 人工决策?"}
Retry -->|成功| Execute["✅ Tool.execute()"]
HITL -->|异步挂起| Ckpt["💾 Checkpoint 保存"]
Ckpt --> Pause["⏸️ 返回 AWAITING_HUMAN_DECISION"]
HITL -->|同步重试| Decide
HITL -->|同步跳过| Decide
HITL -->|同步中止| Fail
Execute --> Observe["👁️ Observation + memory"]
Observe --> CB_AFTER["📡 AFTER_EXECUTE / ON_ERROR"]
CB_AFTER --> Verify{"🔍 RuleEngine<br/>输出验证"}
Verify -->|未通过| Decide
Verify -->|通过| CB_VERIFY["📡 AFTER_VERIFY"]
CB_VERIFY --> Term{"🛑 Terminator<br/>6 种终止条件"}
Term -->|完成/步数上限/连续失败/循环检测/工具耗尽| Converge
Term -->|继续| Decide
Converge --> CB_CONV["📡 ON_CONVERGE"]
CB_CONV --> Output["📊 OrchestrationResult<br/>或 StreamEvent 流式输出"]
三层架构
flowchart LR
subgraph Orchestrator["🎯 Orchestrator 编排层"]
direction TB
O1["多 Agent 串行协作"]
O2["验证 + 纠错 + 收敛"]
O3["HITL 人工决策"]
O4["Streaming 流式输出"]
O1 --> O2 --> O3 --> O4
end
subgraph Agent["🤖 Agent 执行层"]
direction TB
A1["Pipeline 模式<br/>按工具注册顺序执行"]
A2["ReAct 模式<br/>LLM think → act → observe"]
A3["PlanAgent<br/>先规划再执行"]
end
subgraph Tools["🔧 Tool 注册层"]
direction TB
T1["Tool 抽象 + Registry"]
T2["重试 / 超时 / Fallback"]
T3["死信队列"]
end
subgraph LoopEng["🔄 Loop Engineering(灰度可拔插)"]
direction LR
L1["Terminator"]
L2["SmartRetry"]
L3["Callbacks"]
L4["Checkpoint"]
end
Orchestrator --> Agent
Agent --> Tools
LoopEng -.->|LoopConfig 灰度开关注入| Orchestrator
LoopEng -.->|LoopConfig 灰度开关注入| Agent
安装
pip install harnessloop
# 或
pip install harnessloop
快速开始
1. 定义 Tool
from harnessloop import Tool, ToolResult
class GreetingTool(Tool):
name = "greet"
description = "Say hello to someone"
input_schema = {"name": "str"}
def execute(self, name: str = "World", **kwargs) -> ToolResult:
return ToolResult(success=True, data={"greeting": f"Hello, {name}!"})
class WeatherTool(Tool):
name = "weather"
description = "Get weather for a city"
input_schema = {"city": "str"}
def execute(self, city: str = "Beijing", **kwargs) -> ToolResult:
return ToolResult(success=True, data={"city": city, "temp": "22°C"})
2. Pipeline 模式(固定管线)
from harnessloop import Agent, Orchestrator
agent = Agent(
name="greeter",
tools=[GreetingTool(), WeatherTool()],
system_prompt="You are a helpful assistant.",
)
orchestrator = Orchestrator(agents=[agent], max_loops=10)
result = orchestrator.run(
task="Greet Alice and check Beijing weather",
context={"name": "Alice", "city": "Beijing"},
)
print(result.to_dict())
3. ReAct 模式(LLM 动态推理)
from harnessloop import ReActAgent
def my_llm(prompt: str) -> str:
"""替换为你自己的 LLM 调用"""
import openai
client = openai.OpenAI(api_key="your-key")
response = client.chat.completions.create(
model="gpt-4", messages=[{"role": "user", "content": prompt}],
temperature=0.1, max_tokens=2000,
)
return response.choices[0].message.content
agent = ReActAgent(
name="data_analyst",
tools=[GreetingTool(), WeatherTool()],
system_prompt="You are a helpful assistant. Use tools to answer questions.",
llm_call=my_llm,
max_steps=5,
)
orchestrator = Orchestrator(agents=[agent])
result = orchestrator.run(task="What's the weather in Tokyo?")
4. Loop Engineering 加固(可选启用)
from harnessloop import LoopConfig, LoopState, Terminator
config = LoopConfig(
max_steps=10,
consecutive_failures_limit=3,
enable_terminator=True,
enable_smart_retry=True,
enable_loop_state=True,
)
orchestrator = Orchestrator(
agents=[agent],
loop_config=config,
)
5. 多 Agent 协作
agent_a = Agent(name="parser", tools=[...])
agent_b = Agent(name="auditor", tools=[...])
orchestrator = Orchestrator(agents=[agent_a, agent_b])
# agent_a 的 final_data 自动注入 agent_b 的 context["previous_agent_output"]
result = orchestrator.run(task="Parse and audit the document")
6. 验证规则
from harnessloop import RuleEngine, NotNullRule, ColumnConsistencyRule
verifier = RuleEngine()
verifier.register(NotNullRule())
verifier.register(ColumnConsistencyRule())
orchestrator = Orchestrator(agents=[agent], verifier=verifier)
模块说明
| 模块 | 作用 | 关键类 |
|---|---|---|
| exceptions | 结构化异常体系 | HarnessLoopError, ToolTimeoutError, ToolNotFoundError, ToolExecutionError, GuardrailTripError |
| tool_registry | Tool 抽象与注册中心 | Tool, ToolResult, ToolRegistry |
| agent | Agent 基类 + ReActAgent | Agent(Pipeline), ReActAgent(LLM驱动推理) |
| orchestrator | 多 Agent 编排器 | Orchestrator, OrchestrationResult, StreamEvent |
| verification | 输入/输出分层护栏 | Rule, RuleEngine, NotNullRule, ColumnConsistencyRule, TableCountRule, MaxInputLengthRule, SensitiveKeywordRule |
| loop/termination | 多条件终止引擎 | Terminator, TermReason(6 种) |
| loop/smart_retry | 智能重试(类型优先) | SmartRetry, ErrorCategory, DeadLetter, RetryResult |
| loop/loop_state | 结构化状态追踪 | LoopState, AttemptRecord, Strategy |
| loop/hitl | 人机协作 | HITLManager, HITLRequest(同步+异步双模式) |
| loop/checkpoint | 可恢复执行检查点 | Checkpoint, Checkpointer(JSON 持久化) |
| loop/callbacks | 8 阶段步骤回调 | CallbackManager, StepPhase |
| loop/plan_agent | 先规划再执行 | PlanAgent, PlanStep |
Loop Engineering
Agent 循环的生产级加固,全部通过 LoopConfig 灰度开关逐模块启用:
config = LoopConfig(
# Terminator
max_steps=10,
consecutive_failures_limit=3,
loop_detection_threshold=5,
# SmartRetry
base_delay=1.0,
max_delay=30.0,
jitter_factor=0.1,
# HITL
hitl_timeout_seconds=300,
# 灰度开关
enable_terminator=True,
enable_smart_retry=True,
enable_loop_state=True,
enable_hitl=True,
enable_plan_agent=False, # 先规划再执行(默认关闭)
enable_checkpoint=False, # Checkpoint 持久化(默认关闭)
)
Terminator 终止条件优先级
P0: EXPLICIT_ABORT / TASK_COMPLETE
P1: MAX_STEPS / CONSECUTIVE_FAILURES
P2: LOOP_DETECTED / ALL_TOOLS_EXHAUSTED
SmartRetry 错误分类
- 类型优先:
ToolTimeoutError→ TRANSIENT,ToolNotFoundError→ PERMANENT - 字符串降级:兼容旧版错误消息,中英文关键词匹配
- TRANSIENT(瞬时故障)→ 指数退避重试
- PERMANENT(永久故障)→ 立即返回失败,进死信队列
- UNKNOWN(未知)→ 保守重试
Callbacks 8 阶段回调(v0.3.0 新增)
from harnessloop import CallbackManager, StepPhase
cbm = CallbackManager()
@cbm.on(StepPhase.AFTER_EXECUTE)
def log_to_monitor(agent, tool, step, success, **kw):
print(f"[{agent}] {tool} step={step} {'OK' if success else 'FAIL'}")
orchestrator = Orchestrator(agents=[agent], callback_manager=cbm)
支持的阶段:BEFORE_DECIDE, AFTER_DECIDE, BEFORE_EXECUTE, AFTER_EXECUTE, ON_ERROR, ON_CONVERGE, BEFORE_VERIFY, AFTER_VERIFY
Checkpoint 可恢复中断(v0.3.0 新增)
from harnessloop import Checkpointer
checkpointer = Checkpointer(store_dir="./.checkpoints")
cp = checkpointer.save(agent, task, context, hitl_request_id="...")
# 进程重启后恢复
cp = checkpointer.load(cp.checkpoint_id)
# cp.memory_snapshot, cp.loop_state_snapshot, cp.react_trace 均可用
纯 JSON 文件存储,零数据库依赖。与 HITL 异步模式联动:触发人工决策时自动保存检查点。
HITL 双模式
- 异步模式:挂起任务 → 保存 Checkpoint → 返回
AWAITING_HUMAN_DECISION→ 等待外部 API 决策 - 同步模式:调用回调函数阻塞等待(兼容旧接口)
设计原则
- 零依赖:只使用 Python 标准库(
dataclasses,enum,concurrent.futures,hashlib,json,re,time) - LLM 由外部注入:Agent 框架本身不绑定任何 LLM SDK,通过
llm_call回调注入 - 分层明确:Tool → Agent → Orchestrator 三层独立,每层可单独测试
- 灰度可控:Loop Engineering 所有模块通过
LoopConfig按需启用
实战验证
已在 BankDataViz 项目中实战验证:
- 12 家银行 2020-2024 年报全流程处理
- Pipeline:OCR → LLM分析 → 表格重建 → 审计
- ReAct:自然语言驱动的多 Tool 协作数据分析
- Loop Engineering:连续失败保护、循环检测、死信队列
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file harnessloop-0.3.0.tar.gz.
File metadata
- Download URL: harnessloop-0.3.0.tar.gz
- Upload date:
- Size: 42.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0efde9e34c1e3b94512ae4b10cfe55d7acaa6610334ea0126c607c9f8398aa95
|
|
| MD5 |
b747e457528e47d72f57bec09491b7da
|
|
| BLAKE2b-256 |
e5b1c46210e1ce53b23bcb2d6c307e29b4ab812994353f9a3f501b3d1a703efe
|
File details
Details for the file harnessloop-0.3.0-py3-none-any.whl.
File metadata
- Download URL: harnessloop-0.3.0-py3-none-any.whl
- Upload date:
- Size: 46.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5b64933184f9bb75374109f8e950484ebb72c1af8ad84b9812f80b6f78d21b40
|
|
| MD5 |
a84137b27a3a2639fe51a672915dcb2c
|
|
| BLAKE2b-256 |
8b5efc644bd0fd38ddb826e90048e2f2b457a0c61eb62a292c93edac1a8d3465
|