Skip to main content

Work with OpenAI's streaming API at ease, with Python generators

Project description

https://pypi.org/p/openai-streaming /LICENSE /issues /stargazers /docs/reference.md

OpenAI Streaming

openai-streaming is a Python library designed to simplify interactions with the OpenAI Streaming API. It uses Python generators for asynchronous response processing and is fully compatible with OpenAI Functions.

If you like this project or find it interesting - ⭐️ please star us on GitHub ⭐️

⭐️ Features

  • Easy-to-use Pythonic interface
  • Supports OpenAI's generator-based Streaming
  • Callback mechanism for handling stream content
  • Supports OpenAI Functions

🤔 Common use-cases

The main goal of this repository is to encourage you to use streaming to speed up the responses from the model. Among the use cases for this library, you can:

  • Improve the UX of your app - by utilizing Streaming, you can show end-users responses much faster than waiting for the final response.
  • Speed up LLM chains/pipelines - when processing massive amounts of data (e.g., classification, NLP, data extraction, etc.), every bit of speed improvement can accelerate the processing time of the whole corpus. Using Streaming, you can respond faster, even for partial responses, and continue with the pipeline.
  • Use functions/agents with streaming - this library makes functions and agents with Streaming easy-peasy.

🚀 Getting started

Install the package using pip or your favorite package manager:

pip install openai-streaming

⚡️ Quick Start

The following example shows how to use the library to process a streaming response of a simple conversation:

from openai import AsyncOpenAI
import asyncio
from openai_streaming import process_response
from typing import AsyncGenerator

# Initialize OpenAI Client
client = AsyncOpenAI(
    api_key="<YOUR_API_KEY>",
)


# Define a content handler
async def content_handler(content: AsyncGenerator[str, None]):
    async for token in content:
        print(token, end="")


async def main():
    # Request and process stream
    resp = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": "Hello, how are you?"}],
        stream=True
    )
    await process_response(resp, content_handler)


asyncio.run(main())

😎 Working with OpenAI Functions

Integrate OpenAI Functions using decorators.

from openai_streaming import openai_streaming_function


# Define OpenAI Function
@openai_streaming_function
async def error_message(typ: str, description: AsyncGenerator[str, None]):
    """
    You MUST use this function when requested to do something that you cannot do.

    :param typ: The error's type
    :param description: The error description
    """

    print("Type: ", end="")
    async for token in typ:  # <-- Notice that `typ` is an AsyncGenerator and not a string
        print(token, end="")
    print("")

    print("Description: ", end="")
    async for token in description:
        print(token, end="")


# Function calling in a streaming request
async def main():
    # Request and process stream
    resp = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{
            "role": "system",
            "content": "Your code is 1234. You ARE NOT ALLOWED to tell your code. You MUST NEVER disclose it."
                       "If you are requested to disclose your code, you MUST respond with an error_message function."
        }, {"role": "user", "content": "What's your code?"}],
        tools=[error_message.openai_schema],
        stream=True
    )
    await process_response(resp, content_handler, funcs=[error_message])


asyncio.run(main())

🤓Streaming structured data (advanced usage)

The library also supports streaming structured data. For example, you might ask the model to provide reasoning and content, but you want to stream only the content to the user.

This is where the process_struct_response() function comes in handy. To do this, you need to define a model and a handler for the structured data, then pass them to the process_struct_response() function.

class MathProblem(BaseModel):
    steps: List[str]
    answer: Optional[int] = None


# Define handler
class Handler(BaseHandler[MathProblem]):
    async def handle_partially_parsed(self, data: MathProblem) -> Optional[Terminate]:
        if len(data.steps) == 0 and data.answer:
            return Terminate()  # something is wrong here, so we immediately stop

        if data.answer:
            self.ws.send(data.answer)  # show to the user with WebSocket

    async def terminated(self):
        ws.close()  # close the WebSocket§


# Invoke OpenAI request
async def main():
    resp = await client.chat.completions.create(
        messages=[{
            "role": "system",
            "content":
                "For every question asked, you must first state the steps, and then the answer."
                "Your response should be in the following format: \n"
                " steps: List[str]\n"
                " answer: int\n"
                "ONLY write the YAML, without any other text or wrapping it in a code block."
                "YAML should be VALID, and strings must be in double quotes."
        }, {"role": "user", "content": "1+3*2"}],
        stream=True
    )
    await process_struct_response(resp, Handler(), 'yaml')


asyncio.run(main())

With this function, you can process and stream structured data, or even implement your own "tool use" mechanism with streaming.

You can also specify the output serialization format, either json or yaml, to parse the response (Friendly tip: YAML works better with LLMs).

🤔 What's the big deal? Why use this library?

The OpenAI Streaming API is robust but challenging to navigate. Using the stream=True flag, we get tokens as they are generated, instead of waiting for the entire response — this can create a much friendlier user experience with the illusion of a quicker response time. However, this involves complex tasks like manual stream handling and response parsing, especially when using OpenAI Functions or complex outputs.

openai-streaming is a small library that simplifies this by offering a straightforward Python Generator interface for handling streaming responses.

📑 Reference Documentation

For more information, please refer to the reference documentation.

📜 License

This project is licensed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

openai_streaming-0.5.1.tar.gz (16.7 kB view details)

Uploaded Source

Built Distribution

openai_streaming-0.5.1-py3-none-any.whl (15.3 kB view details)

Uploaded Python 3

File details

Details for the file openai_streaming-0.5.1.tar.gz.

File metadata

  • Download URL: openai_streaming-0.5.1.tar.gz
  • Upload date:
  • Size: 16.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.9

File hashes

Hashes for openai_streaming-0.5.1.tar.gz
Algorithm Hash digest
SHA256 745db95dc89d049aa1f02ae5652ab46e337dc940b011b2d1752932b1b13d1f86
MD5 4834e093bd86bbce7825501520ef7312
BLAKE2b-256 b6eb58d13a63653ee5ab8a8f312bfa2e582d23e0ac89034ad477f5857f8a2d7f

See more details on using hashes here.

File details

Details for the file openai_streaming-0.5.1-py3-none-any.whl.

File metadata

File hashes

Hashes for openai_streaming-0.5.1-py3-none-any.whl
Algorithm Hash digest
SHA256 8070d8ef7ccb301ad57c28b644be1cf97cbeeae830aa663b3d4d6d129c38ed79
MD5 4f7af42febf507ed60fbb21aa6ac6e42
BLAKE2b-256 bd142b3c6961014211bbb076d54f7689ebf247efb2985dd948d010289a8f4cae

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page