Skip to main content

Async-native framework for registering, discovering, and executing tools referenced in LLM responses

Project description

CHUK Tool Processor — A Tool Execution Runtime for AI Systems

PyPI Python License Type Checked Wheels OpenTelemetry

Reliable tool execution for LLMs — timeouts, retries, caching, rate limits, circuit breakers, and MCP integration — in one composable layer.


The Missing Runtime Layer

LLMs are good at deciding which tools to call. The hard part is executing those tools reliably.

CHUK Tool Processor is a tool execution runtime — it doesn't plan workflows or decide which tools to call. It executes tool calls reliably, under constraints, as directed by higher-level planners (your agent, LangChain, LlamaIndex, or a custom orchestrator).

What it does:

  • Parses tool calls from any model (Anthropic XML, OpenAI tool_calls, JSON)
  • Executes them with timeouts, retries, caching, rate limits, circuit breaker, observability
  • Runs tools locally, in isolated subprocesses, or remote via MCP

Works with OpenAI, Anthropic, local models (Ollama/MLX/vLLM), and any framework.


Architecture

    LLM Output
        ↓
CHUK Tool Processor
        ↓
 ┌──────────────┬────────────────────┐
 │ Local Tools  │ Remote Tools (MCP) │
 └──────────────┴────────────────────┘

How it works internally:

    LLM Output
        ↓
Parsers (XML / OpenAI / JSON)
        ↓
┌─────────────────────────────┐
│   Execution Middleware      │
│  (Applied in this order)    │
│   • Cache                   │
│   • Rate Limit              │
│   • Retry (with backoff)    │
│   • Circuit Breaker         │
│   • Bulkhead                │
└─────────────────────────────┘
        ↓
   Execution Strategy
   ┌──────────────────────┐
   │ • InProcess          │  ← Fast, trusted
   │ • Isolated/Subprocess│  ← Safe, untrusted
   │ • Remote via MCP     │  ← Distributed
   └──────────────────────┘

Quick Start

Installation

pip install chuk-tool-processor

# Or with uv (recommended)
uv pip install chuk-tool-processor

60-Second Example

import asyncio
from chuk_tool_processor import ToolProcessor, create_registry

class Calculator:
    async def execute(self, operation: str, a: float, b: float) -> dict:
        ops = {"add": a + b, "multiply": a * b, "subtract": a - b}
        return {"result": ops.get(operation, 0)}

async def main():
    registry = create_registry()
    await registry.register_tool(Calculator, name="math.calculator")  # Dotted name → namespace="math"

    async with ToolProcessor(registry=registry, enable_caching=True, enable_retries=True) as p:
        # Works with OpenAI, Anthropic, or JSON formats
        result = await p.process('<tool name="math.calculator" args=\'{"operation": "multiply", "a": 15, "b": 23}\'/>')
        print(result[0].result)  # {'result': 345}

asyncio.run(main())

That's it. You now have production-ready tool execution with timeouts, retries, and caching.

Dotted Names for Namespacing

Dotted names are auto-parsed into namespace and tool name:

# These are equivalent:
await registry.register_tool(FetchUser, name="web.fetch_user")           # Auto-parsed
await registry.register_tool(FetchUser, name="fetch_user", namespace="web")  # Explicit

# Call using the full dotted name
result = await processor.process([{"tool": "web.fetch_user", "arguments": {"user_id": "123"}}])

Works with Any LLM Format

# Anthropic XML format
anthropic_output = '<tool name="search" args=\'{"query": "Python"}\'/>'

# OpenAI tool_calls format
openai_output = {
    "tool_calls": [{
        "type": "function",
        "function": {"name": "search", "arguments": '{"query": "Python"}'}
    }]
}

# Direct JSON
json_output = [{"tool": "search", "arguments": {"query": "Python"}}]

# All work identically
results = await processor.process(anthropic_output)
results = await processor.process(openai_output)
results = await processor.process(json_output)

Key Features

Production Reliability

Feature Description
Timeouts Every tool execution has proper timeout handling
Retries Automatic retry with exponential backoff and jitter
Rate Limiting Global and per-tool rate limits with sliding windows
Caching Result caching with TTL and SHA256-based idempotency keys
Circuit Breakers Prevent cascading failures with automatic recovery
Structured Errors Machine-readable error categories with retry hints for planners

Multi-Tenant & Isolation

Feature Description
Bulkheads Per-tool/namespace concurrency limits to prevent resource starvation
Pattern Bulkheads Glob patterns like "db.*": 3 for grouped concurrency limits
Scoped Registries Isolated registries for multi-tenant apps and testing
ExecutionContext Request-scoped metadata propagation (user, tenant, tracing, deadlines)
Isolated Strategy Subprocess execution for untrusted code (zero crash blast radius)
Redis Registry Distributed tool registry for multi-process/multi-machine deployments

Advanced Scheduling

Feature Description
Return Order Choose completion order (fast first) or submission order (deterministic)
SchedulerPolicy DAG-based scheduling with dependencies, deadlines, pool limits
GreedyDagScheduler Built-in scheduler with topological sort and deadline-aware skipping

Integration & Observability

Feature Description
Multi-Format Parsing XML (Anthropic), OpenAI tool_calls, JSON — all work automatically
MCP Integration Connect to remote tools via HTTP Streamable, STDIO, SSE
OpenTelemetry Distributed tracing with automatic span creation
Prometheus Metrics for error rates, latency, cache hits, circuit breaker state
Type Safety PEP 561 compliant with full mypy support

Production Configuration

async with ToolProcessor(
    # Execution settings
    default_timeout=30.0,
    max_concurrency=20,

    # Reliability features
    enable_caching=True,
    cache_ttl=600,
    enable_rate_limiting=True,
    global_rate_limit=100,
    tool_rate_limits={"expensive_api": (5, 60)},  # 5 req/min
    enable_retries=True,
    max_retries=3,
    enable_circuit_breaker=True,
    circuit_breaker_threshold=5,

    # Multi-tenant isolation
    enable_bulkhead=True,
    bulkhead_config=BulkheadConfig(
        default_limit=10,
        tool_limits={"slow_api": 2},
        patterns={"db.*": 3, "mcp.notion.*": 2},  # Pattern-based limits
    ),
) as processor:
    # Execute with request context
    ctx = ExecutionContext(
        request_id="req-123",
        user_id="user-456",
        tenant_id="acme-corp",
    )
    results = await processor.process(llm_output, context=ctx)

Return Order & Scheduling

Control how results are returned and plan complex execution graphs:

from chuk_tool_processor import ToolProcessor, ReturnOrder

async with ToolProcessor() as processor:
    # Results return as tools complete (fast tools first) - default
    results = await processor.process(calls, return_order="completion")

    # Results return in submission order (deterministic)
    results = await processor.process(calls, return_order="submission")

DAG Scheduling with Dependencies

from chuk_tool_processor import (
    GreedyDagScheduler,
    SchedulingConstraints,
    ToolCallSpec,
    ToolMetadata,
)

scheduler = GreedyDagScheduler()

# Define calls with dependencies
calls = [
    ToolCallSpec(call_id="fetch", tool_name="api.fetch",
                 metadata=ToolMetadata(pool="web", est_ms=300)),
    ToolCallSpec(call_id="transform", tool_name="compute.transform",
                 depends_on=("fetch",)),
    ToolCallSpec(call_id="store", tool_name="db.write",
                 depends_on=("transform",)),
]

# Plan execution with constraints
constraints = SchedulingConstraints(
    deadline_ms=5000,
    pool_limits={"web": 2, "db": 1},
)
plan = scheduler.plan(calls, constraints)

# plan.stages: (('fetch',), ('transform',), ('store',))
# plan.skip: () or low-priority calls that would miss deadline

MCP Integration

Connect to remote tool servers using the Model Context Protocol:

from chuk_tool_processor.mcp import setup_mcp_http_streamable

# Cloud services (Notion, etc.)
processor, manager = await setup_mcp_http_streamable(
    servers=[{
        "name": "notion",
        "url": "https://mcp.notion.com/mcp",
        "headers": {"Authorization": f"Bearer {token}"}
    }],
    namespace="notion",
    enable_caching=True,
    enable_retries=True
)

# Use remote tools
results = await processor.process(
    '<tool name="notion.search_pages" args=\'{"query": "docs"}\'/>'
)

Transport Options:

Transport Use Case Example
HTTP Streamable Cloud SaaS with OAuth Notion, custom APIs
STDIO Local tools, databases SQLite, file systems
SSE Legacy MCP servers Atlassian

See MCP_INTEGRATION.md for complete examples with OAuth token refresh.

MCP Middleware Stack

For production deployments, wrap MCP connections with resilience middleware:

from chuk_tool_processor.mcp.middleware import (
    MiddlewareConfig,
    MiddlewareStack,
    RetrySettings,
    CircuitBreakerSettings,
    RateLimitSettings,
)

# Configure middleware layers
config = MiddlewareConfig(
    retry=RetrySettings(max_retries=3, base_delay=1.0),
    circuit_breaker=CircuitBreakerSettings(failure_threshold=5),
    rate_limiting=RateLimitSettings(enabled=True, global_limit=100),
)

# Wrap StreamManager with middleware
middleware = MiddlewareStack(stream_manager, config=config)

# Execute with automatic retry, circuit breaking, and rate limiting
result = await middleware.call_tool("notion.search", {"query": "docs"})

Distributed Deployments (Redis)

For multi-process or multi-machine deployments, configure Redis backends via environment variables:

# Enable Redis for everything
export CHUK_REGISTRY_BACKEND=redis
export CHUK_RESILIENCE_BACKEND=redis
export CHUK_REDIS_URL=redis://localhost:6379/0

# Enable resilience features
export CHUK_CIRCUIT_BREAKER_ENABLED=true
export CHUK_RATE_LIMIT_ENABLED=true
export CHUK_RATE_LIMIT_GLOBAL=100
from chuk_tool_processor import ProcessorConfig

# Load from environment and create fully-configured processor
config = ProcessorConfig.from_env()
processor = await config.create_processor()

async with processor:
    results = await processor.process(llm_output)

Or configure programmatically:

from chuk_tool_processor import ProcessorConfig, RegistryConfig, BackendType
from chuk_tool_processor.config import CircuitBreakerConfig, RateLimitConfig

config = ProcessorConfig(
    # Registry and resilience use Redis
    registry=RegistryConfig(backend=BackendType.REDIS),
    resilience_backend=BackendType.REDIS,
    redis_url="redis://localhost:6379/0",

    # Enable features
    circuit_breaker=CircuitBreakerConfig(enabled=True, failure_threshold=5),
    rate_limit=RateLimitConfig(enabled=True, global_limit=100),
)

processor = await config.create_processor()

Key features:

  • Distributed registry: Tool metadata shared across processes
  • Distributed circuit breaker: Failure counts shared (prevents cascading failures across instances)
  • Distributed rate limiting: Global limits enforced across all instances
  • Multi-tenant isolation: Key prefixes isolate data per tenant

Installation:

pip install chuk-tool-processor[redis]  # or: uv add chuk-tool-processor[redis]

See examples/02_production_features/distributed_config_demo.py for a complete example.


Observability

One-line setup for production monitoring:

from chuk_tool_processor.observability import setup_observability

setup_observability(
    service_name="my-tool-service",
    enable_tracing=True,     # → OpenTelemetry traces
    enable_metrics=True,     # → Prometheus metrics at :9090/metrics
    metrics_port=9090
)
# Every tool execution is now automatically traced and metered

What you get:

  • Distributed traces (Jaeger, Zipkin, any OTLP collector)
  • Prometheus metrics (error rate, latency P50/P95/P99, cache hit rate)
  • Circuit breaker state monitoring
  • Zero code changes to your tools

See OBSERVABILITY.md for complete setup guide.


Structured Error Handling

Errors include machine-readable categories and retry hints for planner decision-making:

from chuk_tool_processor.core.exceptions import ErrorCategory

results = await processor.process(llm_output)
for result in results:
    if result.error_info:
        match result.error_info.category:
            case ErrorCategory.RATE_LIMIT:
                await asyncio.sleep(result.retry_after_ms / 1000)
                return await retry()
            case ErrorCategory.CIRCUIT_OPEN:
                return await use_fallback_tool()
            case _ if not result.retryable:
                return await report_permanent_failure()

See ERRORS.md for complete error taxonomy.


Documentation

Document Description
GETTING_STARTED.md Creating tools, using the processor, ValidatedTool, StreamingTool
CORE_CONCEPTS.md Registry, strategies, wrappers, parsers, MCP overview
PRODUCTION_PATTERNS.md Bulkheads, scoped registries, ExecutionContext, parallel execution
MCP_INTEGRATION.md HTTP Streamable, STDIO, SSE, OAuth, Middleware Stack
ADVANCED_TOPICS.md Deferred loading, code sandbox, isolated strategy, testing
CONFIGURATION.md All config options and environment variables
OBSERVABILITY.md OpenTelemetry, Prometheus, metrics reference
ERRORS.md Error codes and handling patterns

Examples

# Getting started
python examples/01_getting_started/hello_tool.py

# Hero demo: 8 tools, 5-second deadline, 3 pools (DAG + bulkheads + context)
python examples/02_production_features/hero_runtime_demo.py

# Production patterns (bulkheads, context, scoped registries)
python examples/02_production_features/production_patterns_demo.py

# Runtime features (return order, pattern bulkheads, scheduling)
python examples/02_production_features/runtime_features_demo.py

# Structured error handling for planners
python examples/02_production_features/structured_errors_demo.py

# Redis registry for distributed deployments
python examples/02_production_features/redis_registry_demo.py

# Distributed configuration (Redis registry + resilience)
python examples/02_production_features/distributed_config_demo.py

# Observability demo
python examples/02_production_features/observability_demo.py

# MCP integration
python examples/04_mcp_integration/stdio_echo.py
python examples/04_mcp_integration/notion_oauth.py
python examples/04_mcp_integration/middleware_demo.py

See examples/ for 20+ working examples.


Compatibility

Component Supported
Python 3.11, 3.12, 3.13
Platforms macOS, Linux, Windows
LLM Providers OpenAI, Anthropic, Local models (Ollama, MLX, vLLM)
MCP Transports HTTP Streamable, STDIO, SSE
MCP Spec 2025-11-25, 2025-06-18, 2025-03-26

Installation Options

# Core package
pip install chuk-tool-processor

# With observability (OpenTelemetry + Prometheus)
pip install chuk-tool-processor[observability]

# With MCP support
pip install chuk-tool-processor[mcp]

# With Redis registry (distributed deployments)
pip install chuk-tool-processor[redis]

# With fast JSON (2-3x faster with orjson)
pip install chuk-tool-processor[fast-json]

# All extras
pip install chuk-tool-processor[all]

When to Use This

Use CHUK Tool Processor when:

  • Your LLM calls tools or APIs
  • You need retries, timeouts, caching, or rate limits
  • You need to run untrusted tools safely
  • Your tools are local or remote (MCP)
  • You need multi-tenant isolation
  • You want production-grade observability

Don't use this if:

  • You want an agent framework (this is the execution runtime, not the agent)
  • You want conversation flow/memory orchestration
  • You need a planner to decide which tools to call

The Seam: Runtime vs Planner

CHUK Tool Processor deliberately does not plan workflows or decide which tools to call. It executes tool calls reliably, under constraints, as directed by higher-level planners.

┌─────────────────────────────────────────────────────┐
│  Your Agent / LangChain / LlamaIndex / Custom       │  ← Decides WHICH tools
└─────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────┐
│            CHUK Tool Processor                      │  ← Executes tools RELIABLY
│  (timeouts, retries, caching, rate limits, etc.)   │
└─────────────────────────────────────────────────────┘
                          ↓
┌─────────────────────────────────────────────────────┐
│          Local Tools / MCP Servers                  │  ← Does the actual work
└─────────────────────────────────────────────────────┘

This separation means you can swap planners without changing execution infrastructure, and vice versa.


Contributing

See CONTRIBUTING.md for development setup and guidelines.

# Development setup
git clone https://github.com/chrishayuk/chuk-tool-processor.git
cd chuk-tool-processor
uv pip install -e ".[dev]"

# Run tests
make check

License

MIT License - see LICENSE for details.


Related Projects

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chuk_tool_processor-0.17.tar.gz (184.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

chuk_tool_processor-0.17-py3-none-any.whl (217.4 kB view details)

Uploaded Python 3

File details

Details for the file chuk_tool_processor-0.17.tar.gz.

File metadata

  • Download URL: chuk_tool_processor-0.17.tar.gz
  • Upload date:
  • Size: 184.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.2

File hashes

Hashes for chuk_tool_processor-0.17.tar.gz
Algorithm Hash digest
SHA256 248733adc680d9c503ce4dedc77e526c2df4d4b66ed471e3ea535ffb501776bb
MD5 14697280b3e5b251e39580942ea5e830
BLAKE2b-256 2d67b10de68d7705917bae29f59399e55c7aacfab5c0fe940552a5092f14ff77

See more details on using hashes here.

File details

Details for the file chuk_tool_processor-0.17-py3-none-any.whl.

File metadata

File hashes

Hashes for chuk_tool_processor-0.17-py3-none-any.whl
Algorithm Hash digest
SHA256 5a5755dd816780ebd453c53a7569cb71ebf265257a524168e6b9a2ba58664673
MD5 d7cedd4ca1dea478c0c6c143c8b823f6
BLAKE2b-256 2f47be2d73a58e52f90d6de32779557a1f9ec5b044fc7854f1df0cf98b1f9d9d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page