Observability SDK for Impulse microservices
Project description
impulse_telemetry
Observability SDK for Impulse microservices. One init() call wires up distributed tracing, Prometheus metrics, structured logging, and FastAPI middleware.
Install
pip install impulse-telemetry
# With SQLAlchemy / Redis / Celery auto-instrumentation
pip install "impulse-telemetry[extras]"
# With ML data quality checks (pandas required)
pip install "impulse-telemetry[ml]"
Quickstart
from fastapi import FastAPI
from impulse_telemetry import init
app = FastAPI()
init(app, service="my-service", version="1.0.0", env="production")
Every HTTP request now emits RED metrics, a distributed trace, and a structured JSON log line — automatically.
init() parameters
| Parameter | Default | Description |
|---|---|---|
app |
None |
FastAPI app. Pass to enable HTTP middleware. |
service |
required | Service name — appears on all telemetry. |
version |
"0.0.0" |
Semver string tagged on all telemetry. |
env |
$ENV |
"production" | "staging" | "dev" |
otlp_endpoint |
$OTEL_EXPORTER_OTLP_ENDPOINT |
OTLP Collector gRPC address. |
prometheus_port |
None |
Expose /metrics on this port (for workers without HTTP). |
log_level |
"INFO" |
"DEBUG" | "INFO" | "WARNING" | "ERROR" |
optional_metrics |
None |
List of metric bundle instances (InferenceMetrics, TrainingMetrics, or custom). |
Logging
from impulse_telemetry.logging import get_logger
log = get_logger(__name__)
log.info("prediction_served", model_id="rec-v3", latency_ms=43)
# {"service":"my-service","trace_id":"abc..","event":"prediction_served","model_id":"rec-v3",...}
trace_id and span_id are injected automatically from the active OTEL span.
Per-request context
Bind fields once per request — they appear on every subsequent log line for that request.
from impulse_telemetry.logging import bind_request_context, clear_request_context
token = bind_request_context(user_id="usr_123", request_id="req_456")
log.info("doing_work") # user_id + request_id injected automatically
clear_request_context(token)
Metrics
Prometheus metrics are pre-registered and labeled with service and env.
Automatic (via middleware)
| Metric | Type | Description |
|---|---|---|
http_requests_total |
Counter | Requests by route, method, status |
http_request_errors_total |
Counter | 4xx / 5xx responses |
http_request_duration_seconds |
Histogram | Request latency |
http_active_requests |
Gauge | In-flight requests |
Manual
from impulse_telemetry.metrics import get_metrics
m = get_metrics()
m.dependency_latency.labels(**m.labels(dependency="redis")).observe(elapsed)
m.dependency_errors.labels(**m.labels(dependency="redis")).inc()
Available instruments: rate_limiter_hits, rate_limiter_remaining, dependency_latency, dependency_errors, queue_depth, circuit_breaker.
Tracing
from impulse_telemetry.tracing import get_tracer, inject_headers
tracer = get_tracer(__name__)
with tracer.start_as_current_span("my_operation") as span:
span.set_attribute("key", "value")
headers = inject_headers({}) # propagate W3C traceparent to downstream
requests.get("http://other-service/api", headers=headers)
Auto-instrumented libraries (when installed): requests, httpx, SQLAlchemy, Redis, Celery.
ML Monitoring
Pass metric bundles via optional_metrics at startup. Each bundle attaches to the Metrics singleton under its name attribute.
Inference monitoring
from impulse_telemetry import init
from impulse_telemetry.ml import MLMonitor, InferenceMetrics
init(app, service="rec-service", optional_metrics=[InferenceMetrics()])
monitor = MLMonitor(model_id="rec-v3", user_id=user_id)
# Context manager
with monitor.inference(features=df) as span:
result = model.predict(df)
span.record_output(result)
# Decorator
@monitor.trace
def predict(features):
return model.predict(features)
Both record inference latency, error count, and run data quality checks (missing values) automatically.
Drift & performance metrics
# Call from batch evaluation jobs
monitor.record_drift("age", score=0.18, metric="psi")
monitor.record_performance(rmse=0.042, precision=0.87, recall=0.81)
Inference metric instruments
Accessible via get_metrics().inference.
| Metric | Type | Description |
|---|---|---|
ml_inference_duration_seconds |
Histogram | Per-model inference latency |
ml_inference_requests_total |
Counter | Inference count |
ml_inference_errors_total |
Counter | Inference errors |
ml_missing_feature_rate |
Gauge | Missing values per column |
ml_schema_violations_total |
Counter | Schema violation count |
ml_feature_drift_score |
Gauge | Feature drift score |
ml_prediction_drift_score |
Gauge | Prediction drift score |
ml_rmse |
Gauge | Rolling RMSE |
ml_precision |
Gauge | Rolling precision |
ml_recall |
Gauge | Rolling recall |
Training monitoring
from impulse_telemetry.ml import TrainingMetrics
init(app, service="training-svc", optional_metrics=[TrainingMetrics()])
m = get_metrics()
m.training.record_job("xgboost", "success", duration_seconds=42.1)
m.training.record_model("xgboost", accuracy=0.91, loss=0.12)
m.training.record_dataset("dataset-v3", row_count=100_000, size_bytes=52_428_800)
| Metric | Type | Description |
|---|---|---|
training_jobs_total |
Counter | Completed training jobs |
training_job_duration_seconds |
Histogram | Job wall-clock time |
training_jobs_active |
Gauge | Currently running jobs |
training_job_queue_size |
Gauge | Jobs waiting in queue |
model_training_accuracy |
Histogram | Best validation accuracy |
model_training_loss |
Histogram | Final validation loss |
dataset_rows_processed_total |
Counter | Dataset rows ingested |
dataset_size_bytes |
Histogram | Dataset size |
Background workers
For services without an HTTP server, expose metrics on a dedicated port:
init(service="ingest-worker", prometheus_port=9090)
# Prometheus scrapes localhost:9090/metrics
Environment variables
| Variable | Description |
|---|---|
OTEL_EXPORTER_OTLP_ENDPOINT |
OTLP Collector gRPC address |
ENV |
Deployment environment (production, staging, dev) |
See also
examples.py — minimal, runnable code for every feature.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file impulse_telemetry-0.2.3.tar.gz.
File metadata
- Download URL: impulse_telemetry-0.2.3.tar.gz
- Upload date:
- Size: 20.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5b514cd2a2f533697b6d83dec5e5d21ff8c91c56ee5a9117fb6428c4e1b5e32e
|
|
| MD5 |
f640c47b5138546a615032d2f35c45e0
|
|
| BLAKE2b-256 |
9145316750c1eefc56c75bc9b688517997cf6bce6140c80980130223e39b3b84
|
File details
Details for the file impulse_telemetry-0.2.3-py3-none-any.whl.
File metadata
- Download URL: impulse_telemetry-0.2.3-py3-none-any.whl
- Upload date:
- Size: 23.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
31e1f15e24c7a207b59457c43bc09c2fa8a6154b96f3904fb2b2e97a62664447
|
|
| MD5 |
e7dfd0af8ffddb6a615dc33259156bad
|
|
| BLAKE2b-256 |
cee5294a5138584f97d8a2dcab33a6f31a5a99e26d1eea9999e45d5027701c67
|