Skip to main content

Observability SDK for Impulse microservices

Project description

impulse_telemetry

Observability SDK for Impulse microservices. One init() call wires up distributed tracing, Prometheus metrics, structured logging, and FastAPI middleware.

Install

pip install impulse-telemetry

# With SQLAlchemy / Redis / Celery auto-instrumentation
pip install "impulse-telemetry[extras]"

# With ML data quality checks (pandas required)
pip install "impulse-telemetry[ml]"

Quickstart

from fastapi import FastAPI
from impulse_telemetry import init

app = FastAPI()
init(app, service="my-service", version="1.0.0", env="production")

Every HTTP request now emits RED metrics, a distributed trace, and a structured JSON log line — automatically.

init() parameters

Parameter Default Description
app None FastAPI app. Pass to enable HTTP middleware.
service required Service name — appears on all telemetry.
version "0.0.0" Semver string tagged on all telemetry.
env $ENV "production" | "staging" | "dev"
otlp_endpoint $OTEL_EXPORTER_OTLP_ENDPOINT OTLP Collector gRPC address.
prometheus_port None Expose /metrics on this port (for workers without HTTP).
log_level "INFO" "DEBUG" | "INFO" | "WARNING" | "ERROR"
optional_metrics None List of metric bundle instances (InferenceMetrics, TrainingMetrics, or custom).

Logging

from impulse_telemetry.logging import get_logger

log = get_logger(__name__)
log.info("prediction_served", model_id="rec-v3", latency_ms=43)
# {"service":"my-service","trace_id":"abc..","event":"prediction_served","model_id":"rec-v3",...}

trace_id and span_id are injected automatically from the active OTEL span.

Per-request context

Bind fields once per request — they appear on every subsequent log line for that request.

from impulse_telemetry.logging import bind_request_context, clear_request_context

token = bind_request_context(user_id="usr_123", request_id="req_456")
log.info("doing_work")   # user_id + request_id injected automatically
clear_request_context(token)

Metrics

Prometheus metrics are pre-registered and labeled with service and env.

Automatic (via middleware)

Metric Type Description
http_requests_total Counter Requests by route, method, status
http_request_errors_total Counter 4xx / 5xx responses
http_request_duration_seconds Histogram Request latency
http_active_requests Gauge In-flight requests

Manual

from impulse_telemetry.metrics import get_metrics

m = get_metrics()
m.dependency_latency.labels(**m.labels(dependency="redis")).observe(elapsed)
m.dependency_errors.labels(**m.labels(dependency="redis")).inc()

Available instruments: rate_limiter_hits, rate_limiter_remaining, dependency_latency, dependency_errors, queue_depth, circuit_breaker.

Tracing

from impulse_telemetry.tracing import get_tracer, inject_headers

tracer = get_tracer(__name__)

with tracer.start_as_current_span("my_operation") as span:
    span.set_attribute("key", "value")
    headers = inject_headers({})   # propagate W3C traceparent to downstream
    requests.get("http://other-service/api", headers=headers)

Auto-instrumented libraries (when installed): requests, httpx, SQLAlchemy, Redis, Celery.

ML Monitoring

Pass metric bundles via optional_metrics at startup. Each bundle attaches to the Metrics singleton under its name attribute.

Inference monitoring

from impulse_telemetry import init
from impulse_telemetry.ml import MLMonitor, InferenceMetrics

init(app, service="rec-service", optional_metrics=[InferenceMetrics()])

monitor = MLMonitor(model_id="rec-v3", user_id=user_id)
# Context manager
with monitor.inference(features=df) as span:
    result = model.predict(df)
    span.record_output(result)

# Decorator
@monitor.trace
def predict(features):
    return model.predict(features)

Both record inference latency, error count, and run data quality checks (missing values) automatically.

Drift & performance metrics

# Call from batch evaluation jobs
monitor.record_drift("age", score=0.18, metric="psi")
monitor.record_performance(rmse=0.042, precision=0.87, recall=0.81)

Inference metric instruments

Accessible via get_metrics().inference.

Metric Type Description
ml_inference_duration_seconds Histogram Per-model inference latency
ml_inference_requests_total Counter Inference count
ml_inference_errors_total Counter Inference errors
ml_missing_feature_rate Gauge Missing values per column
ml_schema_violations_total Counter Schema violation count
ml_feature_drift_score Gauge Feature drift score
ml_prediction_drift_score Gauge Prediction drift score
ml_rmse Gauge Rolling RMSE
ml_precision Gauge Rolling precision
ml_recall Gauge Rolling recall

Training monitoring

from impulse_telemetry.ml import TrainingMetrics

init(app, service="training-svc", optional_metrics=[TrainingMetrics()])

m = get_metrics()
m.training.record_job("xgboost", "success", duration_seconds=42.1)
m.training.record_model("xgboost", accuracy=0.91, loss=0.12)
m.training.record_dataset("dataset-v3", row_count=100_000, size_bytes=52_428_800)
Metric Type Description
training_jobs_total Counter Completed training jobs
training_job_duration_seconds Histogram Job wall-clock time
training_jobs_active Gauge Currently running jobs
training_job_queue_size Gauge Jobs waiting in queue
model_training_accuracy Histogram Best validation accuracy
model_training_loss Histogram Final validation loss
dataset_rows_processed_total Counter Dataset rows ingested
dataset_size_bytes Histogram Dataset size

Background workers

For services without an HTTP server, expose metrics on a dedicated port:

init(service="ingest-worker", prometheus_port=9090)
# Prometheus scrapes localhost:9090/metrics

Environment variables

Variable Description
OTEL_EXPORTER_OTLP_ENDPOINT OTLP Collector gRPC address
ENV Deployment environment (production, staging, dev)

See also

examples.py — minimal, runnable code for every feature.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

impulse_telemetry-0.2.3.tar.gz (20.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

impulse_telemetry-0.2.3-py3-none-any.whl (23.4 kB view details)

Uploaded Python 3

File details

Details for the file impulse_telemetry-0.2.3.tar.gz.

File metadata

  • Download URL: impulse_telemetry-0.2.3.tar.gz
  • Upload date:
  • Size: 20.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.11

File hashes

Hashes for impulse_telemetry-0.2.3.tar.gz
Algorithm Hash digest
SHA256 5b514cd2a2f533697b6d83dec5e5d21ff8c91c56ee5a9117fb6428c4e1b5e32e
MD5 f640c47b5138546a615032d2f35c45e0
BLAKE2b-256 9145316750c1eefc56c75bc9b688517997cf6bce6140c80980130223e39b3b84

See more details on using hashes here.

File details

Details for the file impulse_telemetry-0.2.3-py3-none-any.whl.

File metadata

File hashes

Hashes for impulse_telemetry-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 31e1f15e24c7a207b59457c43bc09c2fa8a6154b96f3904fb2b2e97a62664447
MD5 e7dfd0af8ffddb6a615dc33259156bad
BLAKE2b-256 cee5294a5138584f97d8a2dcab33a6f31a5a99e26d1eea9999e45d5027701c67

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page