Skip to main content

Drop-in observability for LangGraph and CrewAI — captures every run, node, tool call, token count, prompt, and response into MongoDB or PostgreSQL

Project description

stakeout-agent

Drop-in observability for LangGraph and CrewAI.

One callback. Every run, node, tool call, token count, prompt, and response — captured automatically into MongoDB or PostgreSQL. No changes to your agent code.

PyPI Python versions License: MIT CI uv Ruff

Dashboard timeline view


Install and go

# LangGraph + MongoDB
pip install 'stakeout-agent[langgraph,mongodb]'

# LangGraph + PostgreSQL
pip install 'stakeout-agent[langgraph,postgres]'

# CrewAI + MongoDB
pip install 'stakeout-agent[crewai,mongodb]'

# CrewAI + PostgreSQL
pip install 'stakeout-agent[crewai,postgres]'
from stakeout_agent import LangGraphMonitorCallback

monitor = LangGraphMonitorCallback(graph_id="my_graph", thread_id="thread_123")
result = graph.invoke(inputs, config={"callbacks": [monitor]})

That's it. Every node execution, tool call, latency, token count, prompt, response, and error is now in your database.


How it works

graph LR
    A[Your LangGraph / CrewAI app] -->|callback| B[stakeout-agent]
    B --> C[(MongoDB)]
    B --> D[(PostgreSQL)]
    C --> E[Dashboard / your queries]
    D --> E

stakeout-agent hooks into your framework's event system. It records a run document for each invocation and an event document for every node start/end, tool call, tool result, and error — with latency, token usage, and the actual prompts and responses captured at every step.


Why stakeout-agent?

stakeout-agent
Lines of integration code 3
Crashes your app on DB failure Never — errors are logged, not raised
Node-level latency (P95) Yes — tracked per node and per tool
Token usage Yes — per node and rolled up to the run
Cost estimation Yes — opt-in, configurable per model
Prompt & response capture Yes — per node, opt-out, truncation supported
Frameworks LangGraph + CrewAI
Backends MongoDB + PostgreSQL
Dashboard included Yesdedicated real-time observability UI

Installation

Install only what you need — framework and backend are independent extras:

# LangGraph + MongoDB
pip install 'stakeout-agent[langgraph,mongodb]'

# LangGraph + PostgreSQL
pip install 'stakeout-agent[langgraph,postgres]'

# CrewAI + MongoDB
pip install 'stakeout-agent[crewai,mongodb]'

# CrewAI + PostgreSQL
pip install 'stakeout-agent[crewai,postgres]'
Extra Installs Use when
langgraph langchain-core, langgraph Using LangGraph
crewai crewai Using CrewAI
mongodb pymongo Storing to MongoDB
postgres psycopg2-binary Storing to PostgreSQL

Requires Python 3.10+.


Quick start

LangGraph — Sync

from stakeout_agent import LangGraphMonitorCallback

monitor = LangGraphMonitorCallback(graph_id="my_graph", thread_id="thread_123")
result = graph.invoke(inputs, config={"callbacks": [monitor]})

LangGraph — Async

from stakeout_agent import AsyncLangGraphMonitorCallback

monitor = AsyncLangGraphMonitorCallback(graph_id="my_graph", thread_id="thread_123")
result = await graph.ainvoke(inputs, config={"callbacks": [monitor]})

CrewAI — Sync

from stakeout_agent import CrewAIMonitorCallback

monitor = CrewAIMonitorCallback(crew_id="my_crew", thread_id="thread_123")
crew.kickoff(inputs={...})

CrewAIMonitorCallback registers itself with CrewAI's event bus automatically — no extra wiring needed.

CrewAI — Async

from stakeout_agent import AsyncCrewAIMonitorCallback

monitor = AsyncCrewAIMonitorCallback(crew_id="my_crew", thread_id="thread_123")
await crew.akickoff(inputs={...})

One instance per invocation

Each callback instance stores per-run state (run ID, node timings, token accumulators) as instance variables. Do not share a single instance across concurrent invocations — a second call will overwrite the first run's state, causing events to be written under the wrong run ID and latencies to be miscalculated.

# Wrong — shared instance, concurrent calls corrupt each other
monitor = AsyncLangGraphMonitorCallback(graph_id="g", thread_id="t")
await asyncio.gather(
    graph.ainvoke(inputs_a, config={"callbacks": [monitor]}),
    graph.ainvoke(inputs_b, config={"callbacks": [monitor]}),
)

# Correct — separate instance per invocation
await asyncio.gather(
    graph.ainvoke(inputs_a, config={"callbacks": [AsyncLangGraphMonitorCallback(graph_id="g", thread_id="t")]}),
    graph.ainvoke(inputs_b, config={"callbacks": [AsyncLangGraphMonitorCallback(graph_id="g", thread_id="t")]}),
)

Token usage and cost tracking

Token counts are captured automatically from every LLM call — no changes to your agent code required. Per-node input/output tokens are recorded on each node_end event, and totals are rolled up onto the run document at completion.

Token capture only (always on)

from stakeout_agent import LangGraphMonitorCallback

monitor = LangGraphMonitorCallback(graph_id="my_graph", thread_id="thread_123")
result = graph.invoke(inputs, config={"callbacks": [monitor]})

Token fields (input_tokens, output_tokens, model) appear on node_end events and total_input_tokens / total_output_tokens on the run document whenever the LLM response contains usage metadata.

Cost estimation (opt-in)

from stakeout_agent import LangGraphMonitorCallback
from stakeout_agent.pricing import ModelPricing, PricingMap

monitor = LangGraphMonitorCallback(
    graph_id="my_graph",
    thread_id="thread_123",
    pricing=PricingMap({
        "gpt-4o":      ModelPricing(input_cost_per_1k=0.005,   output_cost_per_1k=0.015),
        "gpt-4o-mini": ModelPricing(input_cost_per_1k=0.00015, output_cost_per_1k=0.0006),
    })
)
result = graph.invoke(inputs, config={"callbacks": [monitor]})

When pricing is provided, estimated_cost_usd is computed per LLM call and rolled up onto the run. Multi-model workflows are fully supported — each node resolves cost against the model it actually used. Models not present in the map are silently skipped; token counts are still recorded.

Custom token extractor

The default extractor covers OpenAI (token_usage / model_name) and Anthropic (usage / model) response shapes. For providers with a different metadata structure, pass a token_extractor:

def my_extractor(metadata: dict) -> tuple[int | None, int | None, str | None]:
    usage = metadata.get("llm_output", {}).get("token_usage", {})
    return usage.get("input"), usage.get("output"), metadata.get("model_id")

monitor = LangGraphMonitorCallback(
    graph_id="my_graph",
    thread_id="thread_123",
    token_extractor=my_extractor,
)

The extractor receives response.llm_output and must return (input_tokens, output_tokens, model_name). Any field can be None.


Prompt and response capture

The exact messages sent to the LLM and the response text are captured automatically on each node_end event. This is on by default and requires no configuration.

from stakeout_agent import LangGraphMonitorCallback

monitor = LangGraphMonitorCallback(graph_id="my_graph", thread_id="thread_123")
result = graph.invoke(inputs, config={"callbacks": [monitor]})

Each node_end event will include:

{
  "event_type": "node_end",
  "node_name": "agent",
  "llm_input": [
    { "role": "system", "content": "You are a helpful assistant." },
    { "role": "user",   "content": "Summarize the following document..." }
  ],
  "llm_output": "Here is a concise summary..."
}

llm_input and llm_output are absent when no LLM call occurred within the node (e.g. pure routing nodes).

Opt out for sensitive workloads

monitor = LangGraphMonitorCallback(
    graph_id="my_graph",
    thread_id="thread_123",
    capture_payloads=False,
)

Recommended for regulated or privacy-sensitive environments (financial services, healthcare) where prompt content may include PII or confidential data.

Limit stored content size

monitor = LangGraphMonitorCallback(
    graph_id="my_graph",
    thread_id="thread_123",
    max_payload_chars=2000,
)

Each message's content and the response text are truncated to max_payload_chars characters before storage. Useful for long-context or multi-turn workflows to prevent unbounded document sizes.

Both options apply identically to AsyncLangGraphMonitorCallback, CrewAIMonitorCallback, and AsyncCrewAIMonitorCallback.


Dashboard

A dedicated dashboard repository is available at stakeout-dashboard — a standalone Streamlit app that connects to your MongoDB or PostgreSQL backend and visualises everything stakeout-agent captures.

The dashboard shows:

  • Run History — recent runs, status, duration, and a runs-over-time chart
  • Node Performance — average and P95 latency per node and tool, error counts
  • Run Inspector — full event timeline for any individual run
  • Thread Deep Dive — multi-turn conversation view across all runs in a thread

See the stakeout-dashboard README for setup and configuration instructions.


Try the examples

LangGraph

A self-contained example that requires no LLM API key — nodes are pure Python functions.

docker compose up -d mongo
cd stakeout-agent
uv run --extra langgraph --extra mongodb python examples/dummy_app.py

CrewAI

Requires a running MongoDB instance and an OpenAI API key (or configure a different provider via the llm parameter on each Agent).

Sync:

docker compose up -d mongo
cd stakeout-agent
OPENAI_API_KEY=sk-... uv run --extra crewai --extra mongodb python examples/dummy_crewai_app.py

Async:

docker compose up -d mongo
cd stakeout-agent
OPENAI_API_KEY=sk-... uv run --extra crewai --extra mongodb python examples/dummy_crewai_async_app.py

Each example runs a two-agent crew (Researcher + Writer) with a MultiplyTool, then prints the runs and events documents written to MongoDB.


Configuration

Environment variable Default Description
STAKEOUT_BACKEND mongodb Backend to use: mongodb or postgres
MONGO_URI mongodb://localhost:27017 MongoDB connection string
MONGO_DB stakeout MongoDB database name
POSTGRES_URI postgresql://localhost/stakeout PostgreSQL connection string (also reads DATABASE_URL)

PostgreSQL

export STAKEOUT_BACKEND=postgres
export POSTGRES_URI=postgresql://user:password@localhost/stakeout

Tables are created automatically on first connection — no migration needed. New columns (llm_input, llm_output, token and cost fields) are added to existing tables via ALTER TABLE … ADD COLUMN IF NOT EXISTS.

docker compose up -d postgres
# connection string: postgresql://stakeout:stakeout@localhost/stakeout

You can also inject a backend instance directly:

from stakeout_agent import LangGraphMonitorCallback, PostgresMonitorDB

monitor = LangGraphMonitorCallback(
    graph_id="my_graph",
    thread_id="thread_123",
    db=PostgresMonitorDB(),
)

What gets recorded

runs

One document per graph/crew invocation.

{
  "_id": "<run_id>",
  "graph_id": "my_graph",
  "thread_id": "thread_123",
  "status": "completed",
  "started_at": "2026-04-25T10:00:00Z",
  "ended_at": "2026-04-25T10:00:05Z",
  "error": null,
  "total_input_tokens": 1850,
  "total_output_tokens": 420,
  "estimated_cost_usd": 0.01553
}

status is one of running, completed, or failed. Token and cost fields are omitted when no LLM usage data is available; estimated_cost_usd is omitted when no pricing map is configured.

events

One document per node/task start/end, tool call, or error.

{
  "run_id": "<run_id>",
  "graph_id": "my_graph",
  "event_type": "node_end",
  "node_name": "agent",
  "timestamp": "2026-04-25T10:00:03Z",
  "latency_ms": 1240.5,
  "input_tokens": 320,
  "output_tokens": 85,
  "model": "gpt-4o",
  "llm_input": [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "Summarize the following document..."}
  ],
  "llm_output": "Here is a concise summary...",
  "payload": {"outputs": "..."},
  "error": null
}
event_type When latency_ms token fields llm_input / llm_output
node_start A graph node or crew task begins absent absent absent
node_end A graph node or crew task completes present present when LLM was called present when LLM was called and capture_payloads=True
tool_call A tool is invoked absent absent absent
tool_result A tool returns a result present absent absent
error A node, task, or tool raises an exception present absent absent

Error handling

All database writes catch exceptions and log them — a monitoring failure will never crash your application. Enable DEBUG logging to see them:

import logging
logging.getLogger("stakeout_agent").setLevel(logging.DEBUG)

Threads and conversation history

What thread_id means

thread_id is a label you assign to group related invocations together — typically a user session or a multi-turn conversation. stakeout-agent stores it on every run but does not manage it:

thread_id          ← your conversation identifier (you supply this)
  └── run_id       ← one graph.invoke() / crew.kickoff() call (generated per execution)
        └── events ← node_start, node_end, tool_call, tool_result, error

Every time you call graph.invoke(...) with the same thread_id, a new run is created under that thread. The events for each run are stored in order of timestamp.

Viewing all steps in a conversation

To reconstruct the full execution history of a conversation, query runs by thread_id and then fetch events for each run in timestamp order.

MongoDB:

from stakeout_agent import MongoMonitorDB

db = MongoMonitorDB()

thread_id = "thread_123"

runs = list(db.runs.find({"thread_id": thread_id}).sort("started_at", 1))
for run in runs:
    print(f"\n--- Run {run['_id']} ({run['status']}) ---")
    events = list(db.events.find({"run_id": run["_id"]}).sort("timestamp", 1))
    for ev in events:
        print(f"  [{ev['timestamp']}] {ev['event_type']:12s}  node={ev['node_name']}")

PostgreSQL:

SELECT r.run_id, e.timestamp, e.event_type, e.node_name, e.latency_ms, e.error
FROM events e
JOIN runs r ON r.run_id = e.run_id
WHERE r.thread_id = 'thread_123'
ORDER BY e.timestamp ASC;

The stakeout-dashboard Thread Deep Dive view does exactly this — select any thread_id and see every run and every step in chronological order.

stakeout-agent is not a LangGraph checkpointer

LangGraph has a built-in persistence layer that saves graph state at each step using a checkpointer (e.g. MemorySaver, SqliteSaver). This lets you:

  • Pause and resume execution mid-graph
  • Re-enter a graph from any previous checkpoint
  • Enable human-in-the-loop interrupts

stakeout-agent is an observability layer, not a checkpointer. It records what happened during a run (nodes executed, payloads, latency, tokens, prompts) but does not capture enough state to re-execute or resume a graph. It answers "what did this run do?" rather than "replay from step 3."

The two tools serve different purposes and do not conflict — you can use both simultaneously:

from langgraph.checkpoint.memory import MemorySaver
from stakeout_agent import LangGraphMonitorCallback

# LangGraph checkpointer handles state persistence and replay
graph = graph_builder.compile(checkpointer=MemorySaver())

# stakeout-agent handles observability
monitor = LangGraphMonitorCallback(graph_id="my_graph", thread_id="thread_123")
result = graph.invoke(inputs, config={"callbacks": [monitor], "configurable": {"thread_id": "thread_123"}})

Note that LangGraph's thread_id (passed in config["configurable"]) and stakeout-agent's thread_id are independent — both can be set to the same value for consistency, but they serve different systems.


Querying the database directly

MongoDB

from stakeout_agent import MongoMonitorDB

db = MongoMonitorDB()
runs = list(db.runs.find({"graph_id": "my_graph"}).sort("started_at", -1))
events = list(db.events.find({"run_id": "<run_id>"}).sort("timestamp", 1))

PostgreSQL

import psycopg2

conn = psycopg2.connect("postgresql://user:password@localhost/stakeout")
with conn.cursor() as cur:
    cur.execute("SELECT * FROM runs WHERE graph_id = %s ORDER BY started_at DESC", ("my_graph",))
    runs = cur.fetchall()

Extending stakeout-agent

New framework: create a file under callback_handler/ that inherits _MonitorBase and implements the target framework's callback protocol.

New database: create a class that inherits AbstractMonitorDB and implement create_run, complete_run, fail_run, and insert_event.

stakeout_agent/
├── backends/
│   ├── base.py        # AbstractMonitorDB — shared interface
│   ├── mongodb.py     # MongoMonitorDB
│   ├── postgres.py    # PostgresMonitorDB
│   └── __init__.py    # get_backend() factory
├── callback_handler/
│   ├── base.py        # _MonitorBase — framework-agnostic core logic
│   ├── langgraph.py   # LangGraphMonitorCallback, AsyncLangGraphMonitorCallback
│   ├── crewai.py      # CrewAIMonitorCallback, AsyncCrewAIMonitorCallback
│   └── __init__.py
├── pricing.py         # ModelPricing, PricingMap

Roadmap

  • Sync LangGraph callback support
  • Async LangGraph callback support
  • Sync CrewAI callback support
  • Async CrewAI callback support
  • MongoDB persistence
  • PostgreSQL persistence
  • Run and event collections
  • Token usage tracking (per node and per run)
  • Cost estimation with configurable pricing map
  • Prompt and response capture per node (capture_payloads, max_payload_chars)
  • Dedicated UI dashboard (Run History, Node Performance, Run Inspector, Thread Deep Dive)
  • Additional agentic frameworks (PydanticAI, SemanticKernel, AutoGen etc.)
  • Additional storage backends (SQLite, Redis, ...)

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stakeout_agent-0.0.9.1.tar.gz (31.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stakeout_agent-0.0.9.1-py3-none-any.whl (23.1 kB view details)

Uploaded Python 3

File details

Details for the file stakeout_agent-0.0.9.1.tar.gz.

File metadata

  • Download URL: stakeout_agent-0.0.9.1.tar.gz
  • Upload date:
  • Size: 31.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for stakeout_agent-0.0.9.1.tar.gz
Algorithm Hash digest
SHA256 71a6189459a90369f7ee42a502cb74e36859fd8cdc7221b5f1ad3df7d47e33ac
MD5 074d663786cf1c00aed2fc992e163dea
BLAKE2b-256 02668baaed82443bf9b9450eedbf16c673653ac5038990c82a42f852f2924718

See more details on using hashes here.

Provenance

The following attestation bundles were made for stakeout_agent-0.0.9.1.tar.gz:

Publisher: python-publish.yml on KyriakosFrang/stakeout-agent

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file stakeout_agent-0.0.9.1-py3-none-any.whl.

File metadata

File hashes

Hashes for stakeout_agent-0.0.9.1-py3-none-any.whl
Algorithm Hash digest
SHA256 101c427d75e1877a9d85e12dd0183df93d15a3d9fc227df4c5e5cce39a5813d8
MD5 7b5fd764b552a156085cef3736e030d9
BLAKE2b-256 ad9ae004cfdabc556fc51fd5dd35596bd5deb2cc7e10cf995db860503e305f53

See more details on using hashes here.

Provenance

The following attestation bundles were made for stakeout_agent-0.0.9.1-py3-none-any.whl:

Publisher: python-publish.yml on KyriakosFrang/stakeout-agent

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page