A Parquet-based callback handler for logging LangChain LLM interactions
Project description
LangChain Callback Parquet Logger
A high-performance callback handler for logging LangChain LLM interactions to Parquet files. This package provides efficient, structured logging with automatic partitioning and buffering for production use.
Features
- 📊 Parquet Format: Efficient columnar storage for analytics
- 🚀 Buffered Writing: Configurable buffer size for optimal performance
- 📅 Daily Partitioning: Automatic date-based file organization
- 🔄 Thread-Safe: Safe for concurrent LLM calls
- 📦 Flexible Schema: JSON payload for extensible logging
- 🔒 Automatic Cleanup: Ensures buffers flush on exit
Installation
pip install langchain-callback-parquet-logger
Quick Start
from langchain_callback_parquet_logger import ParquetLogger
from langchain_openai import ChatOpenAI
# Just add the logger - works with ANY LangChain LLM
llm = ChatOpenAI(model="gpt-4")
llm.callbacks = [ParquetLogger("./logs")]
# Use the LLM normally
response = llm.invoke("What is 2+2?")
Configuration
Parameters
log_dir(str, default: "./llm_logs"): Directory for log filesbuffer_size(int, default: 100): Number of entries before auto-flushprovider(str, default: "openai"): LLM provider name for tracking
Log Structure
Logs are saved as Parquet files with the following schema:
| Column | Type | Description |
|---|---|---|
| timestamp | timestamp[us, tz=UTC] | Event timestamp |
| run_id | string | Unique run identifier |
| event_type | string | Event type (llm_start, llm_end, llm_error) |
| provider | string | LLM provider name |
| payload | string | JSON-encoded event data |
File Organization
llm_logs/
├── date=2024-01-15/
│ ├── logs_143022_123456.parquet
│ └── logs_150331_789012.parquet
└── date=2024-01-16/
└── logs_090122_345678.parquet
Reading Logs
With Pandas
import pandas as pd
import json
# Read all parquet files in the log directory
df = pd.read_parquet("./logs")
# Parse JSON payloads
df['data'] = df['payload'].apply(json.loads)
# Analyze completions
completions = df[df['event_type'] == 'llm_end']
for _, row in completions.iterrows():
data = row['data']
if 'usage' in data:
print(f"Run {row['run_id'][:8]}: {data['usage'].get('total_tokens', 0)} tokens")
# Get error rate
error_rate = len(df[df['event_type'] == 'llm_error']) / len(df) * 100
print(f"Error rate: {error_rate:.2f}%")
With DuckDB
import duckdb
import json
# Connect to DuckDB and query parquet files directly
conn = duckdb.connect()
# Read all logs
df = conn.execute("""
SELECT * FROM read_parquet('./logs/**/*.parquet')
ORDER BY timestamp DESC
""").df()
# Analyze by provider and event type
summary = conn.execute("""
SELECT
provider,
event_type,
COUNT(*) as count,
DATE(timestamp) as date
FROM read_parquet('./logs/**/*.parquet')
GROUP BY provider, event_type, DATE(timestamp)
ORDER BY date DESC, provider
""").df()
print(summary)
# Extract specific fields from JSON payload
detailed = conn.execute("""
SELECT
timestamp,
run_id,
json_extract_string(payload, '$.model') as model,
json_extract_string(payload, '$.usage.total_tokens') as tokens
FROM read_parquet('./logs/**/*.parquet')
WHERE event_type = 'llm_end'
""").df()
print(f"Total tokens used: {detailed['tokens'].astype(float).sum()}")
With PyArrow
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import json
# Read using dataset API for better performance with partitioned data
dataset = ds.dataset("./logs", format="parquet", partitioning="hive")
# Convert to table with filters
table = dataset.to_table(filter=(ds.field("event_type") == "llm_end"))
df = table.to_pandas()
# Parse and analyze
for _, row in df.iterrows():
payload = json.loads(row['payload'])
print(f"Model: {payload.get('model_name', 'unknown')}")
print(f"Tokens: {payload.get('usage', {}).get('total_tokens', 0)}")
Context Manager Usage
with ParquetLogger(log_dir="./logs") as logger:
llm = ChatOpenAI(callbacks=[logger])
llm.invoke("Process this message")
# Buffer automatically flushed on exit
Advanced Usage
Custom Buffer Size for Batch Processing
# Large buffer for batch processing
logger = ParquetLogger(
log_dir="./batch_logs",
buffer_size=1000, # Flush every 1000 entries
provider="anthropic"
)
Multiple Providers
# Track different providers separately
openai_logger = ParquetLogger(log_dir="./logs", provider="openai")
anthropic_logger = ParquetLogger(log_dir="./logs", provider="anthropic")
openai_llm = ChatOpenAI(callbacks=[openai_logger])
anthropic_llm = ChatAnthropic(callbacks=[anthropic_logger])
Examples
Check out the examples/ directory for complete working examples:
basic_usage.py- Simple example showing fundamental logging capabilitiesbatch_processing.py- Advanced example with async batch processing, web search, and structured outputs
Run examples:
# Basic usage
python examples/basic_usage.py
# Batch processing with web search
python examples/batch_processing.py
Performance Considerations
- Buffer Size: Larger buffers reduce I/O but use more memory
- Compression: Uses Snappy compression by default for balance of speed/size
- Partitioning: Daily partitions enable efficient querying and cleanup
- Thread Safety: Safe for concurrent use without performance penalty
Development
Install from source
git clone https://github.com/turbo3136/langchain-callback-parquet-logger.git
cd langchain-callback-parquet-logger
pip install -e .
Running Tests
# No tests available yet
# Contributions welcome!
License
MIT License - see LICENSE file for details
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Support
For issues and questions, please use the GitHub issues page.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langchain_callback_parquet_logger-0.1.0.tar.gz.
File metadata
- Download URL: langchain_callback_parquet_logger-0.1.0.tar.gz
- Upload date:
- Size: 7.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e2e3bc86ab7223aebc579f1dd41a95b3e696aae416a21d1ed4f0d95e80c9af69
|
|
| MD5 |
3bcc82f40f05e40894e21f99846ab0eb
|
|
| BLAKE2b-256 |
9feb0e3c93762b5c196bd10127409ce04f9ae1d4a716713a0e97ccd98b1dbd48
|
File details
Details for the file langchain_callback_parquet_logger-0.1.0-py3-none-any.whl.
File metadata
- Download URL: langchain_callback_parquet_logger-0.1.0-py3-none-any.whl
- Upload date:
- Size: 7.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7bc47c721a3f1bde14e83241fc5a0ce34238ff519966565b347d4b34b81680e9
|
|
| MD5 |
8163d8253a15cd6e490c11cd989967fc
|
|
| BLAKE2b-256 |
9c27e8dd33c60e1a7e61899bd04bb15b568f323338b8977cfb841b51f02b0759
|