Universal parser for LangGraph streaming outputs
Project description
langgraph-stream-parser
Universal parser for LangGraph streaming outputs. Normalizes complex, variable output shapes from graph.stream() and graph.astream() into consistent, typed event objects.
Installation
pip install langgraph-stream-parser
Quick Start
from langgraph_stream_parser import StreamParser
from langgraph_stream_parser.events import ContentEvent, ToolCallStartEvent, InterruptEvent
parser = StreamParser()
for event in parser.parse(graph.stream(input_data, stream_mode="updates")):
match event:
case ContentEvent(content=text):
print(text, end="")
case ToolCallStartEvent(name=name):
print(f"\nCalling {name}...")
case InterruptEvent(action_requests=actions):
# Handle human-in-the-loop
decision = get_user_decision(actions)
# Resume with create_resume_input()
Features
- Typed Events: All stream outputs normalized to dataclass events with full type hints
- Tool Lifecycle Tracking: Automatic tracking of tool calls from start to completion
- Interrupt Handling: Parse and resume from human-in-the-loop interrupts
- Extensible Extractors: Register custom extractors for domain-specific tools
- Async Support: Both sync and async parsing via
parse()andaparse() - Zero Dependencies: LangGraph/LangChain imported dynamically only when needed
- Backward Compatible: Legacy dict-based API available for gradual migration
Event Types
| Event | Description |
|---|---|
ContentEvent |
Text content from AI messages. Includes agent_name when from a deep agent subagent. |
ToolCallStartEvent |
Tool call initiated by AI |
ToolCallEndEvent |
Tool call completed with result |
ToolExtractedEvent |
Special content extracted from tool (e.g., reflections, todos) |
InterruptEvent |
Human-in-the-loop interrupt requiring decision |
StateUpdateEvent |
Non-message state updates (opt-in) |
UsageEvent |
Token usage metadata (input/output/total tokens) |
CustomEvent |
Custom data emitted via get_stream_writer() |
CompleteEvent |
Stream finished successfully |
ErrorEvent |
Error during streaming |
All events (except CompleteEvent and ErrorEvent) carry a namespace field that identifies which subgraph produced the event — None for the parent graph, or a tuple like ("researcher:abc123",) for subgraphs.
All events have a to_dict() method for JSON serialization. Use event_to_dict(event) for a convenient conversion function.
Usage Examples
Basic Parsing
from langgraph_stream_parser import StreamParser
parser = StreamParser()
for event in parser.parse(graph.stream({"messages": [...]}, stream_mode="updates")):
print(event)
Pattern Matching (Python 3.10+)
from langgraph_stream_parser import StreamParser
from langgraph_stream_parser.events import *
parser = StreamParser()
for event in parser.parse(stream):
match event:
case ContentEvent(content=text, node=node):
print(f"[{node}] {text}", end="")
case ToolCallStartEvent(name=name, args=args):
print(f"\n⏳ Calling {name}...")
case ToolCallEndEvent(name=name, status="success"):
print(f"✅ {name} completed")
case ToolCallEndEvent(name=name, status="error", error_message=err):
print(f"❌ {name} failed: {err}")
case InterruptEvent() as interrupt:
if interrupt.needs_approval:
handle_approval(interrupt.action_requests)
case CompleteEvent():
print("\n✓ Done")
case ErrorEvent(error=err):
print(f"⚠️ Error: {err}")
Handling Interrupts
from langgraph_stream_parser import StreamParser
from langgraph_stream_parser.events import InterruptEvent
parser = StreamParser()
config = {"configurable": {"thread_id": "my-thread"}}
for event in parser.parse(graph.stream(input_data, config=config)):
if isinstance(event, InterruptEvent):
# Show user the pending actions
for action in event.action_requests:
print(f"Tool: {action['tool']}")
print(f"Args: {action['args']}")
# Check allowed decisions
print(f"Allowed: {event.allowed_decisions}")
# Get user decision and resume
decision = "approve" if input("Approve? (y/n): ") == "y" else "reject"
resume_input = event.create_resume(decision)
for resume_event in parser.parse(graph.stream(resume_input, config=config)):
handle_event(resume_event)
break
Custom Tool Extractors
from langgraph_stream_parser import StreamParser, ToolExtractor
from langgraph_stream_parser.events import ToolExtractedEvent
class CanvasExtractor:
tool_name = "add_to_canvas"
extracted_type = "canvas_item"
def extract(self, content):
if isinstance(content, dict):
return content
return {"type": "text", "data": str(content)}
parser = StreamParser()
parser.register_extractor(CanvasExtractor())
for event in parser.parse(stream):
if isinstance(event, ToolExtractedEvent) and event.extracted_type == "canvas_item":
add_to_canvas_ui(event.data)
Async Support
from langgraph_stream_parser import StreamParser
parser = StreamParser()
async def stream_agent():
async for event in parser.aparse(graph.astream(input_data)):
handle_event(event)
Dual Stream Mode (Token-Level Streaming)
For real-time token streaming alongside full tool lifecycle, use dual mode:
parser = StreamParser(stream_mode=["updates", "messages"])
stream = graph.stream(
input_data, config=config,
stream_mode=["updates", "messages"],
)
for event in parser.parse(stream):
match event:
case ContentEvent(content=text):
# Token-by-token from "messages" mode
print(text, end="", flush=True)
case ToolCallStartEvent(name=name):
# Complete tool calls from "updates" mode
print(f"\nCalling {name}...")
The parser automatically deduplicates: ContentEvent comes from "messages" (token-level), while tool calls, interrupts, and state updates come from "updates".
You can also use stream_mode="auto" to auto-detect the format from the first chunk.
Subgraph & Deep Agent Support
When streaming with subgraphs=True, events carry a namespace identifying which subgraph produced them:
parser = StreamParser(stream_mode=["updates", "messages"])
stream = graph.stream(
input_data, config=config,
stream_mode=["updates", "messages"],
subgraphs=True,
)
for event in parser.parse(stream):
if isinstance(event, ContentEvent):
if event.namespace:
print(f"[subagent] {event.content}", end="")
else:
print(event.content, end="")
For LangChain deep agents, ContentEvent.agent_name is extracted from lc_agent_name metadata:
case ContentEvent(content=text, agent_name=name):
label = f"[{name}] " if name else ""
print(f"{label}{text}", end="")
Custom Stream Mode
Handle custom data from get_stream_writer():
parser = StreamParser(stream_mode=["updates", "messages", "custom"])
for event in parser.parse(stream):
match event:
case CustomEvent(data=data):
print(f"Progress: {data}")
Configuration Options
parser = StreamParser(
# Stream format to expect (default: "updates")
stream_mode="updates", # or "messages", "custom", "auto", or a list
# Track tool call lifecycle (start -> end)
track_tool_lifecycle=True,
# Skip these tools entirely (no events emitted)
skip_tools=["internal_tool"],
# Include StateUpdateEvent for non-message state keys
include_state_updates=False,
)
Legacy Dict-Based API
For backward compatibility or simpler use cases:
from langgraph_stream_parser import stream_graph_updates, resume_graph_from_interrupt
for update in stream_graph_updates(agent, input_data, config=config):
if update.get("status") == "interrupt":
interrupt = update["interrupt"]
# Handle interrupt...
elif "chunk" in update:
print(update["chunk"], end="")
elif "tool_calls" in update:
print(f"Calling tools: {update['tool_calls']}")
elif update.get("status") == "complete":
break
# Resume from interrupt
for update in resume_graph_from_interrupt(agent, decisions=[{"type": "approve"}], config=config):
handle_update(update)
Display Adapters
Pre-built adapters for rendering stream events in different environments:
CLIAdapter - Styled Terminal Output
from langgraph_stream_parser.adapters import CLIAdapter
adapter = CLIAdapter()
adapter.run(
graph=agent,
input_data={"messages": [("user", "Hello")]},
config={"configurable": {"thread_id": "my-thread"}}
)
Features:
- ANSI color formatting
- Spinner animation during tool execution
- Interactive arrow-key interrupt handling
PrintAdapter - Plain Text Output
from langgraph_stream_parser.adapters import PrintAdapter
adapter = PrintAdapter()
adapter.run(graph=agent, input_data=input_data, config=config)
Universal output that works in any Python environment without dependencies.
FastAPIAdapter - WebSocket / SSE Streaming
Stream events to a web client over WebSocket or Server-Sent Events. The adapter is stateless — conversation state lives in LangGraph's checkpointer, keyed by session_id (used as thread_id).
from fastapi import FastAPI, WebSocket
from langgraph_stream_parser.adapters import FastAPIAdapter
app = FastAPI()
adapter = FastAPIAdapter(graph=agent) # agent must be compiled with a checkpointer
@app.websocket("/chat/{session_id}")
async def chat(ws: WebSocket, session_id: str):
await adapter.handle_websocket(ws, session_id)
Reconnecting with the same session_id resumes the conversation — LangGraph's checkpointer rehydrates history automatically.
WebSocket message protocol (client ↔ server)
Client → Server:
{"type": "message", "content": "Hello"}
{"type": "decision", "decisions": [{"type": "approve"}]}
{"type": "decision", "decisions": [{"type": "edit", "args": {...}}]}
{"type": "cancel"}
Server → Client: every event's to_dict() output (e.g. {"type": "content", ...}, {"type": "tool_start", ...}, {"type": "interrupt", ...}, {"type": "complete"}), plus protocol-level messages:
{"type": "ack", "ref": "message|decision|cancel"}
{"type": "error", "error": "..."}
Server-Sent Events
from fastapi.responses import StreamingResponse
from langgraph_stream_parser import prepare_agent_input
@app.post("/chat/{session_id}")
async def chat(session_id: str, body: dict):
input_data = prepare_agent_input(message=body["message"])
return StreamingResponse(
adapter.sse_stream(session_id, input_data),
media_type="text/event-stream",
)
@app.post("/chat/{session_id}/resume")
async def resume(session_id: str, body: dict):
return StreamingResponse(
adapter.resume(session_id, body["decisions"]),
media_type="text/event-stream",
)
Requires: pip install langgraph-stream-parser[fastapi]
JupyterDisplay - Rich Notebook Display
from langgraph_stream_parser.adapters.jupyter import JupyterDisplay
display = JupyterDisplay()
display.run(graph=agent, input_data=input_data, config=config)
Requires: pip install langgraph-stream-parser[jupyter]
Adapter Options
All adapters support:
adapter = CLIAdapter(
show_tool_args=True, # Show tool arguments
max_content_preview=200, # Max chars for extracted content
reflection_types={"thinking"}, # Custom reflection type names
todo_types={"tasks"}, # Custom todo type names
)
Custom Adapters
Extend BaseAdapter for custom rendering:
from langgraph_stream_parser.adapters import BaseAdapter
class MyAdapter(BaseAdapter):
def render(self):
# Implement your rendering logic
pass
def prompt_interrupt(self, event):
# Handle interrupt prompts
return [{"type": "approve"}]
Built-in Extractors
The package includes extractors for common LangGraph tools:
- ThinkToolExtractor: Extracts reflections from
think_tool - TodoExtractor: Extracts todo lists from
write_todos
Examples
FastAPI WebSocket Streaming
See examples/fastapi_websocket.py for a complete example of streaming LangGraph events to a web client via WebSockets.
# Install dependencies
pip install fastapi uvicorn websockets
# Run the example
uvicorn examples.fastapi_websocket:app --reload
# Open http://localhost:8000 in your browser
Development
# Install with dev dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Run tests with coverage
pytest --cov=langgraph_stream_parser
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langgraph_stream_parser-0.1.7.tar.gz.
File metadata
- Download URL: langgraph_stream_parser-0.1.7.tar.gz
- Upload date:
- Size: 170.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb0cdf5286068590f32fb58572664cc1de0631457ad169ef0626baf2fc51348f
|
|
| MD5 |
f12dcb0fee05dc5e89097fba10400068
|
|
| BLAKE2b-256 |
2928f0e3097216d00096f9245c3d763ca60023b94d4072dabd5277ff76265aed
|
File details
Details for the file langgraph_stream_parser-0.1.7-py3-none-any.whl.
File metadata
- Download URL: langgraph_stream_parser-0.1.7-py3-none-any.whl
- Upload date:
- Size: 49.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3bdbfef1cb176e58afe36ab83f987df0b7524a5fd0ab3723ed3cd597d1739d36
|
|
| MD5 |
a7680d65a68cbaed759adc53e7df5852
|
|
| BLAKE2b-256 |
fd5c353890413f247aec8863f04466ec637369a2d3fd29f9036041b4e7388545
|