Skip to main content

Production runtime for pydantic-ai agents with structured event streaming, session persistence, and cancellation

Project description

pydantic-ai-stream

Production runtime for pydantic-ai agents. Provides structured event streaming via Redis Streams, session persistence, and cancellation support.

img

Install

pip install pydantic-ai-stream

Quick Start

from dataclasses import dataclass
from redis.asyncio import Redis
from pydantic_ai import Agent

from pydantic_ai_stream import Deps, Session, run

# 1. Define your deps (includes Redis client)
@dataclass
class MyDeps(Deps):
    def get_scope_id(self) -> int:
        return 1

# 2. Implement session persistence
@dataclass
class MySession(Session):
    session_id: str

    async def load(self) -> None:
        pass  # Load from your storage

    async def save(self) -> None:
        pass  # Save to your storage

# 3. Create agent and run
agent = Agent("openai:gpt-4o-mini", deps_type=MyDeps)
redis = Redis.from_url("redis://localhost:6379")

async def main():
    deps = MyDeps(redis=redis, user_id=1, session_id="session-1")
    await run(
        MySession(session_id="session-1"),
        agent,
        "Hello, world!",
        deps=deps,
    )

# 4. Stream events (in another coroutine/process)
async def consume():
    deps = MyDeps(redis=redis, user_id=1, session_id="session-1")
    async for event in deps.listen():
        print(event)

Protocol Reference

Stream Format

Events are stored in Redis Streams with three fields:

Field Type Description
type string Event type
origin string Event source
body JSON Event payload

Event Types

type origin Usage
begin pydantic-ai-stream Session start
event pydantic-ai LLM interaction events
error developer / custom Error during execution
info developer / custom Informational
end pydantic-ai-stream Session complete

Event Body Schema (type=event)

Field Type When
idx int Always — node index
event str llm-begin, llm-end, part_start, part_delta, answer
event_idx int Part events — part index
part_kind str text, thinking, tool-call, tool-return
content str Start events — full content
content_delta str Delta events — incremental
tool_name str Tool call/return
tool_call_id str Tool correlation
args dict Tool call — emitted at part end

Configuration

Configure the Redis key prefix via settings:

from pydantic_ai_stream import settings

settings.set_redis_prefix("myapp")  # default: "pyaix"

Key Patterns

{prefix}:{scope_id}:{user_id}:{session_id}       # stream
{prefix}:{scope_id}:{user_id}:{session_id}:live  # live flag

API Reference

Core

async def run(session, agent, user_prompt, deps, **kwargs) -> None

Execute agent with streaming. Wraps Agent.iter(), emits events, handles cancellation.

class AgxCanceledError(Exception)

Raised when execution is cancelled via deps.cancel().

Session

class Session(ABC):
    msgs: list[ModelMessage]

    async def load(self) -> None: ...       # Load from storage
    async def save(self) -> None: ...       # Save to storage
    def msgs_to_json(self) -> bytes         # Serialize messages
    def msgs_from_json(self, data: bytes)   # Deserialize messages
    def get_user_prompt(self) -> str        # Extract initial prompt
    @staticmethod
    def nodes_from_msgs(msgs) -> list       # Reconstruct node structure

Deps

@dataclass
class Deps(ABC):
    redis: AsyncRedis
    user_id: int
    session_id: str

    @abstractmethod
    def get_scope_id(self) -> int: ...

    # Stream operations
    async def start(self) -> None
    async def stop(self, grace_period: int = 5) -> None
    async def is_live(self) -> bool
    async def listen(self, *, wait=3, timeout=60, serialize=True) -> AsyncGenerator
    async def cancel(self) -> bool

    # Event emission
    async def add(self, *, type: str, origin: str, body: dict | None = None) -> None
    async def add_error(self, body: dict, origin: str = "developer") -> None
    async def add_info(self, body: dict, origin: str = "developer") -> None

    # Node tracking (called by run())
    async def add_node_begin(self, node) -> None
    async def add_node_end(self) -> None
    async def add_node_event(self, event) -> None

Query Active Sessions

async def q(redis, scope_id, user_id) -> AsyncGenerator[tuple[int, int, str], None]

Scan for active sessions (those with live flag set).

Example: FastAPI SSE

See examples/fastapi_sse.py for a complete example with:

  • SSE streaming endpoint
  • Cancellation support
  • Tool usage
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic_ai_stream import Deps, Session, run

@app.post("/chat")
async def chat(prompt: str, session_id: str):
    deps = MyDeps(redis=redis, user_id=1, session_id=session_id)

    # Start agent in background
    asyncio.create_task(run(MySession(...), agent, prompt, deps=deps))

    # Stream events via SSE
    async def stream():
        async for event in deps.listen():
            yield f"data: {event}\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

@app.post("/chat/{session_id}/cancel")
async def cancel_chat(session_id: str):
    deps = MyDeps(redis=redis, user_id=1, session_id=session_id)
    return {"cancelled": await deps.cancel()}

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydantic_ai_stream-0.1.1.tar.gz (113.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pydantic_ai_stream-0.1.1-py3-none-any.whl (7.7 kB view details)

Uploaded Python 3

File details

Details for the file pydantic_ai_stream-0.1.1.tar.gz.

File metadata

  • Download URL: pydantic_ai_stream-0.1.1.tar.gz
  • Upload date:
  • Size: 113.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.8.18

File hashes

Hashes for pydantic_ai_stream-0.1.1.tar.gz
Algorithm Hash digest
SHA256 9e4f54ac6781485386c08fc9f07e3d3837a6801e42c9bd3e1115a4d114db2a36
MD5 a273ea248e75d4e462f0a67e592f174a
BLAKE2b-256 b8fc638cd25f74c8b81bce78b9af79fc8bdaefb747178813fc33ea20a3e88b0e

See more details on using hashes here.

File details

Details for the file pydantic_ai_stream-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for pydantic_ai_stream-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a48e53eee81456f5c36984f3888620f3c0c787a078a7725748464aec24991b26
MD5 46b69f11a9d14b147e591a302c3a45f3
BLAKE2b-256 6fb40039e43cb8433487a02ff5ca97d991dd9f591e55de02cc1bbaf657112833

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page