Skip to main content

A Python library that meters OpenAI usage to Revenium with optional LangChain integration.

Project description

๐Ÿค– Revenium Middleware for OpenAI

PyPI version Python Versions Documentation Status License: MIT

A middleware library for metering and monitoring OpenAI and Azure OpenAI API usage in Python applications. ๐Ÿโœจ

โœจ Features

  • ๐Ÿ“Š Precise Usage Tracking: Monitor tokens, costs, and request counts across all OpenAI and Azure OpenAI endpoints
  • ๐Ÿ”Œ Seamless Integration: Drop-in middleware that works with minimal code changes
  • ๐ŸŒ Multi-Provider Support: Works with both standard OpenAI and Azure OpenAI seamlessly
  • โš™๏ธ Flexible Configuration: Customize metering behavior to suit your application needs
  • ๐ŸŽฏ Accurate Pricing: Automatic model name resolution for precise cost calculation

๐Ÿ“ฅ Installation

pip install revenium-middleware-openai

๐Ÿ“ฅ Updating

pip install --upgrade revenium-middleware-openai

๐Ÿ”ง Usage

โ€ผ๏ธ Setting Environment Variables โ€ผ๏ธ

For Standard OpenAI

export OPENAI_API_KEY=your-openai-key
export REVENIUM_METERING_API_KEY=your-revenium-key

For Azure OpenAI

export AZURE_OPENAI_ENDPOINT="https://your-resource.openai.azure.com/"
export AZURE_OPENAI_API_KEY=your-azure-key
export AZURE_OPENAI_DEPLOYMENT=your-deployment-name
export AZURE_OPENAI_API_VERSION=2024-12-01-preview
export REVENIUM_METERING_API_KEY=your-revenium-key

๐Ÿค– Standard OpenAI Usage

That's it, now your OpenAI calls will be metered automatically:

import openai
import revenium_middleware_openai

response = openai.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {
            "role": "user",
            "content": "What is the answer to life, the universe and everything?",
        },
    ],
    max_tokens=500,
)

print(response.choices[0].message.content)

๐Ÿ”ท Azure OpenAI Usage

The middleware automatically detects Azure OpenAI and works seamlessly:

import revenium_middleware_openai
from openai import AzureOpenAI
import os

client = AzureOpenAI(
    api_version=os.getenv('AZURE_OPENAI_API_VERSION'),
    azure_endpoint=os.getenv('AZURE_OPENAI_ENDPOINT'),
    api_key=os.getenv('AZURE_OPENAI_API_KEY'),
)

response = client.chat.completions.create(
    model="gpt-4o",  # Your Azure deployment name
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {
            "role": "user",
            "content": "What is the answer to life, the universe and everything?"
        },
    ],
    max_tokens=500,
)

print(response.choices[0].message.content)

The middleware automatically intercepts both OpenAI and Azure OpenAI API calls and sends metering data to Revenium without requiring any changes to your existing code. Make sure to set the REVENIUM_METERING_API_KEY environment variable for authentication with the Revenium service.

๐Ÿ”— Embeddings Support

The middleware automatically meters embeddings for both OpenAI and Azure OpenAI:

Standard OpenAI Embeddings

import openai
import revenium_middleware_openai

response = openai.embeddings.create(
    model="text-embedding-3-small",
    input="The quick brown fox jumps over the lazy dog"
)

print(f"Generated embedding with {len(response.data[0].embedding)} dimensions")

Azure OpenAI Embeddings

import revenium_middleware_openai
from openai import AzureOpenAI
import os

client = AzureOpenAI(
    api_version=os.getenv('AZURE_OPENAI_API_VERSION'),
    azure_endpoint=os.getenv('AZURE_OPENAI_ENDPOINT'),
    api_key=os.getenv('AZURE_OPENAI_API_KEY'),
)

response = client.embeddings.create(
    model="text-embedding-3-large",  # Your Azure deployment name
    input="The quick brown fox jumps over the lazy dog"
)

print(f"Generated embedding with {len(response.data[0].embedding)} dimensions")

๐Ÿ“ˆ Enhanced Tracking with Metadata

For more granular usage tracking and detailed reporting, add the usage_metadata parameter to both embeddings and chat completions:

import openai
import revenium_middleware_openai

response = openai.chat.completions.create(
    model="gpt-4o",  # You can change this to other models like "gpt-3.5-turbo"
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {
            "role": "user",
            "content": "What is the meaning of life, the universe and everything?",
        },
    ],
    max_tokens=500,
    usage_metadata={
         "trace_id": "conv-28a7e9d4",
         "task_type": "summarize-customer-issue",
         "subscriber": {
             "id": "subscriberid-1234567890",
             "email": "user@example.com",
             "credential": {
                 "name": "engineering-api-key",
                 "value": "actual-api-key-value"
             }
         },
         "organization_id": "acme-corp",
         "subscription_id": "startup-plan-Q1",
         "product_id": "saas-app-gold-tier",
         "agent": "support-agent",
    },
)
print(response.choices[0].message.content)

๐Ÿท๏ธ Metadata Fields

The usage_metadata parameter supports the following fields:

Field Description Use Case
trace_id Unique identifier for a conversation or session Group multi-turn conversations into single event for performance & cost tracking
task_type Classification of the AI operation by type of work Track cost & performance by purpose (e.g., classification, summarization)
subscriber Nested object containing subscriber information Track cost & performance by individual users (recommended structure)
organization_id Customer or department ID from non-Revenium systems Track cost & performance by customers or business units
subscription_id Reference to a billing plan in non-Revenium systems Track cost & performance by a specific subscription
product_id Your product or feature making the AI call Track cost & performance across different products
agent Identifier for the specific AI agent Track cost & performance performance by AI agent
response_quality_score The quality of the AI response (0..1) Track AI response quality
๐Ÿ‘ค Subscriber Object Structure

The subscriber field supports a nested structure for better organization:

usage_metadata = {
    "subscriber": {
        "id": "user-12345",
        "email": "user@example.com", 
        "credential": {
            "name": "api-key-alias",
            "value": "actual-api-key-value"
        }
    },
    # ... other metadata fields
}

Subscriber fields:

  • id: Unique identifier for the subscriber
  • email: Email address of the subscriber
  • credential: Nested object with API key information
    • name: Alias or name for the credential
    • value: The actual credential value

All metadata fields are optional. Adding them enables more detailed reporting and analytics in Revenium.

๐Ÿ”— LangChain Integration

The middleware provides seamless integration with LangChain, supporting both synchronous and asynchronous operations with automatic usage tracking.

๐Ÿ“ฆ Installation with LangChain Support

pip install revenium-middleware-openai[langchain]

๐Ÿ”„ Basic LangChain Usage

from langchain_openai import ChatOpenAI
from revenium_middleware_openai.langchain import wrap

# Wrap your LangChain LLM with Revenium tracking
llm = wrap(ChatOpenAI(model="gpt-4o-mini"))

# Use normally - usage is automatically tracked
response = llm.invoke("What is the meaning of life?")
print(response.content)

โšก Async LangChain Support

The middleware automatically detects async contexts and uses appropriate handlers:

import asyncio
from langchain_openai import ChatOpenAI
from revenium_middleware_openai.langchain import wrap

async def async_example():
    # Automatic async detection
    llm = wrap(ChatOpenAI(model="gpt-4o-mini"))

    # Use async methods - usage is automatically tracked
    response = await llm.ainvoke("What is the meaning of life?")
    print(response.content)

# Run the async example
asyncio.run(async_example())

๐Ÿ”„ Streaming Support

Both sync and async streaming are fully supported:

from langchain_openai import ChatOpenAI
from revenium_middleware_openai.langchain import wrap_with_streaming

# Sync streaming
llm = wrap_with_streaming(
    ChatOpenAI(model="gpt-4o-mini", streaming=True),
    enable_streaming_debug=True
)

for chunk in llm.stream("Tell me a story"):
    print(chunk.content, end="")

# Async streaming
async def async_streaming():
    llm = wrap(ChatOpenAI(model="gpt-4o-mini", streaming=True))

    async for chunk in llm.astream("Tell me a story"):
        print(chunk.content, end="")

asyncio.run(async_streaming())

๐Ÿ”ค Embeddings with LangChain

from langchain_openai import OpenAIEmbeddings
from revenium_middleware_openai.langchain import wrap

# Wrap embeddings model
embeddings = wrap(OpenAIEmbeddings(model="text-embedding-3-small"))

# Generate embeddings - usage is automatically tracked
vectors = embeddings.embed_documents([
    "The quick brown fox",
    "jumps over the lazy dog"
])

print(f"Generated {len(vectors)} embeddings")

๐Ÿ”ง Advanced LangChain Configuration

from langchain_openai import ChatOpenAI
from revenium_middleware_openai.langchain import wrap, attach_to

# Method 1: wrap() - Returns a new instance (recommended)
llm = wrap(
    ChatOpenAI(model="gpt-4o-mini"),
    usage_metadata={
        "trace_id": "langchain-session-123",
        "task_type": "question-answering",
        "agent": "langchain-assistant"
    },
    enable_debug_logging=True
)

# Method 2: attach_to() - Modifies existing instance in-place
llm = ChatOpenAI(model="gpt-4o-mini")
attach_to(llm, usage_metadata={"session_id": "abc123"})

# Both methods support async auto-detection
response = llm.invoke("Hello LangChain!")

๐Ÿ“Š LangChain Monitoring & Statistics

from revenium_middleware_openai.langchain import (
    wrap, get_streaming_stats, cleanup_streaming_sessions
)

# Create wrapped LLM
llm = wrap(ChatOpenAI(model="gpt-4o-mini", streaming=True))

# Get real-time statistics
stats = get_streaming_stats(llm)
print(f"Active streaming sessions: {stats['streaming_sessions']}")
print(f"Memory usage: {stats['memory_usage_kb']} KB")

# Manual cleanup if needed (automatic cleanup is built-in)
cleanup_results = cleanup_streaming_sessions(llm)
print(f"Cleaned up {cleanup_results['sessions_cleaned']} sessions")

๐ŸŽฏ LangChain Features

  • โœ… Zero-Touch Integration: Works with existing LangChain code
  • โœ… Automatic Async Detection: Seamlessly handles sync and async operations
  • โœ… Streaming Support: Full support for streaming responses with single usage events
  • โœ… Memory Efficient: Automatic cleanup and configurable limits
  • โœ… Thread Safe: Concurrent operation support with proper resource management
  • โœ… Error Resilient: Graceful degradation without breaking LangChain execution

๐Ÿ” Provider Detection & Features

Automatic Provider Detection

The middleware automatically detects whether you're using standard OpenAI or Azure OpenAI:

  • OpenAI: Detected via OpenAI() client or openai.api_base containing standard OpenAI endpoints
  • Azure OpenAI: Detected via AzureOpenAI() client or URLs containing "azure"

Model Name Resolution (Azure)

For Azure OpenAI, the middleware automatically resolves deployment names to standard model names for accurate pricing:

Azure Deployment โ†’ Standard Model Name
"gpt-4o-2024-11-20" โ†’ "gpt-4o"
"gpt-35-turbo-dev" โ†’ "gpt-3.5-turbo"
"text-embedding-3-large" โ†’ "text-embedding-3-large"

Provider-Specific Analytics

Revenium dashboard shows provider-specific data:

  • Standard OpenAI: provider: "OPENAI"
  • Azure OpenAI: provider: "Azure" with model_source: "OPENAI"

Supported Operations

Both providers support:

  • โœ… Chat completions (streaming and non-streaming)
  • โœ… Embeddings
  • โœ… All metadata fields
  • โœ… Token counting and cost calculation
  • โœ… Error handling and logging

๐Ÿ”„ Compatibility

  • ๐Ÿ Python 3.8+
  • ๐Ÿค– OpenAI Python SDK 1.0.0+ (includes AzureOpenAI client)
  • ๐ŸŒ Works with all OpenAI models and endpoints
  • ๐Ÿ”ท Works with all Azure OpenAI deployments and endpoints
  • โšก Automatic provider detection (no code changes required)

๐Ÿ” Logging

This module uses Python's standard logging system. You can control the log level by setting the REVENIUM_LOG_LEVEL environment variable:

# Enable debug logging
export REVENIUM_LOG_LEVEL=DEBUG

# Or when running your script
REVENIUM_LOG_LEVEL=DEBUG python your_script.py

Available log levels:

  • DEBUG: Detailed debugging information
  • INFO: General information (default)
  • WARNING: Warning messages only
  • ERROR: Error messages only
  • CRITICAL: Critical error messages only

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

  • ๐Ÿ’– Built with โค๏ธ by the Revenium team

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

revenium_middleware_openai-0.4.6.tar.gz (36.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

revenium_middleware_openai-0.4.6-py3-none-any.whl (32.5 kB view details)

Uploaded Python 3

File details

Details for the file revenium_middleware_openai-0.4.6.tar.gz.

File metadata

File hashes

Hashes for revenium_middleware_openai-0.4.6.tar.gz
Algorithm Hash digest
SHA256 7b70007fa74bd54be34203b288b7f9e3ee468d412b8ed75c55c88c2f32978df5
MD5 eb83cf48e3c8b4022bbfe90c1eaf5414
BLAKE2b-256 4dcc5ef6b50bd711e7e3f6a88e464c20550e7c4123085311313eed73864ed73f

See more details on using hashes here.

File details

Details for the file revenium_middleware_openai-0.4.6-py3-none-any.whl.

File metadata

File hashes

Hashes for revenium_middleware_openai-0.4.6-py3-none-any.whl
Algorithm Hash digest
SHA256 13b3b34fbf30fb9585102193ad58ecfd22660abef20483f5bc9c26c104fbd97a
MD5 87054df5f04c7617b73a271af6e80821
BLAKE2b-256 f946df811343097ef63d14416e8e673dcbf701398ab9fdb580efd0746b5b8461

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page