Skip to main content

Observability logger for Blend360 internal agent workflows.

Project description

Blend Agents Observability

PyPI version License Build Status

Enterprise-grade observability library for instrumenting multi-step AI agent systems. Capture, process, and visualize complex agent execution graphs with ease.

Note: This package is owned by Blend360 and is intended for internal usage only.

Features

  • Manual Instrumentation: Explicit control over trace creation and node lifecycle for precise observability.
  • AWS Kinesis Integration: Stream observability events directly to AWS Kinesis Data Streams.
  • Type-Safe Events: Leverages Pydantic for robust event validation, ensuring data integrity.
  • Resilient by Design: Gracefully handles errors and fails silently, preventing observability from impacting your application'''s stability.
  • Detailed Agent Tracking: Capture fine-grained details of agent execution, including reasoning steps and tool usage.
  • Parallel Workflow Support: Built-in support for tracing parallel execution branches and sub-traces.

Installation

pip install blend-agents-observability

Quick Start

Here is an example of how to use the logger with Strands Agents:

from observability_logger import AgentLogger, generate_id
from strands import Agent, tool

# 1. Create a trace
trace_id = generate_id("trace_")
logger = AgentLogger(
    trace_id=trace_id,
    title="My Workflow",
    workflow_id="workflow-v1"
)

# 2. Create an agent node
agent_node = logger.strands.agent(
    node_id=generate_id("node_"),
    config={"name": "Assistant", "description": "Helpful agent"}
)

# 3. Execute agent with callback handler
# The agent_node acts as a callback handler to capture streaming events
agent_node.set_input(prompt)
result = agent(prompt, callback_handler=agent_node)
agent_node.complete(result=result)

# 4. End the trace
logger.end(status="completed", final_output={"result": result.message})

Concepts

Traces

A trace represents a single workflow execution. It contains nodes (operations) and edges (connections between nodes).

logger = AgentLogger(
    trace_id=generate_id("trace_"),  # Unique identifier
    title="My Workflow",              # Human-readable title
    workflow_id="workflow-v1",        # Workflow version identifier
    parent_trace_id=None              # Optional parent for hierarchical traces
)

Nodes

Nodes represent individual operations in the workflow. There are four node types:

Agent Node

For AI agent execution with real-time step capture:

agent_node = logger.strands.agent(
    node_id=generate_id("node_"),
    config={"name": "Assistant", "description": "Helpful agent"},
    metadata={"tools": ["search", "calculate"]}
)

# Set input before execution
agent_node.set_input(prompt)

# Execute with callback handler for real-time capture
result = agent(prompt, callback_handler=agent_node)

# Complete with result (extracts output and token usage automatically)
agent_node.complete(result=result)

Router Node

For routing/branching decisions (entry points):

router = logger.router(
    node_id=generate_id("node_"),
    config={"name": "Workflow Router", "description": "Entry point"},
    metadata={"input_length": len(text)}
)

# ... routing logic ...

router.complete(status="completed")

Parallel Node

For concurrent execution branches:

parallel = logger.parallel(
    node_id=generate_id("node_"),
    config={"name": "Process Items", "description": "Parallel processing"},
    metadata={"parallel_count": 3}
)

# Link to child trace
child_logger = AgentLogger(
    trace_id=generate_id("trace_"),
    parent_trace_id=logger.trace_id
)
parallel.subTrace(child_logger)

# ... child execution ...

parallel.complete(status="completed")

Miscellaneous Node

For general operations (auto-completed on creation):

output = logger.miscellaneous(
    node_id=generate_id("node_"),
    config={"name": "Final Output", "description": "Format results"},
    content="Processed 100 items successfully",
    metadata={"item_count": 100}
)
# Node is already completed

Edges

Edges define the execution flow between nodes:

router = logger.router(node_id1, config1)
agent = logger.strands.agent(node_id2, config2)
output = logger.miscellaneous(node_id3, config3, content)

# Create edges
logger.edge(router, agent)
logger.edge(agent, output)

API Reference

AgentLogger

Main entry point for observability logging.

AgentLogger(
    trace_id: str,                    # Unique trace identifier
    workflow_id: Optional[str],       # Workflow identifier (defaults to trace_id)
    title: Optional[str],             # Human-readable title (defaults to trace_id)
    parent_trace_id: Optional[str],   # Parent trace for hierarchical traces
    auto_create: bool = True          # Auto-emit trace_updated on init
)

Methods:

Method Description
miscellaneous(node_id, config, content, metadata) Create miscellaneous node (auto-completed)
parallel(node_id, config, content, metadata) Create parallel node
router(node_id, config, content, metadata) Create router node
edge(source_node, target_node) Create edge between nodes
end(status, final_output) End the trace

Node Types

Common Methods

All nodes share these methods:

Method Description
create() Create node (called automatically)
complete(status, payload, metadata) Complete the node
status Current node status property
is_created Whether node was created
is_completed Whether node was completed

AgentNode Methods

Method Description
set_input(text) Set agent input prompt
set_output(text) Set agent output manually
set_error(error) Set error information
set_token_usage(input, output, total) Set token metrics
complete(result=result) Complete with agent result
__call__(**kwargs) Callback handler for streaming

ParallelNode Methods

Method Description
subTrace(child_logger) Link child trace to parallel node
complete(status, metadata, execution_time_ms) Complete with timing

Utilities

from observability_logger import generate_id, get_current_timestamp_ms

# Generate unique IDs with prefix
trace_id = generate_id("trace_")  # trace_a1b2c3d4-...
node_id = generate_id("node_")    # node_e5f6g7h8-...

# Get current timestamp in milliseconds
timestamp = get_current_timestamp_ms()  # 1700000000000

Usage Guides

Callback Mode (Real-time Capture)

Capture agent execution steps in real-time:

from observability_logger import AgentLogger, generate_id
from strands import Agent, tool

# Define tools
@tool
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"

# Create agent
agent = Agent(tools=[search], name="SearchAgent")

# Initialize trace and node
logger = AgentLogger(trace_id=generate_id("trace_"), title="Search Workflow")
agent_node = logger.strands.agent(
    node_id=generate_id("node_"),
    config={"name": "Search Agent"}
)

# Execute with callback
agent_node.set_input(prompt)
result = agent(prompt, callback_handler=agent_node)
agent_node.complete(result=result)

logger.end(status="completed")

Multi-node Workflows

Create workflows with multiple nodes and edges:

logger = AgentLogger(trace_id=generate_id("trace_"), title="Analysis Workflow")

# Router node (entry point)
router = logger.router(
    node_id=generate_id("node_"),
    config={"name": "Router"}
)
router.complete(status="completed")

# Agent node
agent_node = logger.strands.agent(
    node_id=generate_id("node_"),
    config={"name": "Analyzer"}
)
agent_node.set_input(prompt)
result = agent(prompt, callback_handler=agent_node)
agent_node.complete(result=result)

# Output node (auto-completed)
output = logger.miscellaneous(
    node_id=generate_id("node_"),
    config={"name": "Output"},
    content=result.message
)

# Create edges
logger.edge(router, agent_node)
logger.edge(agent_node, output)

logger.end(status="completed")

Hierarchical Traces (Parallel Execution)

Create parent-child trace relationships:

# Parent trace
parent_logger = AgentLogger(
    trace_id=generate_id("trace_"),
    title="Orchestrator"
)

# Parallel node in parent
parallel = parent_logger.parallel(
    node_id=generate_id("node_"),
    config={"name": "Parallel Processing"}
)

# Child trace
child_logger = AgentLogger(
    trace_id=generate_id("trace_"),
    title="Child Workflow",
    parent_trace_id=parent_logger.trace_id
)

# Link child to parallel node
parallel.subTrace(child_logger)

# Execute child workflow
child_agent = child_logger.strands.agent(
    node_id=generate_id("node_"),
    config={"name": "Child Agent"}
)
# ... child execution ...
child_agent.complete(result=result)
child_logger.end(status="completed")

# Complete parallel node
parallel.complete(status="completed")

parent_logger.end(status="completed")

Configuration

Environment Variables

Variable Required Default Description
KINESIS_STREAM_NAME Yes - Kinesis stream name
AWS_REGION No us-east-1 AWS region
OBSERVABILITY_LOG_LEVEL No WARNING Log level
OBSERVABILITY_ENABLE_VALIDATION No true Enable validation

AWS Credentials

The library uses the standard boto3 credential chain:

  1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  2. Shared credentials file (~/.aws/credentials)
  3. IAM role (for EC2/Lambda)

Error Handling

try:
    logger = AgentLogger(trace_id=generate_id("trace_"), title="My Workflow")

    agent_node = logger.strands.agent(
        node_id=generate_id("node_"),
        config={"name": "Agent"}
    )
    agent_node.set_input(prompt)
    result = agent(prompt, callback_handler=agent_node)
    agent_node.complete(result=result)

    logger.end(status="completed", final_output={"result": result.message})

except Exception as e:
    # Complete with error
    if agent_node and not agent_node.is_completed:
        agent_node.set_error(e)
        agent_node.complete(status="failed")

    logger.end(status="failed", final_output={"error": str(e)})

Node Lifecycle

Node Type Auto-Complete Lifecycle
agent No create() -> set_input() -> execute -> complete()
router No create() -> complete()
parallel No create() -> subTrace() -> complete()
miscellaneous Yes create() (auto-completes)

Data Flow

Agent Code -> AgentLogger -> Kinesis -> Lambda -> DynamoDB + S3 -> Dashboard
  • DynamoDB: Stores trace/node/edge metadata
  • S3: Stores node payloads (input, output, steps)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

blend_agents_observability-1.0.2.tar.gz (70.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

blend_agents_observability-1.0.2-py3-none-any.whl (58.1 kB view details)

Uploaded Python 3

File details

Details for the file blend_agents_observability-1.0.2.tar.gz.

File metadata

File hashes

Hashes for blend_agents_observability-1.0.2.tar.gz
Algorithm Hash digest
SHA256 2739df41fef476e2d5bee7d2ea589e2e87beee558b607b4826f2835e3bd5fa79
MD5 14161e343486fd699254999d9822303e
BLAKE2b-256 b647782a06c0834928a5ca1ac6b9eb95c83f1882d88e10ee94ff0363481f6e08

See more details on using hashes here.

File details

Details for the file blend_agents_observability-1.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for blend_agents_observability-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 9fd70be9abab5aeec3da0c170a254969a66ef0f5bf9519934a953d03ee2133c9
MD5 c0199b07c950fc4660057d69a4b014f4
BLAKE2b-256 ae326a7c0c393ca8013b93280a4d4a8ae70a1ca0a5b0815714749d8f8f49017e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page