Async-native framework for registering, discovering, and executing tools referenced in LLM responses
Project description
CHUK Tool Processor — A Tool Execution Runtime for AI Systems
Reliable tool execution for LLMs — timeouts, retries, caching, rate limits, circuit breakers, and MCP integration — in one composable layer.
The Missing Runtime Layer
LLMs are good at deciding which tools to call. The hard part is executing those tools reliably.
CHUK Tool Processor is a tool execution runtime — it doesn't plan workflows or decide which tools to call. It executes tool calls reliably, under constraints, as directed by higher-level planners (your agent, LangChain, LlamaIndex, or a custom orchestrator).
What it does:
- Parses tool calls from any model (Anthropic XML, OpenAI
tool_calls, JSON) - Executes them with timeouts, retries, caching, rate limits, circuit breaker, observability
- Runs tools locally, in isolated subprocesses, or remote via MCP
Works with OpenAI, Anthropic, local models (Ollama/MLX/vLLM), and any framework.
Architecture
LLM Output
↓
CHUK Tool Processor
↓
┌──────────────┬────────────────────┐
│ Local Tools │ Remote Tools (MCP) │
└──────────────┴────────────────────┘
How it works internally:
LLM Output
↓
Parsers (XML / OpenAI / JSON)
↓
┌─────────────────────────────┐
│ Execution Middleware │
│ (Applied in this order) │
│ • Cache │
│ • Rate Limit │
│ • Retry (with backoff) │
│ • Circuit Breaker │
│ • Bulkhead │
└─────────────────────────────┘
↓
Execution Strategy
┌──────────────────────┐
│ • InProcess │ ← Fast, trusted
│ • Isolated/Subprocess│ ← Safe, untrusted
│ • Remote via MCP │ ← Distributed
└──────────────────────┘
Quick Start
Installation
pip install chuk-tool-processor
# Or with uv (recommended)
uv pip install chuk-tool-processor
60-Second Example
import asyncio
from chuk_tool_processor import ToolProcessor, create_registry
class Calculator:
async def execute(self, operation: str, a: float, b: float) -> dict:
ops = {"add": a + b, "multiply": a * b, "subtract": a - b}
return {"result": ops.get(operation, 0)}
async def main():
registry = create_registry()
await registry.register_tool(Calculator, name="math.calculator") # Dotted name → namespace="math"
async with ToolProcessor(registry=registry, enable_caching=True, enable_retries=True) as p:
# Works with OpenAI, Anthropic, or JSON formats
result = await p.process('<tool name="math.calculator" args=\'{"operation": "multiply", "a": 15, "b": 23}\'/>')
print(result[0].result) # {'result': 345}
asyncio.run(main())
That's it. You now have production-ready tool execution with timeouts, retries, and caching.
Dotted Names for Namespacing
Dotted names are auto-parsed into namespace and tool name:
# These are equivalent:
await registry.register_tool(FetchUser, name="web.fetch_user") # Auto-parsed
await registry.register_tool(FetchUser, name="fetch_user", namespace="web") # Explicit
# Call using the full dotted name
result = await processor.process([{"tool": "web.fetch_user", "arguments": {"user_id": "123"}}])
Works with Any LLM Format
# Anthropic XML format
anthropic_output = '<tool name="search" args=\'{"query": "Python"}\'/>'
# OpenAI tool_calls format
openai_output = {
"tool_calls": [{
"type": "function",
"function": {"name": "search", "arguments": '{"query": "Python"}'}
}]
}
# Direct JSON
json_output = [{"tool": "search", "arguments": {"query": "Python"}}]
# All work identically
results = await processor.process(anthropic_output)
results = await processor.process(openai_output)
results = await processor.process(json_output)
Key Features
Production Reliability
| Feature | Description |
|---|---|
| Timeouts | Every tool execution has proper timeout handling |
| Retries | Automatic retry with exponential backoff and jitter |
| Rate Limiting | Global and per-tool rate limits with sliding windows |
| Caching | Result caching with TTL and SHA256-based idempotency keys |
| Circuit Breakers | Prevent cascading failures with automatic recovery |
Multi-Tenant & Isolation
| Feature | Description |
|---|---|
| Bulkheads | Per-tool/namespace concurrency limits to prevent resource starvation |
| Pattern Bulkheads | Glob patterns like "db.*": 3 for grouped concurrency limits |
| Scoped Registries | Isolated registries for multi-tenant apps and testing |
| ExecutionContext | Request-scoped metadata propagation (user, tenant, tracing, deadlines) |
| Isolated Strategy | Subprocess execution for untrusted code (zero crash blast radius) |
Advanced Scheduling
| Feature | Description |
|---|---|
| Return Order | Choose completion order (fast first) or submission order (deterministic) |
| SchedulerPolicy | DAG-based scheduling with dependencies, deadlines, pool limits |
| GreedyDagScheduler | Built-in scheduler with topological sort and deadline-aware skipping |
Integration & Observability
| Feature | Description |
|---|---|
| Multi-Format Parsing | XML (Anthropic), OpenAI tool_calls, JSON — all work automatically |
| MCP Integration | Connect to remote tools via HTTP Streamable, STDIO, SSE |
| OpenTelemetry | Distributed tracing with automatic span creation |
| Prometheus | Metrics for error rates, latency, cache hits, circuit breaker state |
| Type Safety | PEP 561 compliant with full mypy support |
Production Configuration
async with ToolProcessor(
# Execution settings
default_timeout=30.0,
max_concurrency=20,
# Reliability features
enable_caching=True,
cache_ttl=600,
enable_rate_limiting=True,
global_rate_limit=100,
tool_rate_limits={"expensive_api": (5, 60)}, # 5 req/min
enable_retries=True,
max_retries=3,
enable_circuit_breaker=True,
circuit_breaker_threshold=5,
# Multi-tenant isolation
enable_bulkhead=True,
bulkhead_config=BulkheadConfig(
default_limit=10,
tool_limits={"slow_api": 2},
patterns={"db.*": 3, "mcp.notion.*": 2}, # Pattern-based limits
),
) as processor:
# Execute with request context
ctx = ExecutionContext(
request_id="req-123",
user_id="user-456",
tenant_id="acme-corp",
)
results = await processor.process(llm_output, context=ctx)
Return Order & Scheduling
Control how results are returned and plan complex execution graphs:
from chuk_tool_processor import ToolProcessor, ReturnOrder
async with ToolProcessor() as processor:
# Results return as tools complete (fast tools first) - default
results = await processor.process(calls, return_order="completion")
# Results return in submission order (deterministic)
results = await processor.process(calls, return_order="submission")
DAG Scheduling with Dependencies
from chuk_tool_processor import (
GreedyDagScheduler,
SchedulingConstraints,
ToolCallSpec,
ToolMetadata,
)
scheduler = GreedyDagScheduler()
# Define calls with dependencies
calls = [
ToolCallSpec(call_id="fetch", tool_name="api.fetch",
metadata=ToolMetadata(pool="web", est_ms=300)),
ToolCallSpec(call_id="transform", tool_name="compute.transform",
depends_on=("fetch",)),
ToolCallSpec(call_id="store", tool_name="db.write",
depends_on=("transform",)),
]
# Plan execution with constraints
constraints = SchedulingConstraints(
deadline_ms=5000,
pool_limits={"web": 2, "db": 1},
)
plan = scheduler.plan(calls, constraints)
# plan.stages: (('fetch',), ('transform',), ('store',))
# plan.skip: () or low-priority calls that would miss deadline
MCP Integration
Connect to remote tool servers using the Model Context Protocol:
from chuk_tool_processor.mcp import setup_mcp_http_streamable
# Cloud services (Notion, etc.)
processor, manager = await setup_mcp_http_streamable(
servers=[{
"name": "notion",
"url": "https://mcp.notion.com/mcp",
"headers": {"Authorization": f"Bearer {token}"}
}],
namespace="notion",
enable_caching=True,
enable_retries=True
)
# Use remote tools
results = await processor.process(
'<tool name="notion.search_pages" args=\'{"query": "docs"}\'/>'
)
Transport Options:
| Transport | Use Case | Example |
|---|---|---|
| HTTP Streamable | Cloud SaaS with OAuth | Notion, custom APIs |
| STDIO | Local tools, databases | SQLite, file systems |
| SSE | Legacy MCP servers | Atlassian |
See MCP_INTEGRATION.md for complete examples with OAuth token refresh.
MCP Middleware Stack
For production deployments, wrap MCP connections with resilience middleware:
from chuk_tool_processor.mcp.middleware import (
MiddlewareConfig,
MiddlewareStack,
RetrySettings,
CircuitBreakerSettings,
RateLimitSettings,
)
# Configure middleware layers
config = MiddlewareConfig(
retry=RetrySettings(max_retries=3, base_delay=1.0),
circuit_breaker=CircuitBreakerSettings(failure_threshold=5),
rate_limiting=RateLimitSettings(enabled=True, global_limit=100),
)
# Wrap StreamManager with middleware
middleware = MiddlewareStack(stream_manager, config=config)
# Execute with automatic retry, circuit breaking, and rate limiting
result = await middleware.call_tool("notion.search", {"query": "docs"})
Observability
One-line setup for production monitoring:
from chuk_tool_processor.observability import setup_observability
setup_observability(
service_name="my-tool-service",
enable_tracing=True, # → OpenTelemetry traces
enable_metrics=True, # → Prometheus metrics at :9090/metrics
metrics_port=9090
)
# Every tool execution is now automatically traced and metered
What you get:
- Distributed traces (Jaeger, Zipkin, any OTLP collector)
- Prometheus metrics (error rate, latency P50/P95/P99, cache hit rate)
- Circuit breaker state monitoring
- Zero code changes to your tools
See OBSERVABILITY.md for complete setup guide.
Documentation
| Document | Description |
|---|---|
| GETTING_STARTED.md | Creating tools, using the processor, ValidatedTool, StreamingTool |
| CORE_CONCEPTS.md | Registry, strategies, wrappers, parsers, MCP overview |
| PRODUCTION_PATTERNS.md | Bulkheads, scoped registries, ExecutionContext, parallel execution |
| MCP_INTEGRATION.md | HTTP Streamable, STDIO, SSE, OAuth, Middleware Stack |
| ADVANCED_TOPICS.md | Deferred loading, code sandbox, isolated strategy, testing |
| CONFIGURATION.md | All config options and environment variables |
| OBSERVABILITY.md | OpenTelemetry, Prometheus, metrics reference |
| ERRORS.md | Error codes and handling patterns |
Examples
# Getting started
python examples/01_getting_started/hello_tool.py
# Hero demo: 8 tools, 5-second deadline, 3 pools (DAG + bulkheads + context)
python examples/02_production_features/hero_runtime_demo.py
# Production patterns (bulkheads, context, scoped registries)
python examples/02_production_features/production_patterns_demo.py
# Runtime features (return order, pattern bulkheads, scheduling)
python examples/02_production_features/runtime_features_demo.py
# Observability demo
python examples/02_production_features/observability_demo.py
# MCP integration
python examples/04_mcp_integration/stdio_echo.py
python examples/04_mcp_integration/notion_oauth.py
python examples/04_mcp_integration/middleware_demo.py
See examples/ for 20+ working examples.
Compatibility
| Component | Supported |
|---|---|
| Python | 3.11, 3.12, 3.13 |
| Platforms | macOS, Linux, Windows |
| LLM Providers | OpenAI, Anthropic, Local models (Ollama, MLX, vLLM) |
| MCP Transports | HTTP Streamable, STDIO, SSE |
| MCP Spec | 2025-11-25, 2025-06-18, 2025-03-26 |
Installation Options
# Core package
pip install chuk-tool-processor
# With observability (OpenTelemetry + Prometheus)
pip install chuk-tool-processor[observability]
# With MCP support
pip install chuk-tool-processor[mcp]
# With fast JSON (2-3x faster with orjson)
pip install chuk-tool-processor[fast-json]
# All extras
pip install chuk-tool-processor[all]
When to Use This
Use CHUK Tool Processor when:
- Your LLM calls tools or APIs
- You need retries, timeouts, caching, or rate limits
- You need to run untrusted tools safely
- Your tools are local or remote (MCP)
- You need multi-tenant isolation
- You want production-grade observability
Don't use this if:
- You want an agent framework (this is the execution runtime, not the agent)
- You want conversation flow/memory orchestration
- You need a planner to decide which tools to call
The Seam: Runtime vs Planner
CHUK Tool Processor deliberately does not plan workflows or decide which tools to call. It executes tool calls reliably, under constraints, as directed by higher-level planners.
┌─────────────────────────────────────────────────────┐
│ Your Agent / LangChain / LlamaIndex / Custom │ ← Decides WHICH tools
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ CHUK Tool Processor │ ← Executes tools RELIABLY
│ (timeouts, retries, caching, rate limits, etc.) │
└─────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────┐
│ Local Tools / MCP Servers │ ← Does the actual work
└─────────────────────────────────────────────────────┘
This separation means you can swap planners without changing execution infrastructure, and vice versa.
Contributing
See CONTRIBUTING.md for development setup and guidelines.
# Development setup
git clone https://github.com/chrishayuk/chuk-tool-processor.git
cd chuk-tool-processor
uv pip install -e ".[dev]"
# Run tests
make check
License
MIT License - see LICENSE for details.
Related Projects
- chuk-mcp - Low-level MCP protocol client
- Model Context Protocol - MCP specification
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file chuk_tool_processor-0.15.tar.gz.
File metadata
- Download URL: chuk_tool_processor-0.15.tar.gz
- Upload date:
- Size: 160.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c99e0a2a4978fde633a2a4fd00f29346947de18d8fd97a4bb5b04106a863d706
|
|
| MD5 |
84ac422d87a3945a4e2a30bda1549604
|
|
| BLAKE2b-256 |
cfe2eb81b663030ee92d56b20b7b75ac5b9bfbf2f1b7ece66549a4b4e2a25dd2
|
File details
Details for the file chuk_tool_processor-0.15-py3-none-any.whl.
File metadata
- Download URL: chuk_tool_processor-0.15-py3-none-any.whl
- Upload date:
- Size: 189.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
28740fe66c705e0ca895bb24b888959ab07db5e44fb7ea432b96875bab33b420
|
|
| MD5 |
54144921e0638eb8f44f05bc8780e663
|
|
| BLAKE2b-256 |
bd923cf2e66427dc27c345e8ac91f90b73009ae460aba64c88e15d02a408ec17
|