Pythonic async-native agent framework
Project description
CubePi
Docs: https://cubepi.pages.dev — Getting Started · API Reference · Recipes
A Pythonic, async-native agent framework — a leaner, more readable take on agent runtimes like langgraph.
Why CubePi
| langgraph | CubePi | |
|---|---|---|
| Abstraction | Graph nodes + edges + channels — you model your agent as a state machine | Plain async functions — run_agent_loop is a while loop you can read in 5 minutes |
| Streaming | Callback-based, multiple handler types | async for event in stream — one pattern everywhere |
| Checkpointing | Full snapshot per step — serializes entire message list on every channel change | Append-only — writes only new messages, O(1) DB I/O regardless of conversation length |
| Dependencies | Pulls in langchain-core, langgraph-sdk, and transitive deps | 3 core deps: pydantic, anthropic, openai |
| Tool execution | Tools are graph nodes with manual wiring | Declare tools as functions, framework handles routing and parallel execution |
| Multi-provider | Via langchain chat model adapters | Native Provider protocol — Anthropic, OpenAI built in, add your own with one class |
| Middleware | Graph-level middleware on node entry/exit | Agent-level middleware with 5 typed hooks and declarative composition rules |
| Observability | LangSmith / Langfuse integration, full trace visualization | Native OpenTelemetry — Tracer, Meter, GenAI semconv, OTLP / JSONL exporters built in |
Install
pip install cubepi
# Optional extras
pip install cubepi[sqlite] # SQLite checkpointer
pip install cubepi[postgres] # Postgres checkpointer
pip install cubepi[mcp] # MCP tool loaders
pip install cubepi[tracing] # OpenTelemetry tracing + metrics
pip install cubepi[tracing-otlp] # Adds the OTLP/HTTP span exporter
Or with uv:
uv add cubepi
uv add cubepi[sqlite,postgres,mcp,tracing]
Quick Start
import asyncio
from pydantic import BaseModel
from cubepi import Agent, AgentTool, Model
from cubepi.agent.types import AgentToolResult
from cubepi.providers.anthropic import AnthropicProvider
from cubepi.providers.base import TextContent
provider = AnthropicProvider(api_key="sk-...")
class GetWeatherParams(BaseModel):
city: str
async def get_weather(tool_call_id, params: GetWeatherParams, *, signal=None, on_update=None):
return AgentToolResult(
content=[TextContent(text=f"72°F and sunny in {params.city}")]
)
agent = Agent(
provider=provider,
model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
tools=[
AgentTool(
name="get_weather",
description="Get current weather for a city",
parameters=GetWeatherParams,
execute=get_weather,
),
],
system_prompt="You are a helpful weather assistant.",
)
def on_event(event, signal=None):
if event.type == "text_delta":
print(event.delta, end="", flush=True)
agent.subscribe(on_event)
asyncio.run(agent.prompt("What's the weather in Tokyo?"))
Architecture
cubepi/
├── providers/ # LLM provider abstraction
│ ├── base.py # Provider protocol, message types, MessageStream
│ ├── anthropic.py # Anthropic provider
│ ├── openai.py # OpenAI Chat Completions provider
│ ├── openai_responses.py # OpenAI Responses provider
│ └── faux.py # Test utility — pre-configured responses with realistic streaming
├── agent/ # Agent runtime
│ ├── agent.py # Stateful Agent class
│ ├── loop.py # Stateless core loop (the actual algorithm)
│ ├── tools.py # Tool execution engine (sequential + parallel)
│ └── types.py # Events, AgentTool, AgentContext, hook types
├── middleware/ # Composable middleware protocol
│ └── base.py # 5 hooks with distinct composition rules
├── checkpointer/ # Persistence
│ ├── base.py # Checkpointer protocol
│ ├── memory.py # In-memory (dev/test)
│ ├── sqlite.py # SQLite (lightweight persistence)
│ └── postgres/ # Postgres (production persistence)
├── mcp/ # MCP tool loaders (HTTP + stdio transports)
└── tracing/ # OpenTelemetry tracing + metrics (optional extra)
├── tracer.py # Tracer entry point — TracerProvider + recorder wiring
├── recorder.py # Maps agent + provider events to OTel spans
├── meter.py # gen_ai.* histograms (duration, TTFC, token usage)
├── context.py # tracing_context(tags=…, metadata=…) per-run tagging
├── schema.py # OTel GenAI semconv attribute names
└── exporters/ # JsonlSpanExporter + helpers (OTLP via opentelemetry-sdk)
Core Concepts
Providers
Abstract LLM interaction behind a Provider protocol. All providers return MessageStream — an async iterator of StreamEvents.
from cubepi.providers.anthropic import AnthropicProvider
from cubepi.providers.openai import OpenAIProvider
from cubepi.providers import FauxProvider
# Real providers
anthropic = AnthropicProvider(api_key="...")
openai = OpenAIProvider(api_key="...")
# Test provider — no API calls, fully deterministic
faux = FauxProvider()
faux.set_responses(["Hello!", "How can I help?"])
Tools
Declare tools with a name, a Pydantic model for parameters, and an async execute returning AgentToolResult. The framework handles JSON Schema derivation, argument parsing, parallel execution, and error wrapping.
from pydantic import BaseModel
from cubepi import AgentTool
from cubepi.agent.types import AgentToolResult
from cubepi.providers.base import TextContent
class SearchParams(BaseModel):
query: str
async def execute(tool_call_id, params: SearchParams, *, signal=None, on_update=None):
return AgentToolResult(content=[TextContent(text=f"Results for: {params.query}")])
tool = AgentTool(
name="search",
description="Search the web",
parameters=SearchParams,
execute=execute,
execution_mode="parallel", # or "sequential"
)
Middleware
Composable hooks that modify behavior without touching the core loop:
from cubepi import Middleware, compose_middleware
class LoggingMiddleware(Middleware):
async def transform_context(self, messages, *, signal=None):
print(f"Context has {len(messages)} messages")
return messages
class SafetyMiddleware(Middleware):
async def before_tool_call(self, ctx, *, signal=None):
if ctx.tool_call.name == "dangerous_tool":
return BeforeToolCallResult(block=True, content="Blocked by policy")
return None
hooks = compose_middleware([LoggingMiddleware(), SafetyMiddleware()])
Composition rules:
| Hook | Rule |
|---|---|
transform_context |
Chained — each receives previous result |
convert_to_llm |
Last implementation wins |
before_tool_call |
Any block stops execution |
after_tool_call |
Later overrides earlier |
should_stop_after_turn |
Any true stops |
Checkpointer
Persist conversation state with append-only semantics:
from cubepi.checkpointer import MemoryCheckpointer, SQLiteCheckpointer, PostgresCheckpointer
# In-memory for dev/test
cp = MemoryCheckpointer()
# SQLite for lightweight persistence
async with SQLiteCheckpointer("agent.db") as cp:
agent = Agent(model=model, checkpointer=cp, thread_id="conv-1")
# Postgres for production
async with PostgresCheckpointer("postgresql://...") as cp:
agent = Agent(model=model, checkpointer=cp, thread_id="conv-1")
FauxProvider for Testing
Ship your agent tests without API keys:
from cubepi.providers import FauxProvider, faux_text, faux_tool_call, faux_assistant_message
provider = FauxProvider()
provider.set_responses([
faux_assistant_message([
faux_tool_call("search", {"query": "python"}),
]),
faux_assistant_message("Here are the results..."),
])
agent = Agent(provider=provider, model=Model(id="test", provider="faux"), tools=[search_tool])
agent.subscribe(lambda event, signal=None: None) # subscribe before prompt to receive events
await agent.prompt("Search for python")
# Streams realistic deltas — content_block_start, text_delta, etc.
Tracing
Attach a Tracer and every agent run produces OpenTelemetry spans
aligned with the GenAI Semantic Conventions —
ingestible by Jaeger, Tempo, Honeycomb, Datadog, AWS X-Ray, or any
OTLP-compatible backend without custom instrumentation:
from cubepi.tracing import Tracer, tracing_context
from cubepi.tracing.exporters import JsonlSpanExporter
async with (
Tracer(
service_name="my-bot",
agent_name="assistant",
exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
) as tracer,
tracer.attached(agent),
):
with tracing_context(tags=["beta-arm"], metadata={"user_id": "u-42"}):
await agent.prompt("Hello.")
# On exit: detach (closes any cancelled-run spans + flush) + tracer shutdown.
Span tree per run:
invoke_agent <agent_name> [INTERNAL]
└── cubepi.turn [INTERNAL]
├── chat <model> [CLIENT] ← the LLM call itself
└── execute_tool <tool_name> [INTERNAL] ← each tool invocation
└── tools/call <tool_name> [CLIENT] ← MCP-backed tools only
No prompts / model outputs are recorded by default. Opt in with
Tracer(record_content=True) plus a redact callback for PII. Pair
with Meter(...) for gen_ai.client.operation.duration / TTFC /
token-usage histograms. Full guide: https://cubepi.pages.dev/guides/tracing/overview
Requirements
- Python >= 3.11
- Core:
pydantic,anthropic,openai - Optional:
aiosqlite([sqlite]),asyncpg+sqlalchemy+msgpack([postgres]),mcp([mcp]),opentelemetry-sdk([tracing]),opentelemetry-exporter-otlp-proto-http([tracing-otlp])
Credits
Architecture inspired by pi-agent-core (TypeScript); CubePi is an independent Python reimplementation with Pydantic v2, asyncio-native primitives, and built-in checkpointing.
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cubepi-0.4.0.tar.gz.
File metadata
- Download URL: cubepi-0.4.0.tar.gz
- Upload date:
- Size: 985.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b34e203f89e05ad09c840f4c6bf75e7717e853e96ed8ada8235f87f716994bde
|
|
| MD5 |
ffbb8f4d901c630748fa3b4e4313d9ba
|
|
| BLAKE2b-256 |
20dcaddb6bd924f55ea37fd979df5030ee4181698fa9fcdd82ac8a3ab0a0a8fc
|
Provenance
The following attestation bundles were made for cubepi-0.4.0.tar.gz:
Publisher:
publish.yml on cubeplexai/cubepi
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
cubepi-0.4.0.tar.gz -
Subject digest:
b34e203f89e05ad09c840f4c6bf75e7717e853e96ed8ada8235f87f716994bde - Sigstore transparency entry: 1572608955
- Sigstore integration time:
-
Permalink:
cubeplexai/cubepi@ac56a777a304d67a80f1fbbbfc73eeb5575a7abd -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/cubeplexai
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ac56a777a304d67a80f1fbbbfc73eeb5575a7abd -
Trigger Event:
release
-
Statement type:
File details
Details for the file cubepi-0.4.0-py3-none-any.whl.
File metadata
- Download URL: cubepi-0.4.0-py3-none-any.whl
- Upload date:
- Size: 109.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7dc938e0fa68b6858bf5f905d11b3e14107000b5713a78f4bdfd47f0eb000d51
|
|
| MD5 |
21f60242d7a50256cffd3215033dee37
|
|
| BLAKE2b-256 |
dd9f32cce93c442f297f432e54241899bf383b82240b5a756f0e3036c33f2d15
|
Provenance
The following attestation bundles were made for cubepi-0.4.0-py3-none-any.whl:
Publisher:
publish.yml on cubeplexai/cubepi
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
cubepi-0.4.0-py3-none-any.whl -
Subject digest:
7dc938e0fa68b6858bf5f905d11b3e14107000b5713a78f4bdfd47f0eb000d51 - Sigstore transparency entry: 1572608979
- Sigstore integration time:
-
Permalink:
cubeplexai/cubepi@ac56a777a304d67a80f1fbbbfc73eeb5575a7abd -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/cubeplexai
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ac56a777a304d67a80f1fbbbfc73eeb5575a7abd -
Trigger Event:
release
-
Statement type: