Add your description here
Project description
CHUK Tool Processor
An async-native framework for registering, discovering, and executing tools referenced in LLM responses.
Quick Start
Installation
# Clone the repository
git clone https://github.com/your-org/chuk-tool-processor.git
cd chuk-tool-processor
# Install with pip
pip install -e .
Basic Usage
import asyncio
from chuk_tool_processor.registry import register_tool, initialize
from chuk_tool_processor.models.tool_call import ToolCall
from chuk_tool_processor.execution.strategies.inprocess_strategy import InProcessStrategy
from chuk_tool_processor.execution.tool_executor import ToolExecutor
# Register a simple tool
@register_tool(name="calculator", description="Perform basic calculations")
class CalculatorTool:
async def execute(self, operation: str, x: float, y: float) -> dict:
if operation == "add":
result = x + y
elif operation == "multiply":
result = x * y
else:
raise ValueError(f"Unknown operation: {operation}")
return {
"operation": operation,
"x": x,
"y": y,
"result": result
}
# Setup and execute tools
async def main():
# Initialize registry
await initialize()
# Get the default registry
from chuk_tool_processor.registry import get_default_registry
registry = await get_default_registry()
# Create execution strategy and executor
strategy = InProcessStrategy(registry)
executor = ToolExecutor(registry=registry, strategy=strategy)
# Create a tool call
call = ToolCall(
tool="calculator",
arguments={
"operation": "multiply",
"x": 5,
"y": 7
}
)
# Execute tool
results = await executor.execute([call])
# Display result
result = results[0]
if not result.error:
print(f"Result: {result.result}")
else:
print(f"Error: {result.error}")
if __name__ == "__main__":
asyncio.run(main())
Core Features
Async-Native Architecture
The entire framework is built with native async/await support, allowing for:
- Non-blocking execution of tools
- True concurrency with controlled parallelism
- Task-local context tracking across async boundaries
Tool Registry
Tools are registered in a central registry with optional namespaces:
# Register with default parameters
@register_tool()
class SimpleGreeter:
async def execute(self, name: str) -> str:
return f"Hello, {name}!"
# Register with custom name and namespace
@register_tool(name="weather", namespace="api", description="Get weather info")
class WeatherTool:
async def execute(self, location: str, units: str = "metric") -> dict:
# Implementation...
return {"temperature": 23.5, "conditions": "Sunny"}
Initialize and access the registry:
# Initialize registry (required at startup)
await initialize()
# Get registry
registry = await get_default_registry()
# Look up a tool
tool = await registry.get_tool("weather", namespace="api")
# List registered tools
tools = await registry.list_tools()
# Get tool metadata
metadata = await registry.get_metadata("weather", namespace="api")
Execution Strategies
Two execution strategies are provided:
1. InProcessStrategy
Executes tools in the same process with optional concurrency control:
from chuk_tool_processor.execution.strategies.inprocess_strategy import InProcessStrategy
strategy = InProcessStrategy(
registry,
default_timeout=10.0, # Timeout for tool execution
max_concurrency=5 # Maximum concurrent executions
)
2. SubprocessStrategy
Executes tools in separate processes for true isolation:
from chuk_tool_processor.execution.strategies.subprocess_strategy import SubprocessStrategy
strategy = SubprocessStrategy(
registry,
max_workers=4, # Maximum worker processes
default_timeout=30.0 # Timeout for tool execution
)
Execution Wrappers
Enhance tool execution with optional wrappers:
Retry Logic
from chuk_tool_processor.execution.wrappers.retry import RetryConfig, RetryableToolExecutor
# Use as a wrapper
retry_executor = RetryableToolExecutor(
executor=base_executor,
default_config=RetryConfig(
max_retries=3,
base_delay=0.5,
jitter=True
)
)
# Or as a decorator
from chuk_tool_processor.execution.wrappers.retry import retryable
@retryable(max_retries=3, base_delay=0.5)
@register_tool(name="flaky_api")
class FlakyApiTool:
async def execute(self, query: str) -> dict:
# Implementation with potential failures
pass
Caching
from chuk_tool_processor.execution.wrappers.caching import InMemoryCache, CachingToolExecutor
# Use as a wrapper
cache = InMemoryCache(default_ttl=60) # 60 second TTL
cache_executor = CachingToolExecutor(
executor=base_executor,
cache=cache
)
# Or as a decorator
from chuk_tool_processor.execution.wrappers.caching import cacheable
@cacheable(ttl=300) # Cache for 5 minutes
@register_tool(name="expensive_operation")
class ExpensiveOperationTool:
async def execute(self, input_value: int) -> dict:
# Expensive computation
pass
Rate Limiting
from chuk_tool_processor.execution.wrappers.rate_limiting import RateLimiter, RateLimitedToolExecutor
# Use as a wrapper
limiter = RateLimiter(global_limit=100, global_period=60.0) # 100 requests per minute
rate_limited_executor = RateLimitedToolExecutor(
executor=base_executor,
limiter=limiter
)
# Or as a decorator
from chuk_tool_processor.execution.wrappers.rate_limiting import rate_limited
@rate_limited(limit=5, period=60.0) # 5 requests per minute
@register_tool(name="external_api")
class ExternalApiTool:
async def execute(self, query: str) -> dict:
# Call to rate-limited external API
pass
Streaming Tool Support
Tools can stream results incrementally:
from chuk_tool_processor.models.streaming_tool import StreamingTool
from pydantic import BaseModel, Field
from typing import AsyncIterator
@register_tool(name="counter")
class CounterTool(StreamingTool):
"""Stream incremental counts."""
class Arguments(BaseModel):
start: int = Field(1, description="Starting value")
end: int = Field(10, description="Ending value")
delay: float = Field(0.5, description="Delay between items")
class Result(BaseModel):
value: int
timestamp: str
async def _stream_execute(self, start: int, end: int, delay: float) -> AsyncIterator[Result]:
"""Stream each count with a delay."""
from datetime import datetime
for i in range(start, end + 1):
await asyncio.sleep(delay)
yield self.Result(
value=i,
timestamp=datetime.now().isoformat()
)
# Stream results
async for result in executor.stream_execute([tool_call]):
print(f"Received: {result.result.value}")
Comprehensive Error Handling
Errors are captured in the result objects rather than raising exceptions:
# Execute tool calls
results = await executor.execute([call1, call2, call3])
for result in results:
if result.error:
print(f"Tool {result.tool} failed: {result.error}")
print(f"Duration: {(result.end_time - result.start_time).total_seconds()}s")
else:
print(f"Tool {result.tool} succeeded: {result.result}")
Validation
Validate tool arguments and results:
from pydantic import BaseModel
from chuk_tool_processor.models.validated_tool import ValidatedTool
@register_tool(name="validate_data", namespace="utils")
class ValidatedDataTool(ValidatedTool):
class Arguments(BaseModel):
username: str
age: int
email: str
class Result(BaseModel):
is_valid: bool
errors: list[str] = []
async def _execute(self, username: str, age: int, email: str) -> Result:
errors = []
if len(username) < 3:
errors.append("Username too short")
if age < 18:
errors.append("Must be 18 or older")
if "@" not in email:
errors.append("Invalid email")
return self.Result(
is_valid=len(errors) == 0,
errors=errors
)
Processing LLM Responses
The ToolProcessor helps extract and execute tool calls from LLM responses:
from chuk_tool_processor.core.processor import ToolProcessor
# Create processor
processor = ToolProcessor()
await processor.initialize()
# Process text with tool calls
llm_output = """
I'll help calculate that for you.
<tool name="calculator" args='{"operation": "multiply", "x": 5, "y": 7}'/>
The result should be 35.
"""
# Extract and execute tool calls
results = await processor.process_text(llm_output)
# Process results
for result in results:
print(f"Tool: {result.tool}")
print(f"Result: {result.result}")
print(f"Error: {result.error}")
The processor supports various formats:
# XML format
<tool name="calculator" args='{"operation": "add", "x": 5, "y": 3}'/>
# OpenAI function call format
{
"function_call": {
"name": "calculator",
"arguments": "{\"operation\": \"add\", \"x\": 5, \"y\": 3}"
}
}
# JSON format
{
"tool_calls": [
{
"id": "call_123",
"type": "function",
"function": {
"name": "calculator",
"arguments": "{\"operation\": \"add\", \"x\": 5, \"y\": 3}"
}
}
]
}
License
This project is licensed under the MIT License - see the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file chuk_tool_processor-0.1.7-py3-none-any.whl.
File metadata
- Download URL: chuk_tool_processor-0.1.7-py3-none-any.whl
- Upload date:
- Size: 88.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
09b3c0a7a6c3bc73da4eaa7cc6fed8d3d766ada12a3cc85c6eab63301577e130
|
|
| MD5 |
53dd01b985a80125a1d7893ac27b6588
|
|
| BLAKE2b-256 |
d088062ce8e1af577ac5ade19c957fbcce8bf01da6ffdd1f5206ea4b3fdc3571
|