Skip to main content

Common infrastructure components for Robin microservices

Project description

Common Infrastructure Library

CI PyPI

A comprehensive Python library providing battle-tested infrastructure components for building resilient, observable microservices. Designed for teams building distributed systems with Python.

Overview

This library extracts common patterns and utilities used in production microservices, enabling:

  • Consistent infrastructure patterns across services
  • Single source of truth for resilience, observability, and messaging
  • Rapid onboarding of new services
  • Independent versioning and upgrade paths

๐Ÿ“– Read the User Guide for comprehensive documentation and examples.

Components

๐Ÿ›ก๏ธ Resilience: Circuit Breaker

Prevent cascading failures with automatic circuit breaking. Implements the three-state model (CLOSED, HALF_OPEN, OPEN) with configurable failure thresholds and recovery timeouts. Thread-safe with full async/sync support.

from robin_commons.resilience.breaker import CircuitBreaker, CircuitBreakerConfig

config = CircuitBreakerConfig(
    failure_threshold=5,
    recovery_timeout_seconds=60,
    success_threshold=2
)
breaker = CircuitBreaker(config)

# Async context manager usage
async with breaker:
    await external_service.call()

# Or synchronous usage
with breaker:
    external_service.call()

Features:

  • Three-state model: CLOSED (normal) โ†’ OPEN (failing fast) โ†’ HALF_OPEN (recovery testing)
  • Thread-safe operations with RLock
  • Both async and sync context manager support
  • Detailed logging for state transitions and failures
  • Observability properties: is_open, state, failure_count, last_failure_time

๐Ÿ“ Logging: Structured JSON Logging

Production-ready logging with JSON output and context propagation. Built on Loguru with environment-aware formatting for Grafana Loki/Alloy collection.

from robin_commons.log import logger, configure_logging

configure_logging()
logger.info("Application started", service="my-service", version="1.0.0")

# Context variables are automatically propagated
from robin_commons.telemetry import set_correlation_id
set_correlation_id("correlation-123")

Features:

  • JSON structured logging for container environments
  • Automatic trace context integration
  • Environment-aware output (JSON for production, human-readable for development)
  • Thread-safe with enqueue=True
  • Full backtrace and diagnostic information

๐Ÿ“Š Telemetry: Observability Suite

Complete observability stack with distributed tracing, metrics collection, and request correlation using OpenTelemetry.

Setup & Configuration:

from robin_commons.telemetry import setup_observability, get_observability_config

# Configure once at startup
setup_observability()

# Access config for service metadata
config = get_observability_config()

Distributed Tracing:

from robin_commons.telemetry import span, async_span, add_span_event

# Synchronous span
@span("operation_name")
def process_data():
    add_span_event("processing_started")
    return data

# Asynchronous span
@async_span("async_operation")
async def fetch_data():
    add_span_event("fetch_completed")
    return data

Metrics Collection:

from robin_commons.telemetry import (
    record_http_request,
    record_database_query,
    record_cache_operation,
    timed_operation
)

# Record HTTP requests
record_http_request(method="GET", status=200, duration=0.234)

# Record database queries
record_database_query(query="SELECT *", duration=0.015)

# Record cache operations
record_cache_operation(operation="get", hit=True, duration=0.001)

# Time operations
with timed_operation("expensive_operation"):
    result = perform_work()

Request Correlation:

from robin_commons.telemetry import (
    get_correlation_id,
    set_correlation_id,
    get_request_id,
    set_request_id,
    log_correlation_context
)

# Set/get correlation IDs
set_correlation_id("corr-123")
correlation_id = get_correlation_id()

# Log correlation context
log_correlation_context()

FastAPI Integration:

from fastapi import FastAPI
from robin_commons.telemetry import TraceMiddleware, setup_observability

app = FastAPI()
app.add_middleware(TraceMiddleware)

@app.on_event("startup")
async def startup():
    setup_observability()

Auto-instrumentation:

from robin_commons.telemetry import get_instrumentation_manager

manager = get_instrumentation_manager()
manager.setup_all_instrumentation(app)

# Automatically instruments:
# - FastAPI applications
# - SQLAlchemy database operations
# - Redis cache operations
# - HTTPX/Requests HTTP clients
# - NATS messaging
# - gRPC services

Status: Planned Components

๐Ÿ’พ Cache: Redis Client (Coming soon)

  • Redis client with automatic cluster detection
  • Connection pooling and resilience features

๐Ÿ“จ Messaging: NATS Client (Coming soon)

  • Production-grade NATS client with JetStream support
  • Durable pub/sub messaging with typed event publishing

Installation

From PyPI (when available)

pip install robin-commons

From Source

git clone https://github.com/neeve-ai/robin-commons.git
cd robin-commons
pip install -e .

Optional Dependencies

Install additional instrumentation for specific frameworks:

# FastAPI and SQLAlchemy observability
pip install robin-commons[fastapi,sqlalchemy]

# All optional dependencies
pip install robin-commons[all]

Quick Start

1. Set Up Logging

from robin_commons.log import configure_logging, logger

configure_logging()
logger.info("Application initialized")

2. Initialize Observability

from robin_commons.telemetry import setup_observability

setup_observability()

3. Add Circuit Breaker

from robin_commons.resilience.breaker import CircuitBreaker, CircuitBreakerConfig

breaker_config = CircuitBreakerConfig(
    failure_threshold=5,
    recovery_timeout_seconds=30,
    success_threshold=2
)
breaker = CircuitBreaker(breaker_config)

async with breaker:
    result = await call_external_service()

4. Add FastAPI with Observability

from fastapi import FastAPI
from robin_commons.telemetry import TraceMiddleware, setup_observability
from robin_commons.log import configure_logging

# Configure at startup
configure_logging()
setup_observability()

app = FastAPI()
app.add_middleware(TraceMiddleware)

# Auto-instrument FastAPI and dependencies
from robin_commons.telemetry import get_instrumentation_manager
get_instrumentation_manager().setup_all_instrumentation(app)

Configuration

All components support configuration via environment variables:

# Logging
ENVIRONMENT=production

# Observability
OTEL_SERVICE_NAME=my-service
OTEL_SERVICE_VERSION=1.0.0
OTEL_ENVIRONMENT=production
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_EXPORTER_OTLP_PROTOCOL=grpc
OTEL_TRACES_SAMPLER_ARG=0.1

# For Grafana Cloud (optional)
GRAFANA_CLOUD_OTLP_ENDPOINT=https://otlp-gateway-prod-us-west-0.grafana.net/otlp
OTEL_EXPORTER_OTLP_HEADERS=authorization=Bearer<token>

# For local Alloy (cost optimization)
ENABLE_ALLOY=true
ALLOY_HOST=localhost
ALLOY_PORT=4317

# Instrumentation flags
OTEL_FASTAPI_INSTRUMENTATION_ENABLED=true
OTEL_SQLALCHEMY_INSTRUMENTATION_ENABLED=true
OTEL_REDIS_INSTRUMENTATION_ENABLED=true
OTEL_HTTPX_INSTRUMENTATION_ENABLED=true
OTEL_NATS_INSTRUMENTATION_ENABLED=true
OTEL_GRPC_INSTRUMENTATION_ENABLED=true

Using Dotenv

Create a .env file:

ENVIRONMENT=development
OTEL_SERVICE_NAME=my-service
OTEL_SERVICE_VERSION=0.1.0
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317

Load in your application:

from dotenv import load_dotenv
load_dotenv()

from robin_commons.log import configure_logging
from robin_commons.telemetry import setup_observability

configure_logging()
setup_observability()

Architecture

robin_commons/
โ”œโ”€โ”€ resilience/
โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ””โ”€โ”€ breaker.py              # Circuit breaker implementation
โ”‚       โ”œโ”€โ”€ CircuitBreaker       # Main class with async/sync support
โ”‚       โ”œโ”€โ”€ CircuitBreakerState  # Enum: CLOSED, OPEN, HALF_OPEN
โ”‚       โ”œโ”€โ”€ CircuitBreakerConfig # Configuration dataclass
โ”‚       โ””โ”€โ”€ CircuitBreakerError  # Exception for open circuit
โ”‚
โ”œโ”€โ”€ log/
โ”‚   โ”œโ”€โ”€ __init__.py
โ”‚   โ””โ”€โ”€ config.py               # Logging configuration
โ”‚       โ”œโ”€โ”€ configure_logging()  # Setup JSON logging for Loki
โ”‚       โ””โ”€โ”€ logger               # Loguru logger instance
โ”‚
โ””โ”€โ”€ telemetry/
    โ”œโ”€โ”€ __init__.py             # Public exports
    โ”œโ”€โ”€ config.py               # Observability configuration
    โ”‚   โ”œโ”€โ”€ ServiceConfig        # Service metadata
    โ”‚   โ”œโ”€โ”€ OtlpExporterConfig   # OTLP endpoint config
    โ”‚   โ””โ”€โ”€ ObservabilityConfig  # Main config class
    โ”œโ”€โ”€ correlation.py          # Request correlation context
    โ”‚   โ”œโ”€โ”€ set_correlation_id()  # Set correlation context
    โ”‚   โ”œโ”€โ”€ get_correlation_id()  # Get correlation context
    โ”‚   โ””โ”€โ”€ Utilities for trace/span/user IDs
    โ”œโ”€โ”€ tracing.py              # Distributed tracing
    โ”‚   โ”œโ”€โ”€ setup_observability()        # Bootstrap tracing
    โ”‚   โ”œโ”€โ”€ @span decorator              # Sync spans
    โ”‚   โ”œโ”€โ”€ @async_span decorator        # Async spans
    โ”‚   โ”œโ”€โ”€ add_span_event()             # Add span events
    โ”‚   โ””โ”€โ”€ CircuitBreakerSpanExporter   # OTLP with circuit breaker
    โ”œโ”€โ”€ metrics.py              # Metrics collection
    โ”‚   โ”œโ”€โ”€ BaseMetricsCollector # Metrics manager
    โ”‚   โ”œโ”€โ”€ record_http_request()   # HTTP metrics
    โ”‚   โ”œโ”€โ”€ record_database_query() # DB metrics
    โ”‚   โ”œโ”€โ”€ record_cache_operation()# Cache metrics
    โ”‚   โ”œโ”€โ”€ record_error()         # Error tracking
    โ”‚   โ””โ”€โ”€ @timed_operation decorator
    โ”œโ”€โ”€ middleware.py           # FastAPI HTTP middleware
    โ”‚   โ””โ”€โ”€ TraceMiddleware      # W3C trace propagation
    โ””โ”€โ”€ instrumentation.py      # Auto-instrumentation manager
        โ””โ”€โ”€ InstrumentationManager
            โ”œโ”€โ”€ setup_all_instrumentation()
            โ”œโ”€โ”€ setup_fastapi_instrumentation()
            โ”œโ”€โ”€ setup_sqlalchemy_instrumentation()
            โ”œโ”€โ”€ setup_redis_instrumentation()
            โ”œโ”€โ”€ setup_httpx_instrumentation()
            โ”œโ”€โ”€ setup_nats_instrumentation()
            โ””โ”€โ”€ setup_grpc_instrumentation()

Dependencies

Core:

  • pydantic>=2.12.5 - Configuration validation
  • loguru>=0.7.2 - Structured logging
  • httpx>=0.28.1 - Async HTTP client

Observability:

  • opentelemetry-api>=1.37.0 - OpenTelemetry API
  • opentelemetry-sdk>=1.37.0 - OpenTelemetry SDK
  • opentelemetry-exporter-otlp>=1.37.0 - OTLP exporter

Auto-instrumentation:

  • opentelemetry-instrumentation-fastapi>=0.58b0
  • opentelemetry-instrumentation-sqlalchemy>=0.58b0
  • opentelemetry-instrumentation-redis>=0.58b0
  • opentelemetry-instrumentation-httpx>=0.58b0
  • opentelemetry-instrumentation-grpc>=0.58b0

Framework Integrations:

  • fastapi>=0.110.0 - Web framework
  • sqlalchemy>=2.0.28 - ORM
  • redis>=5.0.1 - Cache client

Versioning

This library follows Semantic Versioning:

  • MAJOR: Breaking API changes
  • MINOR: New features (backward-compatible)
  • PATCH: Bug fixes

See CHANGELOG.md for detailed version history.

Testing

Run tests locally:

pytest tests/ -v

# With coverage
pytest tests/ --cov=robin_commons --cov-report=html

Run integration tests (requires Docker):

docker-compose -f docker-compose.test.yml up
pytest tests/integration/ -v

Troubleshooting

Circuit Breaker Always Open

Problem: Your circuit breaker stays in the OPEN state and doesn't recover.

Solutions:

  1. Check your failure_threshold - it might be too low. Try increasing it.
  2. Verify the external service is actually recovering and returning successful responses.
  3. Check recovery_timeout_seconds - ensure it's giving enough time for recovery (default: 60s).
  4. In HALF_OPEN state, you need success_threshold consecutive successes to close (default: 2).

Debug:

breaker = CircuitBreaker(config)
print(f"State: {breaker.state}")
print(f"Failure count: {breaker.failure_count}")
print(f"Next attempt: {breaker.get_next_attempt_time()}")

Missing Traces in OTLP Collector

Problem: Spans are not appearing in Grafana, Jaeger, or your OTLP collector.

Solutions:

  1. Verify OTEL_EXPORTER_OTLP_ENDPOINT is correct and reachable:
    curl -i http://localhost:4317/healthz
    
  2. Check if the OTLP collector is running and accessible from your application.
  3. Verify network connectivity - check firewall rules, DNS resolution.
  4. Enable debug logging:
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
  5. Check sample rate - default is 0.1 (10% of traces). Set OTEL_TRACES_SAMPLER_ARG=1.0 for 100%.

Logging Not Appearing

Problem: Your logs aren't being captured or formatted incorrectly.

Solutions:

  1. Call configure_logging() early in your application startup (before creating loggers).
  2. For development, set ENVIRONMENT=development to get colored console output.
  3. For production, use ENVIRONMENT=production for JSON output suitable for Loki.
  4. Verify log level - default is INFO. Set DEBUG to see more details.
  5. Check if logs are being enqueued properly (they are async by default).

Debug:

from robin_commons.log import logger, configure_logging
configure_logging()
logger.debug("Debug message")
logger.info("Info message")
logger.error("Error message")

Instrumentation Not Working

Problem: FastAPI, SQLAlchemy, Redis, or other libraries aren't being instrumented.

Solutions:

  1. Call instrumentation setup after creating app instances:
    app = FastAPI()
    from robin_commons.telemetry import get_instrumentation_manager
    manager = get_instrumentation_manager()
    manager.setup_all_instrumentation(app)
    
  2. Verify instrumentation is enabled in config:
    OTEL_FASTAPI_INSTRUMENTATION_ENABLED=true
    OTEL_SQLALCHEMY_INSTRUMENTATION_ENABLED=true
    
  3. Check that required packages are installed (instrumentation packages are optional).
  4. For FastAPI, add middleware before instrumentation:
    from robin_commons.telemetry import TraceMiddleware
    app.add_middleware(TraceMiddleware)
    get_instrumentation_manager().setup_all_instrumentation(app)
    

Context Variables Not Propagating

Problem: Correlation IDs or trace context not appearing in logs.

Solutions:

  1. Set correlation context early in request processing:
    from robin_commons.telemetry import set_correlation_id
    set_correlation_id(request.headers.get("X-Correlation-ID"))
    
  2. Ensure TraceMiddleware is added to FastAPI:
    app.add_middleware(TraceMiddleware)
    
  3. Context variables are async-aware - ensure you're using async functions.
  4. For manual context setup, use async tasks carefully:
    # In async context - this works
    set_correlation_id("id-123")
    
    # In thread pool - create new context
    import asyncio
    asyncio.run(async_operation())
    

OTLP Connection Errors

Problem: Getting connection refused or timeout errors when exporting spans.

Solutions:

  1. Local development: Ensure OTLP collector is running:
    docker run -p 4317:4317 ghcr.io/open-telemetry/opentelemetry-collector
    
  2. Using Grafana Cloud: Verify endpoint and headers:
    GRAFANA_CLOUD_OTLP_ENDPOINT=https://otlp-gateway-prod-us-west-0.grafana.net/otlp
    OTEL_EXPORTER_OTLP_HEADERS=authorization=Bearer<YOUR_TOKEN>
    
  3. Using Alloy: Enable with:
    ENABLE_ALLOY=true
    
  4. Network issues: Check:
    • Firewall rules allowing egress to OTLP endpoint
    • DNS resolution for the endpoint
    • TLS certificate validity (for https endpoints)

High Memory Usage

Problem: Application memory grows over time due to telemetry.

Solutions:

  1. Adjust batch processor settings - configure maximum queue size:
    # Default batch size is 512 spans
    # Adjust if needed based on traffic
    
  2. Reduce sample rate if 100% sampling is enabled:
    OTEL_TRACES_SAMPLER_ARG=0.1  # Sample 10% instead of 100%
    
  3. For high-traffic services, consider:
    • Using Grafana Cloud or managed observability
    • Sampling at the application level
    • Disabling specific instrumentations if not needed

Configuration Not Being Applied

Problem: Environment variables or configuration changes don't take effect.

Solutions:

  1. Ensure variables are set before importing robin_commons:
    import os
    os.environ["OTEL_SERVICE_NAME"] = "my-service"
    
    from robin_commons.telemetry import setup_observability
    setup_observability()
    
  2. Use .env files with python-dotenv:
    from dotenv import load_dotenv
    load_dotenv()
    
  3. Verify variables are actually set:
    import os
    print(os.getenv("OTEL_SERVICE_NAME"))
    
  4. Some configuration is cached - restart the application after changing env vars.

See docs/troubleshooting.md for more detailed guidance and common issues.

Contributing

Contributions are welcome! Please follow these guidelines:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Add tests for new functionality
  4. Ensure all tests pass (pytest)
  5. Commit with clear messages (git commit -m 'Add amazing feature')
  6. Push to your branch (git push origin feature/amazing-feature)
  7. Open a Pull Request

Code Style

  • Use Black for formatting
  • Follow PEP 8 conventions
  • Add type hints to all functions
  • Maintain > 90% test coverage

License

This project is licensed under the Apache License, Version 2.0.

You are free to use, modify, and distribute this library in accordance with the terms of the license. A copy of the license is available in the LICENSE file.

Scope Clarification

This repository contains open-source shared libraries used within the Robin ecosystem, such as common utilities, logging infrastructure, and foundational components.

It does not include:

  • The Robin core engine
  • Agent orchestration logic
  • Proprietary AI models or workflows
  • Commercial SaaS infrastructure

Those components are part of Neeveโ€™s proprietary systems and are distributed separately under commercial terms.

Contributions

By contributing to this repository, you agree that your contributions will be licensed under the Apache License, Version 2.0.

Support

Related Reading

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

robin_commons-0.2.0.tar.gz (46.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

robin_commons-0.2.0-py3-none-any.whl (50.1 kB view details)

Uploaded Python 3

File details

Details for the file robin_commons-0.2.0.tar.gz.

File metadata

  • Download URL: robin_commons-0.2.0.tar.gz
  • Upload date:
  • Size: 46.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for robin_commons-0.2.0.tar.gz
Algorithm Hash digest
SHA256 e00df572eb4a3ab721cbd6030ab110e8f4965dc2298e2d58a2dd6b51622d93b4
MD5 86936dfdef52d80af53fca1c1bb3d9cc
BLAKE2b-256 9180ad08cdd3ee5fee5afc8d0ed0ff4574e1b69a22b6005bca35755c16c9404a

See more details on using hashes here.

File details

Details for the file robin_commons-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: robin_commons-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 50.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for robin_commons-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 128c114c583a2eb6abc5a91f22668c2bc3b4626d4fd3887d4a4b821be78cc0cd
MD5 de4d39b761b38449202444dea20ad011
BLAKE2b-256 2ea8b91b23226af7cd88e5f06054f7cd222e3e8d1301daa9f4f249e51082350d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page