Skip to main content

FastAPI router factory for OpenAI-compatible Chat Completion endpoints

Project description

fastapi-openai-compat

PyPI - Version PyPI - Python Version Tests

FastAPI router factory for OpenAI-compatible Chat Completions endpoints.

Provides a configurable APIRouter that exposes /v1/chat/completions and /v1/models endpoints, following the OpenAI API specification, with support for streaming (SSE), non-streaming responses, tool calling, configurable hooks, and custom chunk mapping.

Installation

pip install fastapi-openai-compat

With Haystack StreamingChunk support:

pip install fastapi-openai-compat[haystack]

Quick start

Create an OpenAI-compatible Chat Completions server in a few lines. Both sync and async callables are supported -- sync callables are automatically executed in a thread pool so they never block the async event loop.

from fastapi import FastAPI
from fastapi_openai_compat import create_openai_router, CompletionResult

def list_models() -> list[str]:
    return ["my-pipeline"]

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    # Your (potentially blocking) pipeline execution logic here
    return "Hello from Haystack!"

app = FastAPI()
router = create_openai_router(
    list_models=list_models,
    run_completion=run_completion,
)
app.include_router(router)

Async callables work the same way:

async def list_models() -> list[str]:
    return ["my-pipeline"]

async def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    return "Hello from Haystack!"

The run_completion callable

The run_completion callable receives three arguments:

Argument Type Description
model str The model name from the request (e.g. "my-pipeline").
messages list[dict] The conversation history in OpenAI format.
body dict The full request body, including all extra parameters (e.g. temperature, max_tokens, stream, metadata, tools).

The request model accepts any additional fields beyond model, messages, and stream. These extra parameters are forwarded as-is in the body dict, so you can use them however you need without any library changes.

For example, you can access metadata and any other extra field from body:

import time
from fastapi_openai_compat import ChatCompletion, Choice, Message, CompletionResult

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    metadata = body.get("metadata", {})
    temperature = body.get("temperature", 1.0)
    request_id = metadata.get("request_id", "unknown")

    return ChatCompletion(
        id=f"resp-{request_id}",
        object="chat.completion",
        created=int(time.time()),
        model=model,
        choices=[
            Choice(
                index=0,
                message=Message(role="assistant", content="Hello!"),
                finish_reason="stop",
            )
        ],
        metadata={"request_id": request_id, "temperature_used": temperature},
    )

A client can then send:

curl http://localhost:8000/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "my-pipeline",
    "messages": [{"role": "user", "content": "Hello!"}],
    "temperature": 0.7,
    "metadata": {"request_id": "abc-123", "user_tier": "premium"}
  }'

The metadata field in the response works because ChatCompletion also allows extra fields, so you can attach any additional data to the response object.

The return type determines how the response is formatted:

Return type Behavior
str Wrapped automatically into a ChatCompletion response.
Generator Each yielded chunk is converted to a chat.completion.chunk SSE message.
AsyncGenerator Same as Generator, but async.
ChatCompletion Returned as-is for full control over the response.

Response types

Returning a string

The simplest option -- return a plain string and the library wraps it as a complete ChatCompletion response automatically:

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    last_msg = messages[-1]["content"]
    return f"You said: {last_msg}"

Streaming with a generator

Return a generator to stream responses token by token via SSE. Each yielded string is automatically wrapped into a chat.completion.chunk message -- you only need to yield the text content, the library handles the SSE wire format. A finish_reason="stop" sentinel is appended automatically at the end of the stream.

Your run_completion should check body.get("stream", False) to decide whether to return a generator or a plain string:

from collections.abc import Generator

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    last_msg = messages[-1]["content"]

    if body.get("stream", False):
        def stream() -> Generator[str, None, None]:
            for word in last_msg.split():
                yield word + " "
        return stream()

    return f"You said: {last_msg}"

Async generators work the same way:

from collections.abc import AsyncGenerator

async def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    async def stream() -> AsyncGenerator[str, None]:
        for word in ["Hello", " from", " Haystack", "!"]:
            yield word
    return stream()

Returning a ChatCompletion

For full control over the response (e.g. custom usage, finish_reason, or system_fingerprint), return a ChatCompletion object directly:

import time
from fastapi_openai_compat import ChatCompletion, Choice, Message, CompletionResult

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    return ChatCompletion(
        id="resp-1",
        object="chat.completion",
        created=int(time.time()),
        model=model,
        choices=[
            Choice(
                index=0,
                message=Message(role="assistant", content="Hello!"),
                finish_reason="stop",
            )
        ],
        usage={"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15},
    )

Tool calling

Returning ChatCompletion directly

For tool calls and other advanced responses, return a ChatCompletion directly from run_completion for full control over the response structure:

import time
from fastapi_openai_compat import ChatCompletion, Choice, Message, CompletionResult

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    return ChatCompletion(
        id="resp-1",
        object="chat.completion",
        created=int(time.time()),
        model=model,
        choices=[
            Choice(
                index=0,
                message=Message(
                    role="assistant",
                    content=None,
                    tool_calls=[{
                        "id": "call_1",
                        "type": "function",
                        "function": {"name": "get_weather", "arguments": '{"city": "Paris"}'},
                    }],
                ),
                finish_reason="tool_calls",
            )
        ],
    )

Streaming tool calls work the same way -- yield ChatCompletion chunk objects from your generator and the library serializes them directly as SSE:

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    def stream():
        yield ChatCompletion(
            id="resp-1", object="chat.completion.chunk",
            created=int(time.time()), model=model,
            choices=[Choice(index=0, delta=Message(
                role="assistant",
                tool_calls=[{"index": 0, "id": "call_1", "type": "function",
                             "function": {"name": "get_weather", "arguments": ""}}],
            ))],
        )
        yield ChatCompletion(
            id="resp-1", object="chat.completion.chunk",
            created=int(time.time()), model=model,
            choices=[Choice(index=0, delta=Message(
                role="assistant",
                tool_calls=[{"index": 0, "function": {"arguments": '{"city": "Paris"}'}}],
            ))],
        )
        yield ChatCompletion(
            id="resp-1", object="chat.completion.chunk",
            created=int(time.time()), model=model,
            choices=[Choice(index=0, delta=Message(role="assistant"), finish_reason="tool_calls")],
        )
    return stream()

Automatic StreamingChunk support

When using Haystack's StreamingChunk (requires pip install fastapi-openai-compat[haystack]), tool call deltas and finish reasons are handled automatically via duck typing:

from haystack.dataclasses import StreamingChunk
from haystack.dataclasses.streaming_chunk import ToolCallDelta

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    def stream():
        yield StreamingChunk(
            content="",
            tool_calls=[ToolCallDelta(
                index=0, id="call_1",
                tool_name="get_weather", arguments='{"city": "Paris"}',
            )],
            index=0,
        )
        yield StreamingChunk(content="", finish_reason="tool_calls")
    return stream()

The library automatically:

  • Converts ToolCallDelta objects to OpenAI wire format (tool_calls[].function.name/arguments)
  • Propagates finish_reason from chunks (e.g. "stop", "tool_calls", "length")
  • Only auto-appends finish_reason="stop" if no chunk already carried a finish reason
  • Works via duck typing -- any object with tool_calls and finish_reason attributes is supported

Custom SSE events

You can yield custom SSE events alongside regular chat completion chunks. This is useful for sending side-channel data to clients like Open WebUI -- status updates, notifications, source citations, etc.

Any object with a .to_event_dict() method is recognized as a custom event and serialized as data: {"event": {...}} in the SSE stream. Custom events don't interfere with chat completion chunks or the finish_reason tracking.

from collections.abc import Generator
from fastapi_openai_compat import CompletionResult

class StatusEvent:
    def __init__(self, description: str, done: bool = False):
        self.description = description
        self.done = done

    def to_event_dict(self) -> dict:
        return {"type": "status", "data": {"description": self.description, "done": self.done}}

def run_completion(model: str, messages: list[dict], body: dict) -> CompletionResult:
    def stream() -> Generator[str | StatusEvent, None, None]:
        yield StatusEvent("Processing your request...")
        for word in ["Hello", " from", " Haystack", "!"]:
            yield word
        yield StatusEvent("Done", done=True)
    return stream()

This works via duck typing -- any object implementing to_event_dict() -> dict is supported. The protocol is compatible with Hayhooks' Open WebUI events.

Hooks

You can inject pre/post hooks to modify requests and results (transformer hooks) or to observe them without modification (observer hooks). Both sync and async hooks are supported.

Transformer hooks

Return a modified value to transform the request or result:

from fastapi_openai_compat import ChatRequest, CompletionResult

async def pre_hook(request: ChatRequest) -> ChatRequest:
    # e.g. inject system prompts, validate, rate-limit
    return request

async def post_hook(result: CompletionResult) -> CompletionResult:
    # e.g. transform, filter
    return result

router = create_openai_router(
    list_models=list_models,
    run_completion=run_completion,
    pre_hook=pre_hook,
    post_hook=post_hook,
)

Observer hooks

Return None to observe without modifying (useful for logging, metrics, etc.):

def log_request(request: ChatRequest) -> None:
    print(f"Request for model: {request.model}")

def log_result(result: CompletionResult) -> None:
    print(f"Got result type: {type(result).__name__}")

router = create_openai_router(
    list_models=list_models,
    run_completion=run_completion,
    pre_hook=log_request,
    post_hook=log_result,
)

Custom chunk mapping

By default the router handles plain str chunks and objects with a .content attribute (e.g. Haystack StreamingChunk). If your pipeline streams a different type, provide a chunk_mapper to extract text content:

from dataclasses import dataclass

@dataclass
class MyChunk:
    text: str
    score: float

def my_mapper(chunk: MyChunk) -> str:
    return chunk.text

router = create_openai_router(
    list_models=list_models,
    run_completion=run_completion,
    chunk_mapper=my_mapper,
)

This works with any object -- dataclasses, dicts, Pydantic models, etc.:

def dict_mapper(chunk: dict) -> str:
    return chunk["payload"]

Examples

The examples/ folder contains ready-to-run servers:

  • basic.py -- Minimal echo server, no external API keys required.
  • haystack_chat.py -- Haystack OpenAIChatGenerator with streaming support.

See the examples README for setup and usage instructions.

API reference

This library implements endpoints compatible with the OpenAI Chat Completions API.

create_openai_router

create_openai_router(
    *,
    list_models,
    run_completion,
    pre_hook=None,
    post_hook=None,
    chunk_mapper=default_chunk_mapper,
    owned_by="custom",
    tags=None,
) -> APIRouter
Parameter Type Description
list_models Callable -> list[str] Returns available model/pipeline names.
run_completion Callable -> CompletionResult Runs a chat completion given (model, messages, body).
pre_hook Callable or None Called before run_completion. Receives ChatRequest, returns modified request (transformer) or None (observer).
post_hook Callable or None Called after run_completion. Receives CompletionResult, returns modified result (transformer) or None (observer).
chunk_mapper Callable[[Any], str] Converts streamed chunks to strings. Default handles str and .content attribute.
owned_by str Value for the owned_by field in model objects. Defaults to "custom".
tags list[str] or None OpenAPI tags for the generated endpoints. Defaults to ["openai"].

Endpoints

The router exposes the following endpoints (with and without the /v1 prefix):

Method Path Description
GET /v1/models List available models.
POST /v1/chat/completions Create a chat completion (streaming or non-streaming).
GET /models Alias for /v1/models.
POST /chat/completions Alias for /v1/chat/completions.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_openai_compat-0.2.0.tar.gz (28.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_openai_compat-0.2.0-py3-none-any.whl (17.8 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_openai_compat-0.2.0.tar.gz.

File metadata

  • Download URL: fastapi_openai_compat-0.2.0.tar.gz
  • Upload date:
  • Size: 28.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for fastapi_openai_compat-0.2.0.tar.gz
Algorithm Hash digest
SHA256 f1069a953f5e0bda0dad5775df531e7216db28c84b8feb7cbcd697b886a152b9
MD5 064af50bea1ae78dd2e9f0788917132a
BLAKE2b-256 9a1afbbfe0bb998494b05f2a9e04b7373021a3bc67dc116ee46d42ecd204f1dd

See more details on using hashes here.

File details

Details for the file fastapi_openai_compat-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: fastapi_openai_compat-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 17.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.4 {"installer":{"name":"uv","version":"0.10.4","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for fastapi_openai_compat-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6bddc5abac04904e455c7b5e5be3a4fc235fa4b5cebaaa1b4c4bb459b57de509
MD5 2215f429d9f3a3cebebf0baa2c43631a
BLAKE2b-256 058774afc812b3d08824e957d6b6c9e0f95de4c4d1586cbd5aa49d69ca231657

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page