Common infrastructure components for Robin microservices
Project description
Common Infrastructure Library
A comprehensive Python library providing battle-tested infrastructure components for building resilient, observable microservices. Designed for teams building distributed systems with Python.
Overview
This library extracts common patterns and utilities used in production microservices, enabling:
- Consistent infrastructure patterns across services
- Single source of truth for resilience, observability, and messaging
- Rapid onboarding of new services
- Independent versioning and upgrade paths
๐ Read the User Guide for comprehensive documentation and examples.
Components
๐ก๏ธ Resilience: Circuit Breaker
Prevent cascading failures with automatic circuit breaking. Implements the three-state model (CLOSED, HALF_OPEN, OPEN) with configurable failure thresholds and recovery timeouts. Thread-safe with full async/sync support.
from robin_commons.resilience.breaker import CircuitBreaker, CircuitBreakerConfig
config = CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout_seconds=60,
success_threshold=2
)
breaker = CircuitBreaker(config)
# Async context manager usage
async with breaker:
await external_service.call()
# Or synchronous usage
with breaker:
external_service.call()
Features:
- Three-state model: CLOSED (normal) โ OPEN (failing fast) โ HALF_OPEN (recovery testing)
- Thread-safe operations with RLock
- Both async and sync context manager support
- Detailed logging for state transitions and failures
- Observability properties:
is_open,state,failure_count,last_failure_time
๐ Logging: Structured JSON Logging
Production-ready logging with JSON output and context propagation. Built on Loguru with environment-aware formatting for Grafana Loki/Alloy collection.
from robin_commons.log import logger, configure_logging
configure_logging()
logger.info("Application started", service="my-service", version="1.0.0")
# Context variables are automatically propagated
from robin_commons.telemetry import set_correlation_id
set_correlation_id("correlation-123")
Features:
- JSON structured logging for container environments
- Automatic trace context integration
- Environment-aware output (JSON for production, human-readable for development)
- Thread-safe with enqueue=True
- Full backtrace and diagnostic information
๐ Telemetry: Observability Suite
Complete observability stack with distributed tracing, metrics collection, and request correlation using OpenTelemetry.
Setup & Configuration:
from robin_commons.telemetry import setup_observability, get_observability_config
# Configure once at startup
setup_observability()
# Access config for service metadata
config = get_observability_config()
Distributed Tracing:
from robin_commons.telemetry import span, async_span, add_span_event
# Synchronous span
@span("operation_name")
def process_data():
add_span_event("processing_started")
return data
# Asynchronous span
@async_span("async_operation")
async def fetch_data():
add_span_event("fetch_completed")
return data
Metrics Collection:
from robin_commons.telemetry import (
record_http_request,
record_database_query,
record_cache_operation,
timed_operation
)
# Record HTTP requests
record_http_request(method="GET", status=200, duration=0.234)
# Record database queries
record_database_query(query="SELECT *", duration=0.015)
# Record cache operations
record_cache_operation(operation="get", hit=True, duration=0.001)
# Time operations
with timed_operation("expensive_operation"):
result = perform_work()
Request Correlation:
from robin_commons.telemetry import (
get_correlation_id,
set_correlation_id,
get_request_id,
set_request_id,
log_correlation_context
)
# Set/get correlation IDs
set_correlation_id("corr-123")
correlation_id = get_correlation_id()
# Log correlation context
log_correlation_context()
FastAPI Integration:
from fastapi import FastAPI
from robin_commons.telemetry import TraceMiddleware, setup_observability
app = FastAPI()
app.add_middleware(TraceMiddleware)
@app.on_event("startup")
async def startup():
setup_observability()
Auto-instrumentation:
from robin_commons.telemetry import get_instrumentation_manager
manager = get_instrumentation_manager()
manager.setup_all_instrumentation(app)
# Automatically instruments:
# - FastAPI applications
# - SQLAlchemy database operations
# - Redis cache operations
# - HTTPX/Requests HTTP clients
# - NATS messaging
# - gRPC services
Status: Planned Components
๐พ Cache: Redis Client (Coming soon)
- Redis client with automatic cluster detection
- Connection pooling and resilience features
๐จ Messaging: NATS Client (Coming soon)
- Production-grade NATS client with JetStream support
- Durable pub/sub messaging with typed event publishing
Installation
From PyPI (when available)
pip install robin-commons
From Source
git clone https://github.com/neeve-ai/robin-commons.git
cd robin-commons
pip install -e .
Optional Dependencies
Install additional instrumentation for specific frameworks:
# FastAPI and SQLAlchemy observability
pip install robin-commons[fastapi,sqlalchemy]
# All optional dependencies
pip install robin-commons[all]
Quick Start
1. Set Up Logging
from robin_commons.log import configure_logging, logger
configure_logging()
logger.info("Application initialized")
2. Initialize Observability
from robin_commons.telemetry import setup_observability
setup_observability()
3. Add Circuit Breaker
from robin_commons.resilience.breaker import CircuitBreaker, CircuitBreakerConfig
breaker_config = CircuitBreakerConfig(
failure_threshold=5,
recovery_timeout_seconds=30,
success_threshold=2
)
breaker = CircuitBreaker(breaker_config)
async with breaker:
result = await call_external_service()
4. Add FastAPI with Observability
from fastapi import FastAPI
from robin_commons.telemetry import TraceMiddleware, setup_observability
from robin_commons.log import configure_logging
# Configure at startup
configure_logging()
setup_observability()
app = FastAPI()
app.add_middleware(TraceMiddleware)
# Auto-instrument FastAPI and dependencies
from robin_commons.telemetry import get_instrumentation_manager
get_instrumentation_manager().setup_all_instrumentation(app)
Configuration
All components support configuration via environment variables:
# Logging
ENVIRONMENT=production
# Observability
OTEL_SERVICE_NAME=my-service
OTEL_SERVICE_VERSION=1.0.0
OTEL_ENVIRONMENT=production
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_EXPORTER_OTLP_PROTOCOL=grpc
OTEL_TRACES_SAMPLER_ARG=0.1
# For Grafana Cloud (optional)
GRAFANA_CLOUD_OTLP_ENDPOINT=https://otlp-gateway-prod-us-west-0.grafana.net/otlp
OTEL_EXPORTER_OTLP_HEADERS=authorization=Bearer<token>
# For local Alloy (cost optimization)
ENABLE_ALLOY=true
ALLOY_HOST=localhost
ALLOY_PORT=4317
# Instrumentation flags
OTEL_FASTAPI_INSTRUMENTATION_ENABLED=true
OTEL_SQLALCHEMY_INSTRUMENTATION_ENABLED=true
OTEL_REDIS_INSTRUMENTATION_ENABLED=true
OTEL_HTTPX_INSTRUMENTATION_ENABLED=true
OTEL_NATS_INSTRUMENTATION_ENABLED=true
OTEL_GRPC_INSTRUMENTATION_ENABLED=true
Using Dotenv
Create a .env file:
ENVIRONMENT=development
OTEL_SERVICE_NAME=my-service
OTEL_SERVICE_VERSION=0.1.0
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
Load in your application:
from dotenv import load_dotenv
load_dotenv()
from robin_commons.log import configure_logging
from robin_commons.telemetry import setup_observability
configure_logging()
setup_observability()
Architecture
robin_commons/
โโโ resilience/
โ โโโ __init__.py
โ โโโ breaker.py # Circuit breaker implementation
โ โโโ CircuitBreaker # Main class with async/sync support
โ โโโ CircuitBreakerState # Enum: CLOSED, OPEN, HALF_OPEN
โ โโโ CircuitBreakerConfig # Configuration dataclass
โ โโโ CircuitBreakerError # Exception for open circuit
โ
โโโ log/
โ โโโ __init__.py
โ โโโ config.py # Logging configuration
โ โโโ configure_logging() # Setup JSON logging for Loki
โ โโโ logger # Loguru logger instance
โ
โโโ telemetry/
โโโ __init__.py # Public exports
โโโ config.py # Observability configuration
โ โโโ ServiceConfig # Service metadata
โ โโโ OtlpExporterConfig # OTLP endpoint config
โ โโโ ObservabilityConfig # Main config class
โโโ correlation.py # Request correlation context
โ โโโ set_correlation_id() # Set correlation context
โ โโโ get_correlation_id() # Get correlation context
โ โโโ Utilities for trace/span/user IDs
โโโ tracing.py # Distributed tracing
โ โโโ setup_observability() # Bootstrap tracing
โ โโโ @span decorator # Sync spans
โ โโโ @async_span decorator # Async spans
โ โโโ add_span_event() # Add span events
โ โโโ CircuitBreakerSpanExporter # OTLP with circuit breaker
โโโ metrics.py # Metrics collection
โ โโโ BaseMetricsCollector # Metrics manager
โ โโโ record_http_request() # HTTP metrics
โ โโโ record_database_query() # DB metrics
โ โโโ record_cache_operation()# Cache metrics
โ โโโ record_error() # Error tracking
โ โโโ @timed_operation decorator
โโโ middleware.py # FastAPI HTTP middleware
โ โโโ TraceMiddleware # W3C trace propagation
โโโ instrumentation.py # Auto-instrumentation manager
โโโ InstrumentationManager
โโโ setup_all_instrumentation()
โโโ setup_fastapi_instrumentation()
โโโ setup_sqlalchemy_instrumentation()
โโโ setup_redis_instrumentation()
โโโ setup_httpx_instrumentation()
โโโ setup_nats_instrumentation()
โโโ setup_grpc_instrumentation()
Dependencies
Core:
pydantic>=2.12.5- Configuration validationloguru>=0.7.2- Structured logginghttpx>=0.28.1- Async HTTP client
Observability:
opentelemetry-api>=1.37.0- OpenTelemetry APIopentelemetry-sdk>=1.37.0- OpenTelemetry SDKopentelemetry-exporter-otlp>=1.37.0- OTLP exporter
Auto-instrumentation:
opentelemetry-instrumentation-fastapi>=0.58b0opentelemetry-instrumentation-sqlalchemy>=0.58b0opentelemetry-instrumentation-redis>=0.58b0opentelemetry-instrumentation-httpx>=0.58b0opentelemetry-instrumentation-grpc>=0.58b0
Framework Integrations:
fastapi>=0.110.0- Web frameworksqlalchemy>=2.0.28- ORMredis>=5.0.1- Cache client
Versioning
This library follows Semantic Versioning:
- MAJOR: Breaking API changes
- MINOR: New features (backward-compatible)
- PATCH: Bug fixes
See CHANGELOG.md for detailed version history.
Testing
Run tests locally:
pytest tests/ -v
# With coverage
pytest tests/ --cov=robin_commons --cov-report=html
Run integration tests (requires Docker):
docker-compose -f docker-compose.test.yml up
pytest tests/integration/ -v
Troubleshooting
Circuit Breaker Always Open
Problem: Your circuit breaker stays in the OPEN state and doesn't recover.
Solutions:
- Check your
failure_threshold- it might be too low. Try increasing it. - Verify the external service is actually recovering and returning successful responses.
- Check
recovery_timeout_seconds- ensure it's giving enough time for recovery (default: 60s). - In HALF_OPEN state, you need
success_thresholdconsecutive successes to close (default: 2).
Debug:
breaker = CircuitBreaker(config)
print(f"State: {breaker.state}")
print(f"Failure count: {breaker.failure_count}")
print(f"Next attempt: {breaker.get_next_attempt_time()}")
Missing Traces in OTLP Collector
Problem: Spans are not appearing in Grafana, Jaeger, or your OTLP collector.
Solutions:
- Verify
OTEL_EXPORTER_OTLP_ENDPOINTis correct and reachable:curl -i http://localhost:4317/healthz
- Check if the OTLP collector is running and accessible from your application.
- Verify network connectivity - check firewall rules, DNS resolution.
- Enable debug logging:
import logging logging.basicConfig(level=logging.DEBUG)
- Check sample rate - default is 0.1 (10% of traces). Set
OTEL_TRACES_SAMPLER_ARG=1.0for 100%.
Logging Not Appearing
Problem: Your logs aren't being captured or formatted incorrectly.
Solutions:
- Call
configure_logging()early in your application startup (before creating loggers). - For development, set
ENVIRONMENT=developmentto get colored console output. - For production, use
ENVIRONMENT=productionfor JSON output suitable for Loki. - Verify log level - default is INFO. Set
DEBUGto see more details. - Check if logs are being enqueued properly (they are async by default).
Debug:
from robin_commons.log import logger, configure_logging
configure_logging()
logger.debug("Debug message")
logger.info("Info message")
logger.error("Error message")
Instrumentation Not Working
Problem: FastAPI, SQLAlchemy, Redis, or other libraries aren't being instrumented.
Solutions:
- Call instrumentation setup after creating app instances:
app = FastAPI() from robin_commons.telemetry import get_instrumentation_manager manager = get_instrumentation_manager() manager.setup_all_instrumentation(app)
- Verify instrumentation is enabled in config:
OTEL_FASTAPI_INSTRUMENTATION_ENABLED=true OTEL_SQLALCHEMY_INSTRUMENTATION_ENABLED=true
- Check that required packages are installed (instrumentation packages are optional).
- For FastAPI, add middleware before instrumentation:
from robin_commons.telemetry import TraceMiddleware app.add_middleware(TraceMiddleware) get_instrumentation_manager().setup_all_instrumentation(app)
Context Variables Not Propagating
Problem: Correlation IDs or trace context not appearing in logs.
Solutions:
- Set correlation context early in request processing:
from robin_commons.telemetry import set_correlation_id set_correlation_id(request.headers.get("X-Correlation-ID"))
- Ensure
TraceMiddlewareis added to FastAPI:app.add_middleware(TraceMiddleware)
- Context variables are async-aware - ensure you're using async functions.
- For manual context setup, use async tasks carefully:
# In async context - this works set_correlation_id("id-123") # In thread pool - create new context import asyncio asyncio.run(async_operation())
OTLP Connection Errors
Problem: Getting connection refused or timeout errors when exporting spans.
Solutions:
- Local development: Ensure OTLP collector is running:
docker run -p 4317:4317 ghcr.io/open-telemetry/opentelemetry-collector
- Using Grafana Cloud: Verify endpoint and headers:
GRAFANA_CLOUD_OTLP_ENDPOINT=https://otlp-gateway-prod-us-west-0.grafana.net/otlp OTEL_EXPORTER_OTLP_HEADERS=authorization=Bearer<YOUR_TOKEN>
- Using Alloy: Enable with:
ENABLE_ALLOY=true
- Network issues: Check:
- Firewall rules allowing egress to OTLP endpoint
- DNS resolution for the endpoint
- TLS certificate validity (for https endpoints)
High Memory Usage
Problem: Application memory grows over time due to telemetry.
Solutions:
- Adjust batch processor settings - configure maximum queue size:
# Default batch size is 512 spans # Adjust if needed based on traffic
- Reduce sample rate if 100% sampling is enabled:
OTEL_TRACES_SAMPLER_ARG=0.1 # Sample 10% instead of 100%
- For high-traffic services, consider:
- Using Grafana Cloud or managed observability
- Sampling at the application level
- Disabling specific instrumentations if not needed
Configuration Not Being Applied
Problem: Environment variables or configuration changes don't take effect.
Solutions:
- Ensure variables are set before importing robin_commons:
import os os.environ["OTEL_SERVICE_NAME"] = "my-service" from robin_commons.telemetry import setup_observability setup_observability()
- Use
.envfiles withpython-dotenv:from dotenv import load_dotenv load_dotenv()
- Verify variables are actually set:
import os print(os.getenv("OTEL_SERVICE_NAME"))
- Some configuration is cached - restart the application after changing env vars.
See docs/troubleshooting.md for more detailed guidance and common issues.
Contributing
Contributions are welcome! Please follow these guidelines:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Add tests for new functionality
- Ensure all tests pass (
pytest) - Commit with clear messages (
git commit -m 'Add amazing feature') - Push to your branch (
git push origin feature/amazing-feature) - Open a Pull Request
Code Style
- Use Black for formatting
- Follow PEP 8 conventions
- Add type hints to all functions
- Maintain > 90% test coverage
License
This project is licensed under the Apache License, Version 2.0.
You are free to use, modify, and distribute this library in accordance with the terms of the license. A copy of the license is available in the LICENSE file.
Scope Clarification
This repository contains open-source shared libraries used within the Robin ecosystem, such as common utilities, logging infrastructure, and foundational components.
It does not include:
- The Robin core engine
- Agent orchestration logic
- Proprietary AI models or workflows
- Commercial SaaS infrastructure
Those components are part of Neeveโs proprietary systems and are distributed separately under commercial terms.
Contributions
By contributing to this repository, you agree that your contributions will be licensed under the Apache License, Version 2.0.
Support
- ๐ Documentation
- ๐ Issue Tracker
- ๐ฌ Discussions
Related Reading
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file robin_commons-0.2.4.tar.gz.
File metadata
- Download URL: robin_commons-0.2.4.tar.gz
- Upload date:
- Size: 54.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
651d7ec94300fac9f3e7e52b84306967040d89e6cf5c146c5a6e0ba4e19352d2
|
|
| MD5 |
16aa94737e22d80f300e5fdffcdfbf23
|
|
| BLAKE2b-256 |
4a568fd9471de76356c51eb23386f4c835f2be880253c6e120c0301ee2484abe
|
Provenance
The following attestation bundles were made for robin_commons-0.2.4.tar.gz:
Publisher:
release.yml on neeve-ai/robin-commons
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
robin_commons-0.2.4.tar.gz -
Subject digest:
651d7ec94300fac9f3e7e52b84306967040d89e6cf5c146c5a6e0ba4e19352d2 - Sigstore transparency entry: 1328326551
- Sigstore integration time:
-
Permalink:
neeve-ai/robin-commons@39868d8f1445f5a467ba5c82c0dc189dbaa7f0cf -
Branch / Tag:
refs/tags/v0.2.4 - Owner: https://github.com/neeve-ai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@39868d8f1445f5a467ba5c82c0dc189dbaa7f0cf -
Trigger Event:
release
-
Statement type:
File details
Details for the file robin_commons-0.2.4-py3-none-any.whl.
File metadata
- Download URL: robin_commons-0.2.4-py3-none-any.whl
- Upload date:
- Size: 59.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9beed6af9409f17930519e1b4b74a497a7db7fbdd21d7352b4a78663af6fcea9
|
|
| MD5 |
c60cce71e98fcc93fb04b8ccfd445394
|
|
| BLAKE2b-256 |
b45fcb3c8ee5fb29a0f38042ee57208cc7648c8df02cdcd59089685899f9e19b
|
Provenance
The following attestation bundles were made for robin_commons-0.2.4-py3-none-any.whl:
Publisher:
release.yml on neeve-ai/robin-commons
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
robin_commons-0.2.4-py3-none-any.whl -
Subject digest:
9beed6af9409f17930519e1b4b74a497a7db7fbdd21d7352b4a78663af6fcea9 - Sigstore transparency entry: 1328326597
- Sigstore integration time:
-
Permalink:
neeve-ai/robin-commons@39868d8f1445f5a467ba5c82c0dc189dbaa7f0cf -
Branch / Tag:
refs/tags/v0.2.4 - Owner: https://github.com/neeve-ai
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@39868d8f1445f5a467ba5c82c0dc189dbaa7f0cf -
Trigger Event:
release
-
Statement type: