Skip to main content

Observability, drift detection, and guardrails for agentic workflows — backed by a temporal knowledge graph.

Project description

Aether Observer

Observability, drift detection, and guardrails for agentic workflows — backed by a temporal knowledge graph.

Installation

pip install aether-observer

With the API server (FastAPI + Uvicorn):

pip install aether-observer[server]

With LangGraph helpers:

pip install "aether-observer[langgraph]"

With OpenAI Agents helpers:

pip install "aether-observer[openai-agents]"

Quickstart

import aether

aether.init(
    workflow_id="claims-triage",
    tenant_id="acme",
    observations_before_guardrails=3,
    max_observations=50,
    sliding_window_days=20,
)
aether.log_step(name="plan", kind="llm", model="gpt-4")
aether.log_step(name="policy_lookup", kind="tool", tool_name="policy_lookup")
result = aether.guardrail(open_browser=False)

print(result.risk_score, result.severity.value, len(result.signals))

No server required — guardrail() / finish() spin up the observability service in-process via Graphiti/Kuzu. See examples/quickstart.py for a fuller example with realistic agent steps.

Hosted Monitoring

The SDK can also mirror finished observations into a hosted monitoring stack.

Recommended shape:

  • SDK writes finished observations to https://your-vercel-app.vercel.app/api/ingest
  • Vercel writes those records into Supabase
  • the Vercel frontend reads from Supabase and shows experiments/runs

The intended setup is env-based, so application code does not need to carry Vercel URLs or credentials:

export AETHER_MONITORING_ENABLED=true
export AETHER_MONITORING_DASHBOARD_URL="https://your-vercel-app.vercel.app"
export AETHER_MONITORING_USERNAME="demo"
export AETHER_MONITORING_PASSWORD="replace-me"

Then your SDK code stays simple:

import aether

aether.init(
    workflow_id="claims-triage",
    tenant_id="acme"
)

Scaffold files:

Context manager

with aether.run(workflow_id="claims-triage", tenant_id="acme"):
    aether.log_step(name="plan", kind="llm", model="gpt-4")
    aether.log_step(name="execute", kind="tool", tool_name="claims_db")

Async

async with aether.arun(workflow_id="claims-triage", tenant_id="acme"):
    aether.log_step(name="plan", kind="llm", model="gpt-4")

LangGraph

Users can keep normal LangGraph code and call Aether directly inside nodes:

import aether
from langgraph.graph import END, START, StateGraph

async def planner(state):
    aether.log(name="planner", kind="llm", input_payload=state)
    return {"plan": "route to support"}

async def responder(state):
    aether.log(name="responder", kind="llm", input_payload=state)
    return {"answer": "done"}

aether.init(workflow_id="support-agent", tenant_id="acme")
result = await graph.ainvoke({"question": "help"})
await aether.aguardrail(open_browser=False)

There is also an optional wrapper if you want one-call execution:

from aether.langgraph import ainvoke

result = await ainvoke(
    graph,
    {"question": "help"},
    workflow_id="support-agent",
    tenant_id="acme",
)

See examples/langgraph_quickstart.py.

OpenAI Agents SDK

The OpenAI Agents SDK already exposes run lifecycle hooks through Runner.run(...), Runner.run_sync(...), and Runner.run_streamed(...). Aether now plugs into that surface directly.

You can keep native OpenAI Agents code and add Aether hooks:

import aether
from agents import Agent, Runner
from aether.openai_agents import AetherOpenAIAgentsHooks

agent = Agent(name="Assistant", instructions="Be concise.")

aether.init(workflow_id="support-agent", tenant_id="acme")
result = await Runner.run(
    agent,
    "How do I reset my password?",
    hooks=AetherOpenAIAgentsHooks(),
)
await aether.aguardrail(open_browser=False)

Or use the one-call wrapper:

from agents import Agent
from aether.openai_agents import run

agent = Agent(name="Assistant", instructions="Be concise.")
result = await run(
    agent,
    "How do I reset my password?",
    workflow_id="support-agent",
    tenant_id="acme",
)

See examples/openai_agents_quickstart.py.

Dashboard

If the server extra is installed:

aether-observer serve --reload
# open http://127.0.0.1:8080/

Publishing To PyPI

The repo now includes two GitHub Actions workflows:

The intended flow is:

  1. run Cut Release
  2. it reuses the current version if it has not been tagged yet, otherwise it bumps patch, minor, or major
  3. it updates pyproject.toml, creates a vX.Y.Z tag, and opens a GitHub release
  4. the GitHub release automatically triggers Publish To PyPI

This avoids re-uploading the same filename to PyPI, which is what caused the earlier 400 File already exists failure.

Publish To PyPI uses PyPI Trusted Publishing through GitHub OIDC, which is the recommended path for publishing from Actions because it avoids long-lived PyPI API tokens.

What it does:

  1. builds the wheel and sdist
  2. validates them with twine check
  3. uploads the artifacts between jobs
  4. verifies the release tag matches the package version
  5. publishes to PyPI from a protected pypi environment

Before it can publish successfully, configure the project on PyPI to trust this workflow:

  1. create the aether-observer project on PyPI if it does not exist yet
  2. in PyPI, add a Trusted Publisher for this GitHub repository
  3. set the workflow filename to .github/workflows/publish-pypi.yml
  4. set the GitHub environment to pypi
  5. in GitHub, create the pypi environment and add any required approvals

After that, you should normally release from the Cut Release workflow rather than manually dispatching Publish To PyPI.


Aether Observer is a Graphiti-backed observability and governance layer for external agentic workflows. It answers four enterprise questions:

  1. What did the agent do?
  2. How did its behavior drift over time?
  3. Why did that drift happen?
  4. Did the run violate client-specific guardrails?

The system is local-first today:

  • Graphiti for temporal graph memory and provenance
  • Kuzu as the embedded graph backend for development
  • MLX through an OpenAI-compatible server for local model testing
  • sentence-transformers for local embeddings

The architecture is modular so the same product can later run with:

  • managed LLMs
  • Neo4j or another production graph backend
  • custom workflow adapters for LangGraph, internal orchestrators, OTEL, queue workers, or agent platforms

What The Product Does

Aether Observer sits beside an external workflow and normalizes its execution into a stable schema. It then:

  1. stores the run as a Graphiti episode
  2. builds graph facts for runs, steps, decisions, actions, tools, prompts, models, functions, and datasets
  3. compares the current run against recent workflow history
  4. detects drift
  5. evaluates client-specific dynamic guardrails
  6. stores drift alerts and guardrail incidents back into the graph
  7. exposes everything through an API, CLI, and dashboard

This is not just telemetry storage. It is an observability agent that turns workflow traces into:

  • temporal memory
  • drift signals
  • graph-backed explanations
  • policy enforcement inputs

Core Capabilities

1. Agentic Drift Detection

The system detects behavioral drift in terms that matter for agent systems:

  • decision_outcome_drift
  • action_route_drift
  • function_call_drift
  • function_arguments_drift
  • tool_sequence_drift
  • prompt_drift
  • model_drift

This means it can tell when an agent starts:

  • choosing different routes
  • taking different actions
  • calling different functions
  • changing function argument shapes
  • swapping prompts or models

2. Data Drift Detection

The system also tracks:

  • input_schema_drift
  • output_schema_drift
  • dataset_version_drift
  • payload_volume_drift

3. Operational Regression Detection

The system detects:

  • step_count_regression
  • latency_regression

4. Graph Explainability

For every important signal, Aether Observer can return:

  • current behavior state
  • baseline behavior state
  • supporting Graphiti facts

Examples:

  • current decision sequence vs baseline decision sequence
  • current function route vs approved function route
  • graph facts showing the exact step that made a different decision

5. Dynamic Client Guardrails

The system supports workflow-specific guardrail policies that can be:

  • stored per tenant and workflow
  • provided inline on a single run
  • mixed with dynamic baseline expectations from recent stable history

Guardrails support three modes:

  • observe
  • warn
  • block

Supported policy areas:

  • allowed decisions
  • allowed actions
  • blocked actions
  • approval-required actions
  • allowed functions
  • blocked functions
  • function argument contracts
  • allowed models
  • allowed prompts
  • allowed tools
  • dataset version policies
  • max step count
  • max latency

Dynamic baseline options let a client say:

  • "use my pinned policy"
  • "also treat recent stable behavior as policy"

That makes the product useful for real client environments where some workflows are tightly pinned and others are baseline-driven.

How Graphiti Is Used

Graphiti is not used here as a generic chat memory layer. It is used as the temporal provenance graph under the observability system.

The system stores:

  • workflow runs
  • workflow steps
  • decisions
  • selected actions
  • candidate actions
  • function calls
  • function argument shapes
  • prompts
  • models
  • tools
  • datasets
  • drift alerts
  • guardrail policies
  • guardrail incidents

This gives you:

  • historical comparison
  • tenant/workflow partitioning
  • graph-backed explainability
  • auditable policy history

Architecture

Runtime Flow

  1. External workflow emits a run payload.
  2. An adapter normalizes it into WorkflowRun.
  3. The run is persisted into Graphiti and Kuzu.
  4. Historical runs for the same workflow are loaded.
  5. Drift features are extracted.
  6. Drift detectors score the run against history.
  7. Guardrail policies are loaded and evaluated.
  8. Explanations are assembled from structured state plus graph facts.
  9. Drift alerts and guardrail incidents are stored.
  10. API and dashboard expose the result.

Important Design Choice

For local development with smaller MLX models, the system uses deterministic Graphiti node and edge writes for workflow telemetry. That is intentional.

Reason:

  • small local models are unreliable for Graphiti's more complex extraction schemas
  • workflow traces are already structured
  • deterministic graph writes are more appropriate for enterprise observability data

So Graphiti is still under the hood, but persistence is explicit and stable.

Normalized Workflow Schema

The product expects a provider-agnostic workflow run schema. A simplified run looks like this:

{
  "tenant_id": "acme",
  "workflow_id": "claims-triage",
  "run_id": "run-001",
  "agent_name": "triage-orchestrator",
  "environment": "dev",
  "status": "success",
  "started_at": "2026-04-12T07:58:00Z",
  "finished_at": "2026-04-12T07:58:02Z",
  "input_payload": {
    "claim_type": "auto",
    "region": "ca",
    "priority": "normal"
  },
  "output_payload": {
    "decision": "review",
    "confidence": 0.74
  },
  "datasets": [
    {
      "name": "claims_policy",
      "version": "2026-01",
      "schema_hash": "claims-policy-2026-01"
    }
  ],
  "steps": [
    {
      "step_id": "s1",
      "name": "plan",
      "kind": "llm",
      "status": "success",
      "model": "mlx-community/Qwen2.5-1.5B-Instruct-4bit",
      "prompt_version": "planner-v1",
      "decision_name": "routing_policy",
      "decision_value": "policy_lookup",
      "selected_action": "policy_lookup",
      "candidate_actions": ["policy_lookup", "web_search", "manual_review_router"],
      "function_calls": [
        {
          "function_name": "route_claim",
          "arguments": {
            "claim_type": "auto",
            "region": "ca",
            "priority": "normal",
            "policy_version": "2026-01"
          }
        }
      ]
    },
    {
      "step_id": "s2",
      "name": "policy_lookup",
      "kind": "tool",
      "status": "success",
      "tool_name": "policy_lookup",
      "selected_action": "policy_lookup",
      "function_calls": [
        {
          "function_name": "lookup_policy_rules",
          "arguments": {
            "claim_type": "auto",
            "region": "ca",
            "policy_version": "2026-01"
          }
        }
      ]
    }
  ]
}

Dynamic Guardrail Policy Schema

Example client policy:

{
  "policy_id": "claims-triage-dynamic-guardrails",
  "tenant_id": "acme",
  "workflow_id": "claims-triage",
  "mode": "warn",
  "allowed_models": ["mlx-community/Qwen2.5-1.5B-Instruct-4bit"],
  "allowed_prompts": ["planner-v1"],
  "allowed_tools": ["policy_lookup", "claims_db"],
  "allowed_actions": ["policy_lookup", "claims_db"],
  "require_approval_actions": ["manual_review_router"],
  "allowed_functions": [
    "route_claim",
    "lookup_policy_rules",
    "fetch_claim_history"
  ],
  "function_contracts": {
    "route_claim": {
      "required_keys": ["claim_type", "region", "priority", "policy_version"],
      "allowed_keys": ["claim_type", "region", "priority", "policy_version"],
      "allow_additional_keys": false
    }
  },
  "dataset_version_policies": {
    "claims_policy": ["2026-01"]
  },
  "max_step_count": 4,
  "max_latency_ms": 1800,
  "dynamic_baseline": {
    "decisions_from_history": true,
    "actions_from_history": true,
    "functions_from_history": true
  }
}

This policy means:

  • models are pinned
  • prompts are pinned
  • tools and actions are pinned
  • manual_review_router is not outright blocked, but it requires approval
  • route_claim must follow a strict argument contract
  • the dataset version is pinned
  • dynamic baseline also contributes expected decisions/actions/functions

Example Output

A drifted and policy-violating run returns:

  • drift signals
  • graph_explanations
  • guardrail_result
  • guardrail violations
  • guardrail graph explanations

Typical guardrail verdict output:

{
  "policy_id": "claims-triage-dynamic-guardrails",
  "mode": "warn",
  "verdict": "warn",
  "violations": [
    { "kind": "decision_guardrail_violation" },
    { "kind": "action_guardrail_violation" },
    { "kind": "approval_required_action" },
    { "kind": "function_guardrail_violation" },
    { "kind": "function_contract_violation" }
  ]
}

API

Health

  • GET /health

Observe Runs

  • POST /observe

Query params:

  • adapter
  • dry_run

Workflow Views

  • GET /workflows/{tenant_id}/{workflow_id}/history
  • GET /workflows/{tenant_id}/{workflow_id}/alerts
  • GET /facts

Guardrails

  • PUT /workflows/{tenant_id}/{workflow_id}/guardrails/policy
  • GET /workflows/{tenant_id}/{workflow_id}/guardrails/policy
  • GET /workflows/{tenant_id}/{workflow_id}/guardrails/incidents

Demo

  • POST /demo/seed

The demo seeds:

  • stable baseline runs
  • a guardrail policy
  • a drifted run that violates both behavior baselines and client policy

CLI

The package exposes:

aether-observer serve --reload
aether-observer observe path/to/run.json
aether-observer history acme claims-triage --limit 20

Dashboard

The FastAPI app serves a frontend at:

http://127.0.0.1:8081/

The dashboard shows:

  • health
  • workflow history
  • drift alert feed
  • latest observation result
  • graph explanations
  • fact search
  • inline payload editor
  • inline guardrail policy example

Quickstart

1. Start MLX

Example:

uv run python -m mlx_lm.server \
  --model mlx-community/Qwen2.5-1.5B-Instruct-4bit \
  --host 127.0.0.1 \
  --port 8000

2. Install Dependencies

uv sync
cp .env.example .env

3. Start The API

uv run uvicorn aether_observer.api:app --host 127.0.0.1 --port 8081

Or via CLI:

uv run aether-observer serve --reload

4. Open The Dashboard

http://127.0.0.1:8081/

5. Seed A Demo Workflow

Use the dashboard or:

curl -X POST http://127.0.0.1:8081/demo/seed \
  -H 'Content-Type: application/json' \
  -d '{
    "tenant_id": "acme",
    "workflow_id": "claims-triage",
    "environment": "dev"
  }'

Project Layout

  • src/aether_observer/api.py: FastAPI API and dashboard routes
  • src/aether_observer/service.py: orchestration layer for observation, drift, and guardrails
  • src/aether_observer/drift/: feature extraction and drift detectors
  • src/aether_observer/guardrails/: dynamic guardrail engine
  • src/aether_observer/graph_store.py: Graphiti persistence for runs, alerts, policies, and incidents
  • src/aether_observer/runtime.py: Graphiti runtime wiring
  • src/aether_observer/adapters/: external workflow adapter boundary
  • src/aether_observer/static/: dashboard frontend
  • src/aether_observer/demo.py: demo data builders and scenario seeding
  • examples/simulate_runs.py: simple local smoke example
  • tests/: drift and guardrail tests

Current Local Validation

The current local build has been exercised with:

  • compile checks
  • unit tests
  • live API checks
  • live dashboard checks
  • live guardrail policy storage and retrieval
  • live guardrail incident storage
  • live drift and guardrail explainability output

Enterprise Deployment Direction

This repo is intentionally shaped for enterprise rollout. The immediate next steps for production are:

  1. replace local Kuzu with a production graph backend
  2. add authenticated workflow adapters for real agent platforms
  3. wire block mode into real execution control instead of post-run verdict only
  4. add approval and waiver records for human review flows
  5. add auth, rate limiting, webhooks, and tenant-facing policy management

Why This Matters

Most agent observability tools stop at traces and token counts. This project is aimed at a more serious target:

  • enterprise agent governance
  • explainable behavioral drift
  • policy-aware execution monitoring
  • reusable client-specific guardrails

That is the actual product direction of this repository.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aether_observer-0.1.1.tar.gz (69.7 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aether_observer-0.1.1-py3-none-any.whl (101.5 kB view details)

Uploaded Python 3

File details

Details for the file aether_observer-0.1.1.tar.gz.

File metadata

  • Download URL: aether_observer-0.1.1.tar.gz
  • Upload date:
  • Size: 69.7 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for aether_observer-0.1.1.tar.gz
Algorithm Hash digest
SHA256 ae20e4225e7c817a1d4b345c8231c600498c4c433b4cc0cd0c36c350058555a5
MD5 2d880950b5c3ba63b7c448c428c73ae5
BLAKE2b-256 10f18f036dbdbb02a04b878de59cb518fdc8bf91540a63e92b2cfeb5737b7957

See more details on using hashes here.

Provenance

The following attestation bundles were made for aether_observer-0.1.1.tar.gz:

Publisher: publish-pypi.yml on AetherAIorg/aether

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file aether_observer-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: aether_observer-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 101.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for aether_observer-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ee4f9d462501a74aa1e80dd1c3a44c1a20610ef4bfaba6e06a63b3c8a27906d7
MD5 f9cb64eafda7443ffe50ab36a9bb225a
BLAKE2b-256 90f08e500c9746f0c8d0632766c0bab4e207a7a3673e93ec8fc5d032a41ee43b

See more details on using hashes here.

Provenance

The following attestation bundles were made for aether_observer-0.1.1-py3-none-any.whl:

Publisher: publish-pypi.yml on AetherAIorg/aether

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page