Observability logger for Blend360 internal agent workflows.
Project description
Blend Agents Observability
Enterprise-grade observability library for instrumenting multi-step AI agent systems. Capture, process, and visualize complex agent execution graphs with ease.
Note: This package is owned by Blend360 and is intended for internal usage only.
Features
- Manual Instrumentation: Explicit control over trace creation and node lifecycle for precise observability.
- AWS Kinesis Integration: Stream observability events directly to AWS Kinesis Data Streams.
- Type-Safe Events: Leverages Pydantic for robust event validation, ensuring data integrity.
- Resilient by Design: Gracefully handles errors and fails silently, preventing observability from impacting your application'''s stability.
- Detailed Agent Tracking: Capture fine-grained details of agent execution, including reasoning steps and tool usage.
- Parallel Workflow Support: Built-in support for tracing parallel execution branches and sub-traces.
Installation
pip install blend-agents-observability
Quick Start
Here is an example of how to use the logger with Strands Agents:
from observability_logger import AgentLogger, generate_id
from strands import Agent, tool
# 1. Create a trace
trace_id = generate_id("trace_")
logger = AgentLogger(
trace_id=trace_id,
title="My Workflow",
workflow_id="workflow-v1"
)
# 2. Create an agent node
agent_node = logger.strands.agent(
node_id=generate_id("node_"),
config={"name": "Assistant", "description": "Helpful agent"}
)
# 3. Execute agent with callback handler
# The agent_node acts as a callback handler to capture streaming events
agent_node.set_input(prompt)
result = agent(prompt, callback_handler=agent_node)
agent_node.complete(result=result)
# 4. End the trace
logger.end(status="completed", final_output={"result": result.message})
Concepts
Traces
A trace represents a single workflow execution. It contains nodes (operations) and edges (connections between nodes).
logger = AgentLogger(
trace_id=generate_id("trace_"), # Unique identifier
title="My Workflow", # Human-readable title
workflow_id="workflow-v1", # Workflow version identifier
parent_trace_id=None # Optional parent for hierarchical traces
)
Nodes
Nodes represent individual operations in the workflow. There are four node types:
Agent Node
For AI agent execution with real-time step capture:
agent_node = logger.strands.agent(
node_id=generate_id("node_"),
config={"name": "Assistant", "description": "Helpful agent"},
metadata={"tools": ["search", "calculate"]}
)
# Set input before execution
agent_node.set_input(prompt)
# Execute with callback handler for real-time capture
result = agent(prompt, callback_handler=agent_node)
# Complete with result (extracts output and token usage automatically)
agent_node.complete(result=result)
Router Node
For routing/branching decisions (entry points):
router = logger.router(
node_id=generate_id("node_"),
config={"name": "Workflow Router", "description": "Entry point"},
metadata={"input_length": len(text)}
)
# ... routing logic ...
router.complete(status="completed")
Parallel Node
For concurrent execution branches:
parallel = logger.parallel(
node_id=generate_id("node_"),
config={"name": "Process Items", "description": "Parallel processing"},
metadata={"parallel_count": 3}
)
# Link to child trace
child_logger = AgentLogger(
trace_id=generate_id("trace_"),
parent_trace_id=logger.trace_id
)
parallel.subTrace(child_logger)
# ... child execution ...
parallel.complete(status="completed")
Miscellaneous Node
For general operations (auto-completed on creation):
output = logger.miscellaneous(
node_id=generate_id("node_"),
config={"name": "Final Output", "description": "Format results"},
content="Processed 100 items successfully",
metadata={"item_count": 100}
)
# Node is already completed
Edges
Edges define the execution flow between nodes:
router = logger.router(node_id1, config1)
agent = logger.strands.agent(node_id2, config2)
output = logger.miscellaneous(node_id3, config3, content)
# Create edges
logger.edge(router, agent)
logger.edge(agent, output)
API Reference
AgentLogger
Main entry point for observability logging.
AgentLogger(
trace_id: str, # Unique trace identifier
workflow_id: Optional[str], # Workflow identifier (defaults to trace_id)
title: Optional[str], # Human-readable title (defaults to trace_id)
parent_trace_id: Optional[str], # Parent trace for hierarchical traces
auto_create: bool = True # Auto-emit trace_updated on init
)
Methods:
| Method | Description |
|---|---|
miscellaneous(node_id, config, content, metadata) |
Create miscellaneous node (auto-completed) |
parallel(node_id, config, content, metadata) |
Create parallel node |
router(node_id, config, content, metadata) |
Create router node |
edge(source_node, target_node) |
Create edge between nodes |
end(status, final_output) |
End the trace |
Node Types
Common Methods
All nodes share these methods:
| Method | Description |
|---|---|
create() |
Create node (called automatically) |
complete(status, payload, metadata) |
Complete the node |
status |
Current node status property |
is_created |
Whether node was created |
is_completed |
Whether node was completed |
AgentNode Methods
| Method | Description |
|---|---|
set_input(text) |
Set agent input prompt |
set_output(text) |
Set agent output manually |
set_error(error) |
Set error information |
set_token_usage(input, output, total) |
Set token metrics |
complete(result=result) |
Complete with agent result |
__call__(**kwargs) |
Callback handler for streaming |
ParallelNode Methods
| Method | Description |
|---|---|
subTrace(child_logger) |
Link child trace to parallel node |
complete(status, metadata, execution_time_ms) |
Complete with timing |
Utilities
from observability_logger import generate_id, get_current_timestamp_ms
# Generate unique IDs with prefix
trace_id = generate_id("trace_") # trace_a1b2c3d4-...
node_id = generate_id("node_") # node_e5f6g7h8-...
# Get current timestamp in milliseconds
timestamp = get_current_timestamp_ms() # 1700000000000
Usage Guides
Callback Mode (Real-time Capture)
Capture agent execution steps in real-time:
from observability_logger import AgentLogger, generate_id
from strands import Agent, tool
# Define tools
@tool
def search(query: str) -> str:
"""Search for information."""
return f"Results for: {query}"
# Create agent
agent = Agent(tools=[search], name="SearchAgent")
# Initialize trace and node
logger = AgentLogger(trace_id=generate_id("trace_"), title="Search Workflow")
agent_node = logger.strands.agent(
node_id=generate_id("node_"),
config={"name": "Search Agent"}
)
# Execute with callback
agent_node.set_input(prompt)
result = agent(prompt, callback_handler=agent_node)
agent_node.complete(result=result)
logger.end(status="completed")
Multi-node Workflows
Create workflows with multiple nodes and edges:
logger = AgentLogger(trace_id=generate_id("trace_"), title="Analysis Workflow")
# Router node (entry point)
router = logger.router(
node_id=generate_id("node_"),
config={"name": "Router"}
)
router.complete(status="completed")
# Agent node
agent_node = logger.strands.agent(
node_id=generate_id("node_"),
config={"name": "Analyzer"}
)
agent_node.set_input(prompt)
result = agent(prompt, callback_handler=agent_node)
agent_node.complete(result=result)
# Output node (auto-completed)
output = logger.miscellaneous(
node_id=generate_id("node_"),
config={"name": "Output"},
content=result.message
)
# Create edges
logger.edge(router, agent_node)
logger.edge(agent_node, output)
logger.end(status="completed")
Hierarchical Traces (Parallel Execution)
Create parent-child trace relationships:
# Parent trace
parent_logger = AgentLogger(
trace_id=generate_id("trace_"),
title="Orchestrator"
)
# Parallel node in parent
parallel = parent_logger.parallel(
node_id=generate_id("node_"),
config={"name": "Parallel Processing"}
)
# Child trace
child_logger = AgentLogger(
trace_id=generate_id("trace_"),
title="Child Workflow",
parent_trace_id=parent_logger.trace_id
)
# Link child to parallel node
parallel.subTrace(child_logger)
# Execute child workflow
child_agent = child_logger.strands.agent(
node_id=generate_id("node_"),
config={"name": "Child Agent"}
)
# ... child execution ...
child_agent.complete(result=result)
child_logger.end(status="completed")
# Complete parallel node
parallel.complete(status="completed")
parent_logger.end(status="completed")
Configuration
Environment Variables
| Variable | Required | Default | Description |
|---|---|---|---|
KINESIS_STREAM_NAME |
Yes | - | Kinesis stream name |
AWS_REGION |
No | us-east-1 | AWS region |
OBSERVABILITY_LOG_LEVEL |
No | WARNING | Log level |
OBSERVABILITY_ENABLE_VALIDATION |
No | true | Enable validation |
AWS Credentials
The library uses the standard boto3 credential chain:
- Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY) - Shared credentials file (
~/.aws/credentials) - IAM role (for EC2/Lambda)
Error Handling
try:
logger = AgentLogger(trace_id=generate_id("trace_"), title="My Workflow")
agent_node = logger.strands.agent(
node_id=generate_id("node_"),
config={"name": "Agent"}
)
agent_node.set_input(prompt)
result = agent(prompt, callback_handler=agent_node)
agent_node.complete(result=result)
logger.end(status="completed", final_output={"result": result.message})
except Exception as e:
# Complete with error
if agent_node and not agent_node.is_completed:
agent_node.set_error(e)
agent_node.complete(status="failed")
logger.end(status="failed", final_output={"error": str(e)})
Node Lifecycle
| Node Type | Auto-Complete | Lifecycle |
|---|---|---|
| agent | No | create() -> set_input() -> execute -> complete() |
| router | No | create() -> complete() |
| parallel | No | create() -> subTrace() -> complete() |
| miscellaneous | Yes | create() (auto-completes) |
Data Flow
Agent Code -> AgentLogger -> Kinesis -> Lambda -> DynamoDB + S3 -> Dashboard
- DynamoDB: Stores trace/node/edge metadata
- S3: Stores node payloads (input, output, steps)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file blend_agents_observability-1.0.2.tar.gz.
File metadata
- Download URL: blend_agents_observability-1.0.2.tar.gz
- Upload date:
- Size: 70.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2739df41fef476e2d5bee7d2ea589e2e87beee558b607b4826f2835e3bd5fa79
|
|
| MD5 |
14161e343486fd699254999d9822303e
|
|
| BLAKE2b-256 |
b647782a06c0834928a5ca1ac6b9eb95c83f1882d88e10ee94ff0363481f6e08
|
File details
Details for the file blend_agents_observability-1.0.2-py3-none-any.whl.
File metadata
- Download URL: blend_agents_observability-1.0.2-py3-none-any.whl
- Upload date:
- Size: 58.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9fd70be9abab5aeec3da0c170a254969a66ef0f5bf9519934a953d03ee2133c9
|
|
| MD5 |
c0199b07c950fc4660057d69a4b014f4
|
|
| BLAKE2b-256 |
ae326a7c0c393ca8013b93280a4d4a8ae70a1ca0a5b0815714749d8f8f49017e
|