Skip to main content

Core utilities and shared types for turbo-agent

Project description

Turbo-Agent Core 设计文档

最后更新: 2025-12-12
版本: 2.0
定位: Turbo-Agent 后端体系的数据模型与协议定义层


📖 目录

  1. 什么是 Turbo-Agent Core
  2. 核心设计理念
  3. 快速开始
  4. 模块导览
  5. 核心概念详解
  6. 使用指南
  7. 扩展开发
  8. 最佳实践

什么是 Turbo-Agent Core

turbo-agent-core 是整个 Turbo-Agent 系统的数据模型与协议定义层,它定义了智能体运行时的所有核心数据结构、I/O 契约和事件协议。

设计目标

目标 说明
最小运行集合 仅包含智能体运行必需的数据结构,剥离权限、租户、审计等外围关注点
领域纯粹性 严格遵循 DDD 原则,模型只表达业务语义,不含基础设施字段
协议统一性 统一 Tool/Agent/LLM 的 I/O 契约,事件协议兼容 CopilotKit 与 LangGraph
演进可控性 版本追溯通过字段实现,接口强制实现(ABC),避免运行时错误

适用场景

  • ✅ 构建新的智能体运行时(Runtime)
  • ✅ 开发工具插件系统
  • ✅ 实现对话历史管理
  • ✅ 设计流式事件处理
  • ✅ 数据持久化层适配(Prisma/SQLAlchemy)
  • ❌ 直接的用户权限管理(由上层 auth 模块负责)
  • ❌ HTTP API 层实现(由 server 模块负责)

核心设计理念

1️⃣ 抽象基类模式 (TurboEntity as ABC)

所有可运行实体继承自 TurboEntity (Abstract Base Class),强制实现统一接口:

from turbo_agent_core.schema.basic import TurboEntity

class MyTool(TurboEntity):
    run_type = RunType.API
    
    def run(self, input: JSON, **kwargs) -> JSON:
        # 同步执行逻辑
        return {"result": "success"}
    
    async def a_run(self, input: JSON, **kwargs) -> JSON:
        # 异步执行逻辑
        return {"result": "success"}
    
    def stream(self, input: JSON, **kwargs) -> Iterator[BaseEvent]:
        # 同步流式执行
        yield TextMessageDeltaEvent(...)
    
    async def a_stream(self, input: JSON, **kwargs) -> AsyncIterator[BaseEvent]:
        # 异步流式执行
        yield TextMessageDeltaEvent(...)

核心价值

  • ✅ 编译期类型检查,避免运行时错误
  • ✅ IDE 自动补全与类型推断
  • ✅ 统一的 I/O 契约定义

2️⃣ 版本追溯而非版本容器

不使用独立的 ToolVersion 类,而是在实体上直接持有版本字段:

tool = Tool(
    id="tool_123",
    version_id="v_456",
    version_tag="v1.2.0",
    prev_version_id="v_455",
    is_default=True,
    # ... 其他字段
)

职责分离

  • Core 层:持有当前版本的运行快照
  • Data 层:维护完整的版本历史与关联关系

3️⃣ I/O 契约根据 run_type 区分

实体类型 run_type 输入 输出
Tool RunType.API JSON JSON
LLMModel RunType.LLM Conversation Message
Agent/Character RunType.AGENT Conversation Message

4️⃣ 组合优先于继承

LLMTool 是"使用大模型驱动的工具",采用单继承 + 组合模式:

class LLMTool(Tool):  # 继承 Tool 的 JSON I/O 契约
    # 组合 Agent 的配置字段
    model: Optional[LLMModel] = None
    modelParameter: ModelParameters = Field(default_factory=ModelParameters)
    setting: Optional[BusinessSetting] = None

避免class LLMTool(Tool, BasicAgent)(多重继承冲突)

5️⃣ 参数声明与校验内置化

Tool 内置参数定义与自动校验:

tool = Tool(
    name="search_api",
    input=[
        Parameter(name="query", type=BasicType.string, required=True),
        Parameter(name="limit", type=BasicType.integer, default=10)
    ]
)

# 自动校验(内部动态构造 Pydantic 模型)
validated_input = tool.validate_input({"query": "hello", "limit": 5})

自动派生 JSON Schema

  • List[Parameter] 自动生成 input_schema
  • 支持缓存优化(内存 + 磁盘)

快速开始

安装

# 使用 uv(推荐)
cd backend/packages/turbo-agent-core
uv sync

# 或使用 pip
pip install -e .

基础用法

from turbo_agent_core.schema import (
    Tool, Agent, LLMModel, Conversation, Message,
    ModelParameters, Parameter, BasicType, RunType, MessageRole
)

# 1. 创建工具
tool = Tool(
    id="search_001",
    name="web_search",
    belong_to_project_path="proj_1",
    name_id="search",
    run_type=RunType.API,
    input=[
        Parameter(name="query", type=BasicType.string, required=True)
    ],
    output=[
        Parameter(name="results", type=BasicType.array)
    ]
)

# 2. 创建智能体
agent = Agent(
    id="agent_001",
    name="助手",
    belong_to_project_path="proj_1",
    name_id="assistant",
    run_type=RunType.AGENT,
    model=llm_model,
    tools=[tool],
    modelParameter=ModelParameters(temperature=0.7)
)

# 3. 创建对话
conversation = Conversation(
    id="conv_001",
    assistant_id="agent_001",
    messages=[]
)

# 4. 添加消息
user_msg = Message(
    id="msg_001",
    role=MessageRole.user,
    content="帮我搜索 Python 教程"
)
conversation.add_child(user_msg)

模块导览

turbo_agent_core/
├── schema/
│   ├── enums.py          # 通用枚举(RunType/MessageRole/BasicType 等)
│   ├── basic.py          # 基础类(TurboEntity/Parameter/ModelParameters)
│   ├── agents.py         # 实体类(Tool/LLMModel/Agent/Character)
│   ├── states.py         # 状态模型(Message/Conversation/ToolCallRecord)
│   ├── resources.py      # 资源类(KnowledgeResource/BusinessSetting/Workset)
│   ├── external.py       # 外部平台(Platform/AuthMethod/Secret)
│   ├── refs.py           # 引用类型(BasicRef 及其子类)
│   └── events.py         # 事件协议(BaseEvent/Content/State/Control 事件)
└── utils/
    ├── json_stream.py    # 流式 JSON 解析器
    ├── json_assembler.py # JSON 事件组装器
    ├── param_schema.py   # 参数 -> JSON Schema 转换
    └── schema_tool.py    # JSON Schema -> Pydantic 模型动态构造

核心模块说明

模块 职责 关键类型
enums.py 定义系统级枚举与类型别名 JSON, RunType, MessageRole, BasicType
basic.py 定义抽象基类与基础类型 TurboEntity, Parameter, ModelParameters, Endpoint
agents.py 定义可运行实体(工具/模型/智能体) Tool, LLMModel, BasicAgent, Agent, Character
states.py 定义运行状态与对话结构 Message, Action, Conversation, ToolCallRecord
resources.py 定义业务资源与知识库 KnowledgeResource, BusinessSetting, Workset
events.py 定义流式事件协议 BaseEvent, ContentTextDeltaEvent, ActionExecutionEvent

核心概念详解

🔧 TurboEntity 实体体系

TurboEntity (ABC)
├── Tool (run_type=API)
│   ├── APITool      # HTTP/HTTPS API 工具
│   ├── LLMTool      # 大模型驱动的工具
│   └── AgentTool    # 智能体作为工具
├── LLMModel (run_type=LLM)
└── BasicAgent (run_type=AGENT)
    ├── Character    # 角色实体
    └── Agent        # 完整智能体

关键属性

  • id: 全局唯一标识
  • name_id: 项目内唯一标识(用于引用)
  • run_type: 运行类型(决定 I/O 契约)
  • version_id/version_tag/prev_version_id: 版本追溯

💬 Message 与 Conversation

Message 聚合模型

Message 聚合单轮对话的完整生命周期:

message = Message(
    id="msg_001",
    role=MessageRole.assistant,
    content="最终答案",                    # 最终响应
    reasoning_content="让我思考一下...",   # 思维链(CoT)
    actions=[                             # 工具调用序列
        Action(
            name="search",
            input={"query": "Python"},
            records=[ToolCallRecord(...)]
        )
    ],
    status=MessageStatus.success,
    token_cost=1500,
    ancestors=["msg_000"],               # 父消息 ID
    children=["msg_002", "msg_003"]      # 子消息 ID 列表
)

与 LangChain 的映射

  • 运行时展开Message[AIMessage(content=reasoning), ToolMessage(...), AIMessage(content=final)]
  • 持久化收缩[AIMessage, ToolMessage, ...] → 单个 Message(actions 数组)

Conversation 树形管理

支持非线性对话分支:

conversation = Conversation(id="conv_001", assistant_id="agent_001")

# 添加根消息
root_msg = Message(id="msg_root", role=MessageRole.user, content="你好")
conversation.add_child(root_msg)

# 添加子消息
reply_1 = Message(id="msg_1", role=MessageRole.assistant, content="你好!")
conversation.add_child(reply_1, parent_id="msg_root")

# 添加兄弟消息(分支)
reply_2 = Message(id="msg_2", role=MessageRole.assistant, content="嗨!")
conversation.add_sibling(reply_2, sibling_id="msg_1")

# 获取历史
history = conversation.get_history(leaf_message_id="msg_1")
# 返回: [root_msg, reply_1]

🛠️ Tool 参数系统

参数定义

from turbo_agent_core.schema import Parameter, BasicType

search_tool = Tool(
    name="search",
    input=[
        Parameter(
            name="query",
            type=BasicType.string,
            required=True,
            description="搜索关键词"
        ),
        Parameter(
            name="filters",
            type=BasicType.object,
            parameters=[  # 嵌套参数
                Parameter(name="date_from", type=BasicType.datetime),
                Parameter(name="date_to", type=BasicType.datetime)
            ]
        ),
        Parameter(
            name="category",
            type=BasicType.enum,
            enum_values=["tech", "news", "blog"]
        )
    ]
)

自动校验

# 正确的输入
valid_input = {
    "query": "Python",
    "filters": {"date_from": "2024-01-01T00:00:00Z"},
    "category": "tech"
}
result = search_tool.validate_input(valid_input)  # ✅ 通过

# 错误的输入
invalid_input = {"query": 123}  # query 应该是 string
try:
    search_tool.validate_input(invalid_input)
except ValueError as e:
    print(e)  # ❌ 抛出校验错误

📊 ToolCallRecord 执行记录

record = ToolCallRecord(
    id="call_001",
    tool_name_id="search",
    tool_version_id="v_001",
    input={"query": "Python"},
    result={"items": [...]},
    status="succeeded",
    token_cost=100,
    time_cost=0.5,  # 秒
    pre_edges=["call_000"],   # 前置依赖
    next_edges=["call_002"],  # 后续调用
    agent_caller_id="agent_001"
)

支持图结构(并行/串行工具调用):

call_000 (search)
    ├── call_001 (analyze) ──┐
    └── call_002 (filter)   ─┤
                             ├──> call_003 (summarize)

使用指南

场景 1:创建自定义工具

from turbo_agent_core.schema import Tool, Parameter, BasicType, RunType
from typing import AsyncIterator

class WeatherTool(Tool):
    run_type = RunType.API
    
    def __init__(self):
        super().__init__(
            id="weather_001",
            name="天气查询",
            belong_to_project_path="proj_1",
            name_id="weather",
            input=[
                Parameter(name="city", type=BasicType.string, required=True)
            ],
            output=[
                Parameter(name="temperature", type=BasicType.number),
                Parameter(name="condition", type=BasicType.string)
            ]
        )
    
    async def a_run(self, input: dict, **kwargs) -> dict:
        city = input["city"]
        # 调用天气 API
        return {"temperature": 25.5, "condition": "晴天"}
    
    async def a_stream(self, input: dict, **kwargs) -> AsyncIterator:
        # 流式响应(如果需要)
        yield ContentTextDeltaEvent(...)

场景 2:构建对话历史

from turbo_agent_core.schema import Conversation, Message, MessageRole, Action

# 初始化对话
conv = Conversation(id="conv_123", assistant_id="agent_001")

# 用户提问
user_msg = Message(
    id="msg_001",
    role=MessageRole.user,
    content="今天天气怎么样?"
)
conv.add_child(user_msg)

# 助手响应(带工具调用)
assistant_msg = Message(
    id="msg_002",
    role=MessageRole.assistant,
    reasoning_content="我需要查询天气信息",
    actions=[
        Action(
            name="weather",
            input={"city": "北京"},
            records=[...]
        )
    ],
    content="北京今天晴天,温度 25.5°C"
)
conv.add_child(assistant_msg, parent_id="msg_001")

# 获取完整历史
history = conv.get_history(leaf_message_id="msg_002")
# [user_msg, assistant_msg]

场景 3:处理流式事件

from turbo_agent_core.schema.events import BaseEvent

async def process_events(event_stream: AsyncIterator[BaseEvent]):
    async for event in event_stream:
        match event.type:
            case "content.text.delta":
                # 处理文本增量
                print(event.payload.delta, end="", flush=True)
            
            case "content.action.start":
                # 工具调用开始
                print(f"\n[调用工具: {event.payload.name}]")
            
            case "content.action.result.end":
                # 工具执行完成
                print(f"[结果: {event.payload.result}]")
            
            case "run.lifecycle":
                if event.payload.stage == "completed":
                    print("\n✅ 运行完成")

场景 4:数据持久化适配

from turbo_agent_core.schema import Conversation, Message
from prisma import Prisma

async def save_conversation_to_db(conv: Conversation):
    db = Prisma()
    await db.connect()
    
    # Core DTO -> Prisma Model
    await db.assistantconversation.create(
        data={
            "id": conv.id,
            "assistant_id": conv.assistant_id,
            "title": conv.title,
            "status": conv.status.value,
            # 创建关联的 Message 记录
            "messages": {
                "create": [
                    {
                        "id": msg.id,
                        "role": msg.role.value,
                        "content": msg.content,
                        "reasoning_content": msg.reasoning_content,
                        "ancestors": msg.ancestors,
                        # ...
                    }
                    for msg in conv.messages
                ]
            }
        }
    )

扩展开发

实现自定义 Agent

from turbo_agent_core.schema import BasicAgent, Conversation, Message, RunType

class MyCustomAgent(BasicAgent):
    run_type = RunType.AGENT
    
    async def a_run(
        self, 
        conversation: Conversation,
        leaf_message_id: str = None,
        tools: List[Tool] = None,
        **kwargs
    ) -> Message:
        # 1. 获取历史
        history = conversation.get_history(leaf_message_id)
        
        # 2. 构造 Prompt
        prompt = self._build_prompt(history)
        
        # 3. 调用 LLM
        llm_response = await self.model.a_run(conversation, ...)
        
        # 4. 解析工具调用
        if llm_response.actions:
            for action in llm_response.actions:
                tool = self._find_tool(action.name)
                result = await tool.a_run(action.input)
                # 记录到 ToolCallRecord
        
        # 5. 返回最终消息
        return Message(
            role=MessageRole.assistant,
            content=llm_response.content,
            actions=llm_response.actions
        )

自定义事件类型

from turbo_agent_core.schema.events import BaseEvent
from pydantic import BaseModel
from typing import Literal

class CustomThinkingPayload(BaseModel):
    step: int
    reasoning: str

class CustomThinkingEvent(BaseEvent):
    type: Literal["custom.thinking"] = "custom.thinking"
    payload: CustomThinkingPayload

最佳实践

✅ 推荐做法

  1. 使用类型注解

    def process_tool(tool: Tool) -> JSON:
        result: JSON = tool.run({"query": "test"})
        return result
    
  2. 版本追溯字段完整填写

    tool = Tool(
        version_id=generate_uuid(),
        version_tag="v1.0.0",
        prev_version_id=old_tool.version_id,
        is_default=True
    )
    
  3. 参数校验前置

    # 在执行前校验
    validated_input = tool.validate_input(user_input)
    result = tool.run(validated_input)
    
  4. 使用 Conversation 树形 API

    # 正确:使用 add_child/add_sibling
    conv.add_child(message, parent_id="msg_001")
    
    # 错误:直接操作 messages 列表
    conv.messages.append(message)  # ❌ 不会更新 ancestors/children
    

❌ 避免的做法

  1. 不要多重继承 TurboEntity

    # ❌ 错误
    class BadTool(Tool, BasicAgent):
        pass
    
    # ✅ 正确:使用组合
    class GoodTool(Tool):
        agent_config: BasicAgent = ...
    
  2. 不要在 Core 层硬编码权限逻辑

    # ❌ 错误
    class Tool(TurboEntity):
        def run(self, input, user_id):
            if not self.check_permission(user_id):  # 权限检查属于上层
                raise PermissionError()
    
  3. 不要绕过参数校验

    # ❌ 错误
    result = tool._execute(raw_input)  # 跳过 validate_input
    
    # ✅ 正确
    validated = tool.validate_input(raw_input)
    result = tool.run(validated)
    

与 Prisma Schema 对齐

Core 层专注于运行时数据结构,Prisma 负责持久化。两者通过适配器模式映射:

Core 模型 Prisma 模型 映射方式
Tool Tool + ToolVersion Core 持有当前版本快照,Prisma 维护历史
Agent Agent + AgentVersion 同上
Message Message + MessageAction Core 聚合,Prisma 关联表
Conversation AssistantConversation 直接映射 + 树形关系处理
ToolCallRecord ToolCallRecord 精简字段对齐

适配器示例(由 config 模块实现):

# Core -> Prisma
def to_prisma_tool(core_tool: Tool) -> PrismaToolCreateInput:
    return {
        "id": core_tool.id,
        "name": core_tool.name,
        "version_id": core_tool.version_id,
        # ... 映射其他字段
    }

# Prisma -> Core
def to_core_tool(prisma_tool: PrismaTool) -> Tool:
    return Tool(
        id=prisma_tool.id,
        name=prisma_tool.name,
        # ... 重构为 Core 结构
    )

相关文档


常见问题

Q: 为什么 Tool 不直接继承 Agent?
A: Tool 和 Agent 的 I/O 契约不同(JSON vs Conversation),多重继承会导致字段冲突。采用组合模式(如 LLMTool)更清晰。

Q: Message 的 ancestorschildren 什么时候更新?
A: 调用 Conversation.add_child/add_sibling 时自动维护。不要直接修改 messages 列表。

Q: 如何扩展新的事件类型?
A: 继承 BaseEvent,定义新的 typepayload,并在 EventType 枚举中添加(可选)。

Q: Core 层是否包含数据库操作?
A: 不包含。Core 定义数据结构,具体 CRUD 由 config 模块的 ConfigProvider 实现。


最后更新: 2025-12-12
维护者: Turbo-Agent Team

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

turbo_agent_core-0.1.7.tar.gz (141.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

turbo_agent_core-0.1.7-py3-none-any.whl (111.9 kB view details)

Uploaded Python 3

File details

Details for the file turbo_agent_core-0.1.7.tar.gz.

File metadata

  • Download URL: turbo_agent_core-0.1.7.tar.gz
  • Upload date:
  • Size: 141.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.7

File hashes

Hashes for turbo_agent_core-0.1.7.tar.gz
Algorithm Hash digest
SHA256 8cbad7d5e2fe27ab31db4a52dc505bb57ca1d021f4f9ff9af0314f485d7ba7c4
MD5 5be137ff5f0b4dfdd321e9a687609eb5
BLAKE2b-256 02a28a20756a54c273ec7df877ef9366851fda6d48a2bd4edd3ef6c1d4561dcf

See more details on using hashes here.

File details

Details for the file turbo_agent_core-0.1.7-py3-none-any.whl.

File metadata

File hashes

Hashes for turbo_agent_core-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 1e332329a20ce2030ac595f2c75f3dfee8771cec02207538ba1b7cdf9654806a
MD5 6a3f2265837bca8a13fb49e979f9deb0
BLAKE2b-256 2f684e8faef57ef402b42566b0754ef69df48b53118932be099dbd6352ccb264

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page