Add your description here
Project description
CHUK Tool Processor
An async-native framework for registering, discovering, and executing tools referenced in LLM responses. Built from the ground up for production use with comprehensive error handling, monitoring, and scalability features.
🚀 Quick Start
Installation
# From source (recommended for development)
git clone https://github.com/chrishayuk/chuk-tool-processor.git
cd chuk-tool-processor
pip install -e .
# Or install from PyPI (when available)
pip install chuk-tool-processor
Your First Tool in 60 Seconds
import asyncio
from chuk_tool_processor import ToolProcessor, register_tool, initialize
# 1. Create a tool
@register_tool(name="calculator", description="Perform basic math operations")
class Calculator:
async def execute(self, operation: str, a: float, b: float) -> dict:
operations = {
"add": a + b,
"subtract": a - b,
"multiply": a * b,
"divide": a / b if b != 0 else None
}
if operation not in operations:
raise ValueError(f"Unknown operation: {operation}")
result = operations[operation]
if result is None:
raise ValueError("Cannot divide by zero")
return {
"operation": operation,
"operands": [a, b],
"result": result
}
async def main():
# 2. Initialize the system
await initialize()
# 3. Create processor
processor = ToolProcessor()
# 4. Process LLM output containing tool calls
llm_response = '''
I'll calculate 15 * 23 for you.
<tool name="calculator" args='{"operation": "multiply", "a": 15, "b": 23}'/>
The result is 345.
'''
# 5. Execute the tools
results = await processor.process(llm_response)
# 6. Handle results
for result in results:
if result.error:
print(f"❌ Tool '{result.tool}' failed: {result.error}")
else:
print(f"✅ Tool '{result.tool}' result: {result.result}")
print(f"⏱️ Executed in {result.duration:.3f}s")
if __name__ == "__main__":
asyncio.run(main())
Output:
✅ Tool 'calculator' result: {'operation': 'multiply', 'operands': [15, 23], 'result': 345}
⏱️ Executed in 0.001s
🎯 Key Features
- 🔄 Async-Native: Built for
async/awaitfrom the ground up - 🛡️ Production Ready: Comprehensive error handling, timeouts, retries
- 📦 Multiple Execution: In-process and subprocess strategies
- 🚀 High Performance: Caching, rate limiting, and concurrency control
- 📊 Monitoring: Structured logging and metrics collection
- 🔗 MCP Integration: Full Model Context Protocol support
- 📡 Streaming: Real-time incremental results
- 🔧 Extensible: Plugin system for custom parsers and strategies
📖 Getting Started Guide
Environment Setup
Create a .env file or set environment variables:
# Optional: Registry provider (default: memory)
export CHUK_TOOL_REGISTRY_PROVIDER=memory
# Optional: Default timeout for tool execution (default: 30.0)
export CHUK_DEFAULT_TIMEOUT=30.0
# Optional: Logging level (default: INFO)
export CHUK_LOG_LEVEL=INFO
# Optional: Enable structured JSON logging (default: true)
export CHUK_STRUCTURED_LOGGING=true
# MCP Integration (if using MCP servers)
export MCP_BEARER_TOKEN=your_bearer_token_here
export MCP_CONFIG_FILE=/path/to/mcp_config.json
Basic Tool Development
1. Simple Function-Based Tool
from chuk_tool_processor.registry.auto_register import register_fn_tool
async def get_current_time(timezone: str = "UTC") -> str:
"""Get the current time in the specified timezone."""
from datetime import datetime
import pytz
tz = pytz.timezone(timezone)
current_time = datetime.now(tz)
return current_time.strftime("%Y-%m-%d %H:%M:%S %Z")
# Register the function as a tool
await register_fn_tool(get_current_time, namespace="utilities")
2. Class-Based Tool with Validation
from chuk_tool_processor.models.validated_tool import ValidatedTool
from pydantic import BaseModel, Field
@register_tool(name="weather", namespace="api")
class WeatherTool(ValidatedTool):
"""Get weather information for a location."""
class Arguments(BaseModel):
location: str = Field(..., description="City name or coordinates")
units: str = Field("metric", description="Temperature units: metric, imperial, kelvin")
include_forecast: bool = Field(False, description="Include 5-day forecast")
class Result(BaseModel):
location: str
temperature: float
conditions: str
humidity: int
forecast: list[dict] = Field(default_factory=list)
async def _execute(self, location: str, units: str, include_forecast: bool) -> Result:
# Simulate API call
await asyncio.sleep(0.1)
return self.Result(
location=location,
temperature=22.5,
conditions="Partly cloudy",
humidity=65,
forecast=[{"day": "Tomorrow", "temp": 24, "conditions": "Sunny"}] if include_forecast else []
)
3. Streaming Tool
from chuk_tool_processor.models.streaming_tool import StreamingTool
import asyncio
@register_tool(name="file_processor")
class FileProcessorTool(StreamingTool):
"""Process a large file line by line."""
class Arguments(BaseModel):
file_path: str
operation: str = "count_lines"
class Result(BaseModel):
line_number: int
content: str
processed_at: str
async def _stream_execute(self, file_path: str, operation: str):
"""Stream results as each line is processed."""
from datetime import datetime
# Simulate processing a large file
total_lines = 100
for i in range(1, total_lines + 1):
await asyncio.sleep(0.01) # Simulate processing time
yield self.Result(
line_number=i,
content=f"Processed line {i}",
processed_at=datetime.now().isoformat()
)
Advanced Configuration
Production-Ready Processor Setup
from chuk_tool_processor import ToolProcessor
from chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy
async def create_production_processor():
"""Create a production-ready processor with all features enabled."""
processor = ToolProcessor(
# Execution settings
default_timeout=30.0,
max_concurrency=10,
# Use subprocess strategy for isolation
strategy=SubprocessStrategy(
registry=await get_default_registry(),
max_workers=4,
default_timeout=30.0
),
# Enable caching for performance
enable_caching=True,
cache_ttl=300, # 5 minutes
# Rate limiting to prevent abuse
enable_rate_limiting=True,
global_rate_limit=100, # 100 requests per minute globally
tool_rate_limits={
"expensive_api": (10, 60), # 10 per minute
"file_processor": (5, 60), # 5 per minute
"weather": (50, 60) # 50 per minute
},
# Automatic retries for reliability
enable_retries=True,
max_retries=3,
# Specify which parsers to use
parser_plugins=["xml_tool", "openai_tool", "json_tool", "function_call"]
)
await processor.initialize()
return processor
Custom Tool with All Features
from chuk_tool_processor.execution.wrappers.caching import cacheable
from chuk_tool_processor.execution.wrappers.rate_limiting import rate_limited
from chuk_tool_processor.execution.wrappers.retry import retryable
@register_tool(name="advanced_api", namespace="external")
@cacheable(ttl=600) # Cache for 10 minutes
@rate_limited(limit=20, period=60.0) # 20 calls per minute
@retryable(max_retries=3, base_delay=1.0) # Retry on failures
class AdvancedApiTool(ValidatedTool):
"""Example tool with all production features."""
class Arguments(BaseModel):
query: str = Field(..., min_length=1, max_length=1000)
format: str = Field("json", regex="^(json|xml|csv)$")
timeout: float = Field(10.0, gt=0, le=30)
class Result(BaseModel):
data: dict
format: str
processing_time: float
cached: bool = False
async def _execute(self, query: str, format: str, timeout: float) -> Result:
start_time = time.time()
# Simulate expensive API call
await asyncio.sleep(0.5)
# Simulate potential failure (for retry testing)
if random.random() < 0.1: # 10% failure rate
raise Exception("Temporary API failure")
processing_time = time.time() - start_time
return self.Result(
data={"query": query, "results": ["result1", "result2", "result3"]},
format=format,
processing_time=processing_time
)
Working with LLM Responses
Supported Input Formats
The processor automatically detects and parses multiple formats:
# 1. XML Tool Tags (most common)
xml_response = """
Let me search for information about Python.
<tool name="search" args='{"query": "Python programming", "limit": 5}'/>
I'll also get the current time.
<tool name="get_current_time" args='{"timezone": "UTC"}'/>
"""
# 2. OpenAI Chat Completions Format
openai_response = {
"tool_calls": [
{
"id": "call_123",
"type": "function",
"function": {
"name": "search",
"arguments": '{"query": "Python programming", "limit": 5}'
}
}
]
}
# 3. Direct ToolCall objects
tool_calls = [
{
"tool": "search",
"arguments": {"query": "Python programming", "limit": 5}
},
{
"tool": "get_current_time",
"arguments": {"timezone": "UTC"}
}
]
# Process any format
processor = ToolProcessor()
results1 = await processor.process(xml_response)
results2 = await processor.process(openai_response)
results3 = await processor.process(tool_calls)
Error Handling Best Practices
async def robust_tool_processing(llm_response: str):
"""Example of robust error handling."""
processor = ToolProcessor(
default_timeout=30.0,
enable_retries=True,
max_retries=3
)
try:
results = await processor.process(llm_response, timeout=60.0)
successful_results = []
failed_results = []
for result in results:
if result.error:
failed_results.append(result)
logger.error(
f"Tool {result.tool} failed: {result.error}",
extra={
"tool": result.tool,
"duration": result.duration,
"attempts": getattr(result, "attempts", 1),
"machine": result.machine
}
)
else:
successful_results.append(result)
logger.info(
f"Tool {result.tool} succeeded",
extra={
"tool": result.tool,
"duration": result.duration,
"cached": getattr(result, "cached", False)
}
)
return {
"successful": successful_results,
"failed": failed_results,
"total": len(results)
}
except Exception as e:
logger.exception("Failed to process LLM response")
raise
MCP (Model Context Protocol) Integration
Quick MCP Setup with SSE
from chuk_tool_processor.mcp import setup_mcp_sse
import os
async def setup_mcp_tools():
"""Set up MCP tools from external servers."""
# Configure MCP servers
servers = [
{
"name": "weather-service",
"url": "https://weather-mcp.example.com",
"api_key": os.getenv("WEATHER_API_KEY")
},
{
"name": "database-service",
"url": "https://db-mcp.example.com",
"api_key": os.getenv("DB_API_KEY")
}
]
# Initialize MCP with full configuration
processor, stream_manager = await setup_mcp_sse(
servers=servers,
namespace="mcp", # Tools available as mcp.tool_name
default_timeout=30.0,
max_concurrency=5,
enable_caching=True,
cache_ttl=300,
enable_rate_limiting=True,
global_rate_limit=100,
enable_retries=True,
max_retries=3
)
return processor, stream_manager
# Use MCP tools
processor, manager = await setup_mcp_tools()
# Tools are now available in the processor
results = await processor.process('''
<tool name="mcp.weather" args='{"location": "London"}'/>
<tool name="mcp.database_query" args='{"sql": "SELECT COUNT(*) FROM users"}'/>
''')
MCP with Stdio Transport
from chuk_tool_processor.mcp import setup_mcp_stdio
# Create MCP config file (mcp_config.json)
mcp_config = {
"weather": {
"command": "python",
"args": ["-m", "weather_mcp_server"],
"env": {"API_KEY": "your_weather_key"}
},
"calculator": {
"command": "node",
"args": ["calculator-server.js"]
}
}
# Setup MCP with stdio
processor, stream_manager = await setup_mcp_stdio(
config_file="mcp_config.json",
servers=["weather", "calculator"],
namespace="tools"
)
# Use the tools
results = await processor.process('<tool name="tools.weather" args=\'{"city": "Paris"}\'/>')
Monitoring and Observability
Structured Logging Setup
from chuk_tool_processor.logging import setup_logging, get_logger, log_context_span
import logging
# Setup logging
await setup_logging(
level=logging.INFO,
structured=True, # JSON output
log_file="tool_processor.log" # Optional file output
)
# Use structured logging in your application
logger = get_logger("my_app")
async def process_user_request(user_id: str, request: str):
"""Example of using structured logging with context."""
async with log_context_span("user_request", {"user_id": user_id}):
logger.info("Processing user request", extra={
"request_length": len(request),
"user_id": user_id
})
try:
results = await processor.process(request)
logger.info("Request processed successfully", extra={
"num_tools": len(results),
"user_id": user_id
})
return results
except Exception as e:
logger.error("Request processing failed", extra={
"error": str(e),
"user_id": user_id
})
raise
Metrics Collection
from chuk_tool_processor.logging import metrics
# Metrics are automatically collected for:
# - Tool execution success/failure rates
# - Execution durations
# - Cache hit/miss rates
# - Parser performance
# - Registry operations
# Access metrics programmatically
async def get_tool_stats():
"""Get statistics about tool usage."""
# Example: Get cache statistics
if hasattr(processor.executor, 'cache'):
cache_stats = await processor.executor.cache.get_stats()
print(f"Cache hit rate: {cache_stats['hit_rate']:.2%}")
print(f"Total entries: {cache_stats['entry_count']}")
# Custom metrics can be logged
await metrics.log_tool_execution(
tool="custom_metric",
success=True,
duration=1.5,
cached=False
)
Testing Your Tools
Unit Testing
import pytest
from chuk_tool_processor import ToolProcessor, register_tool, initialize
@pytest.mark.asyncio
async def test_calculator_tool():
"""Test the calculator tool."""
# Setup
await initialize()
processor = ToolProcessor()
# Test successful operation
results = await processor.process(
'<tool name="calculator" args=\'{"operation": "add", "a": 5, "b": 3}\'/>'
)
assert len(results) == 1
result = results[0]
assert result.error is None
assert result.result["result"] == 8
assert result.result["operation"] == "add"
@pytest.mark.asyncio
async def test_calculator_error_handling():
"""Test calculator error handling."""
await initialize()
processor = ToolProcessor()
# Test division by zero
results = await processor.process(
'<tool name="calculator" args=\'{"operation": "divide", "a": 5, "b": 0}\'/>'
)
assert len(results) == 1
result = results[0]
assert result.error is not None
assert "Cannot divide by zero" in result.error
Integration Testing
@pytest.mark.asyncio
async def test_full_workflow():
"""Test a complete workflow with multiple tools."""
# Register additional test tools
@register_tool(name="formatter")
class FormatterTool:
async def execute(self, text: str, format: str) -> str:
if format == "upper":
return text.upper()
elif format == "lower":
return text.lower()
return text
await initialize()
processor = ToolProcessor(enable_caching=True)
# Test multiple tool calls
llm_response = """
<tool name="calculator" args='{"operation": "multiply", "a": 6, "b": 7}'/>
<tool name="formatter" args='{"text": "Hello World", "format": "upper"}'/>
"""
results = await processor.process(llm_response)
assert len(results) == 2
# Check calculator result
calc_result = next(r for r in results if r.tool == "calculator")
assert calc_result.result["result"] == 42
# Check formatter result
format_result = next(r for r in results if r.tool == "formatter")
assert format_result.result == "HELLO WORLD"
Performance Optimization
Concurrent Execution
# Configure for high-throughput scenarios
processor = ToolProcessor(
max_concurrency=20, # Allow 20 concurrent tool executions
default_timeout=60.0, # Longer timeout for complex operations
enable_caching=True, # Cache frequently used results
cache_ttl=900, # 15-minute cache
enable_rate_limiting=True,
global_rate_limit=500 # 500 requests per minute
)
# Process multiple requests concurrently
async def process_batch(requests: list[str]):
"""Process multiple LLM responses concurrently."""
tasks = [processor.process(request) for request in requests]
all_results = await asyncio.gather(*tasks, return_exceptions=True)
successful = []
failed = []
for i, result in enumerate(all_results):
if isinstance(result, Exception):
failed.append({"request_index": i, "error": str(result)})
else:
successful.append({"request_index": i, "results": result})
return {"successful": successful, "failed": failed}
Memory Management
# For long-running applications, periodically clear caches
async def maintenance_task():
"""Periodic maintenance for long-running applications."""
while True:
await asyncio.sleep(3600) # Every hour
# Clear old cache entries
if hasattr(processor.executor, 'cache'):
# Clear entire cache or implement LRU eviction
stats_before = await processor.executor.cache.get_stats()
await processor.executor.cache.clear()
logger.info("Cache cleared", extra={
"entries_cleared": stats_before.get("entry_count", 0)
})
# Run maintenance in background
asyncio.create_task(maintenance_task())
🔧 Configuration Reference
Environment Variables
| Variable | Default | Description |
|---|---|---|
CHUK_TOOL_REGISTRY_PROVIDER |
memory |
Registry backend (memory, redis, etc.) |
CHUK_DEFAULT_TIMEOUT |
30.0 |
Default tool execution timeout (seconds) |
CHUK_LOG_LEVEL |
INFO |
Logging level (DEBUG, INFO, WARNING, ERROR) |
CHUK_STRUCTURED_LOGGING |
true |
Enable JSON structured logging |
CHUK_MAX_CONCURRENCY |
10 |
Default max concurrent executions |
MCP_BEARER_TOKEN |
- | Bearer token for MCP SSE authentication |
MCP_CONFIG_FILE |
- | Path to MCP configuration file |
ToolProcessor Configuration
processor = ToolProcessor(
# Execution
default_timeout=30.0, # Default timeout per tool
max_concurrency=10, # Max concurrent executions
# Strategy (choose one)
strategy=InProcessStrategy(...), # Fast, shared memory
# strategy=SubprocessStrategy(...), # Isolated, safer
# Caching
enable_caching=True, # Enable result caching
cache_ttl=300, # Cache TTL in seconds
# Rate limiting
enable_rate_limiting=False, # Enable rate limiting
global_rate_limit=100, # Global requests per minute
tool_rate_limits={ # Per-tool limits
"expensive_tool": (10, 60), # 10 per minute
},
# Retries
enable_retries=True, # Enable automatic retries
max_retries=3, # Max retry attempts
# Parsing
parser_plugins=[ # Enabled parsers
"xml_tool",
"openai_tool",
"json_tool",
"function_call"
]
)
🤝 Contributing
We welcome contributions! Please see our Contributing Guide for details.
Development Setup
# Clone and setup development environment
git clone https://github.com/chrishayuk/chuk-tool-processor.git
cd chuk-tool-processor
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install in development mode with all extras
pip install -e ".[dev,test,mcp,all]"
# Run tests
pytest
# Run with coverage
pytest --cov=chuk_tool_processor
# Format code
black chuk_tool_processor
isort chuk_tool_processor
# Type checking
mypy chuk_tool_processor
Adding New Features
- New Tool Types: Extend
ValidatedToolorStreamingTool - New Parsers: Implement
ParserPlugininterface - New Strategies: Implement
ExecutionStrategyinterface - New Wrappers: Create execution wrappers for cross-cutting concerns
📚 Documentation
- API Reference
- Architecture Guide
- Plugin Development
- MCP Integration Guide
- Performance Tuning
- Deployment Guide
🆘 Support
📄 License
This project is licensed under the MIT License - see the LICENSE file for details.
Made with ❤️ by the CHUK team
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file chuk_tool_processor-0.5.4.tar.gz.
File metadata
- Download URL: chuk_tool_processor-0.5.4.tar.gz
- Upload date:
- Size: 83.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fce8282db0630b0da028520a141d56ca4745f47c72040860927b126c8f27ef81
|
|
| MD5 |
88ad8ecfb64d906ee6e0f62669a5068f
|
|
| BLAKE2b-256 |
7085cb966714d9d0c9a8682f4d6563fbb1cf18845e8df275db58167d74c90571
|
File details
Details for the file chuk_tool_processor-0.5.4-py3-none-any.whl.
File metadata
- Download URL: chuk_tool_processor-0.5.4-py3-none-any.whl
- Upload date:
- Size: 107.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
aa52a6f4fe0c3e573192d52a5d7d853e64c86fc671caef19e76aee9efdb4341f
|
|
| MD5 |
918345e8c69d1426dfbdf801427ce811
|
|
| BLAKE2b-256 |
ae19e244acfa0912da21dc2a626c4bd0b801fb71ce515d00022382f52f748ce0
|