Skip to main content

Usage metering and cost enforcement per tenant for LLM applications.

Project description

token-limit

Usage metering and cost enforcement for LLM calls, built for multi-tenant B2B applications.
One call instruments every OpenAI, Anthropic, Google AI, DeepSeek, and OpenRouter request — no changes required in your LLM call sites.


Installation

Install the base package, then add extras only for the providers you use:

# Core package (no provider SDKs included)
pip install token-limit

# With specific provider support
pip install token-limit[openai]      # OpenAI + DeepSeek + OpenRouter
pip install token-limit[anthropic]   # Anthropic Claude
pip install token-limit[google]      # Google AI (Gemini)

# Everything at once
pip install token-limit[all]

Note: Provider extras install the corresponding official SDK as a dependency (openai, anthropic, google-genai). If you already have these SDKs pinned in your project, installing the extras is still safe — they will not downgrade your existing versions.


How it works

token-limit monkey-patches the official provider SDKs at startup. Every LLM call your application makes is automatically intercepted, token usage is extracted from the response, and a lightweight event is queued and batched to your backend ingest endpoint in the background. Your LLM calls are never blocked or slowed down.

Your code  →  [patched SDK]  →  LLM provider
                    ↓
              LLMEvent captured
                    ↓
              EventQueue (in-memory, daemon thread)
                    ↓  (every 5s or 50 events)
              POST /v1/ingest  →  Your backend  →  Dashboard

1. Initialize once at application startup

from token_limit import Meter, MeterConfig

meter = Meter(MeterConfig(
    api_key="sk-...", # Your TokenLimit API key
))

meter.patch_all()  # patches OpenAI, Anthropic, Google, DeepSeek — all at once

2. Tag requests per tenant

Use the context manager to scope a block of LLM calls to a tenant. Thread-safe and async-safe via contextvars — concurrent requests with different tenants are fully isolated.

with meter.for_tenant("user5"):
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": "Hello"}],
        )
    except LimitExceededException:
        show_upgrade_message()

For middleware or request handlers where a context manager isn't convenient:

meter.set_tenant(request.tenant_id)  # sets for current thread/async task

Supported providers

Provider What gets patched Tokens captured
OpenAI chat.completions.create, responses.create, completions.create (legacy), embeddings.create, audio.transcriptions.create, audio.translations.create, audio.speech.create, images.generate, images.edit — all sync + async input, output, cached, plus endpoint-specific fields (character count, image dimensions, audio duration)
Anthropic messages.Messages.create (sync + async) input, output, cached (cache read), cache_creation
Google AI Models.generate_content, Models.generate_content_stream, AsyncModels.generate_content, AsyncModels.generate_content_stream — all via google.genai input, output, total, cached
DeepSeek chat.completions.create, fim.completions.create, beta.chat.completions.create — sync + async; covers both first-party deepseek SDK and openai client pointed at api.deepseek.com input, output, cached (cache hit), cache_miss, reasoning (deepseek-reasoner)
OpenRouter chat.completions.create on registered client instances — sync + async, streaming and non-streaming input, output, cached, cost_usd (when billing header enabled), upstream_provider

All patches are installed/uninstalled cleanly — original methods are always restored on unpatch_all() or process exit.


Cost enforcement

Spend limits are configured per tenant in USD and enforced on every intercepted LLM call.

# Per-month limit (default)
meter.set_limit("tenant-id-456", limit_usd=50.00)

# Per-day limit
meter.set_limit("tenant-id-456", limit_usd=5.00, frequency="per_day")

When a tenant reaches its configured limit, the next intercepted LLM call raises LimitExceededException before any API traffic is sent. Handle it and show an upgrade prompt:

with meter.for_tenant("acme-corp"):
    try:
        response = client.chat.completions.create(...)
    except LimitExceededException:
        return {"detail": "upgrade_plan"}

set_limit() immediately invalidates the local cache for that tenant so the new threshold is honored on the very next call, without waiting for the TTL to expire.

Limit check caching. check_limit() and async_check_limit() are called on every patched SDK call. Results are cached per tenant for limit_check_cache_ttl seconds (default 5 s) to avoid a network round-trip on every LLM call. A local token trip-wire also catches runaway bursts within the TTL window without waiting for the next backend sync.


OpenAI patch details

Patches SDK methods at the class level, so every openai.OpenAI / openai.AsyncOpenAI client created before or after patching is automatically covered.

Patched surfaces

chat.completions.create (sync + async)
ChatCompletions for all models. Handles stream=True transparently: forces stream_options={"include_usage": True} so the final chunk carries a usage summary, then proxies the iterator to the caller while capturing that summary in a finally block.
Fields: input_tokens, output_tokens, total_tokens, cached_tokens, request_id, model, stream, duration_ms.

responses.create (sync + async, openai >= 1.30)
Responses API. Also captures five image-billing dimensions when the image_generation tool is active, and image_count for audit.
Fields: input_tokens, output_tokens, total_tokens, cached_tokens, image billing dimensions, image_count.

completions.create (legacy /v1/completions, sync + async)
Legacy text-completion endpoint for models such as gpt-3.5-turbo-instruct. Streaming handled identically to chat completions.
Fields: input_tokens, output_tokens, total_tokens, model, stream, duration_ms.

embeddings.create (sync + async)
Text-embedding endpoint (text-embedding-3-*, ada-002, etc.). output_tokens is always 0.
Fields: input_tokens, output_tokens (0), total_tokens, model, duration_ms.

audio.transcriptions.create (sync + async)
Whisper STT. Two billing modes handled automatically:

  • Per-minute models (whisper-1): reads response.duration (requires response_format="verbose_json"; emits a warning if omitted).
  • Per-token models (gpt-4o-transcribe, gpt-4o-mini-transcribe): reads usage.input_tokens / usage.output_tokens.

Fields: input_tokens, output_tokens, audio_input_tokens, audio_output_tokens, duration_seconds, duration_unavailable, model, duration_ms.

audio.translations.create (sync + async)
Whisper translation. Identical billing logic to audio.transcriptions; endpoint tag differs.

audio.speech.create (TTS, sync + async)
Two billing modes:

  • Per-character models (tts-1, tts-1-hd): no usage object; character_count derived from the caller's input kwarg.
  • Per-token models (gpt-4o-mini-tts): reads usage.input_tokens / usage.output_tokens; sets character_count=0 to prevent double-billing.

Fields: input_tokens, output_tokens, character_count, model, duration_ms.

images.generate and images.edit (sync + async)
Image generation and editing for gpt-image-* models. Captures five token billing dimensions from usage.input_tokens_details and usage.output_tokens, plus image_count.
Fields: input_text_tokens, cached_input_text_tokens, input_image_tokens, cached_input_image_tokens, output_image_tokens, total_tokens, image_count.

Not patched (OpenAI)

  • moderations.create — free endpoint, no per-token cost.
  • fine_tuning.jobs.* — billed on a separate training rate; not real-time.
  • beta.assistants.* / beta.threads.* / beta.runs.* — usage only available after async run completion; not yet supported.
  • uploads.* / beta.vector_stores / files — storage-billed, not token-billed.
  • realtime.* — persistent WebSocket; no discrete .create() to wrap.
  • audio.transcriptions.create with stream=True — streaming transcription path not yet captured.

Anthropic patch details

Patches Messages.create and AsyncMessages.create at the class level, so all anthropic.Anthropic / anthropic.AsyncAnthropic clients created before or after patching are automatically covered.

Patched surfaces

messages.Messages.create (sync) and messages.AsyncMessages.create (async)
Claude chat/completion for all claude-* models. Both stream=False (default) and stream=True are handled. For streaming, a helper proxies the iterator unchanged while accumulating usage across events (message_start → input tokens; message_delta → output + cache tokens).
Fields: input_tokens, output_tokens, total_tokens, cached_tokens (cache read hits), cache_creation_tokens (cache write), request_id, model, stream, duration_ms, tenant_id, error, input_tokens_details (SDK >= 0.26, model-dependent).

Not patched (Anthropic)

  • beta.messages.batches.* — asynchronous batch completion; results fetched separately from submission. Not yet supported.
  • Embeddings — Anthropic does not offer a text-embedding API.
  • Audio/TTS — Anthropic does not offer speech endpoints.
  • Image generation — Claude is vision-input only; image tokens are already counted inside input_tokens.

Google AI patch details

Patches four methods at the class level on google.genai.models.Models and google.genai.models.AsyncModels. Unlike OpenAI, the google.genai SDK exposes streaming as a separate method rather than a stream=True flag.

Patched surfaces

Models.generate_content (sync, non-streaming)
Usage read from response.usage_metadata directly after the call returns.

Models.generate_content_stream (sync, streaming)
Returns a synchronous iterator of GenerateContentResponse chunks. Usage is only present on the last chunk; the helper tracks last_chunk across the full iteration and reads its usage_metadata in a finally block.

AsyncModels.generate_content (async, non-streaming)
Awaits meter.async_check_limit() to avoid blocking the event loop.

AsyncModels.generate_content_stream (async, streaming)
Handles both coroutine-returning and direct async-iterator forms via inspect.isawaitable.

Fields (all four surfaces): input_tokens (prompt_token_count), output_tokens (candidates_token_count, includes thinking tokens on the direct Gemini API), total_tokens (read from response, not derived), cached_tokens (cached_content_token_count), stream, request_id, duration_ms, tenant_id, error.

Not patched (Google AI)

  • Vertex AI SDK (google.cloud.aiplatform) — separate SDK, not yet supported.
  • models.embed_content / models.embed_content_batch — not yet supported.
  • models.generate_images / models.upscale_image — billed per image, not per token.
  • models.generate_videos — billed per second of output, not yet supported.
  • live.* — WebSocket-based session; no discrete call to wrap.

DeepSeek patch details

Covers both integration paths: the first-party deepseek package and an openai client pointed at api.deepseek.com. Both paths are attempted independently — a failure in one does not prevent the other from being installed.

Patched surfaces

chat.completions.create (sync + async)
Standard chat completions. Streaming handled identically to OpenAI: forces stream_options={"include_usage": True} and captures usage from the final chunk.

fim.completions.create (sync + async)
DeepSeek-specific fill-in-middle (FIM) endpoint. Records fim_prefix (from kwargs["prompt"] or kwargs["prefix"]) and fim_suffix alongside standard token counts.

beta.chat.completions.create (sync + async)
Beta chat namespace alias present in SDK >= 1.x; uses the same extractor as the main chat surface.

DeepSeek-specific fields

Event field Source
cached_tokens usage.prompt_cache_hit_tokens
cache_miss_tokens usage.prompt_cache_miss_tokens
reasoning_tokens usage.completion_tokens_details.reasoning_tokens (deepseek-reasoner only)
fim_prefix kwargs["prompt"] or kwargs["prefix"]
fim_suffix kwargs["suffix"]

Not patched (DeepSeek)

  • models.list — metadata endpoint, no token cost.
  • files.* — file upload/management, not billed per token.

OpenRouter patch details

OpenRouter exposes an OpenAI-compatible REST API, so developers typically point a standard openai.OpenAI (or AsyncOpenAI) client at https://openrouter.ai/api/v1. Unlike the other providers, OpenRouter is patched at the instance level rather than the class level — only the specific client instances you register are instrumented, leaving any other OpenAI clients untouched.

Registration

# Pattern 1 — sync factory (recommended)
client = meter.openrouter_client(api_key="sk-or-v1-...")

# Pattern 2 — async factory
client = meter.async_openrouter_client(api_key="sk-or-v1-...")

# Pattern 3 — register an existing client
meter.register_openrouter_client(existing_client)

# Pattern 4 — fully manual
meter.track_manually(provider="openrouter", model="...", input_tokens=..., output_tokens=...)

Patched surfaces

chat.completions.create (sync + async, on registered instances only)
Streaming handled identically to OpenAI: stream_options={"include_usage": True} is injected automatically so the final chunk carries usage. The wrapper is installed directly on client.chat.completions.create and is guarded against double-patching.

OpenRouter-specific fields

Event field Source
upstream_provider Portion before / in the model string, e.g. "anthropic" from "anthropic/claude-3-5-sonnet"
cost_usd usage.cost — actual USD cost when the caller passes X-Or-Billing: true

Fields (all calls): input_tokens (usage.prompt_tokens), output_tokens (usage.completion_tokens), total_tokens, cached_tokens (usage.prompt_tokens_details.cached_tokens), cost_usd, upstream_provider, model, stream, request_id, duration_ms, tenant_id.


Configuration reference

All configuration lives in MeterConfig, passed once at startup:

from token_limit import Meter, MeterConfig

meter = Meter(MeterConfig(
    # Required
    api_key="your-api-key",       # authenticates event ingest and limit checks
    url="https://...",             # POST endpoint that receives event batches

    # Batching — tune for your traffic volume
    flush_interval=5.0,           # seconds between background flushes
    max_batch_size=50,            # flush early when queue reaches this size
    max_queue_size=1000,          # drop oldest events if queue overflows

    # Limit checks
    limit_check_cache_ttl=5.0,   # seconds a check_limit() result is cached per tenant

    # Behaviour
    debug=False,                  # log every captured event to stdout
    raise_on_error=False,         # re-raise exceptions from within patches

    # Hooks
    on_event=None,                # Callable[[LLMEvent], None] — called after every capture
    on_flush_error=None,          # Callable[[Exception], None] — called on send failure

    # Which providers to patch (default = all four built-ins)
    patches=["openai", "anthropic", "langchain", "google"],
))

Event shape

Every captured call produces an LLMEvent. Fields are sourced directly from types.py:

@dataclass
class LLMEvent:
    # identity
    event_id: str                        # UUID, auto-generated
    tenant_id: str                       # set via for_tenant() or set_tenant()
    session_id: Optional[str]

    # provider / model
    provider: str                        # "openai" | "anthropic" | "google" | "deepseek" | "openrouter"
    model: str                           # e.g. "gpt-4o", "claude-3-5-sonnet-20241022"
    endpoint: str                        # e.g. "chat.completions", "messages", "fim.completions"

    # text token usage
    input_tokens: int
    output_tokens: int
    total_tokens: int
    cached_tokens: int                   # OpenAI: sub-field of input_tokens
                                         # Anthropic/Google: separate pool, not in input_tokens

    # latency
    duration_ms: float                   # wall-clock time of the LLM call
    timestamp: float                     # unix timestamp of capture

    # request metadata
    request_id: Optional[str]            # x-request-id from provider response headers
    stream: bool
    error: Optional[str]                 # set if the LLM call raised an exception

    # audio (transcription / translation)
    duration_seconds: Optional[float]    # per-minute path (whisper-1, verbose_json only)
    audio_input_tokens: Optional[int]    # per-token path (gpt-4o-transcribe etc.)
    audio_output_tokens: Optional[int]

    # speech / TTS
    character_count: Optional[int]       # per-character path (tts-1, tts-1-hd)

    # images (gpt-image-* models)
    input_text_tokens: Optional[int]           # text prompt tokens
    cached_input_text_tokens: Optional[int]
    input_image_tokens: Optional[int]          # reference-image tokens (edit only)
    cached_input_image_tokens: Optional[int]
    output_image_tokens: Optional[int]         # generated-image tokens
    image_count: Optional[int]                 # number of images returned (audit only)

    # extras
    tags: dict                           # arbitrary metadata you can attach

All Optional fields are omitted from to_dict() when None, keeping ingest payloads lean.


Advanced usage

Selective patching

# Specify providers in MeterConfig
meter = Meter(MeterConfig(
    api_key="...",
    url="...",
    patches=["openai", "anthropic"],  # skip google and deepseek
))
meter.patch_all()

# Or patch / unpatch one provider at a time
meter.patch("deepseek")
meter.unpatch("deepseek")

# Unpatch everything and restore original SDK methods
meter.unpatch_all()

Use as a context manager

Meter supports the context manager protocol — __exit__ calls unpatch_all() and shuts down the background flush queue automatically:

with Meter(MeterConfig(api_key="...", url="...")).patch_all() as meter:
    with meter.for_tenant("acme-corp"):
        client.chat.completions.create(...)
# all patches restored, queue flushed on exit

Manual tracking

For providers not yet patched, or custom logic:

meter.track_manually(
    provider="cohere",
    model="command-r-plus",
    input_tokens=512,
    output_tokens=128,
    tenant_id="acme-corp",
)

Any extra keyword arguments are passed through as event fields (unknown fields are dropped with a debug log when debug=True).

Event hook — real-time logging or custom logic

def my_hook(event: LLMEvent) -> None:
    print(f"[{event.tenant_id}] {event.model}: {event.total_tokens} tokens")

meter = Meter(MeterConfig(
    api_key="...",
    url="...",
    on_event=my_hook,
))

Flush error handling

def on_flush_error(exc: Exception) -> None:
    sentry_sdk.capture_exception(exc)

meter = Meter(MeterConfig(
    api_key="...",
    url="...",
    on_flush_error=on_flush_error,
))

Force a flush

# Useful at the end of a batch job or CLI script
meter._queue.flush_now()

Adding a custom provider patch

All provider patches inherit from BasePatch. Implement _install and an extractor function, then register in PATCH_REGISTRY:

from token_limit.patches._base import BasePatch
from token_limit.patches import PATCH_REGISTRY

def _extract(response, args, kwargs, error):
    return {
        "provider": "cohere",
        "endpoint": "chat",
        "model": kwargs.get("model", ""),
        "input_tokens": getattr(response, "meta", {}).get("billed_units", {}).get("input_tokens", 0),
        "output_tokens": getattr(response, "meta", {}).get("billed_units", {}).get("output_tokens", 0),
    }

class CoherePatch(BasePatch):
    name = "cohere"

    def _install(self):
        import cohere
        self._swap(
            cohere.Client, "chat",
            self._make_sync_wrapper(cohere.Client.chat, _extract),
        )

PATCH_REGISTRY["cohere"] = CoherePatch

meter.patch("cohere")

Project structure

token_limit/
├── __init__.py              ← public API: Meter, MeterConfig, LLMEvent
├── meter.py                 ← Meter class (patch_all, for_tenant, set_limit)
├── config.py                ← MeterConfig dataclass
├── types.py                 ← LLMEvent dataclass
├── exceptions.py            ← LimitExceededException exception class
├── patches/
│   ├── _base.py             ← BasePatch ABC + sync/async wrapper factories
│   ├── openai_patch.py
│   ├── anthropic_patch.py
│   ├── google_patch.py
│   ├── deepseek_patch.py
│   └── openrouter_patch.py
└── transport/
    ├── queue.py             ← thread-safe EventQueue with background flush
    └── http_client.py       ← gzip POST, auto-selects httpx/requests/urllib

License

Business Source License 1.1

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

token_limit-0.1.5.tar.gz (58.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

token_limit-0.1.5-py3-none-any.whl (64.2 kB view details)

Uploaded Python 3

File details

Details for the file token_limit-0.1.5.tar.gz.

File metadata

  • Download URL: token_limit-0.1.5.tar.gz
  • Upload date:
  • Size: 58.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.4.1 CPython/3.10.12 Linux/6.8.0-124-generic

File hashes

Hashes for token_limit-0.1.5.tar.gz
Algorithm Hash digest
SHA256 ced5bb791c6177365eb29dffa96205136300e4bb53c1cb7edbf7a8525a11a586
MD5 7046ec52caf109e04b125a6093ba8de6
BLAKE2b-256 93441fcd89c24c334150b923e42d70a506d9cf9b4d606f3b6b2a02083d1b80cd

See more details on using hashes here.

File details

Details for the file token_limit-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: token_limit-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 64.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.4.1 CPython/3.10.12 Linux/6.8.0-124-generic

File hashes

Hashes for token_limit-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 5c7d0cbd8f83d876a712533b2cac65c360a1593888df67b691933df19a9b3cc4
MD5 47aae6460610f3e2a28732a881f8ef08
BLAKE2b-256 a209f371e92b51a7bb25a21fc7ec80f0fbca3e13d73635cb3e1136929cb5a101

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page