Low-overhead latency metrics for SSE streaming responses (TTFT, E2E, throughput).
Project description
streamlat
streamlat is a tiny, zero-dependency utility for measuring latency and throughput of streaming LLM responses.
It is designed for SSE-style streaming and works cleanly with:
- Raw OpenAI structure streaming
- SDK abstractions (LangChain, Microsoft Agent Framework)
No SDK internals. No monkey-patching. Just timestamp hooks.
What it measures
streamlat focuses on metrics that actually matter for streaming UX:
-
TTFT (Time to First Token)
Time from request start until the first text chunk is received. -
End-to-End Latency (E2E)
Time from request start until the stream completes. -
Stream Text Time
Time between the first and last streamed text chunks. -
Event Count
Number of streamed chunks/events received. -
Text Event Count
Number of chunks that actually carried text. -
Estimated Throughput
Tokens per second (estimated from output length).
All metrics are computed client-side, with O(1) overhead per streamed chunk.
How it works under the hood
streamlat does not intercept network traffic.
Instead, you explicitly mark four points in your streaming loop:
- Request start
- Each streamed event
- Each streamed text chunk
- Stream completion
Internally it:
- Uses
time.perf_counter_ns()for high-resolution timing - Stores only timestamps and counters
- Performs no tokenization in the hot path
- Estimates tokens only once at the end
This makes it safe for:
- Production services
- Benchmarks
- Demos
- Framework-based SDKs
Install
pip install streamlat
Example 1: LangChain (AzureChatOpenAI streaming)
import asyncio
import os
from langchain_openai import AzureChatOpenAI
from streamlat import StreamMetricsCollector
async def main():
metrics = StreamMetricsCollector()
llm = AzureChatOpenAI(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_key=os.environ["AZURE_OPENAI_API_KEY"],
azure_deployment="gpt-5",
api_version="2024-02-15-preview",
streaming=True,
)
metrics.on_request_start()
output = ""
async for chunk in llm.astream("Explain SSE streaming in one paragraph."):
metrics.on_event()
text = getattr(chunk, "content", "") or ""
if text:
metrics.on_text(text)
output += text
print(text, end="", flush=True)
metrics.on_done()
m = metrics.finalize(output_text=output)
print("\nMETRICS:")
for k, v in metrics.to_dict(m).items():
print(f"{k}: {v}")
if __name__ == "__main__":
asyncio.run(main())
Example 2: Microsoft Agent Framework (Azure OpenAI Assistants)
import asyncio
from random import randint
from typing import Annotated
import os
from agent_framework.azure import AzureOpenAIAssistantsClient
from pydantic import Field
from streamlat import StreamMetricsCollector
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
async def streaming_example() -> None:
print("=== Streaming Response Example (with metrics) ===")
async with AzureOpenAIAssistantsClient(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION"),
).create_agent(
instructions="You are a helpful weather agent.",
tools=get_weather,
) as agent:
query = "What's the weather like in Portland?"
print(f"User: {query}")
print("Agent: ", end="", flush=True)
metrics = StreamMetricsCollector()
metrics.on_request_start()
output = ""
async for chunk in agent.run_stream(query):
metrics.on_event()
text = getattr(chunk, "text", None)
if text:
metrics.on_text(text)
output += text
print(text, end="", flush=True)
metrics.on_done()
m = metrics.finalize(output_text=output)
print("\nMETRICS:")
for k, v in metrics.to_dict(m).items():
print(f"{k}: {v}")
if __name__ == "__main__":
asyncio.run(streaming_example())
Notes
-
TTFT is client-observed, not server-reported.
-
LangChain and Agent Framework add a small abstraction delay before streaming begins.
-
Token counts are estimated using chars / 4. For exact counts, tokenize once after the stream completes.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamlat-1.0.1.tar.gz.
File metadata
- Download URL: streamlat-1.0.1.tar.gz
- Upload date:
- Size: 6.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
330b39a390bceb941db18db71c7af2358f449eba02828a58ed6cc626d3ea1cbf
|
|
| MD5 |
7a75495617c2630da11f3f53b2e3b7c7
|
|
| BLAKE2b-256 |
42f632e325c0a4448c1c4db650e48f4f235233ccc425fa7cef29c4742294f2c2
|
File details
Details for the file streamlat-1.0.1-py3-none-any.whl.
File metadata
- Download URL: streamlat-1.0.1-py3-none-any.whl
- Upload date:
- Size: 5.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0f9710a62cefbb077f6af5eb11fd0726588b2e4df5313dafe2b5d697fdc608a7
|
|
| MD5 |
9c25abd7ed23ed91db71d4292d926729
|
|
| BLAKE2b-256 |
fea7f489be6dc0c8ca45b17bf634174a93672486676b565ba870cad5b971ff4b
|