Skip to main content

Observability SDK for Impulse microservices

Project description

impulse_telemetry

Observability SDK for Impulse microservices. One init() call wires up distributed tracing, Prometheus metrics, structured logging, and FastAPI middleware.

Install

pip install impulse-telemetry

# With SQLAlchemy / Redis / Celery auto-instrumentation
pip install "impulse-telemetry[extras]"

# With ML data quality checks (pandas required)
pip install "impulse-telemetry[ml]"

Quickstart

from fastapi import FastAPI
from impulse_telemetry import init

app = FastAPI()
init(app, service="my-service", version="1.0.0", env="production")

Every HTTP request now emits RED metrics, a distributed trace, and a structured JSON log line — automatically.

init() parameters

Parameter Default Description
app None FastAPI app. Pass to enable HTTP middleware.
service required Service name — appears on all telemetry.
version "0.0.0" Semver string tagged on all telemetry.
env $ENV "production" | "staging" | "dev"
otlp_endpoint $OTEL_EXPORTER_OTLP_ENDPOINT OTLP Collector gRPC address.
prometheus_port None Expose /metrics on this port (for workers without HTTP).
log_level "INFO" "DEBUG" | "INFO" | "WARNING" | "ERROR"
optional_metrics None List of metric bundle instances (InferenceMetrics, TrainingMetrics, or custom).

Logging

from impulse_telemetry.logging import get_logger

log = get_logger(__name__)
log.info("prediction_served", model_id="rec-v3", latency_ms=43)
# {"service":"my-service","trace_id":"abc..","event":"prediction_served","model_id":"rec-v3",...}

trace_id and span_id are injected automatically from the active OTEL span.

Per-request context

Bind fields once per request — they appear on every subsequent log line for that request.

from impulse_telemetry.logging import bind_request_context, clear_request_context

token = bind_request_context(user_id="usr_123", request_id="req_456")
log.info("doing_work")   # user_id + request_id injected automatically
clear_request_context(token)

Metrics

Prometheus metrics are pre-registered and labeled with service and env.

Automatic (via middleware)

Metric Type Description
http_requests_total Counter Requests by route, method, status
http_request_errors_total Counter 4xx / 5xx responses
http_request_duration_seconds Histogram Request latency
http_active_requests Gauge In-flight requests

Manual

from impulse_telemetry.metrics import get_metrics

m = get_metrics()
m.dependency_latency.labels(**m.labels(dependency="redis")).observe(elapsed)
m.dependency_errors.labels(**m.labels(dependency="redis")).inc()

Available instruments: rate_limiter_hits, rate_limiter_remaining, dependency_latency, dependency_errors, queue_depth, circuit_breaker.

Tracing

from impulse_telemetry.tracing import get_tracer, inject_headers

tracer = get_tracer(__name__)

with tracer.start_as_current_span("my_operation") as span:
    span.set_attribute("key", "value")
    headers = inject_headers({})   # propagate W3C traceparent to downstream
    requests.get("http://other-service/api", headers=headers)

Auto-instrumented libraries (when installed): requests, httpx, SQLAlchemy, Redis, Celery.

ML Monitoring

Pass metric bundles via optional_metrics at startup. Each bundle attaches to the Metrics singleton under its name attribute.

Inference monitoring

from impulse_telemetry import init
from impulse_telemetry.ml import MLMonitor, InferenceMetrics

init(app, service="rec-service", optional_metrics=[InferenceMetrics()])

monitor = MLMonitor(model_id="rec-v3", user_id=user_id)
# Context manager
with monitor.inference(features=df) as span:
    result = model.predict(df)
    span.record_output(result)

# Decorator
@monitor.trace
def predict(features):
    return model.predict(features)

Both record inference latency, error count, and run data quality checks (missing values) automatically.

Drift & performance metrics

# Call from batch evaluation jobs
monitor.record_drift("age", score=0.18, metric="psi")
monitor.record_performance(rmse=0.042, precision=0.87, recall=0.81)

Inference metric instruments

Accessible via get_metrics().inference.

Metric Type Description
ml_inference_duration_seconds Histogram Per-model inference latency
ml_inference_requests_total Counter Inference count
ml_inference_errors_total Counter Inference errors
ml_missing_feature_rate Gauge Missing values per column
ml_schema_violations_total Counter Schema violation count
ml_feature_drift_score Gauge Feature drift score
ml_prediction_drift_score Gauge Prediction drift score
ml_rmse Gauge Rolling RMSE
ml_precision Gauge Rolling precision
ml_recall Gauge Rolling recall

Training monitoring

from impulse_telemetry.ml import TrainingMetrics

init(app, service="training-svc", optional_metrics=[TrainingMetrics()])

m = get_metrics()
m.training.record_job("xgboost", "success", duration_seconds=42.1)
m.training.record_model("xgboost", accuracy=0.91, loss=0.12)
m.training.record_dataset("dataset-v3", row_count=100_000, size_bytes=52_428_800)
Metric Type Description
training_jobs_total Counter Completed training jobs
training_job_duration_seconds Histogram Job wall-clock time
training_jobs_active Gauge Currently running jobs
training_job_queue_size Gauge Jobs waiting in queue
model_training_accuracy Histogram Best validation accuracy
model_training_loss Histogram Final validation loss
dataset_rows_processed_total Counter Dataset rows ingested
dataset_size_bytes Histogram Dataset size

Background workers

For services without an HTTP server, expose metrics on a dedicated port:

init(service="ingest-worker", prometheus_port=9090)
# Prometheus scrapes localhost:9090/metrics

Environment variables

Variable Description
OTEL_EXPORTER_OTLP_ENDPOINT OTLP Collector gRPC address
ENV Deployment environment (production, staging, dev)

See also

examples.py — minimal, runnable code for every feature.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

impulse_telemetry-0.2.2.tar.gz (7.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

impulse_telemetry-0.2.2-py3-none-any.whl (3.9 kB view details)

Uploaded Python 3

File details

Details for the file impulse_telemetry-0.2.2.tar.gz.

File metadata

  • Download URL: impulse_telemetry-0.2.2.tar.gz
  • Upload date:
  • Size: 7.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for impulse_telemetry-0.2.2.tar.gz
Algorithm Hash digest
SHA256 cd7c5b159a6d5ca1ecd53dcc3d18cc2e2e447a9de7e4f8fd194c3aa25dc121a7
MD5 d33963a8db477342439f589a991fbc22
BLAKE2b-256 bb65d2faba1fd1aa1666ef95a3075c90d6fbd9feabfe0f747161bc9cd6e6a9b4

See more details on using hashes here.

File details

Details for the file impulse_telemetry-0.2.2-py3-none-any.whl.

File metadata

File hashes

Hashes for impulse_telemetry-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 f4f54ddb29615637d717b2a998617d041675dcd80229cee7778cba1cdefb8010
MD5 73f520ea0ae9ddb6ce2139e047466def
BLAKE2b-256 6068e46d1b2be2f3625169183dd60be38cf208415bedb4d77baf4bd827d7355e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page