Skip to main content

Python OTEL wrapper by Rebrandly

Project description

Rebrandly OpenTelemetry SDK for Python

A comprehensive OpenTelemetry instrumentation SDK designed specifically for Rebrandly services, with built-in support for AWS Lambda functions and message processing.

Overview

The Rebrandly OpenTelemetry SDK provides a unified interface for distributed tracing, metrics collection, and structured logging across Python applications. It offers automatic instrumentation for AWS Lambda functions, simplified span management, and seamless integration with OTLP-compatible backends.

Installation

pip install rebrandly-otel

Dependencies

  • opentelemetry-api
  • opentelemetry-sdk
  • opentelemetry-exporter-otlp-proto-grpc
  • opentelemetry-semantic-conventions
  • psutil (for system metrics)

Configuration

The SDK is configured through environment variables:

Variable Description Default
OTEL_SERVICE_NAME Service identifier default-service-python
OTEL_SERVICE_VERSION Service version 1.0.0
OTEL_EXPORTER_OTLP_ENDPOINT OTLP collector endpoint None
OTEL_DEBUG Enable console debugging false
BATCH_EXPORT_TIME_MILLIS Batch export interval 100
ENV or ENVIRONMENT or NODE_ENV Deployment environment local

Core Components

RebrandlyOTEL Class

The main entry point for all telemetry operations. Implements a singleton pattern to ensure consistent instrumentation across your application.

Properties

  • tracer: Returns the RebrandlyTracer instance for distributed tracing
  • meter: Returns the RebrandlyMeter instance for metrics collection
  • logger: Returns the configured Python logger with OpenTelemetry integration

Initialization

The SDK auto-initializes as soon as you embed it.

Key Methods

span(name, attributes=None, kind=SpanKind.INTERNAL, message=None)

Context manager for creating traced spans with automatic error handling and status management.

lambda_handler(name=None, attributes=None, kind=SpanKind.CONSUMER, auto_flush=True, skip_aws_link=True)

Decorator for AWS Lambda functions with automatic instrumentation, metrics collection, and telemetry flushing.

aws_message_handler(name=None, attributes=None, kind=SpanKind.CONSUMER, auto_flush=True)

Decorator for processing individual AWS messages (SQS/SNS) with context propagation.

aws_message_span(name, message=None, attributes=None, kind=SpanKind.CONSUMER)

Context manager for creating spans from AWS messages with automatic context extraction.

force_flush(start_datetime=None, timeout_millis=1000)

Forces all pending telemetry data to be exported. Critical for serverless environments.

shutdown()

Gracefully shuts down all OpenTelemetry components.

Built-in Metrics

The SDK automatically registers and tracks the following metrics:

Standard Metrics

  • cpu_usage_percentage (Gauge): CPU utilization percentage
  • memory_usage_bytes (Gauge): Memory usage in bytes

Custom Metrics

You can create the custom metrics you need using the default open telemetry metrics

from src.rebrandly_otel import meter

sqs_counter = meter.meter.create_counter(
    name="sqs_sender_counter",
    description="Number of messages sent",
    unit="1"
)
sqs_counter.add(1)

Tracing Features

Automatic Context Propagation

The SDK automatically extracts and propagates trace context from:

  • AWS SQS message attributes
  • AWS SNS message attributes
  • HTTP headers
  • Custom carriers

Span Attributes

Lambda spans automatically include:

  • faas.trigger: Detected trigger type (sqs, sns, api_gateway, etc.)
  • faas.execution: AWS request ID
  • faas.id: Function ARN
  • cloud.provider: Always "aws" for Lambda
  • cloud.platform: Always "aws_lambda" for Lambda

Exception Handling

Spans automatically capture exceptions with:

  • Full exception details and stack traces
  • Automatic status code setting
  • Exception events in the span timeline

Logging Integration

The SDK integrates with Python's standard logging module:

from rebrandly_otel import logger

# Use as a standard Python logger
logger.info("Processing started", extra={"request_id": "123"})
logger.error("Processing failed", exc_info=True)

Features:

  • Automatic trace context injection
  • Structured logging support
  • Console and OTLP export
  • Log level configuration via environment

AWS Lambda Support

Trigger Detection

Automatically detects and labels Lambda triggers:

  • API Gateway (v1 and v2)
  • SQS
  • SNS
  • S3
  • Kinesis
  • DynamoDB
  • EventBridge
  • Batch

Automatic Metrics

For Lambda functions, the SDK automatically captures:

  • Memory usage
  • CPU utilization

Context Extraction

Automatically extracts trace context from:

  • SQS MessageAttributes
  • SNS MessageAttributes (including nested format)
  • Custom message attributes

Performance Considerations

Batch Processing

  • Configurable batch sizes and intervals
  • Automatic batching for traces, metrics, and logs
  • Optimized for high-throughput scenarios

Lambda Optimization

  • Automatic flushing before function freeze
  • Minimal cold start impact
  • Efficient memory usage
  • Configurable timeout handling

Export Formats

Supported Exporters

  • OTLP/gRPC: Primary export format for production
  • Console: Available for local development and debugging

Thread Safety

All components are thread-safe and can be used in multi-threaded applications:

  • Singleton pattern ensures single initialization
  • Thread-safe metric recording
  • Concurrent span creation support

Resource Attributes

Automatically includes:

  • Service name and version
  • Python runtime version
  • Deployment environment
  • Custom resource attributes via environment

Error Handling

  • Graceful degradation when OTLP endpoint unavailable
  • Non-blocking telemetry operations
  • Automatic retry with exponential backoff
  • Comprehensive error logging

Compatibility

  • Python 3.7+
  • AWS Lambda runtime support
  • Compatible with OpenTelemetry Collector
  • Works with any OTLP-compatible backend

Examples

Lambda - Send SNS / SQS message

import os
import json
import boto3
from rebrandly_otel import otel, lambda_handler, logger

sqs = boto3.client('sqs')
QUEUE_URL = os.environ.get('SQS_URL')

@lambda_handler("sqs_sender")
def handler(event, context):
    logger.info("Starting SQS message send")

    # Get trace context for propagation
    trace_attrs = otel.tracer.get_attributes_for_aws_from_context()

    # Send message with trace context
    response = sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps({"data": "test message"}),
        MessageAttributes=trace_attrs
    )

    logger.info(f"Sent SQS message: {response['MessageId']}")

    return {
        'statusCode': 200,
        'body': json.dumps({'messageId': response['MessageId']})
    }

Lambda Receive SQS message

import json
from rebrandly_otel import lambda_handler, logger, aws_message_span

@lambda_handler(name="sqs_receiver")
def handler(event, context):
    for record in event['Records']:
        # Process each message with trace context
        process_message(record)

def process_message(record):
    with aws_message_span("process_message_sqs_receiver", message=record) as s:
        logger.info(f"Processing message: {record['messageId']}")

        # Parse message body
        body = json.loads(record['body'])
        logger.info(f"Message data: {body}")

Lambda Receive SNS message (record specific event)

import json
from rebrandly_otel import lambda_handler, logger, aws_message_span

@lambda_handler(name="sns_receiver")
def handler(event, context):
    for record in event['Records']:
        # Process each message with trace context
        process_message(record)

def process_message(record):
    message = json.loads(record['Sns']['Message'])
    if message['event'] == 'whitelisted-event':
        with aws_message_span("process_message_sns_receiver", message=record) as s:
            logger.info(f"Processing message: {record['messageId']}")
    
            # Parse message body
            body = json.loads(record['body'])
            logger.info(f"Message data: {body}")

Flask

from flask import Flask, jsonify
from src.rebrandly_otel import otel, logger, app_before_request, app_after_request, flask_error_handler
from datetime import datetime

app = Flask(__name__)

# Register the centralized OTEL handlers
app.before_request(app_before_request)
app.after_request(app_after_request)
app.register_error_handler(Exception, flask_error_handler)

@app.route('/health')
def health():
    logger.info("Health check requested")
    return jsonify({"status": "healthy"}), 200

@app.route('/process', methods=['POST', 'GET'])
def process():
    with otel.span("process_request"):
        logger.info("Processing POST request")

        # Simulate processing
        result = {"processed": True, "timestamp": datetime.now().isoformat()}

        logger.info(f"Returning result: {result}")
        return jsonify(result), 200

@app.route('/error')
def error():
    logger.error("Error endpoint called")
    raise Exception("Simulated error")

if __name__ == '__main__':
    app.run(debug=True)

FastAPI

# main_fastapi.py
from fastapi import FastAPI, HTTPException, Depends
from contextlib import asynccontextmanager
from src.rebrandly_otel import otel, logger, force_flush
from src.fastapi_support import setup_fastapi, get_current_span
from datetime import datetime
from typing import Optional
import uvicorn

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    logger.info("FastAPI application starting up")
    yield
    # Shutdown
    logger.info("FastAPI application shutting down")
    force_flush()

app = FastAPI(title="FastAPI OTEL Example", lifespan=lifespan)

# Setup FastAPI with OTEL
setup_fastapi(otel, app)

@app.get("/health")
async def health():
    """Health check endpoint."""
    logger.info("Health check requested")
    return {"status": "healthy"}

@app.post("/process")
@app.get("/process")
async def process(span = Depends(get_current_span)):
    """Process endpoint with custom span."""
    with otel.span("process_request"):
        logger.info("Processing request")

        # You can also use the injected span directly
        if span:
            span.add_event("custom_processing_event", {
                "timestamp": datetime.now().isoformat()
            })

        # Simulate some processing
        result = {
            "processed": True,
            "timestamp": datetime.now().isoformat()
        }

        logger.info(f"Returning result: {result}")
        return result

@app.get("/error")
async def error():
    """Endpoint that raises an error."""
    logger.error("Error endpoint called")
    raise HTTPException(status_code=400, detail="Simulated error")

@app.get("/exception")
async def exception():
    """Endpoint that raises an unhandled exception."""
    logger.error("Exception endpoint called")
    raise ValueError("Simulated unhandled exception")

@app.get("/items/{item_id}")
async def get_item(item_id: int, q: Optional[str] = None):
    """Example endpoint with path and query parameters."""
    with otel.span("fetch_item", attributes={"item_id": item_id, "query": q}):
        logger.info(f"Fetching item {item_id} with query: {q}")

        if item_id == 999:
            raise HTTPException(status_code=404, detail="Item not found")

        return {
            "item_id": item_id,
            "name": f"Item {item_id}",
            "query": q
        }

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

PyMySQL Database Instrumentation

The SDK provides connection-level instrumentation for PyMySQL that automatically traces all queries without requiring you to instrument each query individually.

import pymysql
from rebrandly_otel import otel, logger, instrument_pymysql

# Initialize OTEL
otel.initialize()

# Create and instrument your connection
connection = pymysql.connect(
    host='localhost',
    user='your_user',
    password='your_password',
    database='your_database'
)

# Instrument the connection - all queries are now automatically traced
connection = instrument_pymysql(otel, connection, options={
    'slow_query_threshold_ms': 1000,  # Queries over 1s flagged as slow
    'capture_bindings': False  # Set True to capture query parameters
})

# Use normally - all queries automatically traced
with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM users WHERE id = %s", (123,))
    result = cursor.fetchone()
    logger.info(f"Found user: {result}")

connection.close()
otel.force_flush()

Features:

  • Automatic span creation for all queries
  • Query operation detection (SELECT, INSERT, UPDATE, etc.)
  • Slow query detection and flagging
  • Duration tracking
  • Error recording with exception details
  • Optional query parameter capture (disabled by default for security)

Environment configuration:

  • PYMYSQL_SLOW_QUERY_THRESHOLD_MS: Threshold for slow query detection (default: 1500ms)

More examples

You can find More examples here

Testing

Running Tests

The test suite uses pytest.

Run all tests:

pytest

Run specific test file:

pytest tests/test_flask_support.py -v
pytest tests/test_fastapi_support.py -v
pytest tests/test_usage.py -v
pytest tests/test_pymysql_instrumentation.py -v
pytest tests/test_metrics_and_logs.py -v
pytest tests/test_decorators.py -v

Run with coverage:

pytest --cov=src --cov-report=html

Test Coverage

The test suite includes:

  • Integration tests (test_usage.py): Core OTEL functionality, Lambda handlers, message processing
  • Flask integration tests (test_flask_support.py): Flask setup and hooks
  • FastAPI integration tests (test_fastapi_support.py): FastAPI setup and middleware
  • PyMySQL instrumentation tests (test_pymysql_instrumentation.py): Database connection instrumentation, query tracing, helper functions
  • Metrics and logs tests (test_metrics_and_logs.py): Custom metrics creation (counter, histogram, gauge), logging levels (info, warning, debug, error)
  • Decorators tests (test_decorators.py): Lambda handler decorator, AWS message handler decorator, traces decorator, aws_message_span context manager

License

Rebrandly Python SDK is released under the MIT License.

Build and Deploy

brew install pipx
pipx ensurepath
pipx install build
pipx install twine

build

twine upload dist/*

If build gives you an error, try:

pyproject-build

twine upload dist/*

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rebrandly_otel-0.2.9.tar.gz (36.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rebrandly_otel-0.2.9-py3-none-any.whl (28.5 kB view details)

Uploaded Python 3

File details

Details for the file rebrandly_otel-0.2.9.tar.gz.

File metadata

  • Download URL: rebrandly_otel-0.2.9.tar.gz
  • Upload date:
  • Size: 36.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for rebrandly_otel-0.2.9.tar.gz
Algorithm Hash digest
SHA256 1dec6ed4069203970600dd4ff9cf4c10f84a9592864116d5cbedacebb2152d70
MD5 2b04548596e4561a474be1b60456e774
BLAKE2b-256 a83dac4ab397ab360de4438308d0de5bbdac663d3a4cb916f4f757f79bf221c9

See more details on using hashes here.

File details

Details for the file rebrandly_otel-0.2.9-py3-none-any.whl.

File metadata

  • Download URL: rebrandly_otel-0.2.9-py3-none-any.whl
  • Upload date:
  • Size: 28.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for rebrandly_otel-0.2.9-py3-none-any.whl
Algorithm Hash digest
SHA256 108275d7eed7fb0078d20b986d1e3be6c7032154c1c98b58824f8e7f4860ba61
MD5 c77e86bdce202375fd1ba9815b3d06b8
BLAKE2b-256 5efd7d6a336f47faf01d8cb0a1ad015a8cd91b64f824a945df1034a973d78898

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page