A framework built by AI, for AI: durable, secure, full-stack Python.
Reason this release was yanked:
buggy
Project description
Bao
A framework built by AI, for AI: the runtime an AI agent reaches for to build, verify, and operate a full-stack app, because an agent can set it up correctly and run it safely.
Python, server-side templating (Jinja2 + Alpine.js + HTMX), secure and bounded by default.
The distribution name on PyPI is bao-framework; the import name is bao.
Quick start
python -m venv .venv
.venv\Scripts\activate # Windows
# Install from PyPI
pip install bao-framework
# pip install "bao-framework[vec]" # add sqlite-vec for accelerated vector search
# Or develop on Bao itself from a checkout:
# pip install -e .
# pip install -e ".[vec]"
bao init --name "My App" # scaffold a new project here
bao run # dev server on http://127.0.0.1:8000
bao serve --host 0.0.0.0 # production server (uvicorn workers, no reload)
bao check # smoke-load the app, list routes
bao describe # structured description (seed of the agent/MCP surface)
bao config # print resolved configuration (secrets redacted)
bao test # run the test suite (pytest, zero-config)
bao make:migration "create posts" && bao migrate # data layer ready
bao make:crud Task name:str count:int done:bool due:datetime # typed vertical slice
bao make:auth # email + password auth (User model, login/register/logout, argon2)
bao make:api Note # JSON CRUD API for an existing model (mounted at /api/v1/<plural>)
bao tokens:issue USER_ID # mint an API bearer token; shown exactly once
bao tokens:list USER_ID # list a user's tokens
bao tokens:revoke TOKEN_ID # revoke a token by id or plaintext
# /openapi.json + /docs are auto-mounted: an OpenAPI 3.1 spec and a Swagger UI page.
bao make:dockerfile # scaffold a Dockerfile + .dockerignore
make:crud accepts name[:type] per field. Supported types: str (default), int,
float, bool, datetime. Scaffolded controllers coerce form input, scaffolded views
pick the right <input> element, and a smoke test is generated alongside.
What's in the box
Major subsystems shipped in v0.1.0. Each links to the deeper section in
llms.txt and the source module that implements it.
| Subsystem | One line | Source |
|---|---|---|
| Data layer | Bao.Model on sync SQLAlchemy 2.0 + Alembic |
src/bao/db.py |
| Auth | email + password, sessions, CSRF, API tokens | src/bao/auth.py, src/bao/tokens.py |
| Tenant scoping | per-model opt-in, default-deny when scoped | src/bao/db.py, src/bao/tenants.py |
| Workflows | @workflow / step(), retries, signals, replay |
src/bao/workflows.py |
| Tasks | @task / background() / @every, bounded pools |
src/bao/tasks.py |
| Fault tolerance | @retry, CircuitBreaker, DLQ |
src/bao/fault_tolerance.py |
| Observability | request id, JSON logs, audit log | src/bao/observability.py |
| Production endpoints | /healthz, /readyz, /metrics, graceful shutdown |
src/bao/health.py, src/bao/metrics.py |
| AI router | ai.ask, ai.embed, ai.complete, ai.stream + budgets |
src/bao/ai/ |
| AI agents | @agent, @tool, FSM + native tool-use, HITL approvals |
src/bao/agents.py |
| Memory + RAG | short-term + vector (sqlite-vec accel), RAG pipeline | src/bao/memory.py, src/bao/rag.py |
| API resources | @api + OpenAPI / Swagger UI auto-emitted |
src/bao/api.py, src/bao/openapi.py |
| Plugins | capability-gated extensions, lifecycle hooks | src/bao/plugins.py |
| Live channels | SSE + WebSocket dispatch + workflow / agent built-ins | src/bao/channels.py |
| Admin dashboard | server-rendered ops UI at /_bao/admin |
src/bao/admin.py |
| MCP server | stdio bao mcp:serve for AI agents |
src/bao/mcp_server.py |
| Demo apps | example/ (tutorial) + examples/support_triage/ (E2E) |
top of repo |
Data layer
Models live in app/models/ and use SQLAlchemy 2.0 with Eloquent-style ergonomics on a
Model base. SQLite by default (zero config), swap to PostgreSQL with one env var:
BAO_DATABASE_URL=postgresql+psycopg://user:pass@host/db
from bao import Model
from sqlalchemy.orm import Mapped, mapped_column
class Post(Model):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
body: Mapped[str]
Post.create(title="Hi", body="...")
posts = Post.all()
Relationships ship as two helpers (belongs_to, has_many) on top of SQLAlchemy
2.0's relationship():
from bao import Model, belongs_to, has_many
class Post(Model):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
comments = has_many("Comment", back_populates="post")
class Comment(Model):
__tablename__ = "comments"
id: Mapped[int] = mapped_column(primary_key=True)
body: Mapped[str]
post_id, post = belongs_to(Post, back_populates="comments")
bao make:crud Note title:str author_id:ref:User emits the FK column +
belongs_to(User) wiring.
Migrations (Alembic under the hood, no alembic.ini to manage):
bao make:migration "create posts" # autogenerate from current models
bao migrate # apply pending migrations
bao migrate:rollback # roll back the last migration
Deploy
bao serve # production server: uvicorn workers, no auto-reload
bao make:dockerfile # scaffold a Dockerfile + .dockerignore
Publishing to PyPI
The bao package is built with hatchling. Releases ship as a wheel + sdist; the
operator runs the actual twine upload step manually so an automated pipeline
cannot accidentally publish.
One-time setup:
pip install -e ".[dev]" # adds build + twine to the venv
# Register on https://pypi.org, generate an API token, and store it:
$env:TWINE_USERNAME = "__token__"
$env:TWINE_PASSWORD = "pypi-<your-token>"
Release flow:
-
Bump the version in
pyproject.tomlandsrc/bao/__init__.py. -
Add a Changelog entry under
CHANGELOG.md. -
Commit, then tag:
git tag v0.1.0 && git push --tags. -
Build + verify locally:
./scripts/publish.ps1 # Windows ./scripts/publish.sh # Linux / macOS
The script cleans
dist/, runspython -m build, and runstwine check. It then prints the next command instead of running it. -
Upload manually:
python -m twine upload dist/*
The build emits both bao-<ver>-py3-none-any.whl and bao-<ver>.tar.gz; the
wheel includes the admin templates under src/bao/templates/admin/.
OpenAPI security
/openapi.json declares two ways to authenticate under
components.securitySchemes:
bearer(HTTP bearer,bearerFormat: bao-token): API tokens minted withbao tokens:issue USER_ID, sent asAuthorization: Bearer <token>.cookieSession(API key in thebao_sessioncookie): Starlette's signed session cookie set after a browser login.
Each operation also carries tags=[ResourceClassName] so generated SDKs cluster
the actions together. The framework decides which operations need auth from
your existing decorators:
from bao import Resource, requires_login
from bao.api import api
# Fully protected: every operation lists both schemes under `security`.
@api("/api/v1/secret_posts", requires_login=True)
class SecretPostsApi(Resource):
model = Post
# Mixed: only the per-method @requires_login actions list `security`.
@api("/api/v1/mixed_posts")
class MixedPostsApi(Resource):
model = Post
async def index(self, request):
return await Resource.index(self, request)
@requires_login
async def create(self, request):
return await Resource.create(self, request)
Testing (built in)
Tests live in tests/. The built-in client exercises the full request path (routing, DI,
views, middleware) without a server, and dependency injection makes mocking trivial:
from bao.testing import TestClient
from main import application
def test_home():
TestClient(application).get("/").assert_ok().assert_see("Welcome")
AI (multi-model router)
A single bao.ai API hides provider differences and supports fallback +
bounded token / cost budgets.
from bao import ai
text = ai.ask("explain backpressure in one sentence")
vectors = ai.embed(["alpha", "beta"])
App() installs an Echo-only router by default (deterministic test stand-in,
returns the prompt reversed). Production projects swap in real providers:
from bao import ai
from bao.ai import Router, AnthropicProvider, OpenAIProvider
ai.configure(Router(
providers=[AnthropicProvider(), OpenAIProvider()],
default="anthropic",
))
Bound an agent's spend with a budget:
from bao.ai import with_budget
with with_budget(max_tokens=2000, max_cost_usd=0.10):
ai.ask("summarise the docs") # raises BudgetExceeded on overflow
Optional SDKs for real providers:
pip install anthropic # AnthropicProvider
pip install openai # OpenAIProvider
# OllamaProvider needs only httpx (already a Bao dep)
CLI: bao ai:ask "prompt", bao ai:stream "prompt", bao ai:providers.
Streaming completions
ai.stream(prompt) returns an iterator of text chunks; ai.stream_messages(messages)
takes a full chat. The router picks the first provider whose stream() is
implemented and doesn't fail before the first chunk:
for chunk in ai.stream("write three taglines"):
print(chunk, end="", flush=True)
Echo yields one character per chunk (deterministic for tests); Anthropic / OpenAI / Ollama wrap their native streaming APIs. Budgets deduct when the iterator closes.
AI agents (@agent + @tool + HITL)
An agent is a Python class with tools. Bao runs a bounded FSM that lets the
model emit CALL: tool(...) and FINAL: ... directives, executes the tool,
and feeds the result back. Bounded by max_steps, optionally by a token /
cost budget, and gated by per-tool approval flags.
from bao.agents import agent, tool
@agent("ResearchBot", max_steps=10, max_tokens=5000)
class ResearchBot:
@tool()
def search(self, q: str) -> str:
return "..."
@tool(requires_approval=True, scope="write")
def post_to_slack(self, channel: str, text: str) -> str:
return slack.post(channel, text)
A @tool(requires_approval=True) writes an AgentApprovalRequest row and the
loop raises AwaitingApproval. An operator decides via bao agents:approve ID / bao agents:deny ID; calling the agent again with the same run_id
consumes the approval and proceeds.
bao agents:list # registered agents
bao agents:run AGENT "msg" # run in the foreground
bao agents:pending # list awaiting approvals
bao agents:approve REQUEST_ID
bao agents:deny REQUEST_ID
Migration: bao make:migration "create agent approvals" && bao migrate.
Native tool-use (transparent)
When the active router contains a provider that declares supports_native_tools = True (Anthropic, OpenAI, or the test-only MockNativeProvider), the agent
loop swaps the CALL: / FINAL: FSM for the provider's native tool-use
protocol. @agent and @tool declarations are unchanged; the dispatch happens
under the hood, HITL approvals still fire, and the budget bookkeeping is
identical. Echo, Ollama, and ScriptedProvider keep the FSM path.
Tool input_schema is inferred from each tool's Python signature
(str / int / float / bool only). Anything else falls back to string and
should be supplied explicitly via a tool(input_schema=...) if you need
richer validation; real-world tool surfaces tend to take string identifiers
and a small handful of scalars, so this covers the common case. Install the
SDK (pip install anthropic / pip install openai) and an API key to use the
real providers; without them the FSM still runs against Echo or Scripted.
Memory (short-term + vector)
from bao import remember, recall, VectorMemory
remember("user asked about retries") # short-term ring buffer (max 50)
notes = recall(10)
vm = VectorMemory("knowledge_base", dim=32)
vm.add("Bao bounds every background task by default.")
hits = vm.search("how are background tasks bounded?", k=3)
Optional sqlite-vec acceleration: pip install sqlite-vec (or pip install bao[vec]). When the extension is loadable and the engine is SQLite,
VectorMemory and bao.rag.retrieve route through a vec0 virtual table
for nearest-neighbour search; the pure-Python cosine walk remains as the
fallback. Production projects with large collections should still consider
pgvector / qdrant.
CLI: bao memory:add NAME "text", bao memory:search NAME "query" --k 5,
bao memory:prune NAME --max-age 30d --max-items 1000.
Honest caveat: ai.embed uses the active router. With EchoProvider the
embeddings are hash-derived (deterministic but not semantic); real semantic
recall requires a real embedding model.
RAG (ingest, retrieve, ask-with-context)
from bao.rag import ingest, retrieve
from bao import ai
doc = ingest("Bao notes", text, source="docs/notes.md")
chunks = retrieve("how does bao bound tasks?", k=4)
answer = ai.ask_with_context("how does bao bound tasks?", k=4)
@agent(rag=True) opts an agent's loop into automatic retrieval before each
step. Retrieved chunks are wrapped in a CONTEXT (data, not instructions)
envelope so the model treats them as background data.
CLI: bao rag:ingest --title T --from-file PATH, bao rag:retrieve "q" --k 5,
bao rag:ask "question".
Migration: bao make:migration "create documents chunks" && bao migrate.
Honest caveats: character-based chunker (not tokenizer-aware), pure-Python cosine ranking, and embeddings depend on the active router. With Echo the ranking is structural; production retrieval needs a real embedding model.
MCP server
bao mcp:serve runs an MCP stdio server publishing Bao's CLI surface as tools.
Agents can both build (bao_describe, bao_check, bao_list,
bao_inspect, bao_routes_table, bao_make_controller, bao_make_crud) and
operate the running system (bao_runs, bao_trace, bao_agent_trace,
bao_health, bao_dlq_list, bao_logs, plus the write tools
bao_workflow_replay, bao_workflow_resume, bao_dlq_replay,
bao_agent_approve, bao_agent_deny). Read tools are always on; write tools
are gated by BAO_MCP_ALLOW_WRITE=1. Wire it into any MCP client (Claude Code,
Cursor):
{
"mcpServers": {
"bao": {
"command": "bao",
"args": ["mcp:serve"]
}
}
}
Frontend (Alpine + HTMX) and live channels
Bao does not ship a frontend library. The scaffolded layout loads Alpine.js
(https://alpinejs.dev) and HTMX (https://htmx.org) from unpkg, used as-is and
credited inline. Set BAO_CSP (or Config.csp) to override the default CSP if
you self-host these libraries instead.
A channel is a kind + id pair that resolves to an SSE URL. The built-in workflow
channel streams rendered step HTML for a running workflow until it completes:
from bao.channels import channel
url = channel("workflow", run_id) # /_bao/channels/workflow/<run_id>
Templates can render the URL via the channel(...) Jinja global. The example home
page wires a working HTMX + SSE demo against the bundled sample workflow.
WebSocket channels (bidirectional)
Beyond SSE, Bao exposes a WebSocket surface at
/_bao/ws/{kind}/{channel_id:path}. Handlers register via
register_ws_channel(kind, handler) with the signature
async def handler(websocket, *parts) -> None; the framework runs the same
current_user auth as HTTP and accepts the socket before dispatch. The
built-in agent_stream channel subscribes to the in-process
bao.agents.<run_id> topic and forwards each event as a JSON message.
Agent streaming
When an @agent runs, the loop publishes structured events on
bao.agents.<run_id>:
{type: "delta", text}token chunks (FSM path; provider must supportstream).{type: "call", tool, args, kwargs}and{type: "tool_result", tool, result}before / after each tool invocation.{type: "final", text}then{type: "done", status}when the run finishes.
Tail the events live with bao agents:run NAME "msg" --stream (subscribes
locally), or open the agent_stream WS channel from a browser. Native
tool-use paths emit boundary events but not token deltas; the FSM path
streams both.
from bao.channels import register_ws_channel
async def echo(ws, *parts):
msg = await ws.receive_text()
await ws.send_text(msg)
register_ws_channel("echo", echo)
Fault tolerance
from bao import retry, CircuitBreaker, dead_letter
@retry(attempts=3, backoff=0.5)
def fetch_user(uid): ...
cb = CircuitBreaker(failure_threshold=5, recovery_timeout=30.0)
with cb:
call_payments(...)
dead_letter("emails", {"to": "alice"}, "smtp failed", attempts=3)
bao dlq:list [--queue X] [--tail N] # tail the dead-letter queue
bao dlq:replay ID # mark replayed and print the payload
Background tasks
Three primitives, all in-process, bounded by default:
@task(kind="io" | "cpu", max_concurrency=None)submits to a managed thread pool (IO) or process pool (CPU) and returns a Future.background(fn, *args, **kwargs)is fire-and-forget on the IO pool; failures are logged and never re-raised.@every("30s" | "5m" | "1h")or@every("every day at 06:30")/@every("monday at 9am")registers a recurring schedule; turn the daemon scheduler on withBAO_ENABLE_SCHEDULER=1or viabao scheduler:run.
Each pool is bounded by Config.max_inflight_tasks (default 500). @task calls
raise bao.tasks.QueueFull on overflow; background() logs and drops.
bao tasks:list # enumerate @task / @every entries
bao tasks:run NAME # invoke a task synchronously
bao scheduler:run [--once] # run the @every scheduler
Plugins
Plugins extend Bao via a capability-gated, lifecycle-managed object: a Plugin
subclass declares its scope (network hosts, env-var secrets, db, routes, services)
in a Capabilities dataclass, then registers routes / DI bindings during
app.use(plugin). Optional async startup() / shutdown() fire from the ASGI
lifespan. Use bao plugins:audit to see what each plugin has declared.
Admin dashboard
A server-rendered operator dashboard ships at /_bao/admin (login-required by
default). Pages cover an overview (recent runs, pending approvals, DLQ size,
audit log size, registered routes / tasks / agents / plugins), a paginated
workflow list, pending agent approvals (with approve / deny buttons),
the dead-letter queue, and the audit log tail.
Templates live under src/bao/templates/admin/ and mount via a Jinja
ChoiceLoader, so a project can shadow any page by writing a file at the
same relative path (e.g. app/views/admin/index.html).
Disable the surface entirely with Config.admin_enabled = False (env
BAO_ADMIN_ENABLED=0).
Workflow visualization and replay
Each WorkflowRun has a detail page at /_bao/workflows/<run_id> that shows
the step graph (ord, name, status, attempts, error, JSON result preview) and
subscribes to the existing workflow SSE channel so step status updates
land live. Failed runs get a "Replay" button that re-invokes the workflow via
bao.workflows.replay(run_id); cached step results short-circuit completed
steps, so only failed / missing ones run again.
replay(..., deterministic_check=True) re-runs every cached step in a
throwaway sandbox and logs a WARNING on divergence, useful for catching
workflows that read non-step data from time.time() or random.
CLI: bao workflows:replay RUN_ID [--deterministic-check].
Production endpoints
Three operator-facing endpoints ship mounted by default:
GET /healthzis the liveness probe: 200{"ok": true}until the ASGI shutdown begins, then 503{"ok": false, "reason": "shutting_down"}.GET /readyzis the readiness probe: pings the DB withSELECT 1and reports any plugin whosestartup()raised. 200 or 503 with achecksdict naming which dependency tripped.GET /metricsis the Prometheus text-format scrape (Config.metrics_enabledtoggles it off). Built-in counters cover HTTP requests, workflow runs and steps, agent runs, plus an in-flight task gauge and an HTTP latency histogram.
Graceful shutdown on ASGI lifespan stop: a ShutdownShedMiddleware returns
503 for new HTTP requests (with /healthz and /readyz allowlisted so
operators can read the reason), bao.tasks.drain_pools waits up to
Config.shutdown_drain_timeout seconds for in-flight work, the @every
scheduler is signalled to exit, and every plugin's async shutdown() runs
in reverse registration order.
Security defaults
Bao ships secure by default:
- Security headers on every response (CSP, X-Frame-Options, X-Content-Type-Options, Referrer-Policy).
- CSRF enforced on unsafe-method form posts. Scaffolded forms include
<input type="hidden" name="_csrf" value="{{ csrf_token }}">; templates auto-receivecsrf_tokenfrom the active request's session. - Signed session cookies (
bao_session) keyed byBAO_SECRET_KEY. - Jinja2 autoescape on by default; parameterized queries via SQLAlchemy 2.0.
- Secrets sourced from environment (
BAO_*vars), never hard-coded.
Conventions (Phase 0)
app/
├── controllers/ # @controller("/base") classes; methods via @get("/") / @post("/")
├── models/ # (later phases)
└── views/ # Jinja2 templates, autoescaped; render with view("name.html", **ctx)
main.py # exposes `application = App(...)`
Type-annotated handler params are resolved from the service container (dependency injection).
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file bao_framework-0.1.1.tar.gz.
File metadata
- Download URL: bao_framework-0.1.1.tar.gz
- Upload date:
- Size: 168.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d557b71150995c0ee46c7551ed566d3e22510dc45e0ceecad2db1c40adec92eb
|
|
| MD5 |
b22af49b7732b512840b5c8420916ed5
|
|
| BLAKE2b-256 |
d1af9fcda001214f9fbd6ede6b17b6c3d648a57ec171f2ec74b88b494d0610ff
|
File details
Details for the file bao_framework-0.1.1-py3-none-any.whl.
File metadata
- Download URL: bao_framework-0.1.1-py3-none-any.whl
- Upload date:
- Size: 169.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
75743d3707af0f87a08b2d7843acfe27bcd4bfc61b814b891cbada28616fcdcf
|
|
| MD5 |
a6043db73ac30a6c0fb29017e75d7fb8
|
|
| BLAKE2b-256 |
778226ba827003ce5da9e6b9edffc7e323c22307ed25d0295c4b60bcd4c9db29
|