Usage metering and cost enforcement per tenant for LLM applications.
Project description
token-limit
Usage metering and cost enforcement for LLM calls, built for multi-tenant B2B applications.
One call instruments every OpenAI, Anthropic, Google AI, DeepSeek, and OpenRouter request — no changes required in your LLM call sites.
Installation
Install the base package, then add extras only for the providers you use:
# Core package (no provider SDKs included)
pip install token-limit
# With specific provider support
pip install token-limit[openai] # OpenAI + DeepSeek + OpenRouter
pip install token-limit[anthropic] # Anthropic Claude
pip install token-limit[google] # Google AI (Gemini)
# Everything at once
pip install token-limit[all]
Note: Provider extras install the corresponding official SDK as a dependency (
openai,anthropic,google-genai). If you already have these SDKs pinned in your project, installing the extras is still safe — they will not downgrade your existing versions.
How it works
token-limit monkey-patches the official provider SDKs at startup. Every LLM call your application makes is automatically intercepted, token usage is extracted from the response, and a lightweight event is queued and batched to your backend ingest endpoint in the background. Your LLM calls are never blocked or slowed down.
Your code → [patched SDK] → LLM provider
↓
LLMEvent captured
↓
EventQueue (in-memory, daemon thread)
↓ (every 5s or 50 events)
POST /v1/ingest → Your backend → Dashboard
1. Initialize once at application startup
from token_limit import Meter, MeterConfig
meter = Meter(MeterConfig(
api_key="sk-...", # Your TokenLimit API key
))
meter.patch_all() # patches OpenAI, Anthropic, Google, DeepSeek — all at once
2. Tag requests per tenant
Use the context manager to scope a block of LLM calls to a tenant. Thread-safe and async-safe via contextvars — concurrent requests with different tenants are fully isolated.
with meter.for_tenant("user5"):
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello"}],
)
except LimitExceededException:
show_upgrade_message()
For middleware or request handlers where a context manager isn't convenient:
meter.set_tenant(request.tenant_id) # sets for current thread/async task
Supported providers
| Provider | What gets patched | Tokens captured |
|---|---|---|
| OpenAI | chat.completions.create, responses.create, completions.create (legacy), embeddings.create, audio.transcriptions.create, audio.translations.create, audio.speech.create, images.generate, images.edit — all sync + async |
input, output, cached, plus endpoint-specific fields (character count, image dimensions, audio duration) |
| Anthropic | messages.Messages.create (sync + async) |
input, output, cached (cache read), cache_creation |
| Google AI | Models.generate_content, Models.generate_content_stream, AsyncModels.generate_content, AsyncModels.generate_content_stream — all via google.genai |
input, output, total, cached |
| DeepSeek | chat.completions.create, fim.completions.create, beta.chat.completions.create — sync + async; covers both first-party deepseek SDK and openai client pointed at api.deepseek.com |
input, output, cached (cache hit), cache_miss, reasoning (deepseek-reasoner) |
| OpenRouter | chat.completions.create on registered client instances — sync + async, streaming and non-streaming |
input, output, cached, cost_usd (when billing header enabled), upstream_provider |
All patches are installed/uninstalled cleanly — original methods are always restored on unpatch_all() or process exit.
Cost enforcement
Spend limits are configured per tenant in USD and enforced on every intercepted LLM call.
# Per-month limit (default)
meter.set_limit("tenant-id-456", limit_usd=50.00)
# Per-day limit
meter.set_limit("tenant-id-456", limit_usd=5.00, frequency="per_day")
When a tenant reaches its configured limit, the next intercepted LLM call raises LimitExceededException before any API traffic is sent. Handle it and show an upgrade prompt:
with meter.for_tenant("acme-corp"):
try:
response = client.chat.completions.create(...)
except LimitExceededException:
return {"detail": "upgrade_plan"}
set_limit() immediately invalidates the local cache for that tenant so the new threshold is honored on the very next call, without waiting for the TTL to expire.
Limit check caching. check_limit() and async_check_limit() are called on every patched SDK call. Results are cached per tenant for limit_check_cache_ttl seconds (default 5 s) to avoid a network round-trip on every LLM call. A local token trip-wire also catches runaway bursts within the TTL window without waiting for the next backend sync.
OpenAI patch details
Patches SDK methods at the class level, so every openai.OpenAI / openai.AsyncOpenAI client created before or after patching is automatically covered.
Patched surfaces
chat.completions.create (sync + async)
ChatCompletions for all models. Handles stream=True transparently: forces stream_options={"include_usage": True} so the final chunk carries a usage summary, then proxies the iterator to the caller while capturing that summary in a finally block.
Fields: input_tokens, output_tokens, total_tokens, cached_tokens, request_id, model, stream, duration_ms.
responses.create (sync + async, openai >= 1.30)
Responses API. Also captures five image-billing dimensions when the image_generation tool is active, and image_count for audit.
Fields: input_tokens, output_tokens, total_tokens, cached_tokens, image billing dimensions, image_count.
completions.create (legacy /v1/completions, sync + async)
Legacy text-completion endpoint for models such as gpt-3.5-turbo-instruct. Streaming handled identically to chat completions.
Fields: input_tokens, output_tokens, total_tokens, model, stream, duration_ms.
embeddings.create (sync + async)
Text-embedding endpoint (text-embedding-3-*, ada-002, etc.). output_tokens is always 0.
Fields: input_tokens, output_tokens (0), total_tokens, model, duration_ms.
audio.transcriptions.create (sync + async)
Whisper STT. Two billing modes handled automatically:
- Per-minute models (
whisper-1): readsresponse.duration(requiresresponse_format="verbose_json"; emits a warning if omitted). - Per-token models (
gpt-4o-transcribe,gpt-4o-mini-transcribe): readsusage.input_tokens/usage.output_tokens.
Fields: input_tokens, output_tokens, audio_input_tokens, audio_output_tokens, duration_seconds, duration_unavailable, model, duration_ms.
audio.translations.create (sync + async)
Whisper translation. Identical billing logic to audio.transcriptions; endpoint tag differs.
audio.speech.create (TTS, sync + async)
Two billing modes:
- Per-character models (
tts-1,tts-1-hd): nousageobject;character_countderived from the caller'sinputkwarg. - Per-token models (
gpt-4o-mini-tts): readsusage.input_tokens/usage.output_tokens; setscharacter_count=0to prevent double-billing.
Fields: input_tokens, output_tokens, character_count, model, duration_ms.
images.generate and images.edit (sync + async)
Image generation and editing for gpt-image-* models. Captures five token billing dimensions from usage.input_tokens_details and usage.output_tokens, plus image_count.
Fields: input_text_tokens, cached_input_text_tokens, input_image_tokens, cached_input_image_tokens, output_image_tokens, total_tokens, image_count.
Not patched (OpenAI)
moderations.create— free endpoint, no per-token cost.fine_tuning.jobs.*— billed on a separate training rate; not real-time.beta.assistants.*/beta.threads.*/beta.runs.*— usage only available after async run completion; not yet supported.uploads.*/beta.vector_stores/files— storage-billed, not token-billed.realtime.*— persistent WebSocket; no discrete.create()to wrap.audio.transcriptions.createwithstream=True— streaming transcription path not yet captured.
Anthropic patch details
Patches Messages.create and AsyncMessages.create at the class level, so all anthropic.Anthropic / anthropic.AsyncAnthropic clients created before or after patching are automatically covered.
Patched surfaces
messages.Messages.create (sync) and messages.AsyncMessages.create (async)
Claude chat/completion for all claude-* models. Both stream=False (default) and stream=True are handled. For streaming, a helper proxies the iterator unchanged while accumulating usage across events (message_start → input tokens; message_delta → output + cache tokens).
Fields: input_tokens, output_tokens, total_tokens, cached_tokens (cache read hits), cache_creation_tokens (cache write), request_id, model, stream, duration_ms, tenant_id, error, input_tokens_details (SDK >= 0.26, model-dependent).
Not patched (Anthropic)
beta.messages.batches.*— asynchronous batch completion; results fetched separately from submission. Not yet supported.- Embeddings — Anthropic does not offer a text-embedding API.
- Audio/TTS — Anthropic does not offer speech endpoints.
- Image generation — Claude is vision-input only; image tokens are already counted inside
input_tokens.
Google AI patch details
Patches four methods at the class level on google.genai.models.Models and google.genai.models.AsyncModels. Unlike OpenAI, the google.genai SDK exposes streaming as a separate method rather than a stream=True flag.
Patched surfaces
Models.generate_content (sync, non-streaming)
Usage read from response.usage_metadata directly after the call returns.
Models.generate_content_stream (sync, streaming)
Returns a synchronous iterator of GenerateContentResponse chunks. Usage is only present on the last chunk; the helper tracks last_chunk across the full iteration and reads its usage_metadata in a finally block.
AsyncModels.generate_content (async, non-streaming)
Awaits meter.async_check_limit() to avoid blocking the event loop.
AsyncModels.generate_content_stream (async, streaming)
Handles both coroutine-returning and direct async-iterator forms via inspect.isawaitable.
Fields (all four surfaces): input_tokens (prompt_token_count), output_tokens (candidates_token_count, includes thinking tokens on the direct Gemini API), total_tokens (read from response, not derived), cached_tokens (cached_content_token_count), stream, request_id, duration_ms, tenant_id, error.
Not patched (Google AI)
- Vertex AI SDK (
google.cloud.aiplatform) — separate SDK, not yet supported. models.embed_content/models.embed_content_batch— not yet supported.models.generate_images/models.upscale_image— billed per image, not per token.models.generate_videos— billed per second of output, not yet supported.live.*— WebSocket-based session; no discrete call to wrap.
DeepSeek patch details
Covers both integration paths: the first-party deepseek package and an openai client pointed at api.deepseek.com. Both paths are attempted independently — a failure in one does not prevent the other from being installed.
Patched surfaces
chat.completions.create (sync + async)
Standard chat completions. Streaming handled identically to OpenAI: forces stream_options={"include_usage": True} and captures usage from the final chunk.
fim.completions.create (sync + async)
DeepSeek-specific fill-in-middle (FIM) endpoint. Records fim_prefix (from kwargs["prompt"] or kwargs["prefix"]) and fim_suffix alongside standard token counts.
beta.chat.completions.create (sync + async)
Beta chat namespace alias present in SDK >= 1.x; uses the same extractor as the main chat surface.
DeepSeek-specific fields
| Event field | Source |
|---|---|
cached_tokens |
usage.prompt_cache_hit_tokens |
cache_miss_tokens |
usage.prompt_cache_miss_tokens |
reasoning_tokens |
usage.completion_tokens_details.reasoning_tokens (deepseek-reasoner only) |
fim_prefix |
kwargs["prompt"] or kwargs["prefix"] |
fim_suffix |
kwargs["suffix"] |
Not patched (DeepSeek)
models.list— metadata endpoint, no token cost.files.*— file upload/management, not billed per token.
OpenRouter patch details
OpenRouter exposes an OpenAI-compatible REST API, so developers typically point a standard openai.OpenAI (or AsyncOpenAI) client at https://openrouter.ai/api/v1. Unlike the other providers, OpenRouter is patched at the instance level rather than the class level — only the specific client instances you register are instrumented, leaving any other OpenAI clients untouched.
Registration
# Pattern 1 — sync factory (recommended)
client = meter.openrouter_client(api_key="sk-or-v1-...")
# Pattern 2 — async factory
client = meter.async_openrouter_client(api_key="sk-or-v1-...")
# Pattern 3 — register an existing client
meter.register_openrouter_client(existing_client)
# Pattern 4 — fully manual
meter.track_manually(provider="openrouter", model="...", input_tokens=..., output_tokens=...)
Patched surfaces
chat.completions.create (sync + async, on registered instances only)
Streaming handled identically to OpenAI: stream_options={"include_usage": True} is injected automatically so the final chunk carries usage. The wrapper is installed directly on client.chat.completions.create and is guarded against double-patching.
OpenRouter-specific fields
| Event field | Source |
|---|---|
upstream_provider |
Portion before / in the model string, e.g. "anthropic" from "anthropic/claude-3-5-sonnet" |
cost_usd |
usage.cost — actual USD cost when the caller passes X-Or-Billing: true |
Fields (all calls): input_tokens (usage.prompt_tokens), output_tokens (usage.completion_tokens), total_tokens, cached_tokens (usage.prompt_tokens_details.cached_tokens), cost_usd, upstream_provider, model, stream, request_id, duration_ms, tenant_id.
Configuration reference
All configuration lives in MeterConfig, passed once at startup:
from token_limit import Meter, MeterConfig
meter = Meter(MeterConfig(
# Required
api_key="your-api-key", # authenticates event ingest and limit checks
url="https://...", # POST endpoint that receives event batches
# Batching — tune for your traffic volume
flush_interval=5.0, # seconds between background flushes
max_batch_size=50, # flush early when queue reaches this size
max_queue_size=1000, # drop oldest events if queue overflows
# Limit checks
limit_check_cache_ttl=5.0, # seconds a check_limit() result is cached per tenant
# Behaviour
debug=False, # log every captured event to stdout
raise_on_error=False, # re-raise exceptions from within patches
# Hooks
on_event=None, # Callable[[LLMEvent], None] — called after every capture
on_flush_error=None, # Callable[[Exception], None] — called on send failure
# Which providers to patch (default = all four built-ins)
patches=["openai", "anthropic", "langchain", "google"],
))
Event shape
Every captured call produces an LLMEvent. Fields are sourced directly from types.py:
@dataclass
class LLMEvent:
# identity
event_id: str # UUID, auto-generated
tenant_id: str # set via for_tenant() or set_tenant()
session_id: Optional[str]
# provider / model
provider: str # "openai" | "anthropic" | "google" | "deepseek" | "openrouter"
model: str # e.g. "gpt-4o", "claude-3-5-sonnet-20241022"
endpoint: str # e.g. "chat.completions", "messages", "fim.completions"
# text token usage
input_tokens: int
output_tokens: int
total_tokens: int
cached_tokens: int # OpenAI: sub-field of input_tokens
# Anthropic/Google: separate pool, not in input_tokens
# latency
duration_ms: float # wall-clock time of the LLM call
timestamp: float # unix timestamp of capture
# request metadata
request_id: Optional[str] # x-request-id from provider response headers
stream: bool
error: Optional[str] # set if the LLM call raised an exception
# audio (transcription / translation)
duration_seconds: Optional[float] # per-minute path (whisper-1, verbose_json only)
audio_input_tokens: Optional[int] # per-token path (gpt-4o-transcribe etc.)
audio_output_tokens: Optional[int]
# speech / TTS
character_count: Optional[int] # per-character path (tts-1, tts-1-hd)
# images (gpt-image-* models)
input_text_tokens: Optional[int] # text prompt tokens
cached_input_text_tokens: Optional[int]
input_image_tokens: Optional[int] # reference-image tokens (edit only)
cached_input_image_tokens: Optional[int]
output_image_tokens: Optional[int] # generated-image tokens
image_count: Optional[int] # number of images returned (audit only)
# extras
tags: dict # arbitrary metadata you can attach
All Optional fields are omitted from to_dict() when None, keeping ingest payloads lean.
Advanced usage
Selective patching
# Specify providers in MeterConfig
meter = Meter(MeterConfig(
api_key="...",
url="...",
patches=["openai", "anthropic"], # skip google and deepseek
))
meter.patch_all()
# Or patch / unpatch one provider at a time
meter.patch("deepseek")
meter.unpatch("deepseek")
# Unpatch everything and restore original SDK methods
meter.unpatch_all()
Use as a context manager
Meter supports the context manager protocol — __exit__ calls unpatch_all() and shuts down the background flush queue automatically:
with Meter(MeterConfig(api_key="...", url="...")).patch_all() as meter:
with meter.for_tenant("acme-corp"):
client.chat.completions.create(...)
# all patches restored, queue flushed on exit
Manual tracking
For providers not yet patched, or custom logic:
meter.track_manually(
provider="cohere",
model="command-r-plus",
input_tokens=512,
output_tokens=128,
tenant_id="acme-corp",
)
Any extra keyword arguments are passed through as event fields (unknown fields are dropped with a debug log when debug=True).
Event hook — real-time logging or custom logic
def my_hook(event: LLMEvent) -> None:
print(f"[{event.tenant_id}] {event.model}: {event.total_tokens} tokens")
meter = Meter(MeterConfig(
api_key="...",
url="...",
on_event=my_hook,
))
Flush error handling
def on_flush_error(exc: Exception) -> None:
sentry_sdk.capture_exception(exc)
meter = Meter(MeterConfig(
api_key="...",
url="...",
on_flush_error=on_flush_error,
))
Force a flush
# Useful at the end of a batch job or CLI script
meter._queue.flush_now()
Adding a custom provider patch
All provider patches inherit from BasePatch. Implement _install and an extractor function, then register in PATCH_REGISTRY:
from token_limit.patches._base import BasePatch
from token_limit.patches import PATCH_REGISTRY
def _extract(response, args, kwargs, error):
return {
"provider": "cohere",
"endpoint": "chat",
"model": kwargs.get("model", ""),
"input_tokens": getattr(response, "meta", {}).get("billed_units", {}).get("input_tokens", 0),
"output_tokens": getattr(response, "meta", {}).get("billed_units", {}).get("output_tokens", 0),
}
class CoherePatch(BasePatch):
name = "cohere"
def _install(self):
import cohere
self._swap(
cohere.Client, "chat",
self._make_sync_wrapper(cohere.Client.chat, _extract),
)
PATCH_REGISTRY["cohere"] = CoherePatch
meter.patch("cohere")
Project structure
token_limit/
├── __init__.py ← public API: Meter, MeterConfig, LLMEvent
├── meter.py ← Meter class (patch_all, for_tenant, set_limit)
├── config.py ← MeterConfig dataclass
├── types.py ← LLMEvent dataclass
├── exceptions.py ← LimitExceededException exception class
├── patches/
│ ├── _base.py ← BasePatch ABC + sync/async wrapper factories
│ ├── openai_patch.py
│ ├── anthropic_patch.py
│ ├── google_patch.py
│ ├── deepseek_patch.py
│ └── openrouter_patch.py
└── transport/
├── queue.py ← thread-safe EventQueue with background flush
└── http_client.py ← gzip POST, auto-selects httpx/requests/urllib
License
Business Source License 1.1
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file token_limit-0.1.9.tar.gz.
File metadata
- Download URL: token_limit-0.1.9.tar.gz
- Upload date:
- Size: 64.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.4.1 CPython/3.10.12 Linux/6.8.0-124-generic
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a85a246c63b8fda37f6103918a0f0447f01ff8b9587ff683ef1abf828e7049fa
|
|
| MD5 |
cbf87864707f65fb65875b3f1c58cdf5
|
|
| BLAKE2b-256 |
c0275d04d03cb2e6028ddbd68d38f05fb472d9a96a6d753368c0ec4848a89cc2
|
File details
Details for the file token_limit-0.1.9-py3-none-any.whl.
File metadata
- Download URL: token_limit-0.1.9-py3-none-any.whl
- Upload date:
- Size: 70.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.4.1 CPython/3.10.12 Linux/6.8.0-124-generic
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b995afb43c73ee52d4e415ec13da87dd921a0373225cb8ef69368fbaa6588abf
|
|
| MD5 |
0ae6cd56af9fb694c3955c6df5693a96
|
|
| BLAKE2b-256 |
fcb21ac8d69d52c464942164ab78b3dac585b22873bb41a5b1dca77b0bd2de0e
|