Skip to main content

Lightweight async monitoring for LLM applications - capacity-based tracking with pluggable storage

Project description

llamonitor-async ๐Ÿฆ™๐Ÿ“Š

Python 3.10+ License PyPI Downloads Downloads/Month Downloads/Week

Lightweight async monitoring for LLM applications - capacity-based tracking with pluggable storage.

A modern alternative to Langfuse/LangSmith focusing on text/image capacity measurement (not tokens), async-first architecture, and maximum extensibility.

Documentation

๐Ÿ“š Complete Documentation | ๐Ÿš€ Quick Start Guide | ๐Ÿงช Testing Guide | ๐Ÿ“Š Download Tracking

Publishing Guides

Design Philosophy: "Leave Space for Air Conditioning"

Every component has clear extension points for future enhancements. Whether you need custom metric collectors, new storage backends, or specialized aggregation strategies, the architecture supports growth without breaking existing code.

Features

  • Async-First: Non-blocking metric collection with buffered batch writes
  • Hierarchical Tracking: Automatic parent-child relationships across nested operations
  • Flexible Metrics: Measure text (characters, words, bytes) and images (count, pixels, file size)
  • Built-in Cost Tracking: Automatic cost calculation for 18+ major LLM models โœจ NEW!
  • Prometheus Exporter: Real-time metrics export for monitoring and alerting โœจ NEW!
  • Pluggable Storage: Local Parquet, PostgreSQL, MySQL (easily add more)
  • Simple API: Single decorator for most use cases
  • Production-Ready: Error handling, retries, graceful shutdown
  • Extensible: Custom collectors, backends, and aggregation strategies

Quick Start

Installation

# Basic installation
pip install llamonitor-async

# With storage backends
pip install llamonitor-async[parquet]    # For local Parquet files
pip install llamonitor-async[postgres]   # For PostgreSQL
pip install llamonitor-async[mysql]      # For MySQL
pip install llamonitor-async[prometheus] # For Prometheus metrics
pip install llamonitor-async[api]        # For REST API server
pip install llamonitor-async[all]        # Everything

Basic Usage

import asyncio
from llamonitor import monitor_llm, initialize_monitoring, MonitorConfig

@monitor_llm(
    operation_name="generate_text",
    measure_text=True,  # Collect all text metrics
    custom_attributes={"model": "gpt-4"}
)
async def my_llm_function(prompt: str):
    # Your LLM call here
    return {"text": "Generated response..."}

async def main():
    # Initialize monitoring
    await initialize_monitoring(MonitorConfig.for_local_dev())

    # Use your decorated functions
    result = await my_llm_function("Hello!")

    # Events are automatically tracked and written asynchronously

if __name__ == "__main__":
    asyncio.run(main())

Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Your Application                         โ”‚
โ”‚  @monitor_llm decorated functions/methods                   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                    โ”‚ (async, non-blocking)
                    โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Instrumentation Layer                          โ”‚
โ”‚  โ€ข MetricCollectors (text, image, cost, custom)             โ”‚
โ”‚  โ€ข Context Management (session/trace/span)                  โ”‚
โ”‚  โ€ข Decorator Logic                                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                    โ”‚
                    โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚               Transport Layer                               โ”‚
โ”‚  โ€ข Async Queue (buffering)                                  โ”‚
โ”‚  โ€ข Background Worker (batching)                             โ”‚
โ”‚  โ€ข Retry Logic                                              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                    โ”‚
                    โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
                    โ–ผ                     โ–ผ                   โ–ผ
         โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
         โ”‚ Storage Backend โ”‚   โ”‚ Metrics Exporter โ”‚  โ”‚   Future     โ”‚
         โ”‚  โ€ข Parquet      โ”‚   โ”‚  โ€ข Prometheus    โ”‚  โ”‚ Integrations โ”‚
         โ”‚  โ€ข PostgreSQL   โ”‚   โ”‚  โ€ข Datadog (TBD) โ”‚  โ”‚              โ”‚
         โ”‚  โ€ข MySQL        โ”‚   โ”‚  โ€ข Custom        โ”‚  โ”‚              โ”‚
         โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Configuration

Environment Variables

LLMOPS_BACKEND=postgres
LLMOPS_CONNECTION_STRING=postgresql://user:pass@localhost/monitoring
LLMOPS_BATCH_SIZE=100
LLMOPS_FLUSH_INTERVAL_SECONDS=5.0

Programmatic Configuration

from llmops_monitoring import MonitorConfig
from llmops_monitoring.schema.config import StorageConfig

# Local development
config = MonitorConfig.for_local_dev()

# Production
config = MonitorConfig.for_production(
    "postgresql://user:pass@host:5432/monitoring"
)

# Custom
config = MonitorConfig(
    storage=StorageConfig(
        backend="parquet",
        output_dir="./my_data",
        batch_size=500,
        flush_interval_seconds=10.0
    ),
    max_queue_size=50000
)

await initialize_monitoring(config)

Examples

Hierarchical Tracking (Agentic Workflows)

from llmops_monitoring.instrumentation.context import monitoring_session, monitoring_trace

@monitor_llm("orchestrator", operation_type="agent_workflow")
async def run_workflow(query: str):
    # All nested calls automatically tracked
    intent = await classify_intent(query)      # Child span
    knowledge = await search_kb(intent)        # Child span
    response = await generate_response(knowledge)  # Child span
    return response

@monitor_llm("classify_intent")
async def classify_intent(query: str):
    # Automatically linked to parent
    return await llm.classify(query)

# Use with session context
with monitoring_session("user-123"):
    with monitoring_trace("conversation-1"):
        result = await run_workflow("What is the weather?")

Built-in Cost Tracking โœจ NEW!

Automatically track costs for major LLM providers:

@monitor_llm(
    operation_name="my_llm_call",
    measure_text=True,
    collectors=["cost"],  # Enable cost tracking
    custom_attributes={
        "model": "gpt-4o-mini"  # Pricing lookup
    }
)
async def my_llm_call(prompt: str):
    # Your LLM API call here
    return {"text": "response..."}

# Query costs later
import pandas as pd
df = pd.read_parquet("./dev_monitoring_data/**/*.parquet")
df['cost'] = df['custom_attributes'].apply(lambda x: x.get('estimated_cost_usd'))
print(f"Total cost: ${df['cost'].sum():.6f}")

Supported Models (18 total):

  • OpenAI: gpt-4, gpt-4-turbo, gpt-4o, gpt-4o-mini, gpt-3.5-turbo
  • Anthropic: claude-3-opus, claude-3-sonnet, claude-3-5-sonnet, claude-3-haiku
  • Google: gemini-1.5-pro, gemini-1.5-flash, gemini-1.0-pro
  • Meta: llama-3-8b, llama-3-70b
  • Mistral: mixtral-8x7b, mistral-small, mistral-medium, mistral-large

Prometheus Metrics Export โœจ NEW!

Expose metrics to Prometheus for monitoring and alerting:

from llmops_monitoring import initialize_monitoring, MonitorConfig
from llmops_monitoring.schema.config import PrometheusConfig

# Configure with Prometheus exporter
config = MonitorConfig.for_local_dev()
config.extensions["prometheus"] = PrometheusConfig(
    enabled=True,
    port=8000,
    host="0.0.0.0"
).model_dump()

await initialize_monitoring(config)

# Metrics available at http://localhost:8000/metrics

Available Metrics:

  • llm_operations_total (Counter): Total operations by operation_name, model, type
  • llm_errors_total (Counter): Total errors by operation_name, error_type
  • llm_operation_duration_seconds (Histogram): Operation latency distribution
  • llm_text_characters_total (Counter): Total characters processed
  • llm_cost_usd (Histogram): Cost per operation distribution
  • llm_queue_size (Gauge): Current queue size
  • llm_buffer_size (Gauge): Current buffer size

Prometheus Scrape Config:

scrape_configs:
  - job_name: 'llm-monitoring'
    static_configs:
      - targets: ['localhost:8000']

Custom Metrics

For completely custom collectors:

from llmops_monitoring.instrumentation.base import MetricCollector, CollectorRegistry

class MyCustomCollector(MetricCollector):
    def collect(self, result, args, kwargs, context):
        # Your custom logic
        return {"custom_attributes": {"my_metric": 123}}

    @property
    def metric_type(self) -> str:
        return "custom"

CollectorRegistry.register("my_custom", MyCustomCollector)

@monitor_llm(collectors=["my_custom"])
async def my_function():
    ...

Visualization with Grafana

Start the monitoring stack:

docker-compose up -d

Access Grafana at http://localhost:3000 (admin/admin)

The dashboard includes:

  • Total events and volume metrics
  • Time-series charts by operation
  • Session analysis
  • Error tracking
  • Hierarchical trace viewer

Storage Backends

Parquet (Local Development)

config = MonitorConfig(
    storage=StorageConfig(
        backend="parquet",
        output_dir="./monitoring_data",
        partition_by="date"  # or "session_id"
    )
)

Files are written as ./monitoring_data/YYYY-MM-DD/events_*.parquet

PostgreSQL (Production)

config = MonitorConfig(
    storage=StorageConfig(
        backend="postgres",
        connection_string="postgresql://user:pass@host:5432/db",
        table_name="metric_events",
        pool_size=20
    )
)

Tables are created automatically with proper indexes.

MySQL (Production)

config = MonitorConfig(
    storage=StorageConfig(
        backend="mysql",
        connection_string="mysql://user:pass@host:3306/monitoring",
        table_name="metric_events",
        pool_size=20
    )
)

Tables are created automatically with InnoDB engine and proper indexes.

Extension Points

1. Custom Metric Collectors

Implement MetricCollector to add new metric types:

class MyCollector(MetricCollector):
    def collect(self, result, args, kwargs, context):
        # Extract metrics
        return {"custom_attributes": {...}}

    @property
    def metric_type(self) -> str:
        return "my_metric"

2. Custom Storage Backends

Implement StorageBackend for new storage systems:

class RedisBackend(StorageBackend):
    async def initialize(self): ...
    async def write_event(self, event): ...
    async def write_batch(self, events): ...
    async def close(self): ...

3. Custom Transport Mechanisms

Replace the async queue with Kafka, Redis, etc. by modifying MonitoringWriter.

Performance

  • Overhead: < 1% for typical workloads
  • Async writes: No blocking of application code
  • Batching: Configurable batch sizes for efficiency
  • Buffering: Handles bursts without data loss
  • Graceful shutdown: Flushes all pending events

Development

# Clone repository
git clone https://github.com/yourusername/llmops-monitoring
cd llmops-monitoring

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run examples
python llmops_monitoring/examples/01_simple_example.py
python llmops_monitoring/examples/02_agentic_workflow.py
python llmops_monitoring/examples/03_custom_collector.py
python llmops_monitoring/examples/04_mysql_backend.py
python llmops_monitoring/examples/05_cost_calculation.py
python llmops_monitoring/examples/06_prometheus_exporter.py
python llmops_monitoring/examples/07_aggregation_api.py
python llmops_monitoring/examples/08_websocket_streaming.py

# Start monitoring stack
docker-compose up -d

REST API for Querying Data โœจ NEW!

Query and aggregate stored monitoring data via REST API:

from llmops_monitoring import MonitorConfig
from llmops_monitoring.api import run_api_server

# Start API server
config = MonitorConfig.for_local_dev()
run_api_server(config, port=8080)

# API available at http://localhost:8080
# Interactive docs at http://localhost:8080/docs

Available Endpoints:

  • GET /api/health - Health check
  • GET /api/v1/events - Query events with filters
  • GET /api/v1/sessions - List sessions
  • GET /api/v1/sessions/{session_id} - Session details
  • GET /api/v1/sessions/{session_id}/traces - Get traces
  • GET /api/v1/metrics/summary - Summary statistics
  • GET /api/v1/metrics/operations - Metrics by operation
  • GET /api/v1/metrics/models - Metrics by model
  • GET /api/v1/metrics/costs - Cost analytics

Query Examples:

# Get summary statistics
curl http://localhost:8080/api/v1/metrics/summary

# List recent sessions
curl http://localhost:8080/api/v1/sessions?limit=10

# Get metrics by operation
curl http://localhost:8080/api/v1/metrics/operations

# Get cost analytics grouped by model
curl 'http://localhost:8080/api/v1/metrics/costs?group_by=model'

Real-time WebSocket Streaming โœจ NEW!

Stream monitoring events in real-time via WebSockets:

from llmops_monitoring import MonitorConfig, initialize_monitoring
from llmops_monitoring.schema.config import WebSocketConfig

# Enable WebSocket streaming
config = MonitorConfig.for_local_dev()
config.extensions["websocket"] = WebSocketConfig(
    enabled=True
).model_dump()

await initialize_monitoring(config)

WebSocket Endpoints:

  • WS /api/v1/stream - All events in real-time
  • WS /api/v1/stream/sessions/{session_id} - Session-specific events
  • WS /api/v1/stream/operations/{operation_name} - Operation-specific events

Python Client Example:

import asyncio
import websockets
import json

async def listen_to_events():
    uri = 'ws://localhost:8080/api/v1/stream'
    async with websockets.connect(uri) as websocket:
        async for message in websocket:
            event = json.loads(message)
            print(f"Received event: {event['data']['operation_name']}")

asyncio.run(listen_to_events())

JavaScript Client Example:

const ws = new WebSocket('ws://localhost:8080/api/v1/stream');
ws.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log('Received event:', data);
};

Roadmap

  • MySQL backend implementation โœ… (v0.1.1)
  • Built-in cost calculation with pricing data โœ… (v0.1.1)
  • Prometheus exporter โœ… (v0.2.0)
  • Aggregation server with REST API โœ… (v0.2.0)
  • Real-time streaming with WebSockets โœ… (v0.2.0)
  • ClickHouse backend for analytics
  • GraphQL backend support
  • ML-based anomaly detection
  • Datadog integration

Contributing

Contributions are welcome! Areas of focus:

  1. Storage Backends: MySQL, ClickHouse, MongoDB, S3, etc.
  2. Collectors: Cost tracking, latency patterns, cache hit rates
  3. Visualization: New Grafana dashboards, custom analytics
  4. Documentation: Tutorials, use cases, best practices

See CONTRIBUTING.md for guidelines.

License

Apache License 2.0 - see LICENSE for details.

Acknowledgments

This project synthesizes ideas from:

  • OpenTelemetry distributed tracing standards
  • Langfuse and LangSmith observability platforms
  • Academic research on LLM agent monitoring (AgentOps, LumiMAS)
  • Production lessons from the LLM community

Citation

If you use this in research, please cite:

@software{llamonitor_async,
  title = {llamonitor-async: Lightweight Async Monitoring for LLM Applications},
  author = {Guy Bass},
  year = {2025},
  url = {https://github.com/guybass/LLMOps_monitoring_async-}
}

Built with the principle of "leaving space for air conditioning" - designed for the features you'll need tomorrow.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

llamonitor_async-0.2.0.tar.gz (83.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

llamonitor_async-0.2.0-py3-none-any.whl (86.8 kB view details)

Uploaded Python 3

File details

Details for the file llamonitor_async-0.2.0.tar.gz.

File metadata

  • Download URL: llamonitor_async-0.2.0.tar.gz
  • Upload date:
  • Size: 83.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.9

File hashes

Hashes for llamonitor_async-0.2.0.tar.gz
Algorithm Hash digest
SHA256 9e224848be8fe59e4c162f65d55f8f95e341ed3d442a83b09c3cfe6186e9113c
MD5 45be4991a4c2e4bf5ffe872557cba0f3
BLAKE2b-256 c9bacf8a3c61f4bfd573c79a1323f300b6c8c7e6de0343f20472c5bfd8148357

See more details on using hashes here.

File details

Details for the file llamonitor_async-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for llamonitor_async-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3eb08244c229d4f25a3b5db8ebc6963dc9eec19d75dd850860febb631aeed5ed
MD5 dccf3b6e43096368311f5a2687f11534
BLAKE2b-256 23902da40142238f27db1da2321931330773384f5ff7dd892171bdcc5de80d44

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page