Stateful agent with tool execution, event streaming, steering/follow-up message queuing, and proxy transport
Project description
pi-agent-core
A stateful LLM agent framework for Python with tool execution, event streaming, steering/follow-up message queuing, and proxy transport.
Installation
uv add pi-agent-core
Overview
pi-agent-core provides a minimal, LLM-agnostic agent loop that handles the orchestration between your application and any LLM provider. You bring your own streaming function — the library handles state management, tool execution, event dispatch, mid-turn steering, and follow-up queuing.
Key Features
- LLM-agnostic — works with any provider via the
StreamFnprotocol - Real-time event streaming — two-level event system for agent lifecycle and LLM streaming primitives
- Tool execution — define tools with JSON Schema parameters and async execute functions
- Steering & follow-up queues — interrupt mid-turn or queue messages for after completion
- Cancellation — cooperative cancellation via
asyncio.Event - Proxy transport — built-in SSE proxy client for routing through a backend server
- Fully typed — Pydantic models throughout with
py.typedmarker
Quick Start
import asyncio
from pi_agent_core import (
Agent,
AgentOptions,
AgentEvent,
AgentTool,
AgentToolSchema,
AgentToolResult,
Model,
TextContent,
)
# 1. Define your tools
async def greet(tool_call_id, params, cancel_event=None, on_update=None):
name = params.get("name", "world")
return AgentToolResult(content=[TextContent(text=f"Hello, {name}!")])
greet_tool = AgentTool(
name="greet",
description="Greet someone by name",
parameters=AgentToolSchema(
properties={"name": {"type": "string", "description": "Name to greet"}},
required=["name"],
),
execute=greet,
)
# 2. Implement a StreamFn for your LLM provider
# (see "Implementing a StreamFn" section below)
async def my_stream_fn(model, context, options):
...
# 3. Create and run the agent
agent = Agent(AgentOptions(stream_fn=my_stream_fn))
agent.set_model(Model(api="anthropic", provider="anthropic", id="claude-sonnet-4-20250514"))
agent.set_system_prompt("You are a helpful assistant.")
agent.set_tools([greet_tool])
# Subscribe to events
def on_event(event: AgentEvent):
print(f"Event: {event.type}")
agent.subscribe(on_event)
# Send a prompt
asyncio.run(agent.prompt("Say hello to Alice"))
Architecture
Agent ← High-level stateful wrapper, subscriptions, queues
↓
agent_loop() ← Core orchestration: prompt → stream → tools → steering loop
↓
StreamFn (user-provided) ← You implement LLM streaming integration
or
stream_proxy() ← Built-in SSE proxy client as a StreamFn
Modules
| Module | Responsibility |
|---|---|
types.py |
All Pydantic models: content blocks, messages, events, tools, config, state, and the StreamResult protocol |
agent_loop.py |
agent_loop() and agent_loop_continue() async generators — streaming, tool execution, steering, follow-ups |
agent.py |
Agent class wrapping the loop with state management, event subscriptions, abort/reset, and queue management |
proxy.py |
stream_proxy() SSE client using httpx — reconstructs partial messages from server-stripped delta events |
Implementing a StreamFn
The library is LLM-agnostic. You provide a stream_fn(model, context, options) that returns a StreamResult — an async iterator of AssistantMessageEvent objects with an async result() method.
from pi_agent_core import (
AssistantMessage,
AssistantMessageEvent,
StreamResult,
StreamStartEvent,
StreamTextStartEvent,
StreamTextDeltaEvent,
StreamTextEndEvent,
StreamDoneEvent,
TextContent,
Model,
AgentContext,
SimpleStreamOptions,
)
class MyStream:
"""Implements the StreamResult protocol."""
def __init__(self):
self._events = []
self._final = None
def __aiter__(self):
return self
async def __anext__(self) -> AssistantMessageEvent:
... # yield events from your LLM provider
async def result(self) -> AssistantMessage:
return self._final
async def my_stream_fn(
model: Model,
context: AgentContext,
options: SimpleStreamOptions,
) -> StreamResult:
stream = MyStream()
# Start your LLM call, push events into the stream
return stream
Event System
Agent Events (10 types)
Covers agent lifecycle, turns, messages, and tool execution:
agent_start, agent_end, turn_start, turn_end, message_start, message_update, message_end, tool_execution_start, tool_execution_update, tool_execution_end
Assistant Message Events (12 types)
Covers LLM streaming primitives consumed internally by the loop:
start, text_start, text_delta, text_end, thinking_start, thinking_delta, thinking_end, toolcall_start, toolcall_delta, toolcall_end, done, error
Steering & Follow-up Queues
Steering messages interrupt the agent mid-turn (skipping remaining tool calls):
agent.steer(UserMessage(content=[TextContent(text="Actually, use a different approach")]))
Follow-up messages trigger new turns after the current run completes:
agent.follow_up(UserMessage(content=[TextContent(text="Now summarize the results")]))
Both support "one-at-a-time" (default) or "all" dequeue modes.
Proxy Transport
For apps that route LLM calls through a backend server:
from pi_agent_core import Agent, AgentOptions, stream_proxy, ProxyStreamOptions
agent = Agent(AgentOptions(
stream_fn=lambda model, context, options: stream_proxy(
model, context,
ProxyStreamOptions(
**options.model_dump(),
auth_token="your-auth-token",
proxy_url="https://your-proxy.example.com",
),
),
))
Development
uv sync # Install dependencies
uv run pytest # Run all tests
uv run pytest -v --tb=short # Verbose with short tracebacks
uv run ruff check . # Lint
uv run ruff format . # Format
Credits
This is a Python port of the TypeScript @mariozechner/pi-agent-core package from the pi-mono repository. The original TypeScript implementation by Mario Zechner provides the architecture, abstractions, and design that this library faithfully mirrors.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pi_agent_core-0.1.1.tar.gz.
File metadata
- Download URL: pi_agent_core-0.1.1.tar.gz
- Upload date:
- Size: 17.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.24 {"installer":{"name":"uv","version":"0.9.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ed45a5aa51d26a9471888cb4de22758587b0dd311496f490dabff456d3c8ee5b
|
|
| MD5 |
59ff478027c5323dcd6b52a6f7aa5266
|
|
| BLAKE2b-256 |
c2040fa88a8be7f7f8fea650fcd374203c7cd5f91f0411d3d8bf94c82ae1c68d
|
File details
Details for the file pi_agent_core-0.1.1-py3-none-any.whl.
File metadata
- Download URL: pi_agent_core-0.1.1-py3-none-any.whl
- Upload date:
- Size: 21.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.24 {"installer":{"name":"uv","version":"0.9.24","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f915ed1d79775ae340bac41eea2ec65089e197333e08f16c4bec5245a99c073b
|
|
| MD5 |
6a52d63890411f27415211c258f44c63
|
|
| BLAKE2b-256 |
93e669f59a57398ff3091b75b6d8f844eab4f62ed71bed77f8545d041b1013cb
|