Chat Completion Stream Handler
Project description
chat-cmpl-stream-handler
You've reimplemented the tool call loop for the fifth time. So have I. Never again.
Why
OpenAI Responses API? Still evolving. Agents SDK? Promising — frameworks always are, at first. Chat Completions API? Boring, stable, everywhere.
This library does exactly two things that everyone keeps copy-pasting across projects:
- Stream a chat completion and handle events
- Keep looping tool calls until the model is done
That's it. No magic. No framework. Just the loop.
Installation
pip install chat-cmpl-stream-handler
Quick Start
import asyncio
from openai import AsyncOpenAI
from openai.types.chat.chat_completion_message_tool_call import ChatCompletionMessageToolCall
from chat_cmpl_stream_handler import args_from_tool_call, stream_until_user_input
client = AsyncOpenAI(api_key="...")
GET_WEATHER_TOOL = {
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a given city.",
"parameters": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"],
"additionalProperties": False,
},
"strict": True,
},
}
async def get_weather(tool_call: ChatCompletionMessageToolCall, context) -> str:
args = args_from_tool_call(tool_call)
return f"The weather in {args['city']} is sunny and 25°C."
async def main():
result = await stream_until_user_input(
messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
model="gpt-4.1-nano",
openai_client=client,
tool_invokers={"get_weather": get_weather},
stream_kwargs={
"tools": [GET_WEATHER_TOOL],
"stream_options": {"include_usage": True},
},
)
# user → assistant (tool_calls) → tool → assistant (final answer)
for msg in result.to_input_list():
print(msg["role"], "->", msg.get("content", ""))
for usage in result.usages:
print(f"total tokens: {usage.total_tokens}")
asyncio.run(main())
Listening to stream events
Subclass ChatCompletionStreamHandler and override whatever you care about:
from chat_cmpl_stream_handler import ChatCompletionStreamHandler
from openai.lib.streaming.chat._events import ContentDeltaEvent, FunctionToolCallArgumentsDoneEvent
class PrintingHandler(ChatCompletionStreamHandler):
async def on_content_delta(self, event: ContentDeltaEvent) -> None:
print(event.delta, end="", flush=True)
async def on_tool_calls_function_arguments_done(
self, event: FunctionToolCallArgumentsDoneEvent
) -> None:
print(f"\n[calling] {event.name}({event.arguments})")
Building tools from MCP servers
If you already expose capabilities through an MCP server, you can turn them into
OpenAI-compatible tools plus tool_invokers in one step:
from chat_cmpl_stream_handler.utils.mcp import MCPServerConfig, build_mcp_tools_and_invokers
mcp_tools, mcp_tool_invokers = await build_mcp_tools_and_invokers(
[
MCPServerConfig(
server_url="https://marketplace-mcp.us-east-1.api.aws/mcp",
server_label="aws",
)
]
)
result = await stream_until_user_input(
messages=[{"role": "user", "content": "Use aws__get_cost_and_usage and summarize it."}],
model="gpt-4.1",
openai_client=client,
tool_invokers=mcp_tool_invokers,
stream_kwargs={"tools": mcp_tools},
)
Notes:
server_label="aws"prefixes discovered tools likeaws__tool_name- if you pass an initialized
ClientSessionintoMCPServerConfig(session=...), tool discovery and tool calls reuse that session without reconnecting - runtime
contextfromstream_until_user_input(..., context=...)is forwarded into MCPmeta["context"]
Building tools from Pydantic models
For local tools with typed inputs, use the Pydantic helpers directly from
chat_cmpl_stream_handler.utils:
from typing import Any
from pydantic import BaseModel
from chat_cmpl_stream_handler.utils.pydantic_to_tool import (
PydanticToolConfig,
build_pydantic_tools_and_invokers,
)
class EchoRequest(BaseModel):
"""Echo text back to the user."""
text: str
async def echo_tool(arguments: EchoRequest, context: Any) -> str:
return f"{context}: {arguments.text}"
pydantic_tools, pydantic_tool_invokers = build_pydantic_tools_and_invokers(
[
PydanticToolConfig(
model=EchoRequest,
invoker=echo_tool,
)
]
)
result = await stream_until_user_input(
messages=[{"role": "user", "content": "Call echo_request with text=hello"}],
model="gpt-4.1",
openai_client=client,
tool_invokers=pydantic_tool_invokers,
stream_kwargs={"tools": pydantic_tools},
context="demo",
)
The generated invoker validates the tool arguments with
model_validate_json(...) before calling your handler.
API Reference
stream_until_user_input
async def stream_until_user_input(
messages: Iterable[ChatCompletionMessageParam],
model: str | ChatModel,
openai_client: AsyncOpenAI,
*,
stream_handler: ChatCompletionStreamHandler[ResponseFormatT] | None = None,
tool_invokers: dict[str, ToolInvokerFn] | None = None,
stream_kwargs: dict[str, Any] | None = None,
context: Any | None = None,
max_iterations: int = 10,
) -> StreamResult
Streams a completion, executes tool calls, feeds results back, repeats — until the model stops asking for tools. Raises MaxIterationsReached if you've somehow ended up in an infinite tool call loop (it happens).
| Parameter | Description |
|---|---|
messages |
Initial message list |
model |
Model name |
openai_client |
AsyncOpenAI instance |
stream_handler |
Receives stream events. Default: a no-op ChatCompletionStreamHandler() |
tool_invokers |
{"tool_name": async_fn} — each fn takes (tool_call: ChatCompletionMessageToolCall, context) and returns str |
stream_kwargs |
Passed directly to chat.completions.create() (e.g. tools, stream_options) |
context |
Forwarded to every tool invoker as-is |
max_iterations |
Safety cap. Default: 10 |
ToolInvokerFn
ToolInvokerFn = Callable[[ChatCompletionMessageToolCall, Any], Awaitable[str]]
Each tool invoker receives the full ChatCompletionMessageToolCall object from the OpenAI response. This gives you access to tool_call.id, tool_call.function.name, and tool_call.function.arguments — useful for tracing, logging, or emitting SSE events with the real tool call id.
args_from_tool_call
def args_from_tool_call(tool_call: ChatCompletionMessageToolCall) -> dict[str, Any]
Convenience helper that parses tool_call.function.arguments into a dictionary. Handles empty arguments gracefully.
StreamResult
| Attribute / Method | Description |
|---|---|
.to_input_list() |
Full message history as a JSON-serializable list, ready for the next round |
.usages |
list[CompletionUsage] — one per iteration, so you can watch the bill grow |
ChatCompletionStreamHandler
All methods are no-ops by default. Override only what you need.
| Method | When it fires |
|---|---|
on_event(event) |
Every event, before more specific hooks |
on_chunk(event) |
Every raw SSE chunk |
on_content_delta(event) |
Each content token |
on_content_done(event) |
Full content string complete |
on_refusal_delta(event) |
Each refusal token |
on_refusal_done(event) |
Full refusal string complete |
on_tool_calls_function_arguments_delta(event) |
Each incremental tool argument fragment |
on_tool_calls_function_arguments_done(event) |
Full tool argument JSON available |
on_logprobs_content_delta(event) |
Each logprobs content token |
on_logprobs_content_done(event) |
All logprobs content tokens done |
on_logprobs_refusal_delta(event) |
Each logprobs refusal token |
on_logprobs_refusal_done(event) |
All logprobs refusal tokens done |
Provider Compatibility
Works with any OpenAI-compatible endpoint. Some providers are more compatible than others.
Anthropic
Anthropic exposes an OpenAI-compatible endpoint — no adapter needed. Use a plain AsyncOpenAI with the Anthropic base URL:
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key="sk-ant-...", base_url="https://api.anthropic.com/v1")
result = await stream_until_user_input(
messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
model="claude-haiku-4-5-20251001",
openai_client=client,
tool_invokers={"get_weather": get_weather},
stream_kwargs={
"tools": [GET_WEATHER_TOOL],
"stream_options": {"include_usage": True},
},
)
Gemini
Gemini's streaming API sends tool_call_delta.index = None, which the OpenAI SDK does not appreciate. Apply the included patch once at startup:
from chat_cmpl_stream_handler._patch_stream_tool_call_index import apply
apply() # safe to call multiple times
Put it at the top of main.py, or in conftest.py if you're testing. This is opt-in — the library won't silently monkey-patch anything on import.
Gemini 3 thought signatures: Gemini 3 models require a thought_signature to be echoed back during multi-turn function calling. stream_until_user_input preserves these signatures automatically — no action needed on your side.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file chat_cmpl_stream_handler-0.4.2.tar.gz.
File metadata
- Download URL: chat_cmpl_stream_handler-0.4.2.tar.gz
- Upload date:
- Size: 15.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.4 CPython/3.12.13 Darwin/25.3.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6698f394817cbd7c39f8ac76854fb444a39c86443e6402fb9b369d14c0160bd5
|
|
| MD5 |
50ebddbb05983cd0dabee23a78e28a91
|
|
| BLAKE2b-256 |
88b53e03ecaa08ef78c41bc834f89618bdc641d2f9d3094d70cda6ebfa0ee90a
|
File details
Details for the file chat_cmpl_stream_handler-0.4.2-py3-none-any.whl.
File metadata
- Download URL: chat_cmpl_stream_handler-0.4.2-py3-none-any.whl
- Upload date:
- Size: 16.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.4 CPython/3.12.13 Darwin/25.3.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
df5e32e8aaae85abe414433ff0efe489bd2b5fa59074080da0baf3678c622b5c
|
|
| MD5 |
c731b19e2bc1152752c5a01e4eb1f491
|
|
| BLAKE2b-256 |
af3df61ebce9c33c7fc621e85dfd51ccd2cec4484706d78c98ca0ffd2440826b
|