Skip to main content

A Python library that meters OpenAI usage to Revenium with optional LangChain integration.

Project description

๐Ÿค– Revenium Middleware for OpenAI

PyPI version Python Versions Documentation Status License: MIT

A middleware library for metering and monitoring OpenAI and Azure OpenAI API usage in Python applications. ๐Ÿโœจ

โœจ Features

  • ๐Ÿ“Š Precise Usage Tracking: Monitor tokens, costs, and request counts across all OpenAI and Azure OpenAI endpoints
  • ๐Ÿ”Œ Seamless Integration: Drop-in middleware that works with minimal code changes
  • ๐ŸŒ Multi-Provider Support: Works with both standard OpenAI and Azure OpenAI seamlessly
  • โš™๏ธ Flexible Configuration: Customize metering behavior to suit your application needs
  • ๐ŸŽฏ Accurate Pricing: Automatic model name resolution for precise cost calculation

๐Ÿ“ฅ Installation

pip install revenium-middleware-openai

๐Ÿ“ฅ Updating

pip install --upgrade revenium-middleware-openai

๐Ÿ”ง Usage

โ€ผ๏ธ Setting Environment Variables โ€ผ๏ธ

For Standard OpenAI

export OPENAI_API_KEY=your-openai-key
export REVENIUM_METERING_API_KEY=your-revenium-key

For Azure OpenAI

export AZURE_OPENAI_ENDPOINT="https://your-resource.openai.azure.com/"
export AZURE_OPENAI_API_KEY=your-azure-key
export AZURE_OPENAI_DEPLOYMENT=your-deployment-name
export AZURE_OPENAI_API_VERSION=2024-12-01-preview
export REVENIUM_METERING_API_KEY=your-revenium-key

๐Ÿค– Standard OpenAI Usage

That's it, now your OpenAI calls will be metered automatically:

import openai
import revenium_middleware_openai

response = openai.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {
            "role": "user",
            "content": "What is the answer to life, the universe and everything?",
        },
    ],
    max_tokens=500,
)

print(response.choices[0].message.content)

๐Ÿ”ท Azure OpenAI Usage

The middleware automatically detects Azure OpenAI and works seamlessly:

import revenium_middleware_openai
from openai import AzureOpenAI
import os

client = AzureOpenAI(
    api_version=os.getenv('AZURE_OPENAI_API_VERSION'),
    azure_endpoint=os.getenv('AZURE_OPENAI_ENDPOINT'),
    api_key=os.getenv('AZURE_OPENAI_API_KEY'),
)

response = client.chat.completions.create(
    model="gpt-4o",  # Your Azure deployment name
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {
            "role": "user",
            "content": "What is the answer to life, the universe and everything?"
        },
    ],
    max_tokens=500,
)

print(response.choices[0].message.content)

The middleware automatically intercepts both OpenAI and Azure OpenAI API calls and sends metering data to Revenium without requiring any changes to your existing code. Make sure to set the REVENIUM_METERING_API_KEY environment variable for authentication with the Revenium service.

๐Ÿ”— Embeddings Support

The middleware automatically meters embeddings for both OpenAI and Azure OpenAI:

Standard OpenAI Embeddings

import openai
import revenium_middleware_openai

response = openai.embeddings.create(
    model="text-embedding-3-small",
    input="The quick brown fox jumps over the lazy dog"
)

print(f"Generated embedding with {len(response.data[0].embedding)} dimensions")

Azure OpenAI Embeddings

import revenium_middleware_openai
from openai import AzureOpenAI
import os

client = AzureOpenAI(
    api_version=os.getenv('AZURE_OPENAI_API_VERSION'),
    azure_endpoint=os.getenv('AZURE_OPENAI_ENDPOINT'),
    api_key=os.getenv('AZURE_OPENAI_API_KEY'),
)

response = client.embeddings.create(
    model="text-embedding-3-large",  # Your Azure deployment name
    input="The quick brown fox jumps over the lazy dog"
)

print(f"Generated embedding with {len(response.data[0].embedding)} dimensions")

๐Ÿ“ˆ Enhanced Tracking with Metadata

For more granular usage tracking and detailed reporting, add the usage_metadata parameter to both embeddings and chat completions:

import openai
import revenium_middleware_openai

response = openai.chat.completions.create(
    model="gpt-4o",  # You can change this to other models like "gpt-3.5-turbo"
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {
            "role": "user",
            "content": "What is the meaning of life, the universe and everything?",
        },
    ],
    max_tokens=500,
    usage_metadata={
         "trace_id": "conv-28a7e9d4",
         "task_type": "summarize-customer-issue",
         "subscriber": {
             "id": "subscriberid-1234567890",
             "email": "user@example.com",
             "credential": {
                 "name": "engineering-api-key",
                 "value": "actual-api-key-value"
             }
         },
         "organization_id": "acme-corp",
         "subscription_id": "startup-plan-Q1",
         "product_id": "saas-app-gold-tier",
         "agent": "support-agent",
    },
)
print(response.choices[0].message.content)

๐Ÿท๏ธ Metadata Fields

The usage_metadata parameter supports the following fields:

Field Description Use Case
trace_id Unique identifier for a conversation or session Group multi-turn conversations into single event for performance & cost tracking
task_type Classification of the AI operation by type of work Track cost & performance by purpose (e.g., classification, summarization)
subscriber Nested object containing subscriber information Track cost & performance by individual users (recommended structure)
organization_id Customer or department ID from non-Revenium systems Track cost & performance by customers or business units
subscription_id Reference to a billing plan in non-Revenium systems Track cost & performance by a specific subscription
product_id Your product or feature making the AI call Track cost & performance across different products
agent Identifier for the specific AI agent Track cost & performance performance by AI agent
response_quality_score The quality of the AI response (0..1) Track AI response quality
๐Ÿ‘ค Subscriber Object Structure

The subscriber field supports a nested structure for better organization:

usage_metadata = {
    "subscriber": {
        "id": "user-12345",
        "email": "user@example.com", 
        "credential": {
            "name": "api-key-alias",
            "value": "actual-api-key-value"
        }
    },
    # ... other metadata fields
}

Subscriber fields:

  • id: Unique identifier for the subscriber
  • email: Email address of the subscriber
  • credential: Nested object with API key information
    • name: Alias or name for the credential
    • value: The actual credential value

All metadata fields are optional. Adding them enables more detailed reporting and analytics in Revenium.

๐Ÿ”— LangChain Integration

The middleware provides seamless integration with LangChain, supporting both synchronous and asynchronous operations with automatic usage tracking.

๐Ÿ“ฆ Installation with LangChain Support

pip install revenium-middleware-openai[langchain]

๐Ÿ”„ Basic LangChain Usage

from langchain_openai import ChatOpenAI
from revenium_middleware_openai.langchain import wrap

# Wrap your LangChain LLM with Revenium tracking
llm = wrap(ChatOpenAI(model="gpt-4o-mini"))

# Use normally - usage is automatically tracked
response = llm.invoke("What is the meaning of life?")
print(response.content)

โšก Async LangChain Support

The middleware automatically detects async contexts and uses appropriate handlers:

import asyncio
from langchain_openai import ChatOpenAI
from revenium_middleware_openai.langchain import wrap

async def async_example():
    # Automatic async detection
    llm = wrap(ChatOpenAI(model="gpt-4o-mini"))

    # Use async methods - usage is automatically tracked
    response = await llm.ainvoke("What is the meaning of life?")
    print(response.content)

# Run the async example
asyncio.run(async_example())

๐Ÿ”„ Streaming Support

Both sync and async streaming are fully supported:

from langchain_openai import ChatOpenAI
from revenium_middleware_openai.langchain import wrap_with_streaming

# Sync streaming
llm = wrap_with_streaming(
    ChatOpenAI(model="gpt-4o-mini", streaming=True),
    enable_streaming_debug=True
)

for chunk in llm.stream("Tell me a story"):
    print(chunk.content, end="")

# Async streaming
async def async_streaming():
    llm = wrap(ChatOpenAI(model="gpt-4o-mini", streaming=True))

    async for chunk in llm.astream("Tell me a story"):
        print(chunk.content, end="")

asyncio.run(async_streaming())

๐Ÿ”ค Embeddings with LangChain

from langchain_openai import OpenAIEmbeddings
from revenium_middleware_openai.langchain import wrap

# Wrap embeddings model
embeddings = wrap(OpenAIEmbeddings(model="text-embedding-3-small"))

# Generate embeddings - usage is automatically tracked
vectors = embeddings.embed_documents([
    "The quick brown fox",
    "jumps over the lazy dog"
])

print(f"Generated {len(vectors)} embeddings")

๐Ÿ”ง Advanced LangChain Configuration

from langchain_openai import ChatOpenAI
from revenium_middleware_openai.langchain import wrap, attach_to

# Method 1: wrap() - Returns a new instance (recommended)
llm = wrap(
    ChatOpenAI(model="gpt-4o-mini"),
    usage_metadata={
        "trace_id": "langchain-session-123",
        "task_type": "question-answering",
        "agent": "langchain-assistant"
    },
    enable_debug_logging=True
)

# Method 2: attach_to() - Modifies existing instance in-place
llm = ChatOpenAI(model="gpt-4o-mini")
attach_to(llm, usage_metadata={"session_id": "abc123"})

# Both methods support async auto-detection
response = llm.invoke("Hello LangChain!")

๐Ÿ“Š LangChain Monitoring & Statistics

from revenium_middleware_openai.langchain import (
    wrap, get_streaming_stats, cleanup_streaming_sessions
)

# Create wrapped LLM
llm = wrap(ChatOpenAI(model="gpt-4o-mini", streaming=True))

# Get real-time statistics
stats = get_streaming_stats(llm)
print(f"Active streaming sessions: {stats['streaming_sessions']}")
print(f"Memory usage: {stats['memory_usage_kb']} KB")

# Manual cleanup if needed (automatic cleanup is built-in)
cleanup_results = cleanup_streaming_sessions(llm)
print(f"Cleaned up {cleanup_results['sessions_cleaned']} sessions")

๐ŸŽฏ LangChain Features

  • โœ… Zero-Touch Integration: Works with existing LangChain code
  • โœ… Automatic Async Detection: Seamlessly handles sync and async operations
  • โœ… Streaming Support: Full support for streaming responses with single usage events
  • โœ… Memory Efficient: Automatic cleanup and configurable limits
  • โœ… Thread Safe: Concurrent operation support with proper resource management
  • โœ… Error Resilient: Graceful degradation without breaking LangChain execution

๐Ÿ” Provider Detection & Features

Automatic Provider Detection

The middleware automatically detects whether you're using standard OpenAI or Azure OpenAI:

  • OpenAI: Detected via OpenAI() client or openai.api_base containing standard OpenAI endpoints
  • Azure OpenAI: Detected via AzureOpenAI() client or URLs containing "azure"

Model Name Resolution (Azure)

For Azure OpenAI, the middleware automatically resolves deployment names to standard model names for accurate pricing:

Azure Deployment โ†’ Standard Model Name
"gpt-4o-2024-11-20" โ†’ "gpt-4o"
"gpt-35-turbo-dev" โ†’ "gpt-3.5-turbo"
"text-embedding-3-large" โ†’ "text-embedding-3-large"

Provider-Specific Analytics

Revenium dashboard shows provider-specific data:

  • Standard OpenAI: provider: "OPENAI"
  • Azure OpenAI: provider: "Azure" with model_source: "OPENAI"

Supported Operations

Both providers support:

  • โœ… Chat completions (streaming and non-streaming)
  • โœ… Embeddings
  • โœ… All metadata fields
  • โœ… Token counting and cost calculation
  • โœ… Error handling and logging

๐Ÿ”„ Compatibility

  • ๐Ÿ Python 3.8+
  • ๐Ÿค– OpenAI Python SDK 1.0.0+ (includes AzureOpenAI client)
  • ๐ŸŒ Works with all OpenAI models and endpoints
  • ๐Ÿ”ท Works with all Azure OpenAI deployments and endpoints
  • โšก Automatic provider detection (no code changes required)

๐Ÿ” Logging

This module uses Python's standard logging system. You can control the log level by setting the REVENIUM_LOG_LEVEL environment variable:

# Enable debug logging
export REVENIUM_LOG_LEVEL=DEBUG

# Or when running your script
REVENIUM_LOG_LEVEL=DEBUG python your_script.py

Available log levels:

  • DEBUG: Detailed debugging information
  • INFO: General information (default)
  • WARNING: Warning messages only
  • ERROR: Error messages only
  • CRITICAL: Critical error messages only

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ™ Acknowledgments

  • ๐Ÿ’– Built with โค๏ธ by the Revenium team

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

revenium_middleware_openai-0.4.5.tar.gz (36.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

revenium_middleware_openai-0.4.5-py3-none-any.whl (32.5 kB view details)

Uploaded Python 3

File details

Details for the file revenium_middleware_openai-0.4.5.tar.gz.

File metadata

File hashes

Hashes for revenium_middleware_openai-0.4.5.tar.gz
Algorithm Hash digest
SHA256 64dc1803bcad2efe2e88300cd4257ef0a478afce41f3026d907cd5fa6d6e9eb8
MD5 27442eb4f336de72787d301ac8641363
BLAKE2b-256 fde9eefa97c372381dc271aa1a33210b546d68b576448dfc8d123d77a5081f68

See more details on using hashes here.

File details

Details for the file revenium_middleware_openai-0.4.5-py3-none-any.whl.

File metadata

File hashes

Hashes for revenium_middleware_openai-0.4.5-py3-none-any.whl
Algorithm Hash digest
SHA256 ddaf7725f4876b988f74d04db2142f93445f2721be99a05de226d4b2efd0ae91
MD5 4f702dd37825195a48cca94365734b1d
BLAKE2b-256 a192961f09eaac2e6b56a64f0028f632dddd340e7a522043cc74e19ac6496750

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page