Skip to main content

Python OTEL wrapper by Rebrandly

Project description

Rebrandly OpenTelemetry SDK for Python

A comprehensive OpenTelemetry instrumentation SDK designed specifically for Rebrandly services, with built-in support for AWS Lambda functions and message processing.

Overview

The Rebrandly OpenTelemetry SDK provides a unified interface for distributed tracing, metrics collection, and structured logging across Python applications. It offers automatic instrumentation for AWS Lambda functions, simplified span management, and seamless integration with OTLP-compatible backends.

Installation

pip install rebrandly-otel

Dependencies

  • opentelemetry-api
  • opentelemetry-sdk
  • opentelemetry-exporter-otlp-proto-grpc
  • opentelemetry-semantic-conventions
  • psutil (for system metrics)

Configuration

The SDK is configured through environment variables:

Variable Description Default
OTEL_SERVICE_NAME Service identifier default-service-python
OTEL_SERVICE_VERSION Service version 1.0.0
OTEL_EXPORTER_OTLP_ENDPOINT OTLP collector endpoint None
OTEL_DEBUG Enable console debugging false
BATCH_EXPORT_TIME_MILLIS Batch export interval 100
ENV or ENVIRONMENT or NODE_ENV Deployment environment local

Core Components

RebrandlyOTEL Class

The main entry point for all telemetry operations. Implements a singleton pattern to ensure consistent instrumentation across your application.

Properties

  • tracer: Returns the RebrandlyTracer instance for distributed tracing
  • meter: Returns the RebrandlyMeter instance for metrics collection
  • logger: Returns the configured Python logger with OpenTelemetry integration

Initialization

The SDK auto-initializes as soon as you embed it.

Key Methods

span(name, attributes=None, kind=SpanKind.INTERNAL, message=None)

Context manager for creating traced spans with automatic error handling and status management.

lambda_handler(name=None, attributes=None, kind=SpanKind.CONSUMER, auto_flush=True, skip_aws_link=True)

Decorator for AWS Lambda functions with automatic instrumentation, metrics collection, and telemetry flushing.

aws_message_handler(name=None, attributes=None, kind=SpanKind.CONSUMER, auto_flush=True)

Decorator for processing individual AWS messages (SQS/SNS) with context propagation.

aws_message_span(name, message=None, attributes=None, kind=SpanKind.CONSUMER)

Context manager for creating spans from AWS messages with automatic context extraction.

force_flush(start_datetime=None, timeout_millis=1000)

Forces all pending telemetry data to be exported. Critical for serverless environments.

shutdown()

Gracefully shuts down all OpenTelemetry components.

Built-in Metrics

The SDK automatically registers and tracks the following metrics:

Standard Metrics

  • cpu_usage_percentage (Gauge): CPU utilization percentage
  • memory_usage_bytes (Gauge): Memory usage in bytes

Custom Metrics

You can create the custom metrics you need using the default open telemetry metrics

from src.rebrandly_otel import meter

sqs_counter = meter.meter.create_counter(
    name="sqs_sender_counter",
    description="Number of messages sent",
    unit="1"
)
sqs_counter.add(1)

Tracing Features

Automatic Context Propagation

The SDK automatically extracts and propagates trace context from:

  • AWS SQS message attributes
  • AWS SNS message attributes
  • HTTP headers
  • Custom carriers

Span Attributes

Lambda spans automatically include:

  • faas.trigger: Detected trigger type (sqs, sns, api_gateway, etc.)
  • faas.execution: AWS request ID
  • faas.id: Function ARN
  • cloud.provider: Always "aws" for Lambda
  • cloud.platform: Always "aws_lambda" for Lambda

Exception Handling

Spans automatically capture exceptions with:

  • Full exception details and stack traces
  • Automatic status code setting
  • Exception events in the span timeline

Logging Integration

The SDK integrates with Python's standard logging module:

from rebrandly_otel import logger

# Use as a standard Python logger
logger.info("Processing started", extra={"request_id": "123"})
logger.error("Processing failed", exc_info=True)

Features:

  • Automatic trace context injection
  • Structured logging support
  • Console and OTLP export
  • Log level configuration via environment

AWS Lambda Support

Trigger Detection

Automatically detects and labels Lambda triggers:

  • API Gateway (v1 and v2)
  • SQS
  • SNS
  • S3
  • Kinesis
  • DynamoDB
  • EventBridge
  • Batch

Automatic Metrics

For Lambda functions, the SDK automatically captures:

  • Memory usage
  • CPU utilization

Context Extraction

Automatically extracts trace context from:

  • SQS MessageAttributes
  • SNS MessageAttributes (including nested format)
  • Custom message attributes

Performance Considerations

Batch Processing

  • Configurable batch sizes and intervals
  • Automatic batching for traces, metrics, and logs
  • Optimized for high-throughput scenarios

Lambda Optimization

  • Automatic flushing before function freeze
  • Minimal cold start impact
  • Efficient memory usage
  • Configurable timeout handling

Export Formats

Supported Exporters

  • OTLP/gRPC: Primary export format for production
  • Console: Available for local development and debugging

Thread Safety

All components are thread-safe and can be used in multi-threaded applications:

  • Singleton pattern ensures single initialization
  • Thread-safe metric recording
  • Concurrent span creation support

Resource Attributes

Automatically includes:

  • Service name and version
  • Python runtime version
  • Deployment environment
  • Custom resource attributes via environment

Error Handling

  • Graceful degradation when OTLP endpoint unavailable
  • Non-blocking telemetry operations
  • Automatic retry with exponential backoff
  • Comprehensive error logging

Compatibility

  • Python 3.7+
  • AWS Lambda runtime support
  • Compatible with OpenTelemetry Collector
  • Works with any OTLP-compatible backend

Examples

Lambda - Send SNS / SQS message

import os
import json
import boto3
from rebrandly_otel import otel, lambda_handler, logger

sqs = boto3.client('sqs')
QUEUE_URL = os.environ.get('SQS_URL')

@lambda_handler("sqs_sender")
def handler(event, context):
    logger.info("Starting SQS message send")

    # Get trace context for propagation
    trace_attrs = otel.tracer.get_attributes_for_aws_from_context()

    # Send message with trace context
    response = sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps({"data": "test message"}),
        MessageAttributes=trace_attrs
    )

    logger.info(f"Sent SQS message: {response['MessageId']}")

    return {
        'statusCode': 200,
        'body': json.dumps({'messageId': response['MessageId']})
    }

Lambda Receive SQS message

import json
from rebrandly_otel import lambda_handler, logger, aws_message_span

@lambda_handler(name="sqs_receiver")
def handler(event, context):
    for record in event['Records']:
        # Process each message with trace context
        process_message(record)

def process_message(record):
    with aws_message_span("process_message_sqs_receiver", message=record) as s:
        logger.info(f"Processing message: {record['messageId']}")

        # Parse message body
        body = json.loads(record['body'])
        logger.info(f"Message data: {body}")

Lambda Receive SNS message (record specific event)

import json
from rebrandly_otel import lambda_handler, logger, aws_message_span

@lambda_handler(name="sns_receiver")
def handler(event, context):
    for record in event['Records']:
        # Process each message with trace context
        process_message(record)

def process_message(record):
    message = json.loads(record['Sns']['Message'])
    if message['event'] == 'whitelisted-event':
        with aws_message_span("process_message_sns_receiver", message=record) as s:
            logger.info(f"Processing message: {record['messageId']}")
    
            # Parse message body
            body = json.loads(record['body'])
            logger.info(f"Message data: {body}")

Flask

from flask import Flask, jsonify
from src.rebrandly_otel import otel, logger, app_before_request, app_after_request, flask_error_handler
from datetime import datetime

app = Flask(__name__)

# Register the centralized OTEL handlers
app.before_request(app_before_request)
app.after_request(app_after_request)
app.register_error_handler(Exception, flask_error_handler)

@app.route('/health')
def health():
    logger.info("Health check requested")
    return jsonify({"status": "healthy"}), 200

@app.route('/process', methods=['POST', 'GET'])
def process():
    with otel.span("process_request"):
        logger.info("Processing POST request")

        # Simulate processing
        result = {"processed": True, "timestamp": datetime.now().isoformat()}

        logger.info(f"Returning result: {result}")
        return jsonify(result), 200

@app.route('/error')
def error():
    logger.error("Error endpoint called")
    raise Exception("Simulated error")

if __name__ == '__main__':
    app.run(debug=True)

FastAPI

# main_fastapi.py
from fastapi import FastAPI, HTTPException, Depends
from contextlib import asynccontextmanager
from src.rebrandly_otel import otel, logger, force_flush
from src.fastapi_support import setup_fastapi, get_current_span
from datetime import datetime
from typing import Optional
import uvicorn

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    logger.info("FastAPI application starting up")
    yield
    # Shutdown
    logger.info("FastAPI application shutting down")
    force_flush()

app = FastAPI(title="FastAPI OTEL Example", lifespan=lifespan)

# Setup FastAPI with OTEL
setup_fastapi(otel, app)

@app.get("/health")
async def health():
    """Health check endpoint."""
    logger.info("Health check requested")
    return {"status": "healthy"}

@app.post("/process")
@app.get("/process")
async def process(span = Depends(get_current_span)):
    """Process endpoint with custom span."""
    with otel.span("process_request"):
        logger.info("Processing request")

        # You can also use the injected span directly
        if span:
            span.add_event("custom_processing_event", {
                "timestamp": datetime.now().isoformat()
            })

        # Simulate some processing
        result = {
            "processed": True,
            "timestamp": datetime.now().isoformat()
        }

        logger.info(f"Returning result: {result}")
        return result

@app.get("/error")
async def error():
    """Endpoint that raises an error."""
    logger.error("Error endpoint called")
    raise HTTPException(status_code=400, detail="Simulated error")

@app.get("/exception")
async def exception():
    """Endpoint that raises an unhandled exception."""
    logger.error("Exception endpoint called")
    raise ValueError("Simulated unhandled exception")

@app.get("/items/{item_id}")
async def get_item(item_id: int, q: Optional[str] = None):
    """Example endpoint with path and query parameters."""
    with otel.span("fetch_item", attributes={"item_id": item_id, "query": q}):
        logger.info(f"Fetching item {item_id} with query: {q}")

        if item_id == 999:
            raise HTTPException(status_code=404, detail="Item not found")

        return {
            "item_id": item_id,
            "name": f"Item {item_id}",
            "query": q
        }

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

PyMySQL Database Instrumentation

The SDK provides connection-level instrumentation for PyMySQL that automatically traces all queries without requiring you to instrument each query individually.

import pymysql
from rebrandly_otel import otel, logger, instrument_pymysql

# Initialize OTEL
otel.initialize()

# Create and instrument your connection
connection = pymysql.connect(
    host='localhost',
    user='your_user',
    password='your_password',
    database='your_database'
)

# Instrument the connection - all queries are now automatically traced
connection = instrument_pymysql(otel, connection, options={
    'slow_query_threshold_ms': 1000,  # Queries over 1s flagged as slow
    'capture_bindings': False  # Set True to capture query parameters
})

# Use normally - all queries automatically traced
with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM users WHERE id = %s", (123,))
    result = cursor.fetchone()
    logger.info(f"Found user: {result}")

connection.close()
otel.force_flush()

Features:

  • Automatic span creation for all queries
  • Query operation detection (SELECT, INSERT, UPDATE, etc.)
  • Slow query detection and flagging
  • Duration tracking
  • Error recording with exception details
  • Optional query parameter capture (disabled by default for security)

Environment configuration:

  • PYMYSQL_SLOW_QUERY_THRESHOLD_MS: Threshold for slow query detection (default: 1500ms)

More examples

You can find More examples here

License

Rebrandly Python SDK is released under the MIT License.

Build and Deploy

brew install pipx
pipx ensurepath
pipx install build
pipx install twine

build

twine upload dist/*

If build gives you an error, try:

pyproject-build

twine upload dist/*

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rebrandly_otel-0.1.25.tar.gz (23.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rebrandly_otel-0.1.25-py3-none-any.whl (25.7 kB view details)

Uploaded Python 3

File details

Details for the file rebrandly_otel-0.1.25.tar.gz.

File metadata

  • Download URL: rebrandly_otel-0.1.25.tar.gz
  • Upload date:
  • Size: 23.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rebrandly_otel-0.1.25.tar.gz
Algorithm Hash digest
SHA256 3d9f691a92481321845683e80dfd0b99fa961fc17439222b8806a0f5c58fde04
MD5 5dc17971e818582201ca935818b778eb
BLAKE2b-256 be22c64a54670a946119b31471192f091c9fb45bc09d95bdf03cbe948c4e56c4

See more details on using hashes here.

File details

Details for the file rebrandly_otel-0.1.25-py3-none-any.whl.

File metadata

File hashes

Hashes for rebrandly_otel-0.1.25-py3-none-any.whl
Algorithm Hash digest
SHA256 68ce3da5357eed6802e82cc9e35415adf0ea9d5777647310947ee27aa205fb4a
MD5 2a894921b025774dedaa69c1abbb030e
BLAKE2b-256 67ebae6058675ac689b49bdade67620a69aa413293413252ff8457c5fe9dbb13

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page