Skip to main content

Pythonic async-native agent framework

Project description

CubePi logo

CubePi

CI codecov PyPI Python License: MIT

Docs: https://cubepi.pages.dev — Getting Started · API Reference · Recipes

A Pythonic, async-native agent framework — a leaner, more readable take on agent runtimes like langgraph.

Why CubePi

langgraph CubePi
Abstraction Graph nodes + edges + channels — you model your agent as a state machine Plain async functions — run_agent_loop is a while loop you can read in 5 minutes
Streaming Callback-based, multiple handler types async for event in stream — one pattern everywhere
Checkpointing Full snapshot per step — serializes entire message list on every channel change Append-only — writes only new messages, O(1) DB I/O regardless of conversation length
Dependencies Pulls in langchain-core, langgraph-sdk, and transitive deps 3 core deps: pydantic, anthropic, openai
Tool execution Tools are graph nodes with manual wiring Declare tools as functions, framework handles routing and parallel execution
Multi-provider Via langchain chat model adapters Native Provider protocol — Anthropic, OpenAI built in, add your own with one class
Middleware Graph-level middleware on node entry/exit Agent-level middleware with 5 typed hooks and declarative composition rules
Observability LangSmith / Langfuse integration, full trace visualization Native OpenTelemetry — Tracer, Meter, GenAI semconv, OTLP / JSONL exporters built in

Install

pip install cubepi

# Optional extras
pip install cubepi[sqlite]     # SQLite checkpointer
pip install cubepi[postgres]   # Postgres checkpointer
pip install cubepi[mcp]        # MCP tool loaders
pip install cubepi[tracing]    # OpenTelemetry tracing + metrics
pip install cubepi[tracing-otlp]  # Adds the OTLP/HTTP span exporter

Or with uv:

uv add cubepi
uv add cubepi[sqlite,postgres,mcp,tracing]

Quick Start

import asyncio
from pydantic import BaseModel
from cubepi import Agent, AgentTool, Model
from cubepi.agent.types import AgentToolResult
from cubepi.providers.anthropic import AnthropicProvider
from cubepi.providers.base import TextContent

provider = AnthropicProvider(api_key="sk-...")

class GetWeatherParams(BaseModel):
    city: str

async def get_weather(tool_call_id, params: GetWeatherParams, *, signal=None, on_update=None):
    return AgentToolResult(
        content=[TextContent(text=f"72°F and sunny in {params.city}")]
    )

agent = Agent(
    provider=provider,
    model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
    tools=[
        AgentTool(
            name="get_weather",
            description="Get current weather for a city",
            parameters=GetWeatherParams,
            execute=get_weather,
        ),
    ],
    system_prompt="You are a helpful weather assistant.",
)

def on_event(event, signal=None):
    if event.type == "text_delta":
        print(event.delta, end="", flush=True)

agent.subscribe(on_event)
asyncio.run(agent.prompt("What's the weather in Tokyo?"))

Architecture

cubepi/
├── providers/        # LLM provider abstraction
│   ├── base.py             # Provider protocol, message types, MessageStream
│   ├── anthropic.py        # Anthropic provider
│   ├── openai.py           # OpenAI Chat Completions provider
│   ├── openai_responses.py # OpenAI Responses provider
│   └── faux.py             # Test utility — pre-configured responses with realistic streaming
├── agent/            # Agent runtime
│   ├── agent.py      # Stateful Agent class
│   ├── loop.py       # Stateless core loop (the actual algorithm)
│   ├── tools.py      # Tool execution engine (sequential + parallel)
│   └── types.py      # Events, AgentTool, AgentContext, hook types
├── middleware/       # Composable middleware protocol
│   └── base.py       # 5 hooks with distinct composition rules
├── checkpointer/     # Persistence
│   ├── base.py       # Checkpointer protocol
│   ├── memory.py     # In-memory (dev/test)
│   ├── sqlite.py     # SQLite (lightweight persistence)
│   └── postgres/     # Postgres (production persistence)
├── mcp/              # MCP tool loaders (HTTP + stdio transports)
└── tracing/          # OpenTelemetry tracing + metrics (optional extra)
    ├── tracer.py     # Tracer entry point — TracerProvider + recorder wiring
    ├── recorder.py   # Maps agent + provider events to OTel spans
    ├── meter.py      # gen_ai.* histograms (duration, TTFC, token usage)
    ├── context.py    # tracing_context(tags=…, metadata=…) per-run tagging
    ├── schema.py     # OTel GenAI semconv attribute names
    └── exporters/    # JsonlSpanExporter + helpers (OTLP via opentelemetry-sdk)

Core Concepts

Providers

Abstract LLM interaction behind a Provider protocol. All providers return MessageStream — an async iterator of StreamEvents.

from cubepi.providers.anthropic import AnthropicProvider
from cubepi.providers.openai import OpenAIProvider
from cubepi.providers import FauxProvider

# Real providers
anthropic = AnthropicProvider(api_key="...")
openai = OpenAIProvider(api_key="...")

# Test provider — no API calls, fully deterministic
faux = FauxProvider()
faux.set_responses(["Hello!", "How can I help?"])

Tools

Declare tools with a name, a Pydantic model for parameters, and an async execute returning AgentToolResult. The framework handles JSON Schema derivation, argument parsing, parallel execution, and error wrapping.

from pydantic import BaseModel
from cubepi import AgentTool
from cubepi.agent.types import AgentToolResult
from cubepi.providers.base import TextContent

class SearchParams(BaseModel):
    query: str

async def execute(tool_call_id, params: SearchParams, *, signal=None, on_update=None):
    return AgentToolResult(content=[TextContent(text=f"Results for: {params.query}")])

tool = AgentTool(
    name="search",
    description="Search the web",
    parameters=SearchParams,
    execute=execute,
    execution_mode="parallel",  # or "sequential"
)

Middleware

Composable hooks that modify behavior without touching the core loop:

from cubepi import Middleware, compose_middleware

class LoggingMiddleware(Middleware):
    async def transform_context(self, messages, *, signal=None):
        print(f"Context has {len(messages)} messages")
        return messages

class SafetyMiddleware(Middleware):
    async def before_tool_call(self, ctx, *, signal=None):
        if ctx.tool_call.name == "dangerous_tool":
            return BeforeToolCallResult(block=True, content="Blocked by policy")
        return None

hooks = compose_middleware([LoggingMiddleware(), SafetyMiddleware()])

Composition rules:

Hook Rule
transform_context Chained — each receives previous result
convert_to_llm Last implementation wins
before_tool_call Any block stops execution
after_tool_call Later overrides earlier
should_stop_after_turn Any true stops

Checkpointer

Persist conversation state with append-only semantics:

from cubepi.checkpointer import MemoryCheckpointer, SQLiteCheckpointer, PostgresCheckpointer

# In-memory for dev/test
cp = MemoryCheckpointer()

# SQLite for lightweight persistence
async with SQLiteCheckpointer("agent.db") as cp:
    agent = Agent(model=model, checkpointer=cp, thread_id="conv-1")

# Postgres for production
async with PostgresCheckpointer("postgresql://...") as cp:
    agent = Agent(model=model, checkpointer=cp, thread_id="conv-1")

FauxProvider for Testing

Ship your agent tests without API keys:

from cubepi.providers import FauxProvider, faux_text, faux_tool_call, faux_assistant_message

provider = FauxProvider()
provider.set_responses([
    faux_assistant_message([
        faux_tool_call("search", {"query": "python"}),
    ]),
    faux_assistant_message("Here are the results..."),
])

agent = Agent(provider=provider, model=Model(id="test", provider="faux"), tools=[search_tool])
agent.subscribe(lambda event, signal=None: None)  # subscribe before prompt to receive events
await agent.prompt("Search for python")
# Streams realistic deltas — content_block_start, text_delta, etc.

Tracing

Attach a Tracer and every agent run produces OpenTelemetry spans aligned with the GenAI Semantic Conventions — ingestible by Jaeger, Tempo, Honeycomb, Datadog, AWS X-Ray, or any OTLP-compatible backend without custom instrumentation:

from cubepi.tracing import Tracer, tracing_context
from cubepi.tracing.exporters import JsonlSpanExporter

async with (
    Tracer(
        service_name="my-bot",
        agent_name="assistant",
        exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
    ) as tracer,
    tracer.attached(agent),
):
    with tracing_context(tags=["beta-arm"], metadata={"user_id": "u-42"}):
        await agent.prompt("Hello.")
# On exit: detach (closes any cancelled-run spans + flush) + tracer shutdown.

Span tree per run:

invoke_agent <agent_name>              [INTERNAL]
└── cubepi.turn                        [INTERNAL]
    ├── chat <model>                   [CLIENT]   ← the LLM call itself
    └── execute_tool <tool_name>       [INTERNAL] ← each tool invocation
        └── tools/call <tool_name>     [CLIENT]   ← MCP-backed tools only

No prompts / model outputs are recorded by default. Opt in with Tracer(record_content=True) plus a redact callback for PII. Pair with Meter(...) for gen_ai.client.operation.duration / TTFC / token-usage histograms. Full guide: https://cubepi.pages.dev/guides/tracing/overview

Requirements

  • Python >= 3.11
  • Core: pydantic, anthropic, openai
  • Optional: aiosqlite ([sqlite]), asyncpg + sqlalchemy + msgpack ([postgres]), mcp ([mcp]), opentelemetry-sdk ([tracing]), opentelemetry-exporter-otlp-proto-http ([tracing-otlp])

Credits

Architecture inspired by pi-agent-core (TypeScript); CubePi is an independent Python reimplementation with Pydantic v2, asyncio-native primitives, and built-in checkpointing.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cubepi-0.4.0.tar.gz (985.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cubepi-0.4.0-py3-none-any.whl (109.8 kB view details)

Uploaded Python 3

File details

Details for the file cubepi-0.4.0.tar.gz.

File metadata

  • Download URL: cubepi-0.4.0.tar.gz
  • Upload date:
  • Size: 985.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for cubepi-0.4.0.tar.gz
Algorithm Hash digest
SHA256 b34e203f89e05ad09c840f4c6bf75e7717e853e96ed8ada8235f87f716994bde
MD5 ffbb8f4d901c630748fa3b4e4313d9ba
BLAKE2b-256 20dcaddb6bd924f55ea37fd979df5030ee4181698fa9fcdd82ac8a3ab0a0a8fc

See more details on using hashes here.

Provenance

The following attestation bundles were made for cubepi-0.4.0.tar.gz:

Publisher: publish.yml on cubeplexai/cubepi

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file cubepi-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: cubepi-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 109.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for cubepi-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7dc938e0fa68b6858bf5f905d11b3e14107000b5713a78f4bdfd47f0eb000d51
MD5 21f60242d7a50256cffd3215033dee37
BLAKE2b-256 dd9f32cce93c442f297f432e54241899bf383b82240b5a756f0e3036c33f2d15

See more details on using hashes here.

Provenance

The following attestation bundles were made for cubepi-0.4.0-py3-none-any.whl:

Publisher: publish.yml on cubeplexai/cubepi

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page