Token usage visibility and budget enforcement for AI agents
Project description
tokencap
Token usage visibility and budget enforcement for AI agents. Works out of the box. Scales to Redis when you need it.
pip install tokencap
What it is
tokencap is a Python library that enables you to track token usage and enforce budgets across your AI agents, giving you full visibility and granular control over what each one is consuming.
Wrap your Anthropic or OpenAI client, or drop one line at the top of your script if you use LangChain, CrewAI, or any other agent framework, and tokencap tracks every token your agents spend. Set limits per session, per tenant, or per pipeline run. When a budget is hit, tokencap warns, degrades to a cheaper model, or blocks the next call before it reaches the provider.
No proxy. No infrastructure. No cloud account. It runs in your process.
# Direct SDK use
client = tokencap.wrap(anthropic.Anthropic(), limit=50_000)
# Agent frameworks (LangChain, CrewAI, AutoGen, LlamaIndex)
tokencap.patch(limit=50_000)
The problem
You deploy an AI agent. A bug causes it to retry in a loop. You find out three days later when the API bill arrives.
These are not edge cases. They happen constantly:
- A research agent entered a retry loop and ran for 11 days. Bill: $47,000.
- A GPT-4o agent retried a failed analysis in a tight loop for 10 minutes. Bill: $187.
- A multi-tenant SaaS product had one runaway session exhaust the entire monthly API budget allocated across all customers.
Provider-level spending caps help, but they are coarse and reactive, capping your entire account, not individual agents or tenants, and they do not stop a session mid-flight.
tokencap gives you enforcement in your code. Set a token budget per session, per tenant, per pipeline run, or across any dimension that matters. When the budget is hit, the call is blocked before it reaches the provider, before the tokens are gone.
Quickstart
Set your provider API key the same way you normally would:
export ANTHROPIC_API_KEY=sk-ant-... # Anthropic
export OPENAI_API_KEY=sk-... # OpenAI
Direct client wrapping
Two lines. Works when you control client construction.
import tokencap
import anthropic
client = tokencap.wrap(anthropic.Anthropic())
# [tokencap] session started: session=a3f1c2d4 backend=sqlite:tokencap.db (no limit set)
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Summarize this document."}],
)
status = client.get_status()
print(f"session: {status.dimensions['session'].used} tokens used")
# session: 312 tokens used
Patch mode — for agent frameworks
One line at the top of your script. Works with any framework that uses the Anthropic or OpenAI SDKs internally.
LangChain:
import tokencap
from langchain_anthropic import ChatAnthropic
tokencap.patch(limit=50_000, providers=[tokencap.Provider.ANTHROPIC])
# [tokencap] patched: anthropic
# backend=sqlite:tokencap.db limit=50000 tokens
llm = ChatAnthropic(model="claude-sonnet-4-6")
# ChatAnthropic constructs its own Anthropic client internally.
# tokencap intercepts it automatically.
CrewAI:
import tokencap
from crewai import Agent, Task, Crew
tokencap.patch(limit=100_000) # patches both anthropic + openai by default
researcher = Agent(
role="Researcher",
goal="Research the topic",
llm="anthropic/claude-sonnet-4-6",
)
# All LLM calls made by the crew are tracked and enforced.
In patch mode, status is always via the module-level call:
status = tokencap.get_status()
state = status.dimensions["session"]
print(f"session: {state.used:,} / {state.limit:,} tokens ({state.pct_used:.1%})")
When using patch(), tokencap manages clients internally. Use
tokencap.get_status() for status checks. When using wrap(),
client.get_status() is available directly on the wrapped client.
tokencap.patch() works with any framework that uses the Anthropic or OpenAI
SDKs internally, including LangChain, CrewAI, LlamaIndex, AutoGen, and the
OpenAI Agents SDK.
Call tokencap.unpatch() to reverse all changes when done.
A few things to know about patch() mode:
- Only clients constructed after
patch()is called are intercepted. isinstance(wrapped_client, anthropic.Anthropic)returnsFalse..pyistubs planned for v0.2 will fix type checker compatibility.patch()is for application code only. Do not use it in libraries you publish — it has global side effects.- Always call
tokencap.unpatch()when done, or use atry/finally.
wrap() prints a startup message to stdout so there are no surprises. By default,
tokencap tracks token usage with no enforcement.
Choosing between wrap() and patch()
wrap() |
patch() |
|
|---|---|---|
| You control client construction | Yes | Not required |
| Works with LangChain, CrewAI, etc. | Only if you inject the client | Yes, automatically |
| Status call | client.get_status() |
tokencap.get_status() |
| Global side effects | No | Yes |
| Recommended for | Direct SDK use, libraries | Framework integration |
With wrap(), you call get_status() on the client object directly. With
patch(), the client is managed by the framework — use tokencap.get_status()
instead.
Add a limit
One argument. No other changes. In patch mode: tokencap.patch(limit=50_000)
client = tokencap.wrap(anthropic.Anthropic(), limit=50_000)
# [tokencap] session started: session=a3f1c2d4 backend=sqlite:tokencap.db limit=50000 tokens
Limits can be loaded from environment variables for dynamic configuration:
import os
client = tokencap.wrap(
anthropic.Anthropic(),
limit=int(os.environ.get("TOKENCAP_LIMIT", "50000")),
)
The same pattern works with patch():
tokencap.patch(
limit=int(os.environ.get("TOKENCAP_LIMIT", "50000")),
providers=[tokencap.Provider.ANTHROPIC],
)
Check status at any time:
status = client.get_status()
for dim, state in status.dimensions.items():
print(f"{dim}: {state.used:,} / {state.limit:,} tokens ({state.pct_used:.1%})")
# session: 31,200 / 50,000 tokens (62.4%)
When the session hits 50,000 tokens, BudgetExceededError is raised before the
next call is made:
try:
response = client.messages.create(...)
except tokencap.BudgetExceededError as e:
for dim in e.check_result.violated:
state = e.check_result.states[dim]
print(f"{dim} exceeded: {state.used:,} / {state.limit:,} tokens")
# session exceeded: 50,312 / 50,000 tokens
Full policy
For warnings, model degradation, and webhooks before the hard stop, pass a policy.
In patch mode: tokencap.patch(policy=my_policy)
import tokencap
import anthropic
def on_warn(status):
print(f"Warning: {status.dimensions['session'].pct_used:.0%} used")
client = tokencap.wrap(
anthropic.Anthropic(),
policy=tokencap.Policy(
dimensions={
"session": tokencap.DimensionPolicy(
limit=50_000,
thresholds=[
tokencap.Threshold(
at_pct=0.8,
actions=[tokencap.Action(kind=tokencap.ActionKind.WARN, callback=on_warn)],
),
tokencap.Threshold(
at_pct=0.9,
actions=[tokencap.Action(kind=tokencap.ActionKind.DEGRADE, degrade_to="claude-haiku-4-5")],
),
tokencap.Threshold(
at_pct=1.0,
actions=[tokencap.Action(kind=tokencap.ActionKind.BLOCK)],
),
],
),
}
),
)
# [tokencap] session started: session=a3f1c2d4 backend=sqlite:tokencap.db limit=50000 tokens
The agent makes many calls. Tokens accumulate. When 80% is crossed, the WARN callback fires once:
Warning: 82% used
After 90%, subsequent calls automatically use claude-haiku-4-5 instead of the
requested model. The calling code never changes.
When the session reaches 100%, the next call raises BudgetExceededError:
try:
response = client.messages.create(...)
except tokencap.BudgetExceededError as e:
for dim in e.check_result.violated:
state = e.check_result.states[dim]
print(f"{dim} exceeded: {state.used:,} / {state.limit:,} tokens")
# session exceeded: 51,200 / 50,000 tokens
Check the final state:
status = client.get_status()
for dim, state in status.dimensions.items():
print(f"{dim}: {state.used:,} / {state.limit:,} tokens ({state.pct_used:.1%})")
# session: 51,200 / 50,000 tokens (102.4%)
tokencap.teardown()
limit and policy are mutually exclusive. Passing both raises ConfigurationError.
Policy actions
WARN: fire a callback and continue
Fires once when the threshold is crossed. The call proceeds normally.
tokencap.Threshold(
at_pct=0.8,
actions=[tokencap.Action(kind=tokencap.ActionKind.WARN, callback=on_warn)],
)
DEGRADE: swap to a cheaper model transparently
From this threshold onward, all calls use the degraded model. The calling code never changes.
tokencap.Threshold(
at_pct=0.9,
actions=[tokencap.Action(kind=tokencap.ActionKind.DEGRADE, degrade_to="claude-haiku-4-5")],
)
BLOCK: raise an exception before the call
Fires on every call after the threshold is crossed, not just the first.
tokencap.Threshold(
at_pct=1.0,
actions=[tokencap.Action(kind=tokencap.ActionKind.BLOCK)],
)
WEBHOOK: fire an HTTP POST and continue
Fire-and-forget in a background thread. Does not add latency to the call path. The webhook payload includes dimension names and identifiers. Avoid using PII as identifier values if your webhook endpoint is not fully trusted.
tokencap.Threshold(
at_pct=0.8,
actions=[tokencap.Action(kind=tokencap.ActionKind.WEBHOOK, webhook_url="https://your-app.com/alerts")],
)
String values like "WARN", "BLOCK", "DEGRADE", "WEBHOOK" also work
if you prefer: Action(kind="WARN") is equivalent to Action(kind=ActionKind.WARN).
Checking status
client = tokencap.wrap(anthropic.Anthropic(), limit=50_000)
# [tokencap] session started: session=a3f1c2d4 backend=sqlite:tokencap.db limit=50000 tokens
# ... after some calls ...
status = client.get_status()
for dim, state in status.dimensions.items():
print(f"{dim}: {state.used:,} / {state.limit:,} tokens ({state.pct_used:.1%})")
# session: 31,200 / 50,000 tokens (62.4%)
tokencap.get_status() also works when the client is not in scope — it reads
from the global Guard singleton created by wrap().
Why tokencap is easy to use
Most budget tools track dollars. The problem is that dollar cost changes every time a provider reprices a model, and different call types (cached tokens, batch API, streaming) cost different amounts. You end up with thresholds that silently mean something different after a pricing update.
tokencap uses token counts directly. You set a limit of 50,000 tokens. That limit means exactly the same thing regardless of which model you use, how the provider prices it, or whether tokens are cached.
Dollar cost tracking is deliberately absent. Provider pricing changes without notice and no machine-readable pricing API exists. A dollar figure derived from a stale table is worse than no figure at all. Token counts are always accurate. They come directly from the provider response.
If you know your task takes roughly 5,000 tokens per call and you want to cap at 10 calls, you set a limit of 50,000. No conversion needed.
How tokencap fits alongside other tools
Observability platforms. Platforms like LangSmith, Helicone, and infrastructure-level AI monitoring tools give you dashboards, traces, and historical spend analysis. They tell you what happened. tokencap enforces policy before and during calls. Many teams use both: an observability platform for the ops dashboard, tokencap for enforcement in the application code. They connect via tokencap's OTEL emission.
No tool at all. The most common situation. Most teams set a provider-level spending cap and find out about runaway costs from the bill. tokencap is for teams who want enforcement in the code, not reactive alerts after the money is spent.
Try it yourself
scripts/smoke_test.py runs every tokencap feature against your real Anthropic
and OpenAI API keys — wrap mode, patch mode, all four policy actions,
multi-dimensional budgets, async clients, and more.
export ANTHROPIC_API_KEY=sk-ant-...
export OPENAI_API_KEY=sk-...
python scripts/smoke_test.py
67 tests with live output showing exactly what tokencap does at each step. Costs roughly $0.001 in API credits total. Each section is documented and easy to comment out if you only want to test one provider or one feature.
The wrapped client
tokencap.wrap() returns a proxy client. The common call paths work unchanged.
Here is exactly what is intercepted and what passes through.
Intercepted (tokencap tracks and enforces these):
client.messages.create(): syncclient.messages.stream(): streamingclient.messages.create()on async client: awaitableclient.with_options(...): returns a new wrapped clientclient.with_raw_response(...): returns a new wrapped clientclient.with_streaming_response(...): returns a new wrapped client
Pass-through (tokencap does not see these calls):
client.models.list()and all non-messages endpointsclient.beta.messages.create(): beta features, pass through untrackedclient.messages.batch: batch API, passes through untracked- All attributes:
client.api_key,client.base_url, etc.
client = tokencap.wrap(anthropic.Anthropic())
# Tracked and enforced
response = client.messages.create(model="claude-sonnet-4-6", ...)
with client.messages.stream(model="claude-sonnet-4-6", ...) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Passes through untracked
models = client.models.list()
api_key = client.api_key
# Async works the same way
async_client = tokencap.wrap(anthropic.AsyncAnthropic())
response = await async_client.messages.create(model="claude-sonnet-4-6", ...)
isinstance returns False.
isinstance(wrapped_client, anthropic.Anthropic) is False. This is a known
limitation of the proxy pattern. Stub files (.pyi) are planned for v0.2.
For OpenAI the same rules apply: chat.completions.create() is intercepted,
everything else passes through.
Advanced usage
Multi-agent shared budgets
Multiple agents on the same machine can share a budget by pointing at the same SQLite file:
from tokencap import Guard, Policy, DimensionPolicy, Threshold, Action, ActionKind
from tokencap.backends.sqlite import SQLiteBackend
policy = Policy(
dimensions={
"tenant_daily": DimensionPolicy(
limit=1_000_000,
thresholds=[Threshold(at_pct=1.0, actions=[Action(kind=ActionKind.BLOCK)])],
),
}
)
shared = SQLiteBackend(path="/shared/tokencap.db")
shared_ids = {"tenant_daily": "acme:2026-03-27"}
agent_a = Guard(policy=policy, identifiers=shared_ids, backend=shared)
agent_b = Guard(policy=policy, identifiers=shared_ids, backend=shared)
client_a = agent_a.wrap_anthropic(anthropic.Anthropic())
client_b = agent_b.wrap_openai(openai.OpenAI())
Across machines, switch to Redis. The API is identical:
from tokencap.backends.redis import RedisBackend
shared = RedisBackend("redis://redis-host:6379")
In production, read the URL from an environment variable:
import os
shared = RedisBackend(os.environ["REDIS_URL"])
pip install tokencap[redis]
Async agents
tokencap works with async agents. The backend calls inside call_async() are
synchronous — for most agents this is fine. For high-throughput async agents
(hundreds of concurrent calls), use RedisBackend which handles concurrency
better than SQLite.
Pre-configuring with init()
If you need to set custom identifiers or a non-default backend before wrapping:
tokencap.init(
policy=tokencap.Policy(...),
identifiers={"session": "my-run-id-123"},
backend=RedisBackend("redis://localhost:6379"),
)
client = tokencap.wrap(anthropic.Anthropic())
In patch() mode, init() can pre-configure identifiers and backend before
the framework constructs its clients:
tokencap.init(
policy=tokencap.Policy(...),
identifiers={"session": "my-run-id-123"},
)
tokencap.patch() # framework clients are now intercepted
Development
Running tests
pip install -e ".[dev]"
make test # unit + integration, no external services needed
make redis-up # start local Redis container
make test-live # live tests (mock providers, real Redis)
make redis-down # stop Redis container
Lint
make lint # ruff + mypy --strict
Contributing
See CONTRIBUTING.md for the full guide.
Reporting issues
Bug reports and feature requests are welcome at https://github.com/pykul/tokencap/issues
OTEL integration
tokencap emits OpenTelemetry metrics after every call if opentelemetry-api is
installed. No configuration required.
pip install tokencap[otel]
| Metric | Type | Labels |
|---|---|---|
tokencap.tokens.used |
Counter | provider, model, dimension |
tokencap.tokens.remaining |
Gauge | dimension, identifier |
tokencap.budget.pct_used |
Gauge | dimension, identifier |
tokencap.policy.action_fired |
Counter | action_kind, dimension |
If opentelemetry-api is not installed, all telemetry is a no-op.
Supported providers
| Provider | Install | Token estimation |
|---|---|---|
| Anthropic | pip install tokencap[anthropic] |
Anthropic SDK counter |
| OpenAI | pip install tokencap[openai] |
tiktoken |
Estimation runs before the call. Actual usage is reconciled after. The delta is debited automatically. You never pay twice.
tokencap works with any model string passed to the provider SDK. Token estimation uses the provider SDK counter where available and falls back to character estimation for unknown models. No configuration is needed to use new or custom model names.
What the defaults are
tokencap never does anything silently. When you call wrap(), these defaults apply:
| Setting | Default value |
|---|---|
| Dimension name | "session" |
| Session identifier | auto-generated UUID (printed when wrap() is called) |
| Backend | SQLite file tokencap.db in the current directory |
| Enforcement | none (tracking only) unless limit= or policy= is passed |
Pass quiet=True to wrap() to suppress the startup message.
API reference
tokencap.wrap(client, limit=None, policy=None, quiet=False)
Wraps an Anthropic or OpenAI client (sync or async). limit is a token count
shorthand for BLOCK at 100%. policy accepts a full Policy object. limit
and policy are mutually exclusive. DimensionPolicy.reset_every is defined
but not yet active in v0.1.
If wrap() is called a second time while a global Guard is already active
(without calling teardown() first), tokencap logs a WARNING and reuses the
existing Guard. The new limit= or policy= argument is ignored. Call
tokencap.teardown() before wrap() to start a fresh session.
The wrapped client has get_status() directly:
client = tokencap.wrap(anthropic.Anthropic())
status = client.get_status() # returns StatusResponse
Module-level functions for when the client is not in scope:
tokencap.get_status() # returns StatusResponse from global Guard
tokencap.teardown() # closes backend connections, resets global Guard
tokencap.init(policy, identifiers=None, backend=None, otel_enabled=True, quiet=False)
Optional. Pre-configures the global Guard before wrap() is called.
tokencap.patch(limit=None, policy=None, quiet=False, providers=None)
Monkey-patches SDK constructors for framework integration. providers defaults
to [Provider.ANTHROPIC, Provider.OPENAI]. Pass a subset like
providers=[Provider.ANTHROPIC] to patch only one SDK. String values also
accepted. unpatch() reverses only what was patched.
Enums
tokencap.ActionKind.WARN # "WARN"
tokencap.ActionKind.BLOCK # "BLOCK"
tokencap.ActionKind.DEGRADE # "DEGRADE"
tokencap.ActionKind.WEBHOOK # "WEBHOOK"
tokencap.Provider.ANTHROPIC # "anthropic"
tokencap.Provider.OPENAI # "openai"
tokencap.ResetPeriod.HOUR # "hour"
tokencap.ResetPeriod.DAY # "day"
All enums inherit from str. String values are accepted everywhere for
backwards compatibility.
ResetPeriod is defined and exported but reset_every is not yet active.
Setting reset_every on a DimensionPolicy has no effect in v0.1. Automatic
period resets are planned for v0.2. To reset a budget manually, call
backend.reset(key) directly.
StatusResponse fields
status = tokencap.get_status()
status.timestamp # str, ISO 8601 UTC
status.dimensions # dict[str, BudgetState]
status.active_policy # str, policy name
status.next_threshold # ThresholdInfo | None
state = status.dimensions["session"]
state.limit # int, tokens
state.used # int, tokens
state.remaining # int, tokens
state.pct_used # float, e.g. 0.624
Exceptions
tokencap.BudgetExceededError # e.check_result.violated: list[str]
# e.check_result.states: dict[str, BudgetState]
tokencap.BackendError # unrecoverable storage failure
tokencap.ConfigurationError # invalid configuration: limit + policy both passed,
# patch() called twice, unknown provider name, etc.
Installation
pip install tokencap
Requires Python 3.9+.
pip install tokencap[anthropic] # Anthropic SDK
pip install tokencap[openai] # OpenAI SDK + tiktoken
pip install tokencap[redis] # Redis backend
pip install tokencap[otel] # OpenTelemetry
pip install tokencap[all] # everything
Roadmap
v0.2:
- Google Gemini, Mistral, and Cohere provider support
asyncio.to_thread()wrapping for async-safe backend calls- Periodic budget reset via
reset_every .pyistub files for correct type checker behavior withwrap()- Per-call sub-identifier tagging
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tokencap-0.2.0.tar.gz.
File metadata
- Download URL: tokencap-0.2.0.tar.gz
- Upload date:
- Size: 102.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d2ce13d3873bf57f9b8c023a3168e9cac6aa7cb84c79119c7567f6c8f57986ce
|
|
| MD5 |
7b5f39273fa95948aac3a2e35cc3c5c1
|
|
| BLAKE2b-256 |
3aedb9c159480314cc6b23bdae50df45254a013558aea44d61b0e7928de2fa88
|
Provenance
The following attestation bundles were made for tokencap-0.2.0.tar.gz:
Publisher:
publish.yml on pykul/tokencap
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
tokencap-0.2.0.tar.gz -
Subject digest:
d2ce13d3873bf57f9b8c023a3168e9cac6aa7cb84c79119c7567f6c8f57986ce - Sigstore transparency entry: 1234949028
- Sigstore integration time:
-
Permalink:
pykul/tokencap@882e84703901f63f4c7587a322d5b914d0d9fe8d -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/pykul
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@882e84703901f63f4c7587a322d5b914d0d9fe8d -
Trigger Event:
push
-
Statement type:
File details
Details for the file tokencap-0.2.0-py3-none-any.whl.
File metadata
- Download URL: tokencap-0.2.0-py3-none-any.whl
- Upload date:
- Size: 39.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dbf91540a35e3fdb38699905c0bb5175e390ffa562a5c288c08d34943f17f794
|
|
| MD5 |
586efed034f13d096aa9ac54110db6af
|
|
| BLAKE2b-256 |
a221697be1c3a3221b769fa1b5ee2afe025d69e2ed6595bff2ce6eb1b720fa16
|
Provenance
The following attestation bundles were made for tokencap-0.2.0-py3-none-any.whl:
Publisher:
publish.yml on pykul/tokencap
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
tokencap-0.2.0-py3-none-any.whl -
Subject digest:
dbf91540a35e3fdb38699905c0bb5175e390ffa562a5c288c08d34943f17f794 - Sigstore transparency entry: 1234949092
- Sigstore integration time:
-
Permalink:
pykul/tokencap@882e84703901f63f4c7587a322d5b914d0d9fe8d -
Branch / Tag:
refs/tags/v0.2.0 - Owner: https://github.com/pykul
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@882e84703901f63f4c7587a322d5b914d0d9fe8d -
Trigger Event:
push
-
Statement type: