Skip to main content

Observability graph server and API for LLM agent policy enforcement

Project description

Observability Server

The observability server maintains a dependency graph of all agent messages in Neo4j, enabling graph-based policy evaluation, auditing, and analysis.

Design

The primary function of the server is event recording: it accepts message and OpenTelemetry events from instrumented applications and stores them with their dependencies in Neo4j.

In addition, it provides APIs for graph queries, forward and backward slices in particular.

The server also includes an updates server that streams graph changes in real-time to downstream consumers (e.g., the policy engine) via a gRPC API.

┌─────────────────┐     gRPC      ┌─────────────────────┐
│  Instrumented   │──────────────▶│  Observability      │
│  Application    │               │  Server (10085)     │
└─────────────────┘               └──────────┬──────────┘
                                             │
                                             ▼
                                  ┌─────────────────────┐
                                  │      Neo4j          │
                                  │  (Message Graph)    │
                                  └──────────┬──────────┘
                                             │ APOC Triggers
                                             ▼
                                  ┌─────────────────────┐     gRPC
                                  │   Updates Server    │────────────▶ Policy Engine
                                  │      (10086)        │
                                  └─────────────────────┘

Running

Generally start via docker compose. To run manually:

# Set required environment variables
NEO4J_URI=bolt://localhost:7687
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=your-password

# Start the observability server
uv run python -m observability.server

Python API

import observability

# Configure client
observability.configure(
    server_url="localhost:10085",
    auth_hook=my_auth_hook,
)

# Record an event
observability.record_event(
    id="msg-123",
    content="Hello, world!",
    role="user",
    agent="MyAgent",
    parent_ids=["msg-122"],
)

# Register additional dependencies
observability.register_dependencies(
    id="msg-124",
    parent_ids=["msg-120", "msg-121"],
)

# Query the graph
backward = observability.backward_slice("msg-124")  # Returns NetworkX DiGraph
forward = observability.forward_slice("msg-100")

Updates Server

The updates server streams graph changes to consumers via SSE. Changes are captured by Neo4j APOC triggers and pushed to connected clients.

Endpoints

Endpoint Method Description
/state GET Current graph state (nodes, edges, sequence)
/updates/{from_position} GET SSE stream of graph changes
/ack/{client_id}/{position} POST Acknowledge receipt of updates
/notify POST Push notification from APOC triggers
/health GET Health check

Event Types

event: node_created
data: {"id": "msg-123", "content": "...", "role": "user", ...}

event: edge_created
data: {"source": "msg-123", "destination": "msg-122"}

event: node_deleted
data: {"id": "msg-123"}

event: edge_deleted
data: {"source": "msg-123", "destination": "msg-122"}

event: full_state
data: {"nodes": [...], "edges": [...], "sequence": 42}

event: heartbeat
data: {"sequence": 42}

Neo4j Schema

The graph contains two node types with DEPENDS_ON relationships:

// Message node - represents agent/user/system messages
(:Message {
    id: "msg-123",
    content: "Hello, world!",
    role: "user",
    agent: "MyAgent",
    timestamp: datetime(),
    tools: [...],
})

// Computation node - represents OTel spans (LLM calls, tool executions, etc.)
(:Computation {
    id: "span-456",
    trace_id: "abc123",
    span_id: "def456",
    name: "llm_response",
    start_time_ns: ...,
    end_time_ns: ...,
    attributes: {...},
})

// Message dependencies
(:Message)-[:DEPENDS_ON]->(:Message)

// Computation relationships
(:Computation)-[:CHILD_OF]->(:Computation)   // Span hierarchy (child -> parent)
(:Computation)-[:PRODUCES]->(:Message)       // Span outputs a message
(:Computation)-[:CONSUMES]->(:Message)       // Span uses a message as input

gRPC API

Defined in proto/observability.proto. These require

Recording messages and dependencies

These require observability-writer privileges. To record a message event in the graph:

rpc RegisterEvents (Events) returns (IDs)

message Events {
    repeated Event events = 1;
}

message Event {
    optional string text = 1;
    optional string agent = 2;
    optional Role role = 3;
    optional string id = 4;
    repeated Tool tools = 5;
    // If this message is a tool result, the tool it was derived from
    optional Tool derived_from = 6;
}

message IDs {
    repeated string ids = 1;
}

To register additional dependencies between existing messages:

rpc RegisterDependencies (Dependencies) returns (Response)

message Dependencies {
    repeated Edge edges = 1;
}

message Edge {
    string source = 1;
    string destination = 2;
    optional uint32 message_index = 3;
    optional bool proximal = 4;
}

BackwardSlice / ForwardSlice

Query the dependency graph for message provenance or impact.

rpc BackwardSlice (SliceRequest) returns (Graph) {}
rpc ForwardSlice (SliceRequest) returns (Graph) {}

message SliceRequest {
    optional string event_id = 1;   // Starting message ID
    optional uint32 max_depth = 2;  // Maximum traversal depth
}

message Graph {
    repeated Event nodes = 1;
    repeated Edge edges = 2;
}

Code Structure

File Description
observability/server.py gRPC server implementation
observability/updates_server.py SSE updates server
observability/api.py Python client API
observability/graph.py Neo4j graph operations
observability/triggers.py APOC trigger management
observability/utils.py Type conversions

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sasy_observability-0.0.1.tar.gz (27.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sasy_observability-0.0.1-py3-none-any.whl (29.7 kB view details)

Uploaded Python 3

File details

Details for the file sasy_observability-0.0.1.tar.gz.

File metadata

  • Download URL: sasy_observability-0.0.1.tar.gz
  • Upload date:
  • Size: 27.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.8

File hashes

Hashes for sasy_observability-0.0.1.tar.gz
Algorithm Hash digest
SHA256 1994c5b9ab78bcee17e61bf691f6e27290bf790ff10de39b828e695e69d74558
MD5 265492f79edc5efbf2f8abee696fbe96
BLAKE2b-256 df4113d44a84aeba0d4f7366b5581a11312d6641717bd6123355ac4c1ce0a3ed

See more details on using hashes here.

File details

Details for the file sasy_observability-0.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for sasy_observability-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 feb84cda8d29ec7b8492226a689699970143d93ee7b2e6c82b2508dbcd8b8740
MD5 57b06f2f48f2cff8335455389d5b957c
BLAKE2b-256 d4bac36502519de9035f8e6e7bc8e8ceec2831a497ab89edf351621a8a4d602f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page