Skip to main content

A Parquet-based callback handler for logging LangChain LLM interactions

Project description

LangChain Parquet Logger

High-performance logging for LangChain - save all your LLM interactions to Parquet files for analysis.

Quick Start (2 minutes)

Install

pip install langchain-callback-parquet-logger

# With S3 support
pip install "langchain-callback-parquet-logger[s3]"

Basic Usage

from langchain_callback_parquet_logger import ParquetLogger
from langchain_openai import ChatOpenAI

# Add logger to any LangChain LLM
logger = ParquetLogger("./logs")
llm = ChatOpenAI(callbacks=[logger])

response = llm.invoke("What is 2+2?")
# Your logs are automatically saved to ./logs/

Batch Processing

import pandas as pd
from langchain_callback_parquet_logger import batch_process

# Your data
df = pd.DataFrame({
    'prompt': ['What is AI?', 'Explain quantum computing']
})

# Process it (logs automatically saved)
results = await batch_process(df)

That's it! Your logs are in Parquet format, ready for analysis.

Core Features

1. Custom Tracking IDs

Track specific requests with custom IDs and descriptions:

from langchain_callback_parquet_logger import ParquetLogger, with_tags

logger = ParquetLogger("./logs")
llm = ChatOpenAI(callbacks=[logger])

# Add custom ID with description to track this specific request
response = llm.invoke(
    "What is quantum computing?",
    config=with_tags(
        custom_id="user-123-session-456",
        custom_id_description="User session from mobile app"
    )
)

2. Batch Processing (Simple)

import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_callback_parquet_logger import batch_process, with_tags, LLMConfig

# Prepare your data
df = pd.DataFrame({
    'prompt': ['What is AI?', 'Explain DNA'],
    'config': [
        with_tags(custom_id='q1', custom_id_description='Science FAQ'),
        with_tags(custom_id='q2', custom_id_description='Science FAQ')
    ]
})

# Process with automatic logging
results = await batch_process(
    df,
    llm_config=LLMConfig(
        llm_class=ChatOpenAI,
        llm_kwargs={'model': 'gpt-4', 'temperature': 0.7}
    )
)

3. Batch Processing (Full Configuration)

import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_callback_parquet_logger import (
    batch_process,
    with_tags,
    LLMConfig,
    JobConfig,
    StorageConfig,
    ProcessingConfig,
    ColumnConfig,
    S3Config
)

# Prepare your data with custom column names
df = pd.DataFrame({
    'question': ['What is AI?', 'Explain DNA', 'What is quantum computing?'],
    'user_id': ['user1', 'user2', 'user3'],
    'tool_list': [[tool1, tool2], None, [tool3]]  # Optional tools
})

# Add config for each row (required)
df['run_config'] = df['user_id'].apply(lambda x: with_tags(
    custom_id=x,
    tags=['production', 'v2']
))

# Process with ALL configuration options
results = await batch_process(
    df,
    # LLM configuration
    llm_config=LLMConfig(
        llm_class=ChatOpenAI,
        llm_kwargs={'model': 'gpt-4', 'temperature': 0.7},
        model_kwargs={'top_p': 0.9},  # Additional model parameters
        structured_output=None  # or Pydantic model for structured responses
    ),

    # Job metadata configuration (all fields except category are optional)
    job_config=JobConfig(
        category="research",
        subcategory="science",  # Optional, defaults to None
        description="Analyzing scientific questions",  # Optional
        version="2.0.0",  # Optional
        environment="production",  # Optional
        metadata={"team": "data-science", "priority": "high"}  # Optional
    ),

    # Storage configuration
    storage_config=StorageConfig(
        output_dir="./batch_logs",
        path_template="{job_category}/{date}/{job_subcategory}/v{job_version_safe}",  # Custom path structure with version
        s3_config=S3Config(
            bucket="my-llm-logs",
            prefix="langchain-logs/",
            on_failure="continue",  # or "error" to fail on S3 errors
            retry_attempts=3
        )
    ),

    # Processing configuration
    processing_config=ProcessingConfig(
        max_concurrency=100,  # Parallel requests
        buffer_size=1000,  # Logger buffer size
        show_progress=True,  # Progress bar with real-time updates
        return_exceptions=True,  # Don't fail on single errors
        return_results=True,  # Set False for huge datasets to save memory
        event_types=['llm_start', 'llm_end', 'llm_error'],  # Events to log
        partition_on="date"  # Partition strategy
    ),

    # Column name configuration (if not using defaults)
    column_config=ColumnConfig(
        prompt="question",  # Your prompt column name
        config="run_config",  # Your config column name
        tools="tool_list"  # Your tools column name (optional)
    )
)

# Results are returned AND saved to Parquet files
df['answer'] = results

4. S3 Upload

For production and cloud environments:

from langchain_callback_parquet_logger import ParquetLogger, S3Config

logger = ParquetLogger(
    log_dir="./logs",
    s3_config=S3Config(
        bucket="my-llm-logs",
        prefix="production/",
        on_failure="error"  # Fail fast in production
    )
)

5. Event Type Selection

Choose what events to log:

# Default: Only LLM events
logger = ParquetLogger("./logs")

# Log everything
logger = ParquetLogger(
    "./logs",
    event_types=['llm_start', 'llm_end', 'llm_error',
                 'chain_start', 'chain_end', 'chain_error',
                 'tool_start', 'tool_end', 'tool_error']
)

Reading Your Logs

import pandas as pd
import json

# Read all logs
df = pd.read_parquet("./logs")

# Parse the payload
df['data'] = df['payload'].apply(json.loads)

# Analyze token usage
df['tokens'] = df['data'].apply(lambda x: x.get('data', {}).get('outputs', {}).get('usage', {}).get('total_tokens'))

v2.0 Breaking Changes

If upgrading from v1.x:

Old (v1.x)

logger = ParquetLogger(
    log_dir="./logs",
    s3_bucket="my-bucket",
    s3_prefix="logs/",
    s3_on_failure="error"
)

New (v2.0)

from langchain_callback_parquet_logger import ParquetLogger, S3Config

logger = ParquetLogger(
    log_dir="./logs",
    s3_config=S3Config(
        bucket="my-bucket",
        prefix="logs/",
        on_failure="error"
    )
)

batch_process changes:

  • Now uses LLMConfig dataclass for LLM configuration
  • Dataclass configs replace multiple parameters
  • Column renamed from logger_custom_id to custom_id
  • See batch processing examples above

Old batch_process (v1.x)

await batch_process(
    df,
    llm=llm_instance,  # or llm_class with llm_kwargs
    structured_output=MyModel
)

New batch_process (v2.0)

await batch_process(
    df,
    llm_config=LLMConfig(
        llm_class=ChatOpenAI,
        llm_kwargs={'model': 'gpt-4'},
        model_kwargs={'top_p': 0.9},  # Additional API params
        structured_output=MyModel
    )
)

## Configuration Classes

### ParquetLogger
- `log_dir`: Where to save logs (default: "./llm_logs")
- `buffer_size`: Entries before auto-flush (default: 100)
- `s3_config`: Optional S3Config for uploads

### LLMConfig
- `llm_class`: The LangChain LLM class to instantiate (e.g., ChatOpenAI)
- `llm_kwargs`: Arguments for the LLM constructor (model, temperature, etc.)
- `model_kwargs`: Additional API parameters (top_p, frequency_penalty, etc.)
- `structured_output`: Optional Pydantic model for structured responses

### JobConfig
- `category`: Job category (required, default: "batch_processing")
- `subcategory`: Job subcategory (optional, default: None)
- `version`: Version string (optional, default: None)
- `environment`: Environment name (optional, default: None)
- `description`: Job description (optional, default: None)
- `metadata`: Additional metadata dict (optional, default: None)

### StorageConfig
- `output_dir`: Local directory (default: "./batch_logs")
- `path_template`: Path template for organizing files (default: "{job_category}/{job_subcategory}/v{job_version_safe}")
  - Available variables: `job_category`, `job_subcategory`, `job_version` (original), `job_version_safe` (dots replaced with underscores), `environment`, `date`
  - Example paths: `ml_training/image_classification/v2_1_0/` or `research/nlp/vunversioned/` (when no version specified)
- `s3_config`: Optional S3Config for uploads

### S3Config
- `bucket`: S3 bucket name
- `prefix`: S3 prefix/folder (default: "langchain-logs/")
- `on_failure`: "error" or "continue" (default: "error")

## Advanced Usage

### Low-Level Batch Processing

If you need direct control over logging:

```python
from langchain_callback_parquet_logger import batch_run, ParquetLogger

# Setup your own logging
with ParquetLogger('./logs') as logger:
    llm = ChatOpenAI(callbacks=[logger])

    # Use low-level batch_run
    results = await batch_run(df, llm, max_concurrency=100)

Context Manager (Notebooks)

For Jupyter notebooks, use context manager for immediate writes:

with ParquetLogger('./logs', buffer_size=1) as logger:
    llm = ChatOpenAI(callbacks=[logger])
    response = llm.invoke("Hello!")
# Logs are guaranteed to be written

Log Schema

Column Type Description
timestamp timestamp Event time (UTC)
run_id string Unique run ID
parent_run_id string Parent run ID for nested calls
custom_id string Your custom tracking ID
event_type string Event type (llm_start, llm_end, etc.)
logger_metadata string JSON metadata
payload string Full event data as JSON

Payload Structure

All events use a consistent JSON structure in the payload column:

{
    "event_type": "llm_end",
    "timestamp": "2025-09-18T10:30:00Z",
    "execution": {
        "run_id": "uuid-here",
        "parent_run_id": "",
        "custom_id": "user-123"
    },
    "data": {
        "prompts": ["..."],
        "llm_type": "openai-chat",  // LangChain's native LLM type
        "response": {"content": "..."},
        "usage": {"total_tokens": 100}
    },
    "raw": {
        // Complete dump of all callback arguments
        // Includes all kwargs plus positional args (serialized when possible)
        "response": {"generations": [...], "llm_output": {...}},
        "run_id": "uuid-here",
        "parent_run_id": "",
        // ... all other arguments passed to the callback
    }
}

Installation Options

# Basic
pip install langchain-callback-parquet-logger

# With S3 support
pip install "langchain-callback-parquet-logger[s3]"

# With background retrieval support (OpenAI)
pip install "langchain-callback-parquet-logger[background]"

# Everything
pip install "langchain-callback-parquet-logger[s3,background]"

License

MIT

Contributing

Pull requests welcome! Keep it simple.

Support

GitHub Issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

langchain_callback_parquet_logger-3.0.2.tar.gz (40.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file langchain_callback_parquet_logger-3.0.2.tar.gz.

File metadata

File hashes

Hashes for langchain_callback_parquet_logger-3.0.2.tar.gz
Algorithm Hash digest
SHA256 b150db31ebec76780586dd73f5caf60bc2e9f3275e5910ed14a5b979dd9c1eb0
MD5 18d8595233c8618fd5aeb2ed9fea4fc0
BLAKE2b-256 3d1b8d54c2268bf64ea7bc7fea2bd04189913f34492df9cee5b8e5eb9e2b4da1

See more details on using hashes here.

File details

Details for the file langchain_callback_parquet_logger-3.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for langchain_callback_parquet_logger-3.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f3bb537907100b6e12d3a96717855e371af3b43fb5f60918e45c51f767d0745c
MD5 d2aa555b4227fdc2c5fd291f0f4f04c6
BLAKE2b-256 54f48ded6b99bb9e7f67034aef164daa15548b7d882477dba3cab0f8f45f2eb8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page