Skip to main content

A Parquet-based callback handler for logging LangChain LLM interactions

Project description

LangChain Parquet Logger

High-performance logging for LangChain - save all your LLM interactions to Parquet files for analysis.

Quick Start (2 minutes)

Install

pip install langchain-callback-parquet-logger

# With S3 support
pip install "langchain-callback-parquet-logger[s3]"

Basic Usage

from langchain_callback_parquet_logger import ParquetLogger
from langchain_openai import ChatOpenAI

# Add logger to any LangChain LLM
logger = ParquetLogger("./logs")
llm = ChatOpenAI(callbacks=[logger])

response = llm.invoke("What is 2+2?")
# Your logs are automatically saved to ./logs/

Batch Processing

import pandas as pd
from langchain_callback_parquet_logger import batch_process

# Your data
df = pd.DataFrame({
    'prompt': ['What is AI?', 'Explain quantum computing']
})

# Process it (logs automatically saved)
results = await batch_process(df)

That's it! Your logs are in Parquet format, ready for analysis.

Core Features

1. Custom Tracking IDs

Track specific requests with custom IDs and descriptions:

from langchain_callback_parquet_logger import ParquetLogger, with_tags

logger = ParquetLogger("./logs")
llm = ChatOpenAI(callbacks=[logger])

# Add custom ID with description to track this specific request
response = llm.invoke(
    "What is quantum computing?",
    config=with_tags(
        custom_id="user-123-session-456",
        custom_id_description="User session from mobile app"
    )
)

2. Batch Processing (Simple)

import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_callback_parquet_logger import batch_process, with_tags, LLMConfig

# Prepare your data
df = pd.DataFrame({
    'prompt': ['What is AI?', 'Explain DNA'],
    'config': [
        with_tags(custom_id='q1', custom_id_description='Science FAQ'),
        with_tags(custom_id='q2', custom_id_description='Science FAQ')
    ]
})

# Process with automatic logging
results = await batch_process(
    df,
    llm_config=LLMConfig(
        llm_class=ChatOpenAI,
        llm_kwargs={'model': 'gpt-4', 'temperature': 0.7}
    )
)

3. Batch Processing (Full Configuration)

import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_callback_parquet_logger import (
    batch_process,
    with_tags,
    LLMConfig,
    JobConfig,
    StorageConfig,
    ProcessingConfig,
    ColumnConfig,
    S3Config
)

# Prepare your data with custom column names
df = pd.DataFrame({
    'question': ['What is AI?', 'Explain DNA', 'What is quantum computing?'],
    'user_id': ['user1', 'user2', 'user3'],
    'tool_list': [[tool1, tool2], None, [tool3]]  # Optional tools
})

# Add config for each row (required)
df['run_config'] = df['user_id'].apply(lambda x: with_tags(
    custom_id=x,
    tags=['production', 'v2']
))

# Process with ALL configuration options
results = await batch_process(
    df,
    # LLM configuration
    llm_config=LLMConfig(
        llm_class=ChatOpenAI,
        llm_kwargs={'model': 'gpt-4', 'temperature': 0.7},
        model_kwargs={'top_p': 0.9},  # Additional model parameters
        structured_output=None  # or Pydantic model for structured responses
    ),

    # Job metadata configuration (all fields except category are optional)
    job_config=JobConfig(
        category="research",
        subcategory="science",  # Optional, defaults to None
        description="Analyzing scientific questions",  # Optional
        version="2.0.0",  # Optional
        environment="production",  # Optional
        metadata={"team": "data-science", "priority": "high"}  # Optional
    ),

    # Storage configuration
    storage_config=StorageConfig(
        output_dir="./batch_logs",
        path_template="{job_category}/{date}/{job_subcategory}/v{job_version_safe}",  # Custom path structure with version
        s3_config=S3Config(
            bucket="my-llm-logs",
            prefix="langchain-logs/",
            on_failure="continue",  # or "error" to fail on S3 errors
            retry_attempts=3
        )
    ),

    # Processing configuration
    processing_config=ProcessingConfig(
        max_concurrency=100,  # Parallel requests
        buffer_size=1000,  # Logger buffer size
        show_progress=True,  # Progress bar with real-time updates
        return_exceptions=True,  # Don't fail on single errors
        return_results=True,  # Set False for huge datasets to save memory
        event_types=['llm_start', 'llm_end', 'llm_error'],  # Events to log
        partition_on="date"  # Partition strategy
    ),

    # Column name configuration (if not using defaults)
    column_config=ColumnConfig(
        prompt="question",  # Your prompt column name
        config="run_config",  # Your config column name
        tools="tool_list"  # Your tools column name (optional)
    )
)

# Results are returned AND saved to Parquet files
df['answer'] = results

4. S3 Upload

For production and cloud environments:

from langchain_callback_parquet_logger import ParquetLogger, S3Config

logger = ParquetLogger(
    log_dir="./logs",
    s3_config=S3Config(
        bucket="my-llm-logs",
        prefix="production/",
        on_failure="error"  # Fail fast in production
    )
)

5. Event Type Selection

Choose what events to log:

# Default: Only LLM events
logger = ParquetLogger("./logs")

# Log everything
logger = ParquetLogger(
    "./logs",
    event_types=['llm_start', 'llm_end', 'llm_error',
                 'chain_start', 'chain_end', 'chain_error',
                 'tool_start', 'tool_end', 'tool_error']
)

Reading Your Logs

import pandas as pd
import json

# Read all logs
df = pd.read_parquet("./logs")

# Parse the payload
df['data'] = df['payload'].apply(json.loads)

# Analyze token usage
df['tokens'] = df['data'].apply(lambda x: x.get('data', {}).get('outputs', {}).get('usage', {}).get('total_tokens'))

v2.0 Breaking Changes

If upgrading from v1.x:

Old (v1.x)

logger = ParquetLogger(
    log_dir="./logs",
    s3_bucket="my-bucket",
    s3_prefix="logs/",
    s3_on_failure="error"
)

New (v2.0)

from langchain_callback_parquet_logger import ParquetLogger, S3Config

logger = ParquetLogger(
    log_dir="./logs",
    s3_config=S3Config(
        bucket="my-bucket",
        prefix="logs/",
        on_failure="error"
    )
)

batch_process changes:

  • Now uses LLMConfig dataclass for LLM configuration
  • Dataclass configs replace multiple parameters
  • Column renamed from logger_custom_id to custom_id
  • See batch processing examples above

Old batch_process (v1.x)

await batch_process(
    df,
    llm=llm_instance,  # or llm_class with llm_kwargs
    structured_output=MyModel
)

New batch_process (v2.0)

await batch_process(
    df,
    llm_config=LLMConfig(
        llm_class=ChatOpenAI,
        llm_kwargs={'model': 'gpt-4'},
        model_kwargs={'top_p': 0.9},  # Additional API params
        structured_output=MyModel
    )
)

## Configuration Classes

### ParquetLogger
- `log_dir`: Where to save logs (default: "./llm_logs")
- `buffer_size`: Entries before auto-flush (default: 100)
- `s3_config`: Optional S3Config for uploads

### LLMConfig
- `llm_class`: The LangChain LLM class to instantiate (e.g., ChatOpenAI)
- `llm_kwargs`: Arguments for the LLM constructor (model, temperature, etc.)
- `model_kwargs`: Additional API parameters (top_p, frequency_penalty, etc.)
- `structured_output`: Optional Pydantic model for structured responses

### JobConfig
- `category`: Job category (required, default: "batch_processing")
- `subcategory`: Job subcategory (optional, default: None)
- `version`: Version string (optional, default: None)
- `environment`: Environment name (optional, default: None)
- `description`: Job description (optional, default: None)
- `metadata`: Additional metadata dict (optional, default: None)

### StorageConfig
- `output_dir`: Local directory (default: "./batch_logs")
- `path_template`: Path template for organizing files (default: "{job_category}/{job_subcategory}/v{job_version_safe}")
  - Available variables: `job_category`, `job_subcategory`, `job_version` (original), `job_version_safe` (dots replaced with underscores), `environment`, `date`
  - Example paths: `ml_training/image_classification/v2_1_0/` or `research/nlp/vunversioned/` (when no version specified)
- `s3_config`: Optional S3Config for uploads

### S3Config
- `bucket`: S3 bucket name
- `prefix`: S3 prefix/folder (default: "langchain-logs/")
- `on_failure`: "error" or "continue" (default: "error")

## Advanced Usage

### Low-Level Batch Processing

If you need direct control over logging:

```python
from langchain_callback_parquet_logger import batch_run, ParquetLogger

# Setup your own logging
with ParquetLogger('./logs') as logger:
    llm = ChatOpenAI(callbacks=[logger])

    # Use low-level batch_run
    results = await batch_run(df, llm, max_concurrency=100)

Context Manager (Notebooks)

For Jupyter notebooks, use context manager for immediate writes:

with ParquetLogger('./logs', buffer_size=1) as logger:
    llm = ChatOpenAI(callbacks=[logger])
    response = llm.invoke("Hello!")
# Logs are guaranteed to be written

Log Schema

Column Type Description
timestamp timestamp Event time (UTC)
run_id string Unique run ID
parent_run_id string Parent run ID for nested calls
custom_id string Your custom tracking ID
event_type string Event type (llm_start, llm_end, etc.)
logger_metadata string JSON metadata
payload string Full event data as JSON

Payload Structure

All events use a consistent JSON structure in the payload column:

{
    "event_type": "llm_end",
    "timestamp": "2025-09-18T10:30:00Z",
    "execution": {
        "run_id": "uuid-here",
        "parent_run_id": "",
        "custom_id": "user-123"
    },
    "data": {
        "prompts": ["..."],
        "llm_type": "openai-chat",  // LangChain's native LLM type
        "response": {"content": "..."},
        "usage": {"total_tokens": 100}
    },
    "raw": {
        // Complete dump of all callback arguments
        // Includes all kwargs plus positional args (serialized when possible)
        "response": {"generations": [...], "llm_output": {...}},
        "run_id": "uuid-here",
        "parent_run_id": "",
        // ... all other arguments passed to the callback
    }
}

Installation Options

# Basic
pip install langchain-callback-parquet-logger

# With S3 support
pip install "langchain-callback-parquet-logger[s3]"

# With background retrieval support (OpenAI)
pip install "langchain-callback-parquet-logger[background]"

# Everything
pip install "langchain-callback-parquet-logger[s3,background]"

License

MIT

Contributing

Pull requests welcome! Keep it simple.

Support

GitHub Issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langchain_callback_parquet_logger-3.1.1.tar.gz (44.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file langchain_callback_parquet_logger-3.1.1.tar.gz.

File metadata

File hashes

Hashes for langchain_callback_parquet_logger-3.1.1.tar.gz
Algorithm Hash digest
SHA256 5204971dd707de6ad16d1799e9a39e78de4fb420c0dcda42cc67324c3aed29af
MD5 5cd2cb8913fc7d7ef1aad56018cb5938
BLAKE2b-256 de145a31f1e678ba87e6ce3504158a512d816a045e7278e59c4011e6e748010f

See more details on using hashes here.

File details

Details for the file langchain_callback_parquet_logger-3.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for langchain_callback_parquet_logger-3.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 24a2818abbd67d51199387a93f27fd4599419213391e07dc6b605866142ad034
MD5 f73638e205e694552e6cb6b3bd91b51d
BLAKE2b-256 af782027d5b7953f8c1abef95ed148179e95a4fcaec6b22de26d8aabc9138aa7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page