SSE thin-layer SDK: LangchainAgentEvent → standard SSE for langchain_agentx_stream_ui
Project description
langchain-agentx-stream-ui-backend
SSE 薄层 SDK:仅 UI 线 — 将 LangchainAgentEvent(经 SDK LangGraphToLangchainAgentEventAdapter 的 out-of-band / artifact 投影)序列化为前端 SSE 协议。
不暴露 LangGraph 原始 dict,也不向浏览器推送 ToolMessage.content(model 路径,仅供 LLM)。
快速接入
from fastapi import FastAPI, Request
from langchain_agentx_stream_ui_backend import stream_to_sse, wrap_langgraph
app = FastAPI()
@app.get("/wiki/build")
async def wiki_build(task_id: str, request: Request):
graph = build_wiki_workflow(task_id)
# adapter_config 可省略:wrap_langgraph 内置 DEFAULT_UI_ADAPTER_CONFIG
events = wrap_langgraph(graph, {"task_id": task_id}, graph_name="WikiBuilder")
return stream_to_sse(events, request=request, session_id=f"wiki-{task_id}")
测试组织(对齐 T10 金字塔)
| 层级 | 位置 | 说明 |
|---|---|---|
| L0 单测 | langchain_agentx_stream_ui_backend/test_*.py、tests/test_*.py |
函数/类级行为 |
| L1 集成 | tests/integration/ |
FastAPI 路由 + SSE / buffer / permission 全链路 |
| 冒烟 | tests/smoke/ |
CI 快速回归(-m smoke,约 7 项) |
原 examples/ 目录已合并为 tests/integration/sse/ 下的集成测试。
cd backend
python -m pip install -e ".[dev]"
# 全部测试(103 项)
pytest -v
# 仅 L0 单测
pytest langchain_agentx_stream_ui_backend/ tests/test_*.py -v
# 仅 L1 集成测试
pytest tests/integration/ -v -m integration
# 冒烟(发布前 / CI 快速门禁)
pytest tests/smoke/ -v -m smoke
集成测试覆盖
| 模块 | 文件 | 覆盖点 |
|---|---|---|
| Wiki / LangGraph | test_wiki_workflow_integration.py |
wrap_langgraph + SSE |
| AgentSession | test_agent_session_integration.py |
wrap_agent_session + SSE |
| EventBuffer | test_buffer_multi_subscriber_integration.py |
多订阅、断连不解 ingest |
| Last-Event-ID | test_last_event_id_integration.py |
SSE id: 续传、400 校验 |
| Middleware | test_middleware_http_integration.py |
HTTP 路由级 middleware |
| Permission raise | test_permission_raise_integration.py |
ingest 挂起 → POST 回写 → 继续 |
| 协议不变量 | test_sse_protocol_integration.py |
meta 字段、bypass/raise 模式 |
| 断连 | test_stream_disconnect_integration.py |
aclose 2 秒内触发 |
公开 API
stream_to_sse(events, *, request, session_id, ...)→StreamingResponse(SseStreamingService门面)stream_to_sse(..., assign_event_ids=True)— 形态 A 单连接内 SSE id 行(默认关闭,向后兼容)stream_to_sse_from_buffer— 自动读取Last-Event-ID请求头并写入 SSEid:行LastEventIdValidator/TaggedAgentEvent— 续传校验与带 id 事件载荷EventBuffer/ingest_stream_to_buffer— 长任务「POST 触发 ingest + GET 订阅」stream_to_sse(..., middlewares=[])— 序列化前可选事件管道(默认空链与 MVP 一致)permission_mode="raise"+create_permission_raise_setup/PermissionWaitRegistry— SDK 权限挂起 →permission-request事件(需 agentx ≥ 0.7.9)EventMiddleware/EventMiddlewarePipeline/apply_middlewares— 脱敏、trace 注入、debug 过滤wrap_langgraph(graph, input_data, ...)→LangGraphEventStream(内置DEFAULT_UI_ADAPTER_CONFIG)wrap_agent_session(session_stream, *, cancel=None)→AgentSessionEventStream;cancel为可选SessionCancelHandlesafe_aclose_events/EventSourceLifecycle— 上游释放(断连 finally 使用)DEFAULT_UI_ADAPTER_CONFIG/merge_ui_adapter_config()— UI 适配器默认(UiAdapterConfigMerger)PROTOCOL_VERSION— 当前为"1"
AgentSession 取消语义
断连或 stream_to_sse 结束时,薄层在 finally 调用 events.aclose()。对 AgentSession 路径:
- 若传入
cancel=SessionCancelHandle,aclose()优先await cancel.cancel() - 否则回退
await session_stream.aclose()
推荐在业务侧传入显式 cancel,以便 SDK 升级时语义稳定:
events = wrap_agent_session(session.astream(question), cancel=session)
return stream_to_sse(events, request=request)
EventMiddleware 示例(tool-result 脱敏)
import dataclasses
from langchain_agentx.observability.events.langchain_agentx_event_adapter import (
LangchainAgentEvent,
LangchainAgentEventType,
)
def redact_tool_result(event: LangchainAgentEvent) -> LangchainAgentEvent | None:
if event.event_type != LangchainAgentEventType.TOOL_RESULT:
return event
data = dict(event.data)
data["content"] = "[REDACTED]"
if isinstance(data.get("display"), dict):
display = dict(data["display"])
display["value"] = "[REDACTED]"
data["display"] = display
return dataclasses.replace(event, data=data)
return stream_to_sse(events, request=request, middlewares=[redact_tool_result])
middleware 返回 None 表示丢弃该事件;抛错则转为 thin_layer error 终态帧。
permission_mode="raise"(V2-D,特性开关)
默认 permission_mode="bypass" 与 MVP 一致。编码类场景可启用 raise,将 SDK PermissionPromptHandler 挂起转为 documented extension 事件 permission-request,等待业务回写后继续。
依赖:langchain-agentx >= 0.7.9(L3 PermissionPromptHandler)。
import asyncio
from fastapi import FastAPI, HTTPException, Request
from langchain_agentx_stream_ui_backend import (
EventBuffer,
PermissionWaitRegistry,
create_permission_raise_setup,
ingest_stream_to_buffer,
stream_to_sse_from_buffer,
wrap_langgraph,
)
app = FastAPI()
_BUFFERS: dict[str, EventBuffer] = {}
_PERMISSIONS: dict[str, PermissionWaitRegistry] = {}
@app.post("/wiki/{task_id}/start")
async def start(task_id: str):
perm = create_permission_raise_setup(session_id=task_id)
_PERMISSIONS[task_id] = perm.registry
buf = EventBuffer()
_BUFFERS[task_id] = buf
graph = build_graph(services={"prompt_handler": perm.handler}) # 注入 agentx graph
events = wrap_langgraph(graph, {"task_id": task_id}, permission_broker=perm.broker)
asyncio.create_task(ingest_stream_to_buffer(perm.wrap_stream(events), buf))
return {"task_id": task_id}
@app.post("/wiki/{task_id}/permission")
async def permission(task_id: str, body: dict):
registry = _PERMISSIONS[task_id]
ok = await registry.resolve(body["request_id"], body)
if not ok:
raise HTTPException(404)
return {"ok": True}
@app.get("/wiki/{task_id}/stream")
async def stream(task_id: str, request: Request):
return stream_to_sse_from_buffer(
_BUFFERS[task_id],
request=request,
permission_mode="raise",
session_id=task_id,
)
回退至 bypass:不传 prompt_handler,stream_to_sse(..., permission_mode="bypass")(默认)。
契约 Fixture
SSOT 路径:backend/tests/fixtures/sse/*.events.json。生成说明见该目录 README.md。
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langchain_agentx_stream_ui_backend-0.1.3.tar.gz.
File metadata
- Download URL: langchain_agentx_stream_ui_backend-0.1.3.tar.gz
- Upload date:
- Size: 39.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
76c52deb1d7e70d4ddc917acd5ddaaa9fcb24029823402edb6418de041872cdf
|
|
| MD5 |
a2109f7acd08de178b291079305973cd
|
|
| BLAKE2b-256 |
d2c6a86ff5a7b73bef21dda07a6799b132fd9187e92972788408a743cc51abd6
|
File details
Details for the file langchain_agentx_stream_ui_backend-0.1.3-py3-none-any.whl.
File metadata
- Download URL: langchain_agentx_stream_ui_backend-0.1.3-py3-none-any.whl
- Upload date:
- Size: 52.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
00f7c72db71652b68a14366964986c9b2592f186ed9d795c0f5667224f9ca6d1
|
|
| MD5 |
5c4a2c8ee63da782e4ba8a0b97fa540c
|
|
| BLAKE2b-256 |
2907e59d6d8b093f6d5f7951676230a68b1c2a7b35f54e4f70f7a0ea9c71c4c0
|