Skip to main content

A Python package for AI SDK streaming utilities

Project description

ai-sdk-stream-python

PyPI version Python License

A Python library for building Vercel AI SDK v6 UIMessageStream-compatible streaming backends.

Installation

pip install ai-sdk-stream-python

Inspired by llama-index-workflows — the same idea of a Context object that holds shared state and can write events to a stream, applied to the Vercel AI SDK wire protocol.


Concept

Normally you have to manually yield raw SSE strings and track protocol ordering yourself:

# Before — error-prone, no type safety, no shared state
yield "data: " + json.dumps({"type": "start", "messageId": id}) + "\n\n"
yield "data: " + json.dumps({"type": "start-step"}) + "\n\n"
yield "data: " + json.dumps({"type": "text-start", "id": part_id}) + "\n\n"
for chunk in llm.stream():
    yield "data: " + json.dumps({"type": "text-delta", "id": part_id, "delta": chunk}) + "\n\n"
# ... remember to close every part and step ...
yield "data: [DONE]\n\n"

With StreamContext and ctx.run():

# After — typed, lifecycle-safe, state-sharing
ctx = StreamContext()
await ctx.run(lambda ctx: my_work(ctx))
return StreamingResponse(ctx.stream(), media_type="text/event-stream", headers=ctx.response_headers)

async def my_work(ctx):
    await ctx.write_text("Hello world!")   # auto-emits start/start-step/text-start
    # ctx.run() auto-calls finish() and handles errors — no try/finally needed

Key features

Feature Detail
Typed events All 16 v6 protocol events as Pydantic models
Lifecycle auto-management start, start-step, text-start etc. are emitted automatically
Shared state ctx.store.get/set() — dot-path key-value store shared across modules
Custom information ctx.info — typed, read-only Pydantic model for request-scoped metadata
Pass as parameter ctx flows through your services like a logger or DB session
Stream collection collect=True records all emitted content into ctx.record for DB persistence
Low-level escape hatch ctx.write(event) / ctx.write_event_to_stream(ev) for raw control
Abort support ctx.abort() terminates the stream safely on errors

Installation

uv add ai-sdk-stream-python
# or
pip install ai-sdk-stream-python

Quick start (FastAPI)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from ai_sdk_stream_python import StreamContext

app = FastAPI()

@app.post("/chat")
async def chat():
    ctx = StreamContext()

    async def _work(ctx: StreamContext) -> None:
        await ctx.write_text("Hello ")
        await ctx.write_text("world!")

    await ctx.run(_work)
    return StreamingResponse(
        ctx.stream(),
        media_type="text/event-stream",
        headers=ctx.response_headers,
    )

ctx.run() wraps your work function with automatic error handling and stream finalization — no try/finally or manual asyncio.create_task() needed. See ctx.run() below.


StreamContext API

State store

await ctx.store.set("user.name", "Alice")        # dot-path write
name = await ctx.store.get("user.name")          # → "Alice"
plan = await ctx.store.get("user.plan", default="free")  # with default

The store uses an asyncio.Lock — safe to use across concurrent coroutines.

Custom information (ctx.info)

Pass a Pydantic model at construction time to carry static, read-only request-scoped data (e.g. user_id, rate_limit, tenant_id) through every service layer without threading extra arguments:

from pydantic import BaseModel
from ai_sdk_stream_python import StreamContext

class RequestInfo(BaseModel):
    user_id: str
    rate_limit: int

# Typed constructor — IDE infers ctx.info as RequestInfo
ctx: StreamContext[RequestInfo] = StreamContext(
    custom_information=RequestInfo(user_id="u_42", rate_limit=100)
)

# In any service layer that receives ctx:
if ctx.info is not None:
    print(ctx.info.user_id)    # "u_42"
    print(ctx.info.rate_limit) # 100
  • ctx.info is read-only (no setter). For mutable runtime state use ctx.store.
  • Defaults to None when custom_information is not passed.
  • StreamContext is generic — annotate as StreamContext[YourModel] for full IDE support.

Writing to the stream

# Reasoning (chain-of-thought)
await ctx.write_reasoning("Let me think…")

# Text answer
await ctx.write_text("Here is the answer…")

# Tool calls
handle = await ctx.begin_tool_call("searchDocs", {"query": "hello"})
result = await my_search(query)
await ctx.complete_tool_call(handle.toolCallId, result)
# or on error:
await ctx.fail_tool_call(handle.toolCallId, "timeout")

# Source citations
await ctx.write_source("doc-1", "https://example.com/doc", "My Doc")

# Finish (closes all open parts/steps, emits finish + [DONE])
await ctx.finish(finish_reason="stop")

new_step() — when to use it

new_step() closes any open text/reasoning part and the current step (finish-step), then immediately opens a new step (start-step). Use it when you want an explicit step boundary in the stream — for example in a multi-turn agentic flow:

# Step 1: reasoning
await ctx.write_reasoning("Let me think…")

# Step 2: tool call
await ctx.new_step()
handle = await ctx.begin_tool_call("search", {"q": query})
await ctx.complete_tool_call(handle.toolCallId, results)

# Step 3: final answer
await ctx.new_step()
await ctx.write_text("Based on the results…")

You don't need new_step() for simple responses. The high-level helpers (write_text, write_reasoning, begin_tool_call) already auto-close each other within the same step, and finish() closes everything. Only call new_step() when you want the frontend to see distinct steps — e.g. separate reasoning, tool-use, and answer phases.

Low-level / raw events

from ai_sdk_stream_python import TextDeltaEvent

# Sync push (like LlamaIndex's write_event_to_stream)
ctx.write_event_to_stream(TextDeltaEvent(id=ctx.current_text_id, delta="!"))

# Async push (auto-ensures 'start' was emitted first)
await ctx.write(TextDeltaEvent(id="my-id", delta="raw"))

Stream collection

Pass collect=True to record all emitted content into a StreamRecord for database persistence or audit logging:

ctx = StreamContext(collect=True)

async def _work(ctx: StreamContext) -> None:
    await ctx.write_reasoning("Let me think…")
    handle = await ctx.begin_tool_call("search", {"q": "hello"})
    await ctx.complete_tool_call(handle.toolCallId, {"results": [...]})
    await ctx.write_text("Here is the answer.")
    await ctx.write_source("s1", "https://example.com", "My Doc")

await ctx.run(_work)
response = StreamingResponse(
    ctx.stream(), media_type="text/event-stream", headers=ctx.response_headers
)

# After finish(), ctx.record is fully populated:
record = ctx.record
# record.text        → "Here is the answer."
# record.reasoning   → "Let me think…"
# record.tool_calls  → [ToolCallRecord(tool_name="search", input={...}, output={...})]
# record.sources     → [SourceRecord(source_id="s1", url="https://example.com", title="My Doc")]
# record.finish_reason → "stop"
# record.step_count  → 1

# Serialize to a plain dict for DB storage:
await db.insert(record.to_dict())

ctx.record is None when collect=False (the default). The record is built incrementally as events are emitted, and is fully available after finish() completes.

ctx.run() — safe task runner

ctx.run() is the recommended way to launch your streaming work function. It provides three safety guarantees that eliminate an entire class of bugs:

Guarantee What it does
Auto-finish Calls ctx.finish() in a finally block — the stream is always closed, even if your function returns early
Auto-error Catches unhandled exceptions and emits ctx.error() — the frontend gets a proper error event instead of a silent hang
Task GC prevention Stores the background task on the context — Python's garbage collector cannot silently discard it
@router.post("/chat")
async def chat(request: ChatRequest) -> StreamingResponse:
    ctx = StreamContext()
    await ctx.run(lambda ctx: my_service.chat(request, ctx=ctx))
    return StreamingResponse(
        ctx.stream(),
        media_type="text/event-stream",
        headers=ctx.response_headers,
    )

Without ctx.run(), you'd need to manually manage try/finally, asyncio.create_task(), and a task reference set — all of which are easy to get wrong and produce hard-to-debug hung streams.

Abort

await ctx.abort()  # terminates stream immediately without finish event

Properties

ctx.message_id           # the message ID in the start event
ctx.current_text_id      # ID of open text part, or None
ctx.current_reasoning_id # ID of open reasoning part, or None
ctx.is_finished          # True after finish()/abort()
ctx.info                 # custom_information model, or None
ctx.response_headers     # dict with x-vercel-ai-ui-message-stream: v1

Passing ctx across modules

The key pattern — ctx flows as a parameter to any module that needs to write events:

# routes/chat.py
@app.post("/chat")
async def chat(req: ChatRequest):
    ctx = StreamContext()

    async def _work(ctx: StreamContext) -> None:
        # db_service writes reasoning + stores user data in ctx.store
        user = await db_service.load_user(req.user_id, ctx=ctx)

        # search_service emits tool call events via ctx
        await ctx.new_step()
        docs = await search_service.search(req.query, ctx=ctx)

        # llm_service reads ctx.store + streams text via ctx
        await ctx.new_step()
        await llm_service.generate(req.query, docs, ctx=ctx)

    await ctx.run(_work)  # auto-handles finish, errors, and task GC
    return StreamingResponse(
        ctx.stream(),
        media_type="text/event-stream",
        headers=ctx.response_headers,
    )

# services/db_service.py
async def load_user(user_id: str, *, ctx: StreamContext) -> dict:
    await ctx.write_reasoning(f"Loading user {user_id}…")
    user = await db.get(user_id)
    await ctx.store.set("user.name", user["name"])  # share with downstream
    return user

# services/llm_service.py
async def generate(query: str, docs: list, *, ctx: StreamContext) -> None:
    name = await ctx.store.get("user.name", default="there")  # read from store
    async for chunk in llm.stream(query, docs):
        await ctx.write_text(chunk)

See example/ for a full runnable example.


Wire protocol

The library targets the Vercel AI SDK v6 UIMessageStream protocol — SSE events with typed JSON payloads:

data: {"type":"start","messageId":"..."}

data: {"type":"start-step"}

data: {"type":"reasoning-start","id":"..."}
data: {"type":"reasoning-delta","id":"...","delta":"thinking…"}
data: {"type":"reasoning-end","id":"..."}

data: {"type":"finish-step"}

data: {"type":"start-step"}

data: {"type":"tool-input-start","toolCallId":"...","toolName":"search"}
data: {"type":"tool-input-available","toolCallId":"...","input":{...}}
data: {"type":"tool-output-available","toolCallId":"...","output":{...}}

data: {"type":"finish-step"}

data: {"type":"start-step"}

data: {"type":"text-start","id":"..."}
data: {"type":"text-delta","id":"...","delta":"Hello "}
data: {"type":"text-end","id":"..."}

data: {"type":"source-url","sourceId":"s1","url":"https://...","title":"Doc"}

data: {"type":"finish-step"}

data: {"type":"finish","finishReason":"stop"}

data: [DONE]

Response header: x-vercel-ai-ui-message-stream: v1


Example app

example/ contains a complete runnable demo:

example/
├── backend/          # FastAPI app — shows ctx passed across 3 service modules
│   ├── main.py
│   ├── routes/chat.py
│   └── services/
│       ├── db_service.py     # writes reasoning + stores data in ctx.store
│       ├── llm_service.py    # reads ctx.store + streams text
│       └── search_service.py
└── frontend/         # Next.js + AI SDK v6 + ai-elements chat UI
    ├── app/
    │   ├── page.tsx           # Conversation/Message/PromptInput from ai-elements
    │   └── api/chat/route.ts  # proxies useChat → Python backend
    └── package.json

Run the backend

cd example/backend
uv pip install fastapi uvicorn
uv pip install -e ../../   # install ai-sdk-stream-python from source
uvicorn main:app --reload --port 8000

Run the frontend

cd example/frontend
npm install
# Install ai-elements components (shadcn/ui registry):
npx ai-elements@latest add conversation message prompt-input
npm run dev
# Open http://localhost:3000

Tests

uv run pytest tests/ -v

56 tests covering: basic lifecycle, reasoning ↔ text transitions, tool calls, multi-step flows, source events, edge cases (double finish, abort, write after finish), StateStore integration, stream collection (collect=True), and custom information (ctx.info).


All event types

Class type field Description
StartEvent start Message begins
StartStepEvent start-step Step begins
ReasoningStartEvent reasoning-start Reasoning part opens
ReasoningDeltaEvent reasoning-delta Reasoning chunk
ReasoningEndEvent reasoning-end Reasoning part closes
TextStartEvent text-start Text part opens
TextDeltaEvent text-delta Text chunk
TextEndEvent text-end Text part closes
ToolInputStartEvent tool-input-start Tool call begins
ToolInputDeltaEvent tool-input-delta Streaming tool input
ToolInputAvailableEvent tool-input-available Full tool input ready
ToolOutputAvailableEvent tool-output-available Tool result
ToolOutputErrorEvent tool-output-error Tool failure
SourceUrlEvent source-url Citation / source
FinishStepEvent finish-step Step closes
FinishEvent finish Message ends

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ai_sdk_stream_python-0.2.2.tar.gz (25.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ai_sdk_stream_python-0.2.2-py3-none-any.whl (30.2 kB view details)

Uploaded Python 3

File details

Details for the file ai_sdk_stream_python-0.2.2.tar.gz.

File metadata

  • Download URL: ai_sdk_stream_python-0.2.2.tar.gz
  • Upload date:
  • Size: 25.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for ai_sdk_stream_python-0.2.2.tar.gz
Algorithm Hash digest
SHA256 d6bf1de241cffd5e828f75d08ba9af233f626cb3d0954c781d91339c33210523
MD5 73d8df525f943f23d7e51d42a1832bf7
BLAKE2b-256 d3c5cf0b61d783cab28611806315582271c2d04bfc741eb3d8b6f1da23f5c3c9

See more details on using hashes here.

Provenance

The following attestation bundles were made for ai_sdk_stream_python-0.2.2.tar.gz:

Publisher: release.yml on shloimy-wiesel/ai-sdk-stream-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ai_sdk_stream_python-0.2.2-py3-none-any.whl.

File metadata

File hashes

Hashes for ai_sdk_stream_python-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6bb6c30d59a270951e66362c02abed05a7623fa8408aa7cbb1ff12247537c728
MD5 27bf81354c7c260251b1f60570fb50f9
BLAKE2b-256 a4461bb6ca3f9a22332aa1a9b2d6b1e93b98be863180563066e1f421d8b789ca

See more details on using hashes here.

Provenance

The following attestation bundles were made for ai_sdk_stream_python-0.2.2-py3-none-any.whl:

Publisher: release.yml on shloimy-wiesel/ai-sdk-stream-python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page