A Parquet-based callback handler for logging LangChain LLM interactions
Project description
LangChain Parquet Logger
High-performance logging for LangChain - save all your LLM interactions to Parquet files for analysis.
Quick Start (2 minutes)
Install
pip install langchain-callback-parquet-logger
# With S3 support
pip install "langchain-callback-parquet-logger[s3]"
Basic Usage
from langchain_callback_parquet_logger import ParquetLogger
from langchain_openai import ChatOpenAI
# Add logger to any LangChain LLM
logger = ParquetLogger("./logs")
llm = ChatOpenAI(callbacks=[logger])
response = llm.invoke("What is 2+2?")
# Your logs are automatically saved to ./logs/
Batch Processing
import pandas as pd
from langchain_callback_parquet_logger import batch_process
# Your data
df = pd.DataFrame({
'prompt': ['What is AI?', 'Explain quantum computing']
})
# Process it (logs automatically saved)
results = await batch_process(df)
That's it! Your logs are in Parquet format, ready for analysis.
Core Features
1. Custom Tracking IDs
Track specific requests with custom IDs and descriptions:
from langchain_callback_parquet_logger import ParquetLogger, with_tags
logger = ParquetLogger("./logs")
llm = ChatOpenAI(callbacks=[logger])
# Add custom ID with description to track this specific request
response = llm.invoke(
"What is quantum computing?",
config=with_tags(
custom_id="user-123-session-456",
custom_id_description="User session from mobile app"
)
)
2. Batch Processing (Simple)
import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_callback_parquet_logger import batch_process, with_tags, LLMConfig
# Prepare your data
df = pd.DataFrame({
'prompt': ['What is AI?', 'Explain DNA'],
'config': [
with_tags(custom_id='q1', custom_id_description='Science FAQ'),
with_tags(custom_id='q2', custom_id_description='Science FAQ')
]
})
# Process with automatic logging
results = await batch_process(
df,
llm_config=LLMConfig(
llm_class=ChatOpenAI,
llm_kwargs={'model': 'gpt-4', 'temperature': 0.7}
)
)
3. Batch Processing (Full Configuration)
import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_callback_parquet_logger import (
batch_process,
with_tags,
LLMConfig,
JobConfig,
StorageConfig,
ProcessingConfig,
ColumnConfig,
S3Config
)
# Prepare your data with custom column names
df = pd.DataFrame({
'question': ['What is AI?', 'Explain DNA', 'What is quantum computing?'],
'user_id': ['user1', 'user2', 'user3'],
'tool_list': [[tool1, tool2], None, [tool3]] # Optional tools
})
# Add config for each row (required)
df['run_config'] = df['user_id'].apply(lambda x: with_tags(
custom_id=x,
tags=['production', 'v2']
))
# Process with ALL configuration options
results = await batch_process(
df,
# LLM configuration
llm_config=LLMConfig(
llm_class=ChatOpenAI,
llm_kwargs={'model': 'gpt-4', 'temperature': 0.7},
model_kwargs={'top_p': 0.9}, # Additional model parameters
structured_output=None # or Pydantic model for structured responses
),
# Job metadata configuration (all fields except category are optional)
job_config=JobConfig(
category="research",
subcategory="science", # Optional, defaults to None
description="Analyzing scientific questions", # Optional
version="2.0.0", # Optional
environment="production", # Optional
metadata={"team": "data-science", "priority": "high"} # Optional
),
# Storage configuration
storage_config=StorageConfig(
output_dir="./batch_logs",
path_template="{job_category}/{date}/{job_subcategory}/v{job_version_safe}", # Custom path structure with version
s3_config=S3Config(
bucket="my-llm-logs",
prefix="langchain-logs/",
on_failure="continue", # or "error" to fail on S3 errors
retry_attempts=3
)
),
# Processing configuration
processing_config=ProcessingConfig(
max_concurrency=100, # Parallel requests
buffer_size=1000, # Logger buffer size
show_progress=True, # Progress bar with real-time updates
return_exceptions=True, # Don't fail on single errors
return_results=True, # Set False for huge datasets to save memory
event_types=['llm_start', 'llm_end', 'llm_error'], # Events to log
partition_on="date" # Partition strategy
),
# Column name configuration (if not using defaults)
column_config=ColumnConfig(
prompt="question", # Your prompt column name
config="run_config", # Your config column name
tools="tool_list" # Your tools column name (optional)
)
)
# Results are returned AND saved to Parquet files
df['answer'] = results
4. S3 Upload
For production and cloud environments:
from langchain_callback_parquet_logger import ParquetLogger, S3Config
logger = ParquetLogger(
log_dir="./logs",
s3_config=S3Config(
bucket="my-llm-logs",
prefix="production/",
on_failure="error" # Fail fast in production
)
)
5. Event Type Selection
Choose what events to log:
# Default: Only LLM events
logger = ParquetLogger("./logs")
# Log everything
logger = ParquetLogger(
"./logs",
event_types=['llm_start', 'llm_end', 'llm_error',
'chain_start', 'chain_end', 'chain_error',
'tool_start', 'tool_end', 'tool_error']
)
Reading Your Logs
import pandas as pd
import json
# Read all logs
df = pd.read_parquet("./logs")
# Parse the payload
df['data'] = df['payload'].apply(json.loads)
# Analyze token usage
df['tokens'] = df['data'].apply(lambda x: x.get('data', {}).get('outputs', {}).get('usage', {}).get('total_tokens'))
v2.0 Breaking Changes
If upgrading from v1.x:
Old (v1.x)
logger = ParquetLogger(
log_dir="./logs",
s3_bucket="my-bucket",
s3_prefix="logs/",
s3_on_failure="error"
)
New (v2.0)
from langchain_callback_parquet_logger import ParquetLogger, S3Config
logger = ParquetLogger(
log_dir="./logs",
s3_config=S3Config(
bucket="my-bucket",
prefix="logs/",
on_failure="error"
)
)
batch_process changes:
- Now uses LLMConfig dataclass for LLM configuration
- Dataclass configs replace multiple parameters
- Column renamed from
logger_custom_idtocustom_id - See batch processing examples above
Old batch_process (v1.x)
await batch_process(
df,
llm=llm_instance, # or llm_class with llm_kwargs
structured_output=MyModel
)
New batch_process (v2.0)
await batch_process(
df,
llm_config=LLMConfig(
llm_class=ChatOpenAI,
llm_kwargs={'model': 'gpt-4'},
model_kwargs={'top_p': 0.9}, # Additional API params
structured_output=MyModel
)
)
## Configuration Classes
### ParquetLogger
- `log_dir`: Where to save logs (default: "./llm_logs")
- `buffer_size`: Entries before auto-flush (default: 100)
- `s3_config`: Optional S3Config for uploads
### LLMConfig
- `llm_class`: The LangChain LLM class to instantiate (e.g., ChatOpenAI)
- `llm_kwargs`: Arguments for the LLM constructor (model, temperature, etc.)
- `model_kwargs`: Additional API parameters (top_p, frequency_penalty, etc.)
- `structured_output`: Optional Pydantic model for structured responses
### JobConfig
- `category`: Job category (required, default: "batch_processing")
- `subcategory`: Job subcategory (optional, default: None)
- `version`: Version string (optional, default: None)
- `environment`: Environment name (optional, default: None)
- `description`: Job description (optional, default: None)
- `metadata`: Additional metadata dict (optional, default: None)
### StorageConfig
- `output_dir`: Local directory (default: "./batch_logs")
- `path_template`: Path template for organizing files (default: "{job_category}/{job_subcategory}/v{job_version_safe}")
- Available variables: `job_category`, `job_subcategory`, `job_version` (original), `job_version_safe` (dots replaced with underscores), `environment`, `date`
- Example paths: `ml_training/image_classification/v2_1_0/` or `research/nlp/vunversioned/` (when no version specified)
- `s3_config`: Optional S3Config for uploads
### S3Config
- `bucket`: S3 bucket name
- `prefix`: S3 prefix/folder (default: "langchain-logs/")
- `on_failure`: "error" or "continue" (default: "error")
## Advanced Usage
### Low-Level Batch Processing
If you need direct control over logging:
```python
from langchain_callback_parquet_logger import batch_run, ParquetLogger
# Setup your own logging
with ParquetLogger('./logs') as logger:
llm = ChatOpenAI(callbacks=[logger])
# Use low-level batch_run
results = await batch_run(df, llm, max_concurrency=100)
Context Manager (Notebooks)
For Jupyter notebooks, use context manager for immediate writes:
with ParquetLogger('./logs', buffer_size=1) as logger:
llm = ChatOpenAI(callbacks=[logger])
response = llm.invoke("Hello!")
# Logs are guaranteed to be written
Log Schema
| Column | Type | Description |
|---|---|---|
timestamp |
timestamp | Event time (UTC) |
run_id |
string | Unique run ID |
parent_run_id |
string | Parent run ID for nested calls |
custom_id |
string | Your custom tracking ID |
event_type |
string | Event type (llm_start, llm_end, etc.) |
logger_metadata |
string | JSON metadata |
payload |
string | Full event data as JSON |
Payload Structure
All events use a consistent JSON structure in the payload column:
{
"event_type": "llm_end",
"timestamp": "2025-09-18T10:30:00Z",
"execution": {
"run_id": "uuid-here",
"parent_run_id": "",
"custom_id": "user-123"
},
"data": {
"prompts": ["..."],
"llm_type": "openai-chat", // LangChain's native LLM type
"response": {"content": "..."},
"usage": {"total_tokens": 100}
},
"raw": {
// Complete dump of all callback arguments
// Includes all kwargs plus positional args (serialized when possible)
"response": {"generations": [...], "llm_output": {...}},
"run_id": "uuid-here",
"parent_run_id": "",
// ... all other arguments passed to the callback
}
}
Installation Options
# Basic
pip install langchain-callback-parquet-logger
# With S3 support
pip install "langchain-callback-parquet-logger[s3]"
# With background retrieval support (OpenAI)
pip install "langchain-callback-parquet-logger[background]"
# Everything
pip install "langchain-callback-parquet-logger[s3,background]"
License
MIT
Contributing
Pull requests welcome! Keep it simple.
Support
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file langchain_callback_parquet_logger-3.3.0a2.tar.gz.
File metadata
- Download URL: langchain_callback_parquet_logger-3.3.0a2.tar.gz
- Upload date:
- Size: 55.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2bf10d7833d06e35de4268e25bc652301d24a88e5a754d29a9079765e69f7734
|
|
| MD5 |
3d70e869ee4ca6ef0f22b5390adba466
|
|
| BLAKE2b-256 |
704ecc265a101222dc7d0f138e0a8cc5f71bf4f332f1da748c281b8087a10649
|
File details
Details for the file langchain_callback_parquet_logger-3.3.0a2-py3-none-any.whl.
File metadata
- Download URL: langchain_callback_parquet_logger-3.3.0a2-py3-none-any.whl
- Upload date:
- Size: 32.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
411c2714727a676c3c79862986f2a210c52b4234e8879204cbff100202cdbb05
|
|
| MD5 |
a73911639f8071f2db7384a65228f433
|
|
| BLAKE2b-256 |
5bc3051826b1221694df9f23d70bde051cd05df9f0fd55c3a5439b351b04e726
|