Skip to main content

SSE thin-layer SDK: LangchainAgentEvent → standard SSE for langchain_agentx_stream_ui

Project description

langchain-agentx-stream-ui-backend

SSE 薄层 SDK:仅 UI 线 — 将 LangchainAgentEvent(经 SDK LangGraphToLangchainAgentEventAdapter 的 out-of-band / artifact 投影)序列化为前端 SSE 协议。

不暴露 LangGraph 原始 dict,也不向浏览器推送 ToolMessage.content(model 路径,仅供 LLM)。

快速接入

from fastapi import FastAPI, Request
from langchain_agentx_stream_ui_backend import stream_to_sse, wrap_langgraph

app = FastAPI()

@app.get("/wiki/build")
async def wiki_build(task_id: str, request: Request):
    graph = build_wiki_workflow(task_id)
    # adapter_config 可省略:wrap_langgraph 内置 DEFAULT_UI_ADAPTER_CONFIG
    events = wrap_langgraph(graph, {"task_id": task_id}, graph_name="WikiBuilder")
    return stream_to_sse(events, request=request, session_id=f"wiki-{task_id}")

测试组织(对齐 T10 金字塔)

层级 位置 说明
L0 单测 langchain_agentx_stream_ui_backend/test_*.pytests/test_*.py 函数/类级行为
L1 集成 tests/integration/ FastAPI 路由 + SSE / buffer / permission 全链路
冒烟 tests/smoke/ CI 快速回归(-m smoke,约 7 项)

examples/ 目录已合并为 tests/integration/sse/ 下的集成测试。

cd backend
python -m pip install -e ".[dev]"

# 全部测试(103 项)
pytest -v

# 仅 L0 单测
pytest langchain_agentx_stream_ui_backend/ tests/test_*.py -v

# 仅 L1 集成测试
pytest tests/integration/ -v -m integration

# 冒烟(发布前 / CI 快速门禁)
pytest tests/smoke/ -v -m smoke

集成测试覆盖

模块 文件 覆盖点
Wiki / LangGraph test_wiki_workflow_integration.py wrap_langgraph + SSE
AgentSession test_agent_session_integration.py wrap_agent_session + SSE
EventBuffer test_buffer_multi_subscriber_integration.py 多订阅、断连不解 ingest
Last-Event-ID test_last_event_id_integration.py SSE id: 续传、400 校验
Middleware test_middleware_http_integration.py HTTP 路由级 middleware
Permission raise test_permission_raise_integration.py ingest 挂起 → POST 回写 → 继续
协议不变量 test_sse_protocol_integration.py meta 字段、bypass/raise 模式
断连 test_stream_disconnect_integration.py aclose 2 秒内触发

公开 API

  • stream_to_sse(events, *, request, session_id, ...)StreamingResponseSseStreamingService 门面)
  • stream_to_sse(..., assign_event_ids=True) — 形态 A 单连接内 SSE id 行(默认关闭,向后兼容)
  • stream_to_sse_from_buffer — 自动读取 Last-Event-ID 请求头并写入 SSE id:
  • LastEventIdValidator / TaggedAgentEvent — 续传校验与带 id 事件载荷
  • EventBuffer / ingest_stream_to_buffer — 长任务「POST 触发 ingest + GET 订阅」
  • stream_to_sse(..., middlewares=[]) — 序列化前可选事件管道(默认空链与 MVP 一致)
  • permission_mode="raise" + create_permission_raise_setup / PermissionWaitRegistry — SDK 权限挂起 → permission-request 事件(需 agentx ≥ 0.7.9)
  • EventMiddleware / EventMiddlewarePipeline / apply_middlewares — 脱敏、trace 注入、debug 过滤
  • wrap_langgraph(graph, input_data, ...)LangGraphEventStream(内置 DEFAULT_UI_ADAPTER_CONFIG
  • wrap_agent_session(session_stream, *, cancel=None)AgentSessionEventStreamcancel 为可选 SessionCancelHandle
  • safe_aclose_events / EventSourceLifecycle — 上游释放(断连 finally 使用)
  • DEFAULT_UI_ADAPTER_CONFIG / merge_ui_adapter_config() — UI 适配器默认(UiAdapterConfigMerger
  • PROTOCOL_VERSION — 当前为 "1"

AgentSession 取消语义

断连或 stream_to_sse 结束时,薄层在 finally 调用 events.aclose()。对 AgentSession 路径:

  1. 若传入 cancel=SessionCancelHandleaclose() 优先 await cancel.cancel()
  2. 否则回退 await session_stream.aclose()

推荐在业务侧传入显式 cancel,以便 SDK 升级时语义稳定:

events = wrap_agent_session(session.astream(question), cancel=session)
return stream_to_sse(events, request=request)

EventMiddleware 示例(tool-result 脱敏)

import dataclasses
from langchain_agentx.observability.events.langchain_agentx_event_adapter import (
    LangchainAgentEvent,
    LangchainAgentEventType,
)

def redact_tool_result(event: LangchainAgentEvent) -> LangchainAgentEvent | None:
    if event.event_type != LangchainAgentEventType.TOOL_RESULT:
        return event
    data = dict(event.data)
    data["content"] = "[REDACTED]"
    if isinstance(data.get("display"), dict):
        display = dict(data["display"])
        display["value"] = "[REDACTED]"
        data["display"] = display
    return dataclasses.replace(event, data=data)

return stream_to_sse(events, request=request, middlewares=[redact_tool_result])

middleware 返回 None 表示丢弃该事件;抛错则转为 thin_layer error 终态帧。

permission_mode="raise"(V2-D,特性开关)

默认 permission_mode="bypass" 与 MVP 一致。编码类场景可启用 raise,将 SDK PermissionPromptHandler 挂起转为 documented extension 事件 permission-request,等待业务回写后继续。

依赖langchain-agentx >= 0.7.9(L3 PermissionPromptHandler)。

import asyncio

from fastapi import FastAPI, HTTPException, Request
from langchain_agentx_stream_ui_backend import (
    EventBuffer,
    PermissionWaitRegistry,
    create_permission_raise_setup,
    ingest_stream_to_buffer,
    stream_to_sse_from_buffer,
    wrap_langgraph,
)

app = FastAPI()
_BUFFERS: dict[str, EventBuffer] = {}
_PERMISSIONS: dict[str, PermissionWaitRegistry] = {}

@app.post("/wiki/{task_id}/start")
async def start(task_id: str):
    perm = create_permission_raise_setup(session_id=task_id)
    _PERMISSIONS[task_id] = perm.registry
    buf = EventBuffer()
    _BUFFERS[task_id] = buf
    graph = build_graph(services={"prompt_handler": perm.handler})  # 注入 agentx graph
    events = wrap_langgraph(graph, {"task_id": task_id}, permission_broker=perm.broker)
    asyncio.create_task(ingest_stream_to_buffer(perm.wrap_stream(events), buf))
    return {"task_id": task_id}

@app.post("/wiki/{task_id}/permission")
async def permission(task_id: str, body: dict):
    registry = _PERMISSIONS[task_id]
    ok = await registry.resolve(body["request_id"], body)
    if not ok:
        raise HTTPException(404)
    return {"ok": True}

@app.get("/wiki/{task_id}/stream")
async def stream(task_id: str, request: Request):
    return stream_to_sse_from_buffer(
        _BUFFERS[task_id],
        request=request,
        permission_mode="raise",
        session_id=task_id,
    )

回退至 bypass:不传 prompt_handlerstream_to_sse(..., permission_mode="bypass")(默认)。

契约 Fixture

SSOT 路径:backend/tests/fixtures/sse/*.events.json。生成说明见该目录 README.md

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langchain_agentx_stream_ui_backend-0.1.3.tar.gz (39.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file langchain_agentx_stream_ui_backend-0.1.3.tar.gz.

File metadata

File hashes

Hashes for langchain_agentx_stream_ui_backend-0.1.3.tar.gz
Algorithm Hash digest
SHA256 76c52deb1d7e70d4ddc917acd5ddaaa9fcb24029823402edb6418de041872cdf
MD5 a2109f7acd08de178b291079305973cd
BLAKE2b-256 d2c6a86ff5a7b73bef21dda07a6799b132fd9187e92972788408a743cc51abd6

See more details on using hashes here.

File details

Details for the file langchain_agentx_stream_ui_backend-0.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for langchain_agentx_stream_ui_backend-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 00f7c72db71652b68a14366964986c9b2592f186ed9d795c0f5667224f9ca6d1
MD5 5c4a2c8ee63da782e4ba8a0b97fa540c
BLAKE2b-256 2907e59d6d8b093f6d5f7951676230a68b1c2a7b35f54e4f70f7a0ea9c71c4c0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page