A Parquet-based callback handler for logging LangChain LLM interactions
Project description
LangChain Callback Parquet Logger
A high-performance callback handler for logging LangChain LLM interactions to Parquet files. This package provides efficient, structured logging with automatic partitioning and buffering for production use.
Features
- 📊 Parquet Format: Efficient columnar storage for analytics
- 🚀 Buffered Writing: Configurable buffer size for optimal performance
- 📅 Daily Partitioning: Automatic date-based file organization
- 🔄 Thread-Safe: Safe for concurrent LLM calls
- 📦 Flexible Schema: JSON payload for extensible logging
- 🔒 Automatic Cleanup: Ensures buffers flush on exit
- 🏷️ Custom Tracking: Add custom IDs and metadata to your logs
Installation
pip install langchain-callback-parquet-logger
Quick Start
from langchain_callback_parquet_logger import ParquetLogger
from langchain_openai import ChatOpenAI
# Just add the logger - works with ANY LangChain LLM
llm = ChatOpenAI(model="gpt-4")
llm.callbacks = [ParquetLogger("./logs")]
# Use the LLM normally
response = llm.invoke("What is 2+2?")
Usage in Notebooks (Jupyter, Hex, Colab)
⚠️ Important: In notebook environments, the default buffer size of 100 means logs only write to disk after 100 LLM calls. For immediate writes, use one of these approaches:
Option 1: Context Manager (Recommended)
from langchain_callback_parquet_logger import ParquetLogger
from langchain_openai import ChatOpenAI
# Using context manager ensures logs are written when the block exits
with ParquetLogger('./logs') as logger:
llm = ChatOpenAI(model="gpt-4")
llm.callbacks = [logger]
response = llm.invoke("What is 2+2?")
# Logs are automatically flushed here
Option 2: Small Buffer Size
# Set buffer_size=1 to write after every LLM call
logger = ParquetLogger('./logs', buffer_size=1)
llm = ChatOpenAI(model="gpt-4", callbacks=[logger])
response = llm.invoke("What is 2+2?")
Option 3: Manual Flush
logger = ParquetLogger('./logs')
llm = ChatOpenAI(model="gpt-4", callbacks=[logger])
response = llm.invoke("What is 2+2?")
logger.flush() # Manually write logs to disk
Configuration
Parameters
log_dir(str, default: "./llm_logs"): Directory for log filesbuffer_size(int, default: 100): Number of entries before auto-flushprovider(str, default: "openai"): LLM provider name for trackingmetadata(dict, optional): Logger-level metadata included in all log entries
Log Structure
Logs are saved as Parquet files with the following schema:
| Column | Type | Description |
|---|---|---|
| timestamp | timestamp[us, tz=UTC] | Event timestamp |
| run_id | string | Unique run identifier |
| logger_custom_id | string | Optional custom ID for request tracking |
| event_type | string | Event type (llm_start, llm_end, llm_error) |
| provider | string | LLM provider name |
| logger_metadata | string | JSON-encoded logger-level metadata |
| payload | string | JSON-encoded event data |
File Organization
llm_logs/
├── date=2024-01-15/
│ ├── logs_143022_123456.parquet
│ └── logs_150331_789012.parquet
└── date=2024-01-16/
└── logs_090122_345678.parquet
Reading Logs
With Pandas
import pandas as pd
import json
# Read all parquet files in the log directory
df = pd.read_parquet("./logs")
# Parse JSON payloads
df['data'] = df['payload'].apply(json.loads)
# Analyze completions
completions = df[df['event_type'] == 'llm_end']
for _, row in completions.iterrows():
data = row['data']
if 'usage' in data:
print(f"Run {row['run_id'][:8]}: {data['usage'].get('total_tokens', 0)} tokens")
# Get error rate
error_rate = len(df[df['event_type'] == 'llm_error']) / len(df) * 100
print(f"Error rate: {error_rate:.2f}%")
With DuckDB
import duckdb
import json
# Connect to DuckDB and query parquet files directly
conn = duckdb.connect()
# Read all logs
df = conn.execute("""
SELECT * FROM read_parquet('./logs/**/*.parquet')
ORDER BY timestamp DESC
""").df()
# Analyze by provider and event type
summary = conn.execute("""
SELECT
provider,
event_type,
COUNT(*) as count,
DATE(timestamp) as date
FROM read_parquet('./logs/**/*.parquet')
GROUP BY provider, event_type, DATE(timestamp)
ORDER BY date DESC, provider
""").df()
print(summary)
# Query using the new custom ID field
custom_requests = conn.execute("""
SELECT
logger_custom_id,
event_type,
json_extract_string(payload, '$.usage.total_tokens') as tokens
FROM read_parquet('./logs/**/*.parquet')
WHERE logger_custom_id != ''
ORDER BY timestamp
""").df()
print(f"Found {len(custom_requests)} requests with custom IDs")
# Extract specific fields from JSON payload
detailed = conn.execute("""
SELECT
timestamp,
run_id,
json_extract_string(payload, '$.model') as model,
json_extract_string(payload, '$.usage.total_tokens') as tokens
FROM read_parquet('./logs/**/*.parquet')
WHERE event_type = 'llm_end'
""").df()
print(f"Total tokens used: {detailed['tokens'].astype(float).sum()}")
With PyArrow
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import json
# Read using dataset API for better performance with partitioned data
dataset = ds.dataset("./logs", format="parquet", partitioning="hive")
# Convert to table with filters
table = dataset.to_table(filter=(ds.field("event_type") == "llm_end"))
df = table.to_pandas()
# Parse and analyze
for _, row in df.iterrows():
payload = json.loads(row['payload'])
print(f"Model: {payload.get('model_name', 'unknown')}")
print(f"Tokens: {payload.get('usage', {}).get('total_tokens', 0)}")
Metadata and Custom IDs
Logger-Level Metadata
Add metadata that's included with every log entry:
logger = ParquetLogger(
log_dir="./logs",
metadata={
"environment": "production",
"service": "api-gateway",
"version": "2.1.0"
}
)
Request-Level Custom IDs
Track specific requests with custom IDs:
response = llm.invoke(
"What is quantum computing?",
metadata={"logger_custom_id": "user-123-session-456-req-789"}
)
Checking Version
import langchain_callback_parquet_logger
print(langchain_callback_parquet_logger.__version__)
Context Manager Usage
with ParquetLogger(log_dir="./logs") as logger:
llm = ChatOpenAI(callbacks=[logger])
llm.invoke("Process this message")
# Buffer automatically flushed on exit
Advanced Usage
Custom Buffer Size for Batch Processing
# Large buffer for batch processing
logger = ParquetLogger(
log_dir="./batch_logs",
buffer_size=1000, # Flush every 1000 entries
provider="anthropic"
)
Multiple Providers
# Track different providers separately
openai_logger = ParquetLogger(log_dir="./logs", provider="openai")
anthropic_logger = ParquetLogger(log_dir="./logs", provider="anthropic")
openai_llm = ChatOpenAI(callbacks=[openai_logger])
anthropic_llm = ChatAnthropic(callbacks=[anthropic_logger])
Examples
Check out the examples/ directory for complete working examples:
basic_usage.py- Simple example showing fundamental logging capabilitiesbatch_processing.py- Advanced example with async batch processing, web search, and structured outputs
Run examples:
# Basic usage
python examples/basic_usage.py
# Batch processing with web search
python examples/batch_processing.py
Performance Considerations
- Buffer Size: Larger buffers reduce I/O but use more memory
- Compression: Uses Snappy compression by default for balance of speed/size
- Partitioning: Daily partitions enable efficient querying and cleanup
- Thread Safety: Safe for concurrent use without performance penalty
Development
Install from source
git clone https://github.com/turbo3136/langchain-callback-parquet-logger.git
cd langchain-callback-parquet-logger
pip install -e .
Running Tests
# No tests available yet
# Contributions welcome!
License
MIT License - see LICENSE file for details
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues and questions, please use the GitHub issues page.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langchain_callback_parquet_logger-0.1.4.tar.gz.
File metadata
- Download URL: langchain_callback_parquet_logger-0.1.4.tar.gz
- Upload date:
- Size: 8.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
62076107ab030a3dfa7906431d2bf37f2c5337a982b7edccad8e14894afe31fe
|
|
| MD5 |
bb9a004817a31a61c22e0599eebdfb3b
|
|
| BLAKE2b-256 |
7f66069cc86c814a18f1a199fbe25ac5c4fbbceece9f5230dade858155edc764
|
File details
Details for the file langchain_callback_parquet_logger-0.1.4-py3-none-any.whl.
File metadata
- Download URL: langchain_callback_parquet_logger-0.1.4-py3-none-any.whl
- Upload date:
- Size: 9.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d875653d3f371994904584d0eb6778da039a4586c380b4d2eae6fb1ff4911c9a
|
|
| MD5 |
4b5650dc6447e4db8ea22bacb9d03c0b
|
|
| BLAKE2b-256 |
295e710292b18d160702b52c63c4b3c543761d780efbddda0e09afc186204275
|