Skip to main content

Low-overhead latency metrics for SSE streaming responses (TTFT, E2E, throughput).

Project description

streamlat

streamlat is a tiny, zero-dependency utility for measuring latency and throughput of streaming LLM responses.

It is designed for SSE-style streaming and works cleanly with:

  • Raw OpenAI structure streaming
  • SDK abstractions (LangChain, Microsoft Agent Framework)

No SDK internals. No monkey-patching. Just timestamp hooks.


What it measures

streamlat focuses on metrics that actually matter for streaming UX:

  • TTFT (Time to First Token)
    Time from request start until the first text chunk is received.

  • End-to-End Latency (E2E)
    Time from request start until the stream completes.

  • Stream Text Time
    Time between the first and last streamed text chunks.

  • Event Count
    Number of streamed chunks/events received.

  • Text Event Count
    Number of chunks that actually carried text.

  • Estimated Throughput
    Tokens per second (estimated from output length).

All metrics are computed client-side, with O(1) overhead per streamed chunk.


How it works under the hood

streamlat does not intercept network traffic.

Instead, you explicitly mark four points in your streaming loop:

  1. Request start
  2. Each streamed event
  3. Each streamed text chunk
  4. Stream completion

Internally it:

  • Uses time.perf_counter_ns() for high-resolution timing
  • Stores only timestamps and counters
  • Performs no tokenization in the hot path
  • Estimates tokens only once at the end

This makes it safe for:

  • Production services
  • Benchmarks
  • Demos
  • Framework-based SDKs

Install

pip install streamlat

Example 1: LangChain (AzureChatOpenAI streaming)

import asyncio
import os

from langchain_openai import AzureChatOpenAI
from streamlat import StreamMetricsCollector


async def main():
    metrics = StreamMetricsCollector()

    llm = AzureChatOpenAI(
        azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
        api_key=os.environ["AZURE_OPENAI_API_KEY"],
        azure_deployment="gpt-5",
        api_version="2024-02-15-preview",
        streaming=True,
    )

    metrics.on_request_start()
    output = ""

    async for chunk in llm.astream("Explain SSE streaming in one paragraph."):
        metrics.on_event()
        text = getattr(chunk, "content", "") or ""
        if text:
            metrics.on_text(text)
            output += text
            print(text, end="", flush=True)

    metrics.on_done()

    m = metrics.finalize(output_text=output)
    print("\nMETRICS:")
    for k, v in metrics.to_dict(m).items():
        print(f"{k}: {v}")


if __name__ == "__main__":
    asyncio.run(main())

Example 2: Microsoft Agent Framework (Azure OpenAI Assistants)

import asyncio
from random import randint
from typing import Annotated
import os

from agent_framework.azure import AzureOpenAIAssistantsClient
from pydantic import Field
from streamlat import StreamMetricsCollector


def get_weather(
    location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
    conditions = ["sunny", "cloudy", "rainy", "stormy"]
    return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."


async def streaming_example() -> None:
    print("=== Streaming Response Example (with metrics) ===")

    async with AzureOpenAIAssistantsClient(
        api_key=os.getenv("AZURE_OPENAI_API_KEY"),
        endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
        api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
    ).create_agent(
        instructions="You are a helpful weather agent.",
        tools=get_weather,
    ) as agent:
        query = "What's the weather like in Portland?"
        print(f"User: {query}")
        print("Agent: ", end="", flush=True)

        metrics = StreamMetricsCollector()
        metrics.on_request_start()
        output = ""

        async for chunk in agent.run_stream(query):
            metrics.on_event()
            text = getattr(chunk, "text", None)
            if text:
                metrics.on_text(text)
                output += text
                print(text, end="", flush=True)

        metrics.on_done()

        m = metrics.finalize(output_text=output)
        print("\nMETRICS:")
        for k, v in metrics.to_dict(m).items():
            print(f"{k}: {v}")


if __name__ == "__main__":
    asyncio.run(streaming_example())

Notes

  • TTFT is client-observed, not server-reported.

  • LangChain and Agent Framework add a small abstraction delay before streaming begins.

  • Token counts are estimated using chars / 4. For exact counts, tokenize once after the stream completes.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamlat-1.0.1.tar.gz (6.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamlat-1.0.1-py3-none-any.whl (5.6 kB view details)

Uploaded Python 3

File details

Details for the file streamlat-1.0.1.tar.gz.

File metadata

  • Download URL: streamlat-1.0.1.tar.gz
  • Upload date:
  • Size: 6.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for streamlat-1.0.1.tar.gz
Algorithm Hash digest
SHA256 330b39a390bceb941db18db71c7af2358f449eba02828a58ed6cc626d3ea1cbf
MD5 7a75495617c2630da11f3f53b2e3b7c7
BLAKE2b-256 42f632e325c0a4448c1c4db650e48f4f235233ccc425fa7cef29c4742294f2c2

See more details on using hashes here.

File details

Details for the file streamlat-1.0.1-py3-none-any.whl.

File metadata

  • Download URL: streamlat-1.0.1-py3-none-any.whl
  • Upload date:
  • Size: 5.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for streamlat-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 0f9710a62cefbb077f6af5eb11fd0726588b2e4df5313dafe2b5d697fdc608a7
MD5 9c25abd7ed23ed91db71d4292d926729
BLAKE2b-256 fea7f489be6dc0c8ca45b17bf634174a93672486676b565ba870cad5b971ff4b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page