Skip to main content

Chat Completion Stream Handler

Project description

chat-cmpl-stream-handler

PyPI version Python Version License Tests Docs

You've reimplemented the tool call loop for the fifth time. So have I. Never again.

Why

OpenAI Responses API? Deprecated vibes. Agents SDK? Lovely — until the third breaking change in a month. Chat Completions API? Still here. Still boring. Still working.

This library does exactly two things that everyone keeps copy-pasting across projects:

  1. Stream a chat completion and handle events
  2. Keep looping tool calls until the model is done

That's it. No magic. No framework. Just the loop.

Installation

pip install chat-cmpl-stream-handler

Quick Start

import asyncio
import json
from openai import AsyncOpenAI
from chat_cmpl_stream_handler import ChatCompletionStreamHandler, stream_until_user_input

client = AsyncOpenAI(api_key="...")

GET_WEATHER_TOOL = {
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get the current weather for a given city.",
        "parameters": {
            "type": "object",
            "properties": {"city": {"type": "string"}},
            "required": ["city"],
            "additionalProperties": False,
        },
        "strict": True,
    },
}


async def get_weather(arguments: str, context) -> str:
    args = json.loads(arguments)
    return f"The weather in {args['city']} is sunny and 25°C."


async def main():
    result = await stream_until_user_input(
        messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
        model="gpt-4.1-nano",
        openai_client=client,
        stream_handler=ChatCompletionStreamHandler(),
        tool_invokers={"get_weather": get_weather},
        stream_kwargs={
            "tools": [GET_WEATHER_TOOL],
            "stream_options": {"include_usage": True},
        },
    )

    # user → assistant (tool_calls) → tool → assistant (final answer)
    for msg in result.to_input_list():
        print(msg["role"], "->", msg.get("content", ""))

    for usage in result.usages:
        print(f"total tokens: {usage.total_tokens}")


asyncio.run(main())

Listening to stream events

Subclass ChatCompletionStreamHandler and override whatever you care about:

from chat_cmpl_stream_handler import ChatCompletionStreamHandler
from openai.lib.streaming.chat._events import ContentDeltaEvent, FunctionToolCallArgumentsDoneEvent


class PrintingHandler(ChatCompletionStreamHandler):
    async def on_content_delta(self, event: ContentDeltaEvent) -> None:
        print(event.delta, end="", flush=True)

    async def on_tool_calls_function_arguments_done(
        self, event: FunctionToolCallArgumentsDoneEvent
    ) -> None:
        print(f"\n[calling] {event.name}({event.arguments})")

API Reference

stream_until_user_input

async def stream_until_user_input(
    messages: Iterable[ChatCompletionMessageParam],
    model: str | ChatModel,
    openai_client: AsyncOpenAI,
    *,
    stream_handler: ChatCompletionStreamHandler[ResponseFormatT],
    tool_invokers: dict[str, ToolInvokerFn] | None = None,
    stream_kwargs: dict[str, Any] | None = None,
    context: Any | None = None,
    max_iterations: int = 10,
) -> StreamResult

Streams a completion, executes tool calls, feeds results back, repeats — until the model stops asking for tools. Raises MaxIterationsReached if you've somehow ended up in an infinite tool call loop (it happens).

Parameter Description
messages Initial message list
model Model name
openai_client AsyncOpenAI instance
stream_handler Receives stream events
tool_invokers {"tool_name": async_fn} — each fn takes (arguments: str, context) and returns str
stream_kwargs Passed directly to beta.chat.completions.stream() (e.g. tools, stream_options)
context Forwarded to every tool invoker as-is
max_iterations Safety cap. Default: 10

StreamResult

Attribute / Method Description
.to_input_list() Full message history as a JSON-serializable list, ready for the next round
.usages list[CompletionUsage] — one per iteration, so you can watch the bill grow

ChatCompletionStreamHandler

All methods are no-ops by default. Override only what you need.

Method When it fires
on_event(event) Every event, before more specific hooks
on_chunk(event) Every raw SSE chunk
on_content_delta(event) Each content token
on_content_done(event) Full content string complete
on_refusal_delta(event) Each refusal token
on_refusal_done(event) Full refusal string complete
on_tool_calls_function_arguments_delta(event) Each incremental tool argument fragment
on_tool_calls_function_arguments_done(event) Full tool argument JSON available
on_logprobs_content_delta(event) Each logprobs content token
on_logprobs_content_done(event) All logprobs content tokens done
on_logprobs_refusal_delta(event) Each logprobs refusal token
on_logprobs_refusal_done(event) All logprobs refusal tokens done

Provider Compatibility

Works with any OpenAI-compatible endpoint. Some providers are more compatible than others.

Anthropic

Anthropic's Messages API is not OpenAI-compatible. Use the included AnthropicOpenAI adapter — a drop-in AsyncOpenAI subclass that translates requests under the hood (no extra dependencies required):

from chat_cmpl_stream_handler._anthropic import AnthropicOpenAI

client = AnthropicOpenAI(api_key="sk-ant-...")
result = await stream_until_user_input(
    messages=[{"role": "user", "content": "What's the weather in Tokyo?"}],
    model="claude-haiku-4-5-20251001",
    openai_client=client,
    stream_handler=ChatCompletionStreamHandler(),
    tool_invokers={"get_weather": get_weather},
    stream_kwargs={"tools": [GET_WEATHER_TOOL]},
)

A few differences from OpenAI to be aware of:

  • Usage is always returned — no need to pass stream_options: {"include_usage": True}.
  • The strict field in tool definitions is silently ignored (Anthropic doesn't support it).
  • OpenAI-only keys (stream_options, response_format) are stripped before the request is sent.

Gemini

Gemini's streaming API sends tool_call_delta.index = None, which the OpenAI SDK does not appreciate. Apply the included patch once at startup:

from chat_cmpl_stream_handler._patch_stream_tool_call_index import apply
apply()  # safe to call multiple times

Put it at the top of main.py, or in conftest.py if you're testing. This is opt-in — the library won't silently monkey-patch anything on import.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chat_cmpl_stream_handler-0.2.0.tar.gz (13.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

chat_cmpl_stream_handler-0.2.0-py3-none-any.whl (12.9 kB view details)

Uploaded Python 3

File details

Details for the file chat_cmpl_stream_handler-0.2.0.tar.gz.

File metadata

  • Download URL: chat_cmpl_stream_handler-0.2.0.tar.gz
  • Upload date:
  • Size: 13.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.12.13 Darwin/25.3.0

File hashes

Hashes for chat_cmpl_stream_handler-0.2.0.tar.gz
Algorithm Hash digest
SHA256 04caa966669859100ab36b1af9c0d782ebfa89128ae60af657f5456eefdf6988
MD5 f6e7246bf2eeaff32277998f96f5190e
BLAKE2b-256 914a27fcee4694507a2dc125a1007dcd794a59bf62b76bb3797a82e252fbcfda

See more details on using hashes here.

File details

Details for the file chat_cmpl_stream_handler-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for chat_cmpl_stream_handler-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3e2c7fe96e88b4c849eec6153ee496eb96c80b1143f40855b219d88c25a61ec6
MD5 a210a5ce18f4eaab2b7db7dcbaf6ff10
BLAKE2b-256 f766b800403534acd43a411ab141a2f5374ac82b57380c7254d67091db88f6a9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page