AI-native observability for Python pipelines and agentic workflows
Project description
Nebo
Lightweight observability for Python programs. Decorate your functions with @nb.fn(), and nebo automatically infers a DAG from your call graph, captures logs, metrics, inspections, and errors -- all queryable in real time via CLI, MCP tools, or a Rich terminal dashboard.
Installation
pip install nebo
The CLI entry point is nb:
nb --help
Quick Start
import nebo as nb
@nb.fn()
def load_data(path: str = "data.csv") -> list[dict]:
"""Load records from a file."""
records = [{"id": i, "value": i * 0.5} for i in range(100)]
nb.log(f"Loaded {len(records)} records from {path}")
return records
@nb.fn()
def transform(records: list[dict]) -> list[dict]:
"""Normalize values."""
out = []
for r in nb.track(records, name="transforming"):
out.append({**r, "value": r["value"] / 50.0})
nb.log(f"Transformed {len(out)} records")
nb.log_metric("record_count", float(len(out)))
return out
@nb.fn()
def run():
"""Main pipeline entry point."""
records = load_data()
result = transform(records)
nb.log(f"Pipeline complete: {len(result)} records")
return result
if __name__ == "__main__":
run()
Running this produces a Rich terminal display showing the DAG, node execution counts, logs, and progress bars. The DAG edges (run -> load_data, load_data -> transform) are inferred automatically from data flow -- no manual wiring required.
Core Concepts
@nb.fn() -- Register a function as a DAG node
Every function decorated with @nb.fn() becomes a node in the pipeline DAG. Edges are inferred from data flow: when a node's return value is passed as an argument to another node, an edge is created from the producer to the consumer.
@nb.fn()
def load_data():
return [1, 2, 3]
@nb.fn()
def transform(data):
return [x * 2 for x in data]
@nb.fn()
def run():
records = load_data() # edge: run -> load_data (no data dependency)
result = transform(records) # edge: load_data -> transform (data flows from load_data)
return result
When a child node receives no node-produced arguments, the edge falls back to the calling parent node.
You can use it in several ways:
@nb.fn # bare decorator
@nb.fn() # with parentheses
@nb.fn(depends_on=[other_fn]) # with explicit dependencies
@nb.fn(ui={"collapsed": True}) # with per-node UI hints
Class Decoration
@nb.fn() can be applied to classes. All methods are wrapped with scope tracking, and the class name becomes a visual group in the DAG:
@nb.fn()
class Agent:
def think(self, query):
nb.log(f"Thinking about: {query}")
return {"plan": "respond"}
def act(self, plan):
nb.log(f"Acting on: {plan}")
return "result"
agent = Agent()
agent.think("hello")
agent.act({"plan": "respond"})
Methods appear as Agent.think and Agent.act in the DAG, grouped under Agent.
Lazy Materialization
Nodes only become visible in the DAG when they produce observable output (via nb.log(), nb.log_metric(), etc.). Functions that run silently are registered internally but don't clutter the visualization.
depends_on -- Explicit dependency declaration
Some dependencies cannot be detected automatically (shared mutable state, class attributes, global variables). Use depends_on to declare these explicitly:
@nb.fn()
def setup():
"""Initialize shared resources."""
...
@nb.fn(depends_on=[setup])
def process():
"""Uses resources initialized by setup."""
...
nb.log(message) -- Text logging
Log a message to the current node. Messages appear in the terminal dashboard and are queryable via MCP tools.
@nb.fn()
def train(data):
nb.log(f"Training on {len(data)} samples")
for epoch in range(10):
loss = do_train(data)
nb.log(f"Epoch {epoch}: loss={loss:.4f}")
nb.log_metric(name, value, step=None) -- Scalar metrics
Log scalar metrics with automatic step counting.
@nb.fn()
def train(model, data):
for epoch in range(100):
loss = train_one_epoch(model, data)
nb.log_metric("loss", loss)
nb.log_metric("lr", optimizer.param_groups[0]["lr"])
nb.log_cfg(cfg) -- Configuration logging
Log configuration for the current node.
@nb.fn()
def train(lr=0.001, epochs=50):
nb.log_cfg({"lr": lr, "epochs": epochs})
...
nb.track(iterable, name=None, total=None) -- Progress tracking
Wrap any iterable for tqdm-like progress tracking.
@nb.fn()
def process(items):
for item in nb.track(items, name="processing"):
transform(item)
nb.log_image(image, name=None, step=None) -- Image logging
Log images (PIL, NumPy arrays, or PyTorch tensors) for visual inspection.
nb.log_audio(audio, sr=16000, name=None, step=None) -- Audio logging
Log audio data for playback and analysis.
nb.log_text(name, text) -- Rich text / Markdown logging
Log formatted text or Markdown content.
nb.md(description) -- Workflow description
Set a workflow-level description (Markdown supported). Visible in MCP tools and the dashboard.
nb.md("A pipeline that loads images, runs inference, and exports predictions.")
nb.ui() -- Run-level UI defaults
Set default layout and display options for the web UI:
nb.ui(layout="horizontal", view="dag", minimap=True, theme="dark")
nb.ask(question, options=None, timeout=None) -- Human-in-the-loop
Pause the pipeline and ask the user a question via MCP or the terminal.
@nb.fn()
def review(predictions):
answer = nb.ask(
"Model accuracy is 73%. Continue training?",
options=["yes", "no", "retrain with more data"]
)
if answer == "no":
return predictions
...
CLI Reference
Start the daemon server
nb serve # foreground
nb serve -d # background (daemon mode)
nb serve --port 3000 # custom port
nb serve --no-store # disable .nebo file storage
Run a pipeline
nb run my_pipeline.py
nb run my_pipeline.py --name "experiment-1"
Load a .nebo file
nb load .nebo/2026-04-06_143000_run-1.nebo
Check status, logs, errors
nb status
nb logs
nb logs --run experiment-1 --node train --limit 50
nb errors
nb errors --run experiment-1
Stop the daemon
nb stop
MCP integration
nb mcp # print Claude Code MCP config
MCP Tools for AI Agents
Nebo exposes 15 MCP tools for querying and controlling pipelines from an AI agent (e.g., Claude). The daemon server must be running.
Observation Tools
| Tool | Description |
|---|---|
nebo_get_graph |
Full DAG structure: nodes, edges, execution counts |
nebo_get_node_status |
Detailed status for one node: logs, metrics, errors, params |
nebo_get_logs |
Recent log entries, filterable by node and run |
nebo_get_metrics |
Metric time series for a node |
nebo_get_errors |
All errors with full tracebacks and node context |
nebo_get_description |
Workflow description and all node docstrings |
Action Tools
| Tool | Description |
|---|---|
nebo_run_pipeline |
Start a pipeline script, returns a run ID |
nebo_stop_pipeline |
Stop a running pipeline by run ID |
nebo_restart_pipeline |
Stop and re-run a pipeline with same args |
nebo_get_run_status |
Status of a specific run (running/completed/crashed) |
nebo_get_run_history |
List all runs with outcomes and timestamps |
nebo_get_source_code |
Read a pipeline source file |
nebo_write_source_code |
Write or patch a pipeline source file |
nebo_ask_user |
Send a question to the user via the terminal |
nebo_wait_for_event |
Block until a pipeline event occurs or timeout elapses |
.nebo File Format
Runs are persisted as .nebo binary files using MessagePack serialization. Each file contains a header (magic, version, metadata) followed by append-only event entries. Use nb load to replay a file into the daemon.
Architecture
+----------------+ +------------------+ +------------------+
| Your Python |---->| Nebo SDK |---->| Daemon Server |
| Pipeline | | (@fn, log, | | (FastAPI, |
| | | track, ...) | | port 2048) |
+----------------+ +--------+---------+ +--------+---------+
| |
+-------v-------+ +--------------+---------------+
| Terminal | | | |
| Dashboard | | +------v------+ +------v------+
| (Rich) | | | MCP Tools | | Web UI |
+--------------+ | | (Claude) | | |
| +-------------+ +-------------+
+-----v-----+
| CLI |
| nb |
+-----------+
Two execution modes:
- Local mode (default): In-process only. No daemon needed.
- Server mode: Events stream to a persistent daemon via HTTP. Use
nb serveto start the daemon, thennb runto execute pipelines.
API Reference
Module: nebo
| Function | Signature | Description |
|---|---|---|
fn |
@fn(), @fn(depends_on=[...]), @fn(ui={...}) |
Register a function/class as a DAG node |
log |
log(message: str) |
Log a text message |
log_metric |
log_metric(name, value, step=None) |
Log a scalar metric |
log_cfg |
log_cfg(cfg: dict) |
Log node configuration |
log_image |
log_image(image, name=None, step=None) |
Log an image |
log_audio |
log_audio(audio, sr=16000, name=None, step=None) |
Log audio data |
log_text |
log_text(name, text) |
Log rich text / Markdown |
track |
track(iterable, name=None, total=None) |
Progress tracking |
md |
md(description: str) |
Set workflow description |
ui |
ui(layout, view, collapsed, minimap, theme) |
Set run-level UI defaults |
init |
init(port, host, mode, backends, terminal, dag_strategy, flush_interval, store) |
Manual initialization |
ask |
ask(question, options=None, timeout=None) |
Human-in-the-loop prompt |
get_state |
get_state() -> SessionState |
Access the global state singleton |
Logging Backends
Implement the LoggingBackend protocol to send events to external systems:
from nebo import LoggingBackend
class MyBackend:
def on_log(self, node: str, message: str, timestamp: float) -> None: ...
def on_metric(self, node: str, name: str, value: float, step: int) -> None: ...
def on_image(self, node: str, name: str, image_bytes: bytes, step: int) -> None: ...
def on_audio(self, node: str, name: str, audio_bytes: bytes, sr: int) -> None: ...
def on_node_start(self, node: str, params: dict) -> None: ...
def on_node_end(self, node: str, duration: float) -> None: ...
def flush(self) -> None: ...
def close(self) -> None: ...
nb.init(backends=[MyBackend()])
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file nebo-0.1.3.tar.gz.
File metadata
- Download URL: nebo-0.1.3.tar.gz
- Upload date:
- Size: 32.1 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8d9f61f3b5b443294575a3ba9b6ffb38b0d8fba04004ee7cc060628638edcce1
|
|
| MD5 |
63be56c42f26331d3081401ecf00cc5c
|
|
| BLAKE2b-256 |
eea26b816f85721fa1303a22615a825e74c9a8a169b66335ff0a9c33f262a808
|
Provenance
The following attestation bundles were made for nebo-0.1.3.tar.gz:
Publisher:
release.yml on graphbookai/nebo
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
nebo-0.1.3.tar.gz -
Subject digest:
8d9f61f3b5b443294575a3ba9b6ffb38b0d8fba04004ee7cc060628638edcce1 - Sigstore transparency entry: 1245466386
- Sigstore integration time:
-
Permalink:
graphbookai/nebo@12bde42e97eef38027210316e25326bff19497a8 -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/graphbookai
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@12bde42e97eef38027210316e25326bff19497a8 -
Trigger Event:
push
-
Statement type:
File details
Details for the file nebo-0.1.3-py3-none-any.whl.
File metadata
- Download URL: nebo-0.1.3-py3-none-any.whl
- Upload date:
- Size: 461.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
828ecdf78310b44a447dac6dfb90c376d6c0cf4891b37ab8ae689ff8706c8d2c
|
|
| MD5 |
ab33dd56e1f7379ebb3843fdb8b77e23
|
|
| BLAKE2b-256 |
174e29bacbcb771795461153fff59e502e3846429142056bbba231238c6d0ed5
|
Provenance
The following attestation bundles were made for nebo-0.1.3-py3-none-any.whl:
Publisher:
release.yml on graphbookai/nebo
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
nebo-0.1.3-py3-none-any.whl -
Subject digest:
828ecdf78310b44a447dac6dfb90c376d6c0cf4891b37ab8ae689ff8706c8d2c - Sigstore transparency entry: 1245466394
- Sigstore integration time:
-
Permalink:
graphbookai/nebo@12bde42e97eef38027210316e25326bff19497a8 -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/graphbookai
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@12bde42e97eef38027210316e25326bff19497a8 -
Trigger Event:
push
-
Statement type: