Agent runtime with tool calling and state management
Project description
pig-agent-core
Production-ready agent framework with tool calling, resilience, and observability.
Features
- 🤖 Master Loop: Async agent runtime with resilient LLM calls and automatic tool execution
- 🔧 Enhanced Tool System: Handler-based tools with fallback mapping, confirmation gates, and parallel/sequential execution
- 🛡️ Resilience: API key rotation with per-failure-type cooldowns, retry logic, and fallback models
- 📊 Observability: Event emission, billing hooks, tool audit logging, and performance metrics
- 💾 Memory Protocols: Pluggable conversation history storage (in-memory, Redis, database)
- 🔄 Context Management: 3-level compression strategy (truncate → summarize → LLM-compress)
- 🎯 Extension Protocols: MemoryProvider, SystemPromptBuilder, BillingHook, ContextLoader
- 🧮 Token Counting: Accurate token estimation with tiktoken and character-based fallback
- ⚡ Async First: Full async/await support with streaming and cancellation
Installation
pip install pig-agent-core
Quick Start
Basic Agent
from pig_agent_core import Agent
from pig_llm import LLM
# Create agent with enhanced features
agent = Agent(
name="Assistant",
llm=LLM(provider="openai"),
system_prompt="You are a helpful assistant.",
max_rounds=10, # Maximum conversation rounds
verbose=True, # Enable logging
)
# Run agent (async)
response = await agent.arun("Hello, how are you?")
print(response.content)
Agent with Resilience
from pig_agent_core import Agent, ProfileManager
from pig_llm import LLM
# Create profile manager for API key rotation
profile_manager = ProfileManager.from_env(
env_prefix="OPENAI_API_KEY", # Looks for OPENAI_API_KEY_1, _2, etc.
model="gpt-4",
fallback_models=["gpt-3.5-turbo"],
)
# Create agent with resilience
agent = Agent(
name="ResilientAgent",
llm=LLM(provider="openai"),
profile_manager=profile_manager, # Automatic key rotation
max_rounds=15,
)
# Agent automatically handles rate limits and failures
response = await agent.arun("Complex task requiring multiple API calls")
Agent with Tools
from pig_agent_core.tools import TOOL_SCHEMAS, HANDLERS
from pig_agent_core.tools.registry import ToolRegistry
# Create registry and register core tools (convenience method)
registry = ToolRegistry()
registry.register_package(TOOL_SCHEMAS, HANDLERS, is_core=True)
# Or register tools individually
# for schema in TOOL_SCHEMAS:
# tool_name = schema["function"]["name"]
# handler = HANDLERS.get(tool_name)
# if handler:
# registry.register(
# name=tool_name,
# handler=handler,
# schema=schema,
# is_core=True,
# )
# Create agent with tools
agent = Agent(
name="ThinkingAgent",
llm=LLM(provider="openai"),
)
agent.registry = registry
# Agent can now use think, plan, and discover_tools
response = await agent.respond("Help me plan a project")
Architecture
Master Loop
The agent uses a streaming master loop that:
- Receives user message
- Calls LLM with streaming
- Detects tool calls in stream
- Executes tools automatically
- Continues until no more tool calls
- Returns final response
User Message → LLM Stream → Tool Calls? → Execute Tools → LLM Stream → Response
↑ ↓
└────────────────────────────┘
(Repeat until done)
Tool System
Tools are async handlers registered with schemas:
async def handle_search(
args: dict[str, Any],
user_id: str | None = None,
meta: dict[str, Any] | None = None,
cancel: asyncio.Event | None = None,
) -> ToolResult:
query = args.get("query", "")
# Perform search...
return ToolResult(ok=True, data=results)
# Register tool
registry.register(
name="search",
handler=handle_search,
schema={
"type": "function",
"function": {
"name": "search",
"description": "Search for information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"],
},
},
},
)
Resilience System
API Key Rotation
from pig_agent_core.resilience.profile import ProfileManager
# Create profile manager with multiple API keys
profile_manager = ProfileManager.from_env(
env_prefix="OPENAI_API_KEY", # Looks for OPENAI_API_KEY_1, _2, etc.
model="gpt-4",
fallback_models=["gpt-3.5-turbo"],
)
# Create agent with resilience
agent = Agent(
name="ResilientAgent",
llm=LLM(provider="openai"),
profile_manager=profile_manager,
)
# Agent automatically rotates keys on rate limits
Retry Logic
from pig_agent_core.resilience.retry import resilient_streaming_call
# Resilient LLM call with automatic retry
async for chunk in resilient_streaming_call(
llm=llm,
messages=messages,
profile_manager=profile_manager,
max_retries=3,
):
print(chunk.content, end="")
Context Compression
def compress_messages(messages: list[Message]) -> list[Message]:
"""Compress messages when context is too long."""
# Keep system prompt and recent messages
if len(messages) <= 10:
return messages
return [messages[0]] + messages[-9:]
agent = Agent(
name="Agent",
llm=llm,
compress_fn=compress_messages,
)
# Agent automatically compresses on context overflow
Observability
from pig_agent_core.observability.events import AgentEvent, AgentEventType
def event_callback(event: AgentEvent):
"""Handle agent events."""
if event.type == AgentEventType.TOOL_START:
print(f"Tool started: {event.data.get('tool_name')}")
elif event.type == AgentEventType.TOOL_END:
print(f"Tool finished: {event.data.get('result')}")
agent = Agent(
name="ObservableAgent",
llm=llm,
event_callback=event_callback,
)
# Events are emitted automatically during execution
Extension Protocols
pig-agent-core provides protocol-based extension points for customizing agent behavior. All protocols use Python's Protocol for structural typing, allowing products to provide custom implementations.
MemoryProvider
Customize conversation history storage:
from pig_agent_core import MemoryProvider, Message
class RedisMemoryProvider:
"""Store conversation history in Redis."""
def __init__(self, redis_client):
self.redis = redis_client
async def get_messages(self, session_id: str) -> list[Message]:
"""Load messages from Redis."""
data = await self.redis.get(f"session:{session_id}")
if not data:
return []
return [Message(**msg) for msg in json.loads(data)]
async def add_message(self, session_id: str, message: Message) -> None:
"""Save message to Redis."""
messages = await self.get_messages(session_id)
messages.append(message)
await self.redis.set(
f"session:{session_id}",
json.dumps([msg.model_dump() for msg in messages])
)
async def clear_messages(self, session_id: str) -> None:
"""Clear session history."""
await self.redis.delete(f"session:{session_id}")
# Use custom memory provider
agent = Agent(
llm=llm,
memory_provider=RedisMemoryProvider(redis_client),
)
SystemPromptBuilder
Dynamically build system prompts with context:
from pig_agent_core import SystemPromptBuilder
class BrandedPromptBuilder:
"""Build prompts with brand context."""
def __init__(self, brand_db):
self.brand_db = brand_db
def build_prompt(self, base_prompt: str, context: dict) -> str:
"""Add brand voice and guidelines."""
brand = self.brand_db.get(context.get("brand_id"))
return f"""{base_prompt}
Brand Voice: {brand.voice}
Guidelines: {brand.guidelines}
Target Audience: {brand.audience}
"""
agent = Agent(
llm=llm,
system_prompt="You are a helpful assistant.",
system_prompt_builder=BrandedPromptBuilder(brand_db),
)
BillingHook
Track LLM and tool usage for cost monitoring:
from pig_agent_core import BillingHook
class CostTracker:
"""Track costs per user."""
def __init__(self):
self.costs = {}
self.pricing = {
"gpt-4": {"input": 0.03, "output": 0.06}, # per 1K tokens
"gpt-3.5-turbo": {"input": 0.001, "output": 0.002},
}
def on_llm_call(
self,
model: str,
input_tokens: int,
output_tokens: int,
user_id: str | None = None,
metadata: dict | None = None,
) -> None:
"""Track LLM call costs."""
if model in self.pricing:
cost = (
input_tokens / 1000 * self.pricing[model]["input"] +
output_tokens / 1000 * self.pricing[model]["output"]
)
user_id = user_id or "default"
self.costs[user_id] = self.costs.get(user_id, 0) + cost
def on_tool_call(
self,
tool_name: str,
user_id: str | None = None,
metadata: dict | None = None,
) -> None:
"""Track tool usage."""
# Could add tool-specific costs here
pass
def get_usage_summary(self, user_id: str | None = None) -> dict:
"""Get cost summary."""
if user_id:
return {"user_id": user_id, "total_cost": self.costs.get(user_id, 0)}
return {"total_cost": sum(self.costs.values()), "by_user": self.costs}
tracker = CostTracker()
agent = Agent(
llm=llm,
billing_hook=tracker,
)
# After execution
summary = tracker.get_usage_summary()
print(f"Total cost: ${summary['total_cost']:.4f}")
ContextLoader
Load user/brand context dynamically:
from pig_agent_core import ContextLoader
class DatabaseContextLoader:
"""Load context from database."""
def __init__(self, db):
self.db = db
async def load_context(self, user_id: str) -> dict:
"""Load user preferences and history."""
user = await self.db.users.find_one({"id": user_id})
return {
"user_name": user["name"],
"preferences": user["preferences"],
"recent_topics": user["recent_topics"],
"language": user["language"],
}
# Context is loaded and injected into system prompt
agent = Agent(
llm=llm,
context_loader=DatabaseContextLoader(db),
)
Compression Functions
Provide custom context compression strategies:
from pig_agent_core import compress_messages, CompressionConfig
# Use built-in 3-level compression
from pig_agent_core.context import compress_messages
agent = Agent(
llm=llm,
compress_fn=lambda msgs: compress_messages(
msgs,
max_tokens=8000,
model="gpt-4",
config=CompressionConfig(
level1_threshold=0.7, # Truncate tool results at 70%
level2_threshold=0.8, # Summarize at 80%
level3_threshold=0.9, # LLM-compress at 90%
),
),
)
# Or provide custom compression
def custom_compress(messages: list[Message]) -> list[Message]:
"""Keep only system prompt and last 5 messages."""
if len(messages) <= 6:
return messages
return [messages[0]] + messages[-5:]
agent = Agent(llm=llm, compress_fn=custom_compress)
Tool Audit and Metrics
Track tool usage patterns and performance:
from pig_agent_core import ToolAuditLog, ToolMetricsCollector
# Audit logging
audit_log = ToolAuditLog(max_entries=10000)
audit_log.log(
tool_name="search_web",
user_id="user123",
args={"query": "python"},
success=True,
duration=2.5,
result_size=1024,
)
# Get failed executions
failed = audit_log.get_failed_entries(limit=10)
# Export to JSON
audit_log.export_json("audit.json")
# Metrics collection
metrics = ToolMetricsCollector()
metrics.record("search_web", success=True, duration=2.5, result_size=1024)
# Get statistics
summary = metrics.get_summary()
print(f"Success rate: {summary['success_rate']:.1f}%")
print(f"Calls per second: {summary['calls_per_second']:.2f}")
# Get top tools by usage
top_tools = metrics.get_top_tools(limit=5, by="calls")
for tool_metrics in top_tools:
print(f"{tool_metrics.tool_name}: {tool_metrics.total_calls} calls")
API Reference
Agent
class Agent:
def __init__(
self,
name: str = "Agent",
llm: LLM | None = None,
tools: list[Tool] | None = None,
system_prompt: str | None = None,
max_iterations: int = 10,
on_tool_start: Callable | None = None,
on_tool_end: Callable | None = None,
verbose: bool = False,
profile_manager: ProfileManager | None = None,
event_callback: AgentEventCallback | None = None,
compress_fn: Callable[[list[Message]], list[Message]] | None = None,
)
async def respond(
self,
user_message: str,
cancel: asyncio.Event | None = None,
) -> Response:
"""Get non-streaming response."""
async def respond_stream(
self,
user_message: str,
cancel: asyncio.Event | None = None,
) -> AsyncIterator[StreamChunk]:
"""Get streaming response."""
ToolRegistry
class ToolRegistry:
def register(
self,
name: str,
handler: Callable,
schema: dict[str, Any],
*,
is_core: bool = False,
timeout: float = 30.0,
max_retries: int = 0,
) -> None:
"""Register a tool."""
async def execute(
self,
tool_call: Any,
user_id: str,
meta: dict[str, Any],
cancel: asyncio.Event | None = None,
) -> ToolResult:
"""Execute a tool."""
def activate_tools(self, names: list[str]) -> list[str]:
"""Activate deferred tools (lazy loading)."""
ToolResult
@dataclass
class ToolResult:
ok: bool
data: Any = None
error: str | None = None
meta: dict[str, Any] = field(default_factory=dict)
def serialize(self, max_chars: int = 4000) -> str:
"""Serialize with structure-aware truncation."""
ProfileManager
class ProfileManager:
@classmethod
def from_env(
cls,
env_prefix: str = "OPENAI_API_KEY",
model: str = "gpt-4",
fallback_models: list[str] | None = None,
) -> ProfileManager:
"""Load profiles from environment variables."""
def get_next_profile(self) -> APIProfile | None:
"""Get next available profile (round-robin with cooldown)."""
Migration Guide
From v0.0.x to v0.1.0
Tool System Changes
Before (v0.0.x):
from pig_agent_core import Agent, tool
@tool(description="Get weather")
def get_weather(location: str) -> str:
return f"Weather in {location}"
agent = Agent(tools=[get_weather])
After (v0.1.0):
from pig_agent_core import Agent
from pig_agent_core.tools.base import ToolResult
from pig_agent_core.tools.registry import ToolRegistry
async def handle_weather(args, user_id=None, meta=None, cancel=None):
location = args.get("location", "")
return ToolResult(ok=True, data=f"Weather in {location}")
registry = ToolRegistry()
registry.register(
name="get_weather",
handler=handle_weather,
schema={...}, # OpenAI function calling schema
)
agent = Agent()
agent.registry = registry
Agent Methods
Before (v0.0.x):
response = agent.run("Hello") # Synchronous
After (v0.1.0):
response = await agent.respond("Hello") # Async
# or
async for chunk in agent.respond_stream("Hello"): # Streaming
print(chunk.content)
State Management
State management API remains compatible, but now supports new subsystems:
# Save state (includes resilience and observability config)
state = agent.get_state()
agent.save_state("state.json")
# Restore state
agent = Agent.from_state("state.json")
Core Tools
pig-agent-core includes three core tools:
- think: Internal reasoning tool for agent planning
- plan: Create and validate execution plans
- discover_tools: Find available tools by description
These tools are automatically available when using the new ToolRegistry.
Examples
Complete Agent with All Features
import asyncio
from pig_agent_core import Agent
from pig_llm import LLM
from pig_agent_core.resilience.profile import ProfileManager
from pig_agent_core.observability.events import AgentEvent
# Setup resilience
profile_manager = ProfileManager.from_env(
env_prefix="OPENAI_API_KEY",
model="gpt-4",
fallback_models=["gpt-3.5-turbo"],
)
# Setup observability
def log_events(event: AgentEvent):
print(f"[{event.type}] {event.data}")
# Setup context compression
def compress(messages):
return [messages[0]] + messages[-9:] if len(messages) > 10 else messages
# Create agent
agent = Agent(
name="ProductionAgent",
llm=LLM(provider="openai"),
system_prompt="You are a helpful assistant.",
profile_manager=profile_manager,
event_callback=log_events,
compress_fn=compress,
max_iterations=10,
verbose=True,
)
# Run agent
async def main():
async for chunk in agent.respond_stream("Help me plan a project"):
if chunk.type == "text":
print(chunk.content, end="", flush=True)
asyncio.run(main())
Development
# Install in development mode
uv pip install -e .
# Install with dev dependencies
uv pip install -e ".[dev]"
# Run tests
pytest tests/
# Run with coverage
pytest tests/ --cov=pig_agent_core --cov-report=html
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pig_agent_core-0.0.4.tar.gz.
File metadata
- Download URL: pig_agent_core-0.0.4.tar.gz
- Upload date:
- Size: 102.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4c076f381673ddbc1e5d47e48da80c314a292d080cd38445b62690573838fedc
|
|
| MD5 |
65e0db560b5519d6e218b56caba51e3c
|
|
| BLAKE2b-256 |
3a26dc2c251b5a0c98efabfb19a98ba62c76a36d74d36ea2bcfe799b6b35c7ca
|
File details
Details for the file pig_agent_core-0.0.4-py3-none-any.whl.
File metadata
- Download URL: pig_agent_core-0.0.4-py3-none-any.whl
- Upload date:
- Size: 69.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b0bde0f6b626f14e9ddde119c9d63e7836c4fc77e2adf9092a49d1e927907d92
|
|
| MD5 |
a334be581317e9bdb33c1dd5254f754c
|
|
| BLAKE2b-256 |
c8af7fcb898209fbdef7a93c0e57e6aeb895688c84982b406ccbc4c61b30392d
|