Skip to main content

A Parquet-based callback handler for logging LangChain LLM interactions

Project description

LangChain Callback Parquet Logger

A high-performance callback handler for logging LangChain LLM interactions to Parquet files. This package provides efficient, structured logging with automatic partitioning and buffering for production use.

Features

  • 📊 Parquet Format: Efficient columnar storage for analytics
  • 🚀 Buffered Writing: Configurable buffer size for optimal performance
  • 📅 Daily Partitioning: Automatic date-based file organization
  • 🔄 Thread-Safe: Safe for concurrent LLM calls
  • 📦 Flexible Schema: JSON payload for extensible logging
  • 🔒 Automatic Cleanup: Ensures buffers flush on exit
  • 🏷️ Custom Tracking: Add custom IDs and metadata to your logs

Installation

pip install langchain-callback-parquet-logger

Quick Start

from langchain_callback_parquet_logger import ParquetLogger
from langchain_openai import ChatOpenAI

# Just add the logger - works with ANY LangChain LLM
llm = ChatOpenAI(model="gpt-4")
llm.callbacks = [ParquetLogger("./logs")]

# Use the LLM normally
response = llm.invoke("What is 2+2?")

Usage in Notebooks (Jupyter, Hex, Colab)

⚠️ Important: In notebook environments, the default buffer size of 100 means logs only write to disk after 100 LLM calls. For immediate writes, use one of these approaches:

Option 1: Context Manager (Recommended)

from langchain_callback_parquet_logger import ParquetLogger
from langchain_openai import ChatOpenAI

# Using context manager ensures logs are written when the block exits
with ParquetLogger('./logs') as logger:
    llm = ChatOpenAI(model="gpt-4")
    llm.callbacks = [logger]
    response = llm.invoke("What is 2+2?")
# Logs are automatically flushed here

Option 2: Small Buffer Size

# Set buffer_size=1 to write after every LLM call
logger = ParquetLogger('./logs', buffer_size=1)
llm = ChatOpenAI(model="gpt-4", callbacks=[logger])
response = llm.invoke("What is 2+2?")

Option 3: Manual Flush

logger = ParquetLogger('./logs')
llm = ChatOpenAI(model="gpt-4", callbacks=[logger])
response = llm.invoke("What is 2+2?")
logger.flush()  # Manually write logs to disk

Configuration

Parameters

  • log_dir (str, default: "./llm_logs"): Directory for log files
  • buffer_size (int, default: 100): Number of entries before auto-flush
  • provider (str, default: "openai"): LLM provider name for tracking
  • metadata (dict, optional): Logger-level metadata included in all log entries

Log Structure

Logs are saved as Parquet files with the following schema:

Column Type Description
timestamp timestamp[us, tz=UTC] Event timestamp
run_id string Unique run identifier
logger_custom_id string Optional custom ID for request tracking
event_type string Event type (llm_start, llm_end, llm_error)
provider string LLM provider name
logger_metadata string JSON-encoded logger-level metadata
payload string JSON-encoded event data

File Organization

llm_logs/
├── date=2024-01-15/
│   ├── logs_143022_123456.parquet
│   └── logs_150331_789012.parquet
└── date=2024-01-16/
    └── logs_090122_345678.parquet

Reading Logs

With Pandas

import pandas as pd
import json

# Read all parquet files in the log directory
df = pd.read_parquet("./logs")

# Parse JSON payloads
df['data'] = df['payload'].apply(json.loads)

# Analyze completions
completions = df[df['event_type'] == 'llm_end']
for _, row in completions.iterrows():
    data = row['data']
    if 'usage' in data:
        print(f"Run {row['run_id'][:8]}: {data['usage'].get('total_tokens', 0)} tokens")

# Get error rate
error_rate = len(df[df['event_type'] == 'llm_error']) / len(df) * 100
print(f"Error rate: {error_rate:.2f}%")

With DuckDB

import duckdb
import json

# Connect to DuckDB and query parquet files directly
conn = duckdb.connect()

# Read all logs
df = conn.execute("""
    SELECT * FROM read_parquet('./logs/**/*.parquet')
    ORDER BY timestamp DESC
""").df()

# Analyze by provider and event type
summary = conn.execute("""
    SELECT 
        provider,
        event_type,
        COUNT(*) as count,
        DATE(timestamp) as date
    FROM read_parquet('./logs/**/*.parquet')
    GROUP BY provider, event_type, DATE(timestamp)
    ORDER BY date DESC, provider
""").df()

print(summary)

# Query using the new custom ID field
custom_requests = conn.execute("""
    SELECT 
        logger_custom_id,
        event_type,
        json_extract_string(payload, '$.usage.total_tokens') as tokens
    FROM read_parquet('./logs/**/*.parquet')
    WHERE logger_custom_id != ''
    ORDER BY timestamp
""").df()

print(f"Found {len(custom_requests)} requests with custom IDs")

# Extract specific fields from JSON payload
detailed = conn.execute("""
    SELECT 
        timestamp,
        run_id,
        json_extract_string(payload, '$.model') as model,
        json_extract_string(payload, '$.usage.total_tokens') as tokens
    FROM read_parquet('./logs/**/*.parquet')
    WHERE event_type = 'llm_end'
""").df()

print(f"Total tokens used: {detailed['tokens'].astype(float).sum()}")

With PyArrow

import pyarrow.parquet as pq
import pyarrow.dataset as ds
import json

# Read using dataset API for better performance with partitioned data
dataset = ds.dataset("./logs", format="parquet", partitioning="hive")

# Convert to table with filters
table = dataset.to_table(filter=(ds.field("event_type") == "llm_end"))
df = table.to_pandas()

# Parse and analyze
for _, row in df.iterrows():
    payload = json.loads(row['payload'])
    print(f"Model: {payload.get('model_name', 'unknown')}")
    print(f"Tokens: {payload.get('usage', {}).get('total_tokens', 0)}")

Metadata and Custom IDs

Logger-Level Metadata

Add metadata that's included with every log entry:

logger = ParquetLogger(
    log_dir="./logs",
    metadata={
        "environment": "production",
        "service": "api-gateway",
        "version": "2.1.0"
    }
)

Request-Level Custom IDs

Track specific requests with custom IDs:

response = llm.invoke(
    "What is quantum computing?",
    metadata={"logger_custom_id": "user-123-session-456-req-789"}
)

Checking Version

import langchain_callback_parquet_logger
print(langchain_callback_parquet_logger.__version__)

Context Manager Usage

with ParquetLogger(log_dir="./logs") as logger:
    llm = ChatOpenAI(callbacks=[logger])
    llm.invoke("Process this message")
# Buffer automatically flushed on exit

Advanced Usage

Custom Buffer Size for Batch Processing

# Large buffer for batch processing
logger = ParquetLogger(
    log_dir="./batch_logs",
    buffer_size=1000,  # Flush every 1000 entries
    provider="anthropic"
)

Multiple Providers

# Track different providers separately
openai_logger = ParquetLogger(log_dir="./logs", provider="openai")
anthropic_logger = ParquetLogger(log_dir="./logs", provider="anthropic")

openai_llm = ChatOpenAI(callbacks=[openai_logger])
anthropic_llm = ChatAnthropic(callbacks=[anthropic_logger])

Examples

Check out the examples/ directory for complete working examples:

  • basic_usage.py - Simple example showing fundamental logging capabilities
  • batch_processing.py - Advanced example with async batch processing, web search, and structured outputs

Run examples:

# Basic usage
python examples/basic_usage.py

# Batch processing with web search
python examples/batch_processing.py

Performance Considerations

  • Buffer Size: Larger buffers reduce I/O but use more memory
  • Compression: Uses Snappy compression by default for balance of speed/size
  • Partitioning: Daily partitions enable efficient querying and cleanup
  • Thread Safety: Safe for concurrent use without performance penalty

Development

Install from source

git clone https://github.com/turbo3136/langchain-callback-parquet-logger.git
cd langchain-callback-parquet-logger
pip install -e .

Running Tests

# No tests available yet
# Contributions welcome!

License

MIT License - see LICENSE file for details

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Support

For issues and questions, please use the GitHub issues page.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langchain_callback_parquet_logger-0.1.4.tar.gz (8.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file langchain_callback_parquet_logger-0.1.4.tar.gz.

File metadata

File hashes

Hashes for langchain_callback_parquet_logger-0.1.4.tar.gz
Algorithm Hash digest
SHA256 62076107ab030a3dfa7906431d2bf37f2c5337a982b7edccad8e14894afe31fe
MD5 bb9a004817a31a61c22e0599eebdfb3b
BLAKE2b-256 7f66069cc86c814a18f1a199fbe25ac5c4fbbceece9f5230dade858155edc764

See more details on using hashes here.

File details

Details for the file langchain_callback_parquet_logger-0.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for langchain_callback_parquet_logger-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 d875653d3f371994904584d0eb6778da039a4586c380b4d2eae6fb1ff4911c9a
MD5 4b5650dc6447e4db8ea22bacb9d03c0b
BLAKE2b-256 295e710292b18d160702b52c63c4b3c543761d780efbddda0e09afc186204275

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page