Skip to main content

Pythonic async-native agent framework

Project description

CubePi logo

CubePi

CI codecov PyPI Python License: MIT

Docs: https://cubepi.pages.dev — Getting Started · API Reference · Recipes

A Pythonic, async-native agent framework — a leaner, more readable take on agent runtimes like langgraph.

Why CubePi

langgraph CubePi
Abstraction Graph nodes + edges + channels — you model your agent as a state machine Plain async functions — run_agent_loop is a while loop you can read in 5 minutes
Streaming Callback-based, multiple handler types async for event in stream — one pattern everywhere
Checkpointing Full snapshot per step — serializes entire message list on every channel change Append-only — writes only new messages, O(1) DB I/O regardless of conversation length
Dependencies Pulls in langchain-core, langgraph-sdk, and transitive deps 3 core deps: pydantic, anthropic, openai
Tool execution Tools are graph nodes with manual wiring Declare tools as functions, framework handles routing and parallel execution
Multi-provider Via langchain chat model adapters Native Provider protocol — Anthropic, OpenAI built in, add your own with one class
Middleware Graph-level middleware on node entry/exit Agent-level middleware with 5 typed hooks and declarative composition rules
Observability LangSmith / Langfuse integration, full trace visualization Native OpenTelemetry — Tracer, Meter, GenAI semconv, OTLP / JSONL exporters built in

Install

pip install cubepi

# Optional extras
pip install cubepi[sqlite]     # SQLite checkpointer
pip install cubepi[postgres]   # Postgres checkpointer
pip install cubepi[mcp]        # MCP tool loaders
pip install cubepi[tracing]    # OpenTelemetry tracing + metrics
pip install cubepi[tracing-otlp]  # Adds the OTLP/HTTP span exporter

Or with uv:

uv add cubepi
uv add cubepi[sqlite,postgres,mcp,tracing]

Quick Start

import asyncio
from pydantic import BaseModel
from cubepi import Agent, AgentTool, Model
from cubepi.agent.types import AgentToolResult
from cubepi.providers.anthropic import AnthropicProvider
from cubepi.providers.base import TextContent

provider = AnthropicProvider(api_key="sk-...")

class GetWeatherParams(BaseModel):
    city: str

async def get_weather(tool_call_id, params: GetWeatherParams, *, signal=None, on_update=None):
    return AgentToolResult(
        content=[TextContent(text=f"72°F and sunny in {params.city}")]
    )

agent = Agent(
    provider=provider,
    model=Model(id="claude-sonnet-4-5-20250929", provider="anthropic"),
    tools=[
        AgentTool(
            name="get_weather",
            description="Get current weather for a city",
            parameters=GetWeatherParams,
            execute=get_weather,
        ),
    ],
    system_prompt="You are a helpful weather assistant.",
)

def on_event(event, signal=None):
    if event.type == "text_delta":
        print(event.delta, end="", flush=True)

agent.subscribe(on_event)
asyncio.run(agent.prompt("What's the weather in Tokyo?"))

Architecture

cubepi/
├── providers/        # LLM provider abstraction
│   ├── base.py             # Provider protocol, message types, MessageStream
│   ├── anthropic.py        # Anthropic provider
│   ├── openai.py           # OpenAI Chat Completions provider
│   ├── openai_responses.py # OpenAI Responses provider
│   └── faux.py             # Test utility — pre-configured responses with realistic streaming
├── agent/            # Agent runtime
│   ├── agent.py      # Stateful Agent class
│   ├── loop.py       # Stateless core loop (the actual algorithm)
│   ├── tools.py      # Tool execution engine (sequential + parallel)
│   └── types.py      # Events, AgentTool, AgentContext, hook types
├── middleware/       # Composable middleware protocol
│   └── base.py       # 5 hooks with distinct composition rules
├── checkpointer/     # Persistence
│   ├── base.py       # Checkpointer protocol
│   ├── memory.py     # In-memory (dev/test)
│   ├── sqlite.py     # SQLite (lightweight persistence)
│   └── postgres/     # Postgres (production persistence)
├── mcp/              # MCP tool loaders (HTTP + stdio transports)
└── tracing/          # OpenTelemetry tracing + metrics (optional extra)
    ├── tracer.py     # Tracer entry point — TracerProvider + recorder wiring
    ├── recorder.py   # Maps agent + provider events to OTel spans
    ├── meter.py      # gen_ai.* histograms (duration, TTFC, token usage)
    ├── context.py    # tracing_context(tags=…, metadata=…) per-run tagging
    ├── schema.py     # OTel GenAI semconv attribute names
    └── exporters/    # JsonlSpanExporter + helpers (OTLP via opentelemetry-sdk)

Core Concepts

Providers

Abstract LLM interaction behind a Provider protocol. All providers return MessageStream — an async iterator of StreamEvents.

from cubepi.providers.anthropic import AnthropicProvider
from cubepi.providers.openai import OpenAIProvider
from cubepi.providers import FauxProvider

# Real providers
anthropic = AnthropicProvider(api_key="...")
openai = OpenAIProvider(api_key="...")

# Test provider — no API calls, fully deterministic
faux = FauxProvider()
faux.set_responses(["Hello!", "How can I help?"])

Tools

Declare tools with a name, a Pydantic model for parameters, and an async execute returning AgentToolResult. The framework handles JSON Schema derivation, argument parsing, parallel execution, and error wrapping.

from pydantic import BaseModel
from cubepi import AgentTool
from cubepi.agent.types import AgentToolResult
from cubepi.providers.base import TextContent

class SearchParams(BaseModel):
    query: str

async def execute(tool_call_id, params: SearchParams, *, signal=None, on_update=None):
    return AgentToolResult(content=[TextContent(text=f"Results for: {params.query}")])

tool = AgentTool(
    name="search",
    description="Search the web",
    parameters=SearchParams,
    execute=execute,
    execution_mode="parallel",  # or "sequential"
)

Middleware

Composable hooks that modify behavior without touching the core loop:

from cubepi import Middleware, compose_middleware

class LoggingMiddleware(Middleware):
    async def transform_context(self, messages, *, signal=None):
        print(f"Context has {len(messages)} messages")
        return messages

class SafetyMiddleware(Middleware):
    async def before_tool_call(self, ctx, *, signal=None):
        if ctx.tool_call.name == "dangerous_tool":
            return BeforeToolCallResult(block=True, content="Blocked by policy")
        return None

hooks = compose_middleware([LoggingMiddleware(), SafetyMiddleware()])

Composition rules:

Hook Rule
transform_context Chained — each receives previous result
convert_to_llm Last implementation wins
before_tool_call Any block stops execution
after_tool_call Later overrides earlier
should_stop_after_turn Any true stops

Checkpointer

Persist conversation state with append-only semantics:

from cubepi.checkpointer import MemoryCheckpointer, SQLiteCheckpointer, PostgresCheckpointer

# In-memory for dev/test
cp = MemoryCheckpointer()

# SQLite for lightweight persistence
async with SQLiteCheckpointer("agent.db") as cp:
    agent = Agent(model=model, checkpointer=cp, thread_id="conv-1")

# Postgres for production
async with PostgresCheckpointer("postgresql://...") as cp:
    agent = Agent(model=model, checkpointer=cp, thread_id="conv-1")

FauxProvider for Testing

Ship your agent tests without API keys:

from cubepi.providers import FauxProvider, faux_text, faux_tool_call, faux_assistant_message

provider = FauxProvider()
provider.set_responses([
    faux_assistant_message([
        faux_tool_call("search", {"query": "python"}),
    ]),
    faux_assistant_message("Here are the results..."),
])

agent = Agent(provider=provider, model=Model(id="test", provider="faux"), tools=[search_tool])
agent.subscribe(lambda event, signal=None: None)  # subscribe before prompt to receive events
await agent.prompt("Search for python")
# Streams realistic deltas — content_block_start, text_delta, etc.

Tracing

Attach a Tracer and every agent run produces OpenTelemetry spans aligned with the GenAI Semantic Conventions — ingestible by Jaeger, Tempo, Honeycomb, Datadog, AWS X-Ray, or any OTLP-compatible backend without custom instrumentation:

from cubepi.tracing import Tracer, tracing_context
from cubepi.tracing.exporters import JsonlSpanExporter

async with (
    Tracer(
        service_name="my-bot",
        agent_name="assistant",
        exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
    ) as tracer,
    tracer.attached(agent),
):
    with tracing_context(tags=["beta-arm"], metadata={"user_id": "u-42"}):
        await agent.prompt("Hello.")
# On exit: detach (closes any cancelled-run spans + flush) + tracer shutdown.

Span tree per run:

invoke_agent <agent_name>              [INTERNAL]
└── cubepi.turn                        [INTERNAL]
    ├── chat <model>                   [CLIENT]   ← the LLM call itself
    └── execute_tool <tool_name>       [INTERNAL] ← each tool invocation
        └── tools/call <tool_name>     [CLIENT]   ← MCP-backed tools only

No prompts / model outputs are recorded by default. Opt in with Tracer(record_content=True) plus a redact callback for PII. Pair with Meter(...) for gen_ai.client.operation.duration / TTFC / token-usage histograms. Full guide: https://cubepi.pages.dev/docs/guides/tracing/overview

Inspecting traces from the terminal

With JsonlSpanExporter writing to ./cubepi-traces, inspect runs with the cubepi trace CLI (install the extra: pip install cubepi[trace-cli]). All subcommands take --dir (default ./cubepi-traces):

cubepi trace ls                 # recent runs, newest first; the `input`
                                #   column shows the user message + `status`
cubepi trace view <run_id>      # render a run as a tree; errors print inline
                                #   under the failing span (no flag needed).
                                #   A unique run-id PREFIX is enough.
cubepi trace view <run> --content   # also expand prompts / tool args / results
cubepi trace view <run> -v          # expand ALL span attributes (verbose)
cubepi trace follow <run_id>    # stream spans live as they complete
cubepi trace stats --by model   # token / latency / error aggregates
cubepi trace stats --by tool --since 2026-01-01

Typical debugging flow: ls (find the run by its input), then view <prefix> and read the inline error: line under any ERROR span. Need content only recorded with Tracer(record_content=True).

Token / cache fields. The recorder reconciles to the GenAI semconv, so gen_ai.usage.input_tokens is the inclusive total prompt (input + cache_read + cache_creation) and gen_ai.usage.cache_read.input_tokens is a subset of it. From trace fields, cache hit rate is cache_read / input_tokens (≤ 100%) — do not add cache_read to the denominator.

Coding agents debugging cubepi/consumer apps can install the bundled cubepi-trace skill: npx skills add https://github.com/cubeplexai/cubepi/tree/main/skills/cubepi-trace -a claude-code.

Requirements

  • Python >= 3.11
  • Core: pydantic, anthropic, openai
  • Optional: aiosqlite ([sqlite]), asyncpg + sqlalchemy + msgpack ([postgres]), mcp ([mcp]), opentelemetry-sdk ([tracing]), opentelemetry-exporter-otlp-proto-http ([tracing-otlp])

Credits

Architecture inspired by pi-agent-core (TypeScript); CubePi is an independent Python reimplementation with Pydantic v2, asyncio-native primitives, and built-in checkpointing.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cubepi-0.5.0.tar.gz (1.3 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cubepi-0.5.0-py3-none-any.whl (135.8 kB view details)

Uploaded Python 3

File details

Details for the file cubepi-0.5.0.tar.gz.

File metadata

  • Download URL: cubepi-0.5.0.tar.gz
  • Upload date:
  • Size: 1.3 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for cubepi-0.5.0.tar.gz
Algorithm Hash digest
SHA256 069cfe8160c9af3ebca6e40ee9ee7008aac3e87296709a60ebf92ecb94cc6152
MD5 c34f080cbf251da99836850904883ee1
BLAKE2b-256 150a33d54356624d56f7b42295e1ac19603e70641553c0be7cda9f9ff568804f

See more details on using hashes here.

Provenance

The following attestation bundles were made for cubepi-0.5.0.tar.gz:

Publisher: publish.yml on cubeplexai/cubepi

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file cubepi-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: cubepi-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 135.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for cubepi-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9d1f02c35bb9b82d9a4ba2489c6c32c1c7ad9b7626d924217a23ca59c8021d86
MD5 13b09cc84a88ec3ff28cf6aedbdec432
BLAKE2b-256 36783223beccc1113a83ab00d98dc4b27547768b0b47cd7518a895f2acded21f

See more details on using hashes here.

Provenance

The following attestation bundles were made for cubepi-0.5.0-py3-none-any.whl:

Publisher: publish.yml on cubeplexai/cubepi

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page