Production runtime for pydantic-ai agents with structured event streaming, session persistence, and cancellation
Project description
opale-agent-stream
Production runtime for pydantic-ai agents. Provides structured event streaming via Redis Streams, session persistence, and cancellation support.
Install
pip install opale-agent-stream
Quick Start
from dataclasses import dataclass
from redis.asyncio import Redis
from pydantic_ai import Agent
from opale.agx import Config, Deps, Session, configure, run, listen
# 1. Configure Redis
redis = Redis.from_url("redis://localhost:6379")
configure(Config(redis=redis))
# 2. Define your deps
@dataclass
class MyDeps(Deps):
def get_scope_id(self) -> int:
return 1
# 3. Implement session persistence
@dataclass
class MySession(Session):
id: str
async def load(self) -> None:
pass # Load from your storage
async def save(self) -> None:
pass # Save to your storage
# 4. Create agent and run
agent = Agent("openai:gpt-4o-mini", deps_type=MyDeps)
async def main():
await run(
MySession(id="session-1"),
agent,
"Hello, world!",
deps=MyDeps(user_id=1, session_id="session-1"),
)
# 5. Stream events (in another coroutine/process)
async def consume():
async for event in listen(1, 1, "session-1"):
print(event)
Protocol Reference
Stream Format
Events are stored in Redis Streams with three fields:
| Field | Type | Description |
|---|---|---|
type |
string | Event type |
origin |
string | Event source |
body |
JSON | Event payload |
Event Types
| type | origin | Usage |
|---|---|---|
begin |
opale | Session start |
event |
pydantic-ai | LLM interaction events |
error |
opale / custom | Error during execution |
info |
opale / custom | Informational |
end |
opale | Session complete |
Event Body Schema (type=event)
| Field | Type | When |
|---|---|---|
idx |
int | Always — node index |
event |
str | llm-begin, llm-end, part_start, part_delta, answer |
event_idx |
int | Part events — part index |
part_kind |
str | text, thinking, tool-call, tool-return |
content |
str | Start events — full content |
content_delta |
str | Delta events — incremental |
tool_name |
str | Tool call/return |
tool_call_id |
str | Tool correlation |
args |
dict | Tool call — emitted at node end |
Configuration
from dataclasses import dataclass
from redis.asyncio import Redis
from opale.agx import Config, configure
@dataclass
class Config:
redis: Redis # Injected async Redis client
key_prefix: str # Key prefix (default: "agx")
# Initialize once at startup
redis = Redis.from_url("redis://localhost:6379")
configure(Config(redis=redis, key_prefix="myapp"))
Key Patterns
{prefix}:{scope_id}:{user_id}:{session_id} # stream
{prefix}:{scope_id}:{user_id}:{session_id}:live # live flag
API Reference
Core
async def run(session, agent, user_prompt, deps, **kwargs)
Execute agent with streaming. Wraps Agent.iter(), emits events, handles cancellation.
class AgxCanceledError(Exception)
Raised when execution is cancelled via cancel().
Session
class Session(ABC):
msgs: list[ModelMessage]
async def load(self) -> None: ... # Load from storage
async def save(self) -> None: ... # Save to storage
def msgs_to_json(self) -> bytes # Serialize messages
def msgs_from_json(self, data: bytes) # Deserialize messages
def get_user_prompt(self) -> str # Extract initial prompt
@staticmethod
def nodes_from_msgs(msgs) -> list # Reconstruct node structure
Deps
class Deps(ABC):
user_id: int
session_id: str
runtime: Runtime
@abstractmethod
def get_scope_id(self) -> int: ...
async def add_node_begin(self, node) -> None
async def add_node_end(self) -> None
async def add_node_event(self, event) -> None
async def add_error(self, body: dict, origin: str = "opale") -> None
async def add_info(self, body: dict, origin: str = "opale") -> None
Stream Operations
async def start(scope_id, user_id, session_id) -> None
async def stop(scope_id, user_id, session_id, grace_period=5) -> None
async def add(scope_id, user_id, session_id, *, type, origin, body=None) -> None
async def is_live(scope_id, user_id, session_id) -> bool
async def listen(scope_id, user_id, session_id, *, wait=3, timeout=60, serialize=True) -> AsyncGenerator
async def cancel(scope_id, user_id, session_id) -> bool
async def q(scope_id, user_id) -> AsyncGenerator[tuple[int, int, str], None]
Example: FastAPI SSE
See examples/fastapi_sse.py for a complete example with:
- SSE streaming endpoint
- Cancellation support
- Tool usage
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from opale.agx import configure, Config, run, listen, cancel
app = FastAPI()
@app.post("/chat")
async def chat(prompt: str, session_id: str):
# Start agent in background
asyncio.create_task(run(...))
# Stream events via SSE
async def stream():
async for event in listen(1, 1, session_id):
yield f"data: {event}\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
@app.post("/chat/{session_id}/cancel")
async def cancel_chat(session_id: str):
return {"cancelled": await cancel(1, 1, session_id)}
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pydantic_ai_stream-0.0.1a0.tar.gz.
File metadata
- Download URL: pydantic_ai_stream-0.0.1a0.tar.gz
- Upload date:
- Size: 111.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9f4f56c51f0e86d5074663657417cba7e84b70403f328e73849f243bd6f3585e
|
|
| MD5 |
cb607a0e93397751c64afe2d8a47303e
|
|
| BLAKE2b-256 |
12964982569658e0941ff4e6d3bd82531d8ffa2571cbec78815ea85fd293f4f1
|
File details
Details for the file pydantic_ai_stream-0.0.1a0-py3-none-any.whl.
File metadata
- Download URL: pydantic_ai_stream-0.0.1a0-py3-none-any.whl
- Upload date:
- Size: 8.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.8.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
20acbeeb011ad93e600b7e4a1b649b825941ce1b0b13e692ed9b31ced048e868
|
|
| MD5 |
9e4fefd25dce65218fb24bede414f264
|
|
| BLAKE2b-256 |
a3abb8466c9c78577a6b6e1702a9abb2442a874dab16b004f2e10fdb2bcc08c3
|