A Python package for AI SDK streaming utilities
Project description
ai-sdk-stream-python
A Python library for building Vercel AI SDK v6 UIMessageStream-compatible streaming backends.
Installation
pip install ai-sdk-stream-python
Inspired by llama-index-workflows — the same idea of a Context object that holds shared state and can write events to a stream, applied to the Vercel AI SDK wire protocol.
Concept
Normally you have to manually yield raw SSE strings and track protocol ordering yourself:
# Before — error-prone, no type safety, no shared state
yield "data: " + json.dumps({"type": "start", "messageId": id}) + "\n\n"
yield "data: " + json.dumps({"type": "start-step"}) + "\n\n"
yield "data: " + json.dumps({"type": "text-start", "id": part_id}) + "\n\n"
for chunk in llm.stream():
yield "data: " + json.dumps({"type": "text-delta", "id": part_id, "delta": chunk}) + "\n\n"
# ... remember to close every part and step ...
yield "data: [DONE]\n\n"
With StreamContext and ctx.run():
# After — typed, lifecycle-safe, state-sharing
ctx = StreamContext()
await ctx.run(lambda ctx: my_work(ctx))
return StreamingResponse(ctx.stream(), media_type="text/event-stream", headers=ctx.response_headers)
async def my_work(ctx):
await ctx.write_text("Hello world!") # auto-emits start/start-step/text-start
# ctx.run() auto-calls finish() and handles errors — no try/finally needed
Key features
| Feature | Detail |
|---|---|
| Typed events | All 16 v6 protocol events as Pydantic models |
| Lifecycle auto-management | start, start-step, text-start etc. are emitted automatically |
| Shared state | ctx.store.get/set() — dot-path key-value store shared across modules |
| Custom information | ctx.info — typed, read-only Pydantic model for request-scoped metadata |
| Pass as parameter | ctx flows through your services like a logger or DB session |
| Stream collection | collect=True records all emitted content into ctx.record for DB persistence |
| Low-level escape hatch | ctx.write(event) / ctx.write_event_to_stream(ev) for raw control |
| Abort support | ctx.abort() terminates the stream safely on errors |
Installation
uv add ai-sdk-stream-python
# or
pip install ai-sdk-stream-python
Quick start (FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from ai_sdk_stream_python import StreamContext
app = FastAPI()
@app.post("/chat")
async def chat():
ctx = StreamContext()
async def _work(ctx: StreamContext) -> None:
await ctx.write_text("Hello ")
await ctx.write_text("world!")
await ctx.run(_work)
return StreamingResponse(
ctx.stream(),
media_type="text/event-stream",
headers=ctx.response_headers,
)
ctx.run() wraps your work function with automatic error handling and stream finalization — no try/finally or manual asyncio.create_task() needed. See ctx.run() below.
StreamContext API
State store
await ctx.store.set("user.name", "Alice") # dot-path write
name = await ctx.store.get("user.name") # → "Alice"
plan = await ctx.store.get("user.plan", default="free") # with default
The store uses an asyncio.Lock — safe to use across concurrent coroutines.
Custom information (ctx.info)
Pass a Pydantic model at construction time to carry static, read-only request-scoped data (e.g. user_id, rate_limit, tenant_id) through every service layer without threading extra arguments:
from pydantic import BaseModel
from ai_sdk_stream_python import StreamContext
class RequestInfo(BaseModel):
user_id: str
rate_limit: int
# Typed constructor — IDE infers ctx.info as RequestInfo
ctx: StreamContext[RequestInfo] = StreamContext(
custom_information=RequestInfo(user_id="u_42", rate_limit=100)
)
# In any service layer that receives ctx:
if ctx.info is not None:
print(ctx.info.user_id) # "u_42"
print(ctx.info.rate_limit) # 100
ctx.infois read-only (no setter). For mutable runtime state usectx.store.- Defaults to
Nonewhencustom_informationis not passed. StreamContextis generic — annotate asStreamContext[YourModel]for full IDE support.
Writing to the stream
# Reasoning (chain-of-thought)
await ctx.write_reasoning("Let me think…")
# Text answer
await ctx.write_text("Here is the answer…")
# Tool calls
handle = await ctx.begin_tool_call("searchDocs", {"query": "hello"})
result = await my_search(query)
await ctx.complete_tool_call(handle.toolCallId, result)
# or on error:
await ctx.fail_tool_call(handle.toolCallId, "timeout")
# Source citations
await ctx.write_source("doc-1", "https://example.com/doc", "My Doc")
# Finish (closes all open parts/steps, emits finish + [DONE])
await ctx.finish(finish_reason="stop")
new_step() — when to use it
new_step() closes any open text/reasoning part and the current step (finish-step), then immediately opens a new step (start-step). Use it when you want an explicit step boundary in the stream — for example in a multi-turn agentic flow:
# Step 1: reasoning
await ctx.write_reasoning("Let me think…")
# Step 2: tool call
await ctx.new_step()
handle = await ctx.begin_tool_call("search", {"q": query})
await ctx.complete_tool_call(handle.toolCallId, results)
# Step 3: final answer
await ctx.new_step()
await ctx.write_text("Based on the results…")
You don't need new_step() for simple responses. The high-level helpers (write_text, write_reasoning, begin_tool_call) already auto-close each other within the same step, and finish() closes everything. Only call new_step() when you want the frontend to see distinct steps — e.g. separate reasoning, tool-use, and answer phases.
Low-level / raw events
from ai_sdk_stream_python import TextDeltaEvent
# Sync push (like LlamaIndex's write_event_to_stream)
ctx.write_event_to_stream(TextDeltaEvent(id=ctx.current_text_id, delta="!"))
# Async push (auto-ensures 'start' was emitted first)
await ctx.write(TextDeltaEvent(id="my-id", delta="raw"))
Stream collection
Pass collect=True to record all emitted content into a StreamRecord for database persistence or audit logging:
ctx = StreamContext(collect=True)
async def _work(ctx: StreamContext) -> None:
await ctx.write_reasoning("Let me think…")
handle = await ctx.begin_tool_call("search", {"q": "hello"})
await ctx.complete_tool_call(handle.toolCallId, {"results": [...]})
await ctx.write_text("Here is the answer.")
await ctx.write_source("s1", "https://example.com", "My Doc")
await ctx.run(_work)
response = StreamingResponse(
ctx.stream(), media_type="text/event-stream", headers=ctx.response_headers
)
# After finish(), ctx.record is fully populated:
record = ctx.record
# record.text → "Here is the answer."
# record.reasoning → "Let me think…"
# record.tool_calls → [ToolCallRecord(tool_name="search", input={...}, output={...})]
# record.sources → [SourceRecord(source_id="s1", url="https://example.com", title="My Doc")]
# record.finish_reason → "stop"
# record.step_count → 1
# Serialize to a plain dict for DB storage:
await db.insert(record.to_dict())
ctx.record is None when collect=False (the default). The record is built incrementally as events are emitted, and is fully available after finish() completes.
ctx.run() — safe task runner
ctx.run() is the recommended way to launch your streaming work function. It provides three safety guarantees that eliminate an entire class of bugs:
| Guarantee | What it does |
|---|---|
| Auto-finish | Calls ctx.finish() in a finally block — the stream is always closed, even if your function returns early |
| Auto-error | Catches unhandled exceptions and emits ctx.error() — the frontend gets a proper error event instead of a silent hang |
| Task GC prevention | Stores the background task on the context — Python's garbage collector cannot silently discard it |
@router.post("/chat")
async def chat(request: ChatRequest) -> StreamingResponse:
ctx = StreamContext()
await ctx.run(lambda ctx: my_service.chat(request, ctx=ctx))
return StreamingResponse(
ctx.stream(),
media_type="text/event-stream",
headers=ctx.response_headers,
)
Without ctx.run(), you'd need to manually manage try/finally, asyncio.create_task(), and a task reference set — all of which are easy to get wrong and produce hard-to-debug hung streams.
Abort
await ctx.abort() # terminates stream immediately without finish event
Properties
ctx.message_id # the message ID in the start event
ctx.current_text_id # ID of open text part, or None
ctx.current_reasoning_id # ID of open reasoning part, or None
ctx.is_finished # True after finish()/abort()
ctx.info # custom_information model, or None
ctx.response_headers # dict with x-vercel-ai-ui-message-stream: v1
Passing ctx across modules
The key pattern — ctx flows as a parameter to any module that needs to write events:
# routes/chat.py
@app.post("/chat")
async def chat(req: ChatRequest):
ctx = StreamContext()
async def _work(ctx: StreamContext) -> None:
# db_service writes reasoning + stores user data in ctx.store
user = await db_service.load_user(req.user_id, ctx=ctx)
# search_service emits tool call events via ctx
await ctx.new_step()
docs = await search_service.search(req.query, ctx=ctx)
# llm_service reads ctx.store + streams text via ctx
await ctx.new_step()
await llm_service.generate(req.query, docs, ctx=ctx)
await ctx.run(_work) # auto-handles finish, errors, and task GC
return StreamingResponse(
ctx.stream(),
media_type="text/event-stream",
headers=ctx.response_headers,
)
# services/db_service.py
async def load_user(user_id: str, *, ctx: StreamContext) -> dict:
await ctx.write_reasoning(f"Loading user {user_id}…")
user = await db.get(user_id)
await ctx.store.set("user.name", user["name"]) # share with downstream
return user
# services/llm_service.py
async def generate(query: str, docs: list, *, ctx: StreamContext) -> None:
name = await ctx.store.get("user.name", default="there") # read from store
async for chunk in llm.stream(query, docs):
await ctx.write_text(chunk)
See example/ for a full runnable example.
Wire protocol
The library targets the Vercel AI SDK v6 UIMessageStream protocol — SSE events with typed JSON payloads:
data: {"type":"start","messageId":"..."}
data: {"type":"start-step"}
data: {"type":"reasoning-start","id":"..."}
data: {"type":"reasoning-delta","id":"...","delta":"thinking…"}
data: {"type":"reasoning-end","id":"..."}
data: {"type":"finish-step"}
data: {"type":"start-step"}
data: {"type":"tool-input-start","toolCallId":"...","toolName":"search"}
data: {"type":"tool-input-available","toolCallId":"...","input":{...}}
data: {"type":"tool-output-available","toolCallId":"...","output":{...}}
data: {"type":"finish-step"}
data: {"type":"start-step"}
data: {"type":"text-start","id":"..."}
data: {"type":"text-delta","id":"...","delta":"Hello "}
data: {"type":"text-end","id":"..."}
data: {"type":"source-url","sourceId":"s1","url":"https://...","title":"Doc"}
data: {"type":"finish-step"}
data: {"type":"finish","finishReason":"stop"}
data: [DONE]
Response header: x-vercel-ai-ui-message-stream: v1
Example app
example/ contains a complete runnable demo:
example/
├── backend/ # FastAPI app — shows ctx passed across 3 service modules
│ ├── main.py
│ ├── routes/chat.py
│ └── services/
│ ├── db_service.py # writes reasoning + stores data in ctx.store
│ ├── llm_service.py # reads ctx.store + streams text
│ └── search_service.py
└── frontend/ # Next.js + AI SDK v6 + ai-elements chat UI
├── app/
│ ├── page.tsx # Conversation/Message/PromptInput from ai-elements
│ └── api/chat/route.ts # proxies useChat → Python backend
└── package.json
Run the backend
cd example/backend
uv pip install fastapi uvicorn
uv pip install -e ../../ # install ai-sdk-stream-python from source
uvicorn main:app --reload --port 8000
Run the frontend
cd example/frontend
npm install
# Install ai-elements components (shadcn/ui registry):
npx ai-elements@latest add conversation message prompt-input
npm run dev
# Open http://localhost:3000
Tests
uv run pytest tests/ -v
56 tests covering: basic lifecycle, reasoning ↔ text transitions, tool calls,
multi-step flows, source events, edge cases (double finish, abort, write after finish),
StateStore integration, stream collection (collect=True), and custom information (ctx.info).
All event types
| Class | type field |
Description |
|---|---|---|
StartEvent |
start |
Message begins |
StartStepEvent |
start-step |
Step begins |
ReasoningStartEvent |
reasoning-start |
Reasoning part opens |
ReasoningDeltaEvent |
reasoning-delta |
Reasoning chunk |
ReasoningEndEvent |
reasoning-end |
Reasoning part closes |
TextStartEvent |
text-start |
Text part opens |
TextDeltaEvent |
text-delta |
Text chunk |
TextEndEvent |
text-end |
Text part closes |
ToolInputStartEvent |
tool-input-start |
Tool call begins |
ToolInputDeltaEvent |
tool-input-delta |
Streaming tool input |
ToolInputAvailableEvent |
tool-input-available |
Full tool input ready |
ToolOutputAvailableEvent |
tool-output-available |
Tool result |
ToolOutputErrorEvent |
tool-output-error |
Tool failure |
SourceUrlEvent |
source-url |
Citation / source |
FinishStepEvent |
finish-step |
Step closes |
FinishEvent |
finish |
Message ends |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ai_sdk_stream_python-0.2.0a6.tar.gz.
File metadata
- Download URL: ai_sdk_stream_python-0.2.0a6.tar.gz
- Upload date:
- Size: 22.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
32f0c3c0074920660ddbc363cf11d2b1f4775c636f1b683d2630a11f392abd32
|
|
| MD5 |
5509744e396ed33dbd29638ce6eeedf1
|
|
| BLAKE2b-256 |
84e27eb880496eb72a30fdf4e56e072fdd4f1b06d230ff045afbb798fe79d3f6
|
Provenance
The following attestation bundles were made for ai_sdk_stream_python-0.2.0a6.tar.gz:
Publisher:
release.yml on shloimy-wiesel/ai-sdk-stream-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ai_sdk_stream_python-0.2.0a6.tar.gz -
Subject digest:
32f0c3c0074920660ddbc363cf11d2b1f4775c636f1b683d2630a11f392abd32 - Sigstore transparency entry: 1159611564
- Sigstore integration time:
-
Permalink:
shloimy-wiesel/ai-sdk-stream-python@12d3e9358b531ae662a4ff0be905e9de166bcc17 -
Branch / Tag:
refs/heads/alpha - Owner: https://github.com/shloimy-wiesel
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@12d3e9358b531ae662a4ff0be905e9de166bcc17 -
Trigger Event:
push
-
Statement type:
File details
Details for the file ai_sdk_stream_python-0.2.0a6-py3-none-any.whl.
File metadata
- Download URL: ai_sdk_stream_python-0.2.0a6-py3-none-any.whl
- Upload date:
- Size: 25.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b3ff03dbe442ac972d2f9f7ffde79ed13e1339b54006b3555a6b782a984a5477
|
|
| MD5 |
c5ee09cb978f653bbe73f7787438125c
|
|
| BLAKE2b-256 |
f5ccbb28e1954bb3f4f69b5bdee6a653b90ffaae9accbbbd4092d0f0eaaa4b6e
|
Provenance
The following attestation bundles were made for ai_sdk_stream_python-0.2.0a6-py3-none-any.whl:
Publisher:
release.yml on shloimy-wiesel/ai-sdk-stream-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ai_sdk_stream_python-0.2.0a6-py3-none-any.whl -
Subject digest:
b3ff03dbe442ac972d2f9f7ffde79ed13e1339b54006b3555a6b782a984a5477 - Sigstore transparency entry: 1159611633
- Sigstore integration time:
-
Permalink:
shloimy-wiesel/ai-sdk-stream-python@12d3e9358b531ae662a4ff0be905e9de166bcc17 -
Branch / Tag:
refs/heads/alpha - Owner: https://github.com/shloimy-wiesel
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@12d3e9358b531ae662a4ff0be905e9de166bcc17 -
Trigger Event:
push
-
Statement type: