Skip to main content

SSE thin-layer SDK: LangchainAgentEvent → standard SSE for langchain_agentx_stream_ui

Project description

langchain-agentx-stream-ui-backend

SSE 薄层 SDK:仅 UI 线 — 将 LangchainAgentEvent(经 SDK LangGraphToLangchainAgentEventAdapter 的 out-of-band / artifact 投影)序列化为前端 SSE 协议。

不暴露 LangGraph 原始 dict,也不向浏览器推送 ToolMessage.content(model 路径,仅供 LLM)。

快速接入

from fastapi import FastAPI, Request
from langchain_agentx_stream_ui_backend import stream_to_sse, wrap_langgraph

app = FastAPI()

@app.get("/wiki/build")
async def wiki_build(task_id: str, request: Request):
    graph = build_wiki_workflow(task_id)
    # adapter_config 可省略:wrap_langgraph 内置 DEFAULT_UI_ADAPTER_CONFIG
    events = wrap_langgraph(graph, {"task_id": task_id}, graph_name="WikiBuilder")
    return stream_to_sse(events, request=request, session_id=f"wiki-{task_id}")

测试组织(对齐 T10 金字塔)

层级 位置 说明
L0 单测 langchain_agentx_stream_ui_backend/test_*.pytests/test_*.py 函数/类级行为
L1 集成 tests/integration/ FastAPI 路由 + SSE / buffer / permission 全链路
冒烟 tests/smoke/ CI 快速回归(-m smoke,约 7 项)

examples/ 目录已合并为 tests/integration/sse/ 下的集成测试。

cd backend
python -m pip install -e ".[dev]"

# 全部测试(103 项)
pytest -v

# 仅 L0 单测
pytest langchain_agentx_stream_ui_backend/ tests/test_*.py -v

# 仅 L1 集成测试
pytest tests/integration/ -v -m integration

# 冒烟(发布前 / CI 快速门禁)
pytest tests/smoke/ -v -m smoke

集成测试覆盖

模块 文件 覆盖点
Wiki / LangGraph test_wiki_workflow_integration.py wrap_langgraph + SSE
AgentSession test_agent_session_integration.py wrap_agent_session + SSE
EventBuffer test_buffer_multi_subscriber_integration.py 多订阅、断连不解 ingest
Last-Event-ID test_last_event_id_integration.py SSE id: 续传、400 校验
Middleware test_middleware_http_integration.py HTTP 路由级 middleware
Permission raise test_permission_raise_integration.py ingest 挂起 → POST 回写 → 继续
协议不变量 test_sse_protocol_integration.py meta 字段、bypass/raise 模式
断连 test_stream_disconnect_integration.py aclose 2 秒内触发

公开 API

  • stream_to_sse(events, *, request, session_id, ...)StreamingResponseSseStreamingService 门面)
  • stream_to_sse(..., assign_event_ids=True) — 形态 A 单连接内 SSE id 行(默认关闭,向后兼容)
  • stream_to_sse_from_buffer — 自动读取 Last-Event-ID 请求头并写入 SSE id:
  • LastEventIdValidator / TaggedAgentEvent — 续传校验与带 id 事件载荷
  • EventBuffer / ingest_stream_to_buffer — 长任务「POST 触发 ingest + GET 订阅」
  • stream_to_sse(..., middlewares=[]) — 序列化前可选事件管道(默认空链与 MVP 一致)
  • permission_mode="raise" + create_permission_raise_setup / PermissionWaitRegistry — SDK 权限挂起 → permission-request 事件(需 agentx ≥ 0.7.9)
  • EventMiddleware / EventMiddlewarePipeline / apply_middlewares — 脱敏、trace 注入、debug 过滤
  • wrap_langgraph(graph, input_data, ...)LangGraphEventStream(内置 DEFAULT_UI_ADAPTER_CONFIG
  • wrap_agent_session(session_stream, *, cancel=None)AgentSessionEventStreamcancel 为可选 SessionCancelHandle
  • safe_aclose_events / EventSourceLifecycle — 上游释放(断连 finally 使用)
  • DEFAULT_UI_ADAPTER_CONFIG / merge_ui_adapter_config() — UI 适配器默认(UiAdapterConfigMerger
  • PROTOCOL_VERSION — 当前为 "1"

AgentSession 取消语义

断连或 stream_to_sse 结束时,薄层在 finally 调用 events.aclose()。对 AgentSession 路径:

  1. 若传入 cancel=SessionCancelHandleaclose() 优先 await cancel.cancel()
  2. 否则回退 await session_stream.aclose()

推荐在业务侧传入显式 cancel,以便 SDK 升级时语义稳定:

events = wrap_agent_session(session.astream(question), cancel=session)
return stream_to_sse(events, request=request)

EventMiddleware 示例(tool-result 脱敏)

import dataclasses
from langchain_agentx.observability.events.langchain_agentx_event_adapter import (
    LangchainAgentEvent,
    LangchainAgentEventType,
)

def redact_tool_result(event: LangchainAgentEvent) -> LangchainAgentEvent | None:
    if event.event_type != LangchainAgentEventType.TOOL_RESULT:
        return event
    data = dict(event.data)
    data["content"] = "[REDACTED]"
    if isinstance(data.get("display"), dict):
        display = dict(data["display"])
        display["value"] = "[REDACTED]"
        data["display"] = display
    return dataclasses.replace(event, data=data)

return stream_to_sse(events, request=request, middlewares=[redact_tool_result])

middleware 返回 None 表示丢弃该事件;抛错则转为 thin_layer error 终态帧。

permission_mode="raise"(V2-D,特性开关)

默认 permission_mode="bypass" 与 MVP 一致。编码类场景可启用 raise,将 SDK PermissionPromptHandler 挂起转为 documented extension 事件 permission-request,等待业务回写后继续。

依赖langchain-agentx >= 0.7.9(L3 PermissionPromptHandler)。

import asyncio

from fastapi import FastAPI, HTTPException, Request
from langchain_agentx_stream_ui_backend import (
    EventBuffer,
    PermissionWaitRegistry,
    create_permission_raise_setup,
    ingest_stream_to_buffer,
    stream_to_sse_from_buffer,
    wrap_langgraph,
)

app = FastAPI()
_BUFFERS: dict[str, EventBuffer] = {}
_PERMISSIONS: dict[str, PermissionWaitRegistry] = {}

@app.post("/wiki/{task_id}/start")
async def start(task_id: str):
    perm = create_permission_raise_setup(session_id=task_id)
    _PERMISSIONS[task_id] = perm.registry
    buf = EventBuffer()
    _BUFFERS[task_id] = buf
    graph = build_graph(services={"prompt_handler": perm.handler})  # 注入 agentx graph
    events = wrap_langgraph(graph, {"task_id": task_id}, permission_broker=perm.broker)
    asyncio.create_task(ingest_stream_to_buffer(perm.wrap_stream(events), buf))
    return {"task_id": task_id}

@app.post("/wiki/{task_id}/permission")
async def permission(task_id: str, body: dict):
    registry = _PERMISSIONS[task_id]
    ok = await registry.resolve(body["request_id"], body)
    if not ok:
        raise HTTPException(404)
    return {"ok": True}

@app.get("/wiki/{task_id}/stream")
async def stream(task_id: str, request: Request):
    return stream_to_sse_from_buffer(
        _BUFFERS[task_id],
        request=request,
        permission_mode="raise",
        session_id=task_id,
    )

回退至 bypass:不传 prompt_handlerstream_to_sse(..., permission_mode="bypass")(默认)。

契约 Fixture

SSOT 路径:backend/tests/fixtures/sse/*.events.json。生成说明见该目录 README.md

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langchain_agentx_stream_ui_backend-0.1.4.tar.gz (39.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file langchain_agentx_stream_ui_backend-0.1.4.tar.gz.

File metadata

File hashes

Hashes for langchain_agentx_stream_ui_backend-0.1.4.tar.gz
Algorithm Hash digest
SHA256 1a5bcaad339dd0c9fbb0adcacc6543516c6bbba570a142d36312f596eebebcc5
MD5 afd763a1afaf400511e83cc3cf944618
BLAKE2b-256 1256dbc50ad064eb3cda9cf8aff95573065fd04f71f4c7e419e5117328fa06c2

See more details on using hashes here.

File details

Details for the file langchain_agentx_stream_ui_backend-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for langchain_agentx_stream_ui_backend-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 f619df1f1c14b88532ac6dfb6f9b4798acd7422df06145cdae71428f7d30afb2
MD5 04f34d3dea6654f3033acebff2f58c8f
BLAKE2b-256 1e3ea664722d32e0312099ea19d23c9afc68ee84d6d7c715f0964e8a86664dab

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page