A provider-agnostic LLM toolkit with tool calling, skills, and parallel execution.
Project description
llmstitch
A provider-agnostic LLM toolkit with tool calling, skills, and parallel execution.
Stitch together Anthropic, OpenAI, Gemini, Groq, and OpenRouter behind one Agent loop. Define tools with a decorator, compose behaviors as skills, and execute tool calls concurrently — all with a tiny, typed core.
Install
pip install llmstitch[anthropic] # just the Anthropic SDK
pip install llmstitch[openai] # just the OpenAI SDK
pip install llmstitch[gemini] # just the Gemini SDK
pip install llmstitch[groq] # just the Groq SDK
pip install llmstitch[openrouter] # OpenRouter (reuses the openai SDK)
pip install llmstitch[all] # all five
The bare pip install llmstitch has zero runtime dependencies — provider SDKs are opt-in extras.
30-second example
import asyncio
from llmstitch import Agent, tool
from llmstitch.providers.anthropic import AnthropicAdapter
@tool
def get_weather(city: str) -> str:
"""Return a canned weather report for the given city."""
return f"{city}: 72°F and sunny"
agent = Agent(
provider=AnthropicAdapter(),
model="claude-opus-4-7",
system="You are a helpful weather assistant.",
)
agent.tools.register(get_weather)
messages = asyncio.run(agent.run("What's the weather in Tokyo?"))
print(messages[-1].content)
Features
- Provider-agnostic — swap
AnthropicAdapterforOpenAIAdapter,GeminiAdapter,GroqAdapter, orOpenRouterAdapterwithout touching your agent code. - Typed
@tooldecorator — JSON Schema generated from type hints (Optional,Literal, defaults, async). - Parallel tool execution — when a model returns multiple tool calls in one turn, they run concurrently.
- Streaming —
Agent.run_stream()yields provider-neutral events (TextDelta,ToolUseStart/Delta/Stop,MessageStop, terminalStreamDone) and handles tool execution between turns. - Retries — opt in with a
RetryPolicy; exponential backoff with jitter, honorsRetry-Afterheaders, uses each adapter's own transient-error classes. - Token counting —
Agent.count_tokens(prompt)via native provider endpoints (Anthropic, Gemini). - Usage and cost —
agent.usage(aUsageTally) accumulates tokens, turns, API calls, and retries across a run;agent.cost()prices it against aPricingrate card in USD. - Observability — attach an
EventBusand subscribe (sync callback or async iterator) to per-turn model / tool / usage / stop events. Zero overhead when unused. - Cost ceiling — set
cost_ceiling=(USD) and the run halts mid-loop if accumulated spend crosses it. Retries don't double-charge. - Non-raising
run_with_result()— structuredAgentResultwithstop_reason ∈ {"complete", "max_iterations", "cost_ceiling", "error"}so service code never has to catch. - Concurrency-aware tools —
@tool(is_read_only=True, is_concurrency_safe=False)annotates tools; mixed-safety batches run sequentially, all-safe batches parallelize withasyncio.gather. - Skills — bundle a system prompt with a set of tools; compose with
.extend(). - PEP 561 typed — ships with
py.typed, fully checked undermypy --strict.
Streaming example
import asyncio
from llmstitch import Agent, TextDelta, StreamDone
from llmstitch.providers.anthropic import AnthropicAdapter
async def main() -> None:
agent = Agent(provider=AnthropicAdapter(), model="claude-opus-4-7")
async for event in agent.run_stream("Tell me a haiku about streams."):
if isinstance(event, TextDelta):
print(event.text, end="", flush=True)
elif isinstance(event, StreamDone):
print(f"\n[stop_reason={event.response.stop_reason}]")
asyncio.run(main())
Retries
from llmstitch import Agent, RetryPolicy
from llmstitch.providers.anthropic import AnthropicAdapter
agent = Agent(
provider=AnthropicAdapter(),
model="claude-opus-4-7",
retry_policy=RetryPolicy(
max_attempts=3,
retry_on=AnthropicAdapter.default_retryable(),
),
)
Transient errors (rate limits, timeouts, connection drops, 5xx) are retried with exponential backoff + jitter; Retry-After headers raise the delay floor. Non-retryable exceptions pass through unchanged. Retries cover Agent.run (non-streaming) — run_stream is not retried in v0.1.3 because deltas may already have been yielded to the caller.
Token counting
count = await agent.count_tokens("How many tokens is this?")
print(count.input_tokens)
Available natively on AnthropicAdapter and GeminiAdapter. Other adapters raise NotImplementedError — llmstitch doesn't estimate with third-party tokenizers, since the counts can disagree with the provider's own.
Usage and cost
from llmstitch import Agent, Pricing
from llmstitch.providers.anthropic import AnthropicAdapter
agent = Agent(
provider=AnthropicAdapter(),
model="claude-opus-4-7",
pricing=Pricing(input_per_mtok=15.00, output_per_mtok=75.00), # paste from vendor rate card
)
await agent.run("Summarize the Iliad in three sentences.")
print(agent.usage) # UsageTally(input_tokens=..., output_tokens=..., turns=1, api_calls=1, retries=0)
print(agent.cost().total) # USD
agent.usage accumulates across every run / run_stream on that agent — tokens (fed by adapters that report usage), turns (model responses folded in), api_calls (provider invocations), and retries (from the retry policy). Call agent.usage.reset() to zero the counters between logical sessions, or usage.cost(other_pricing) directly to price the same tally against a different rate card. The default Pricing(1.00, 2.00) is a placeholder — pass real vendor rates for accurate costs.
Observability
from llmstitch import (
Agent, EventBus, Event,
AgentStopped, ToolExecutionCompleted, UsageUpdated,
)
from llmstitch.providers.anthropic import AnthropicAdapter
def on_event(event: Event) -> None:
if isinstance(event, ToolExecutionCompleted):
print(f"{event.call.name} -> {event.result.content!r} in {event.duration_s*1000:.0f}ms")
elif isinstance(event, UsageUpdated) and event.delta is not None:
print(f"+{event.delta.get('input_tokens', 0)}in / "
f"+{event.delta.get('output_tokens', 0)}out "
f"(total {event.usage.total_tokens})")
elif isinstance(event, AgentStopped):
print(f"stop: {event.stop_reason} after {event.turns} turns")
bus = EventBus()
bus.subscribe(on_event) # also supports `async for event in bus.stream()`
agent = Agent(provider=AnthropicAdapter(), model="claude-opus-4-7", event_bus=bus)
EventBus emits frozen dataclasses for every phase of the run — AgentStarted, TurnStarted, ModelRequestSent, ModelResponseReceived, ToolExecutionStarted/Completed, UsageUpdated, AgentStopped. Subscriber exceptions are swallowed with a RuntimeWarning so observers cannot break the agent loop. Events flow through the bus only — they are not interleaved into run_stream's StreamEvent iterator.
Cost ceiling and non-raising runs
from llmstitch import Agent, AgentResult, CostCeilingExceeded, Pricing
from llmstitch.providers.anthropic import AnthropicAdapter
agent = Agent(
provider=AnthropicAdapter(),
model="claude-opus-4-7",
pricing=Pricing(input_per_mtok=15.00, output_per_mtok=75.00),
cost_ceiling=0.50, # USD — run halts if spend crosses this
)
result: AgentResult = await agent.run_with_result("Draft a short reply.")
match result.stop_reason:
case "complete": print(result.text)
case "cost_ceiling": print(f"hit budget: {result.error}")
case "max_iterations": print("loop overran")
case "error": print(f"crashed: {type(result.error).__name__}")
run_with_result() never raises — it catches MaxIterationsExceeded, CostCeilingExceeded, and vendor errors into the returned AgentResult (with partial message history, usage, and cost). run_stream_with_result() is the streaming variant: same StreamEvents as run_stream, then one terminal AgentResultEvent.
The cost_ceiling check runs after each response is folded into the usage tally and outside the retry wrapper, so retries don't double-charge. Agent.run() / Agent.run_stream() still raise CostCeilingExceeded if you prefer classical error handling.
More examples
The examples/ directory has runnable scripts for:
basic.py— minimal agent with one tool.skills_demo.py— composing twoSkills with.extend().streaming.py—Agent.run_streamwith rich event handling.providers_gallery.py— the same agent against every provider.parallel_tools.py— parallel tool execution with order-preserving results.async_and_timeout.py— async tools, per-call timeout, captured-exception semantics.retries.py—RetryPolicywith backoff, jitter, and anon_retryobservability hook.token_counting.py—Agent.count_tokenson Anthropic + Gemini, with graceful fallback on adapters that don't support native counting.observability.py—EventBuswith a structured-logging subscriber that covers every event type.cost_ceiling.py—cost_ceiling=plusrun_with_result(): one scenario completes, one scenario trips the ceiling; both inspectresult.stop_reason.streaming_with_result.py—run_stream_with_result(): liveTextDeltarendering plus a terminalAgentResultEvent, withEventBusside-channel progress.tool_concurrency.py—is_read_only/is_concurrency_safeflags, mixed-safety batches going sequential, all-safe batches going parallel, and a planner/executor split viaregistry.read_only_subset().
Guide
See GUIDE.md for a full walkthrough — core concepts, recipes, ten end-to-end agentic application patterns (research assistant, code review agent, support triage, SQL analyst, nested agents, production observability template), best practices, and a full API reference.
Status
Alpha. MCP support and structured-output helpers are on the roadmap. See CHANGELOG.md for release history and ARCHITECTURE.md for a walkthrough of how the library is put together.
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file llmstitch-0.1.4.tar.gz.
File metadata
- Download URL: llmstitch-0.1.4.tar.gz
- Upload date:
- Size: 87.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e0f20d7499c2168b7ec4224f71d82ab1de79541cf8063d6ce6e8ca25314f2001
|
|
| MD5 |
2d82876f864f61a7646cedced17e5855
|
|
| BLAKE2b-256 |
5576000a51fc5c623f11033552ef159b9cd29f7fbe49b528dc02359c9702f2e5
|
Provenance
The following attestation bundles were made for llmstitch-0.1.4.tar.gz:
Publisher:
release.yml on bengeos/llmstitch
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
llmstitch-0.1.4.tar.gz -
Subject digest:
e0f20d7499c2168b7ec4224f71d82ab1de79541cf8063d6ce6e8ca25314f2001 - Sigstore transparency entry: 1354874716
- Sigstore integration time:
-
Permalink:
bengeos/llmstitch@156db19efda5cc887ba580653c6cc1c160a61b76 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/bengeos
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@156db19efda5cc887ba580653c6cc1c160a61b76 -
Trigger Event:
pull_request
-
Statement type:
File details
Details for the file llmstitch-0.1.4-py3-none-any.whl.
File metadata
- Download URL: llmstitch-0.1.4-py3-none-any.whl
- Upload date:
- Size: 33.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ccab7326813fe16f19758e9aba4dd92f46ef0b3fc861f773a929b5221a251cd8
|
|
| MD5 |
a902d2a9178671ce148359b3fc696977
|
|
| BLAKE2b-256 |
0765928c9183e3d23535e9466f58329a864460c6798e5f708f48b7525908aeeb
|
Provenance
The following attestation bundles were made for llmstitch-0.1.4-py3-none-any.whl:
Publisher:
release.yml on bengeos/llmstitch
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
llmstitch-0.1.4-py3-none-any.whl -
Subject digest:
ccab7326813fe16f19758e9aba4dd92f46ef0b3fc861f773a929b5221a251cd8 - Sigstore transparency entry: 1354874796
- Sigstore integration time:
-
Permalink:
bengeos/llmstitch@156db19efda5cc887ba580653c6cc1c160a61b76 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/bengeos
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@156db19efda5cc887ba580653c6cc1c160a61b76 -
Trigger Event:
pull_request
-
Statement type: