Skip to main content

Internal standardized metrics library

Project description

📊 fast-telemetry

Production-ready Prometheus integration for FastAPI, FastStream, and Python Workers.

fast-telemetry — это "клей", который объединяет метрики вашего микросервиса в единый стандарт. Библиотека навязывает правильные практики (единые метки env, version, service) и предоставляет удобные абстракции для HTTP-сервисов, обработчиков очередей и периодических задач.

🌟 Особенности

  • Единый стандарт: Автоматически добавляет метки service, env, version ко всем метрикам.
  • FastAPI: Автоконфигурация через Instrumentator, исключение служебных ручек (/metrics и других переданных).
  • FastStream: Поддержка RabbitMQ, Kafka, Redis, NATS "из коробки" (автоопределение драйвера).
  • Grpc: Поддержка sync/async версий.
  • Workers & Cron:
    • Поддержка Long-running процессов (daemon thread server).
    • Поддержка Batch-jobs (Pushgateway) через универсальный контекстный менеджер (sync/async).
  • Декораторы: Удобные @measure_task и @track_exception, которые сами понимают, синхронная функция или асинхронная.
  • Безопасность типов: Полная типизация (mypy strict).

📦 Установка

pip install fast-telemetry

🚀 Quick Start

1. Core Concepts

В основе всего лежит класс PrometheusMetrics (или его наследники). Вы инициализируете его один раз при старте приложения.

from fast_telemetry import PrometheusMetrics

metrics = PrometheusMetrics(
    service_name="payment_service",
    version="1.0.4",
    env="production"
)

# Использование в коде
metrics.inc_error("validation_error")

with metrics.timer("db_query", long_task=False):
    # ... logic ...
    pass

2. FastAPI Integration

Автоматически добавляет эндпоинт /metrics и собирает RED-метрики (Requests, Errors, Duration).

from fastapi import FastAPI
from fast_telemetry import PrometheusMetrics, setup_fastapi_metrics

app = FastAPI()
metrics = PrometheusMetrics(service_name="api_core", env="dev", version="1.2.3")

# Настройка (исключает /docs, /openapi.json + readiness)
setup_fastapi_metrics(app, metrics, excluded_routes=["/readiness"])


@app.get("/")
@metrics.measure_task("root_handler", long_task=False)
async def root():
    return {"status": "ok"}

3. FastStream Integration

Поддерживает автоматическое определение брокера (Rabbit/Kafka/Redis/NATS) и инъекцию middleware.

from faststream.asgi import AsgiFastStream, make_ping_asgi
from faststream.rabbit import RabbitBroker
from fast_telemetry import PrometheusMetrics, setup_faststream_metrics

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = AsgiFastStream(
    broker,
    asgi_routes=[
        ("/health", make_ping_asgi(broker))
    ],
    lifespan=None,
    asyncapi_path="/docs",
)

metrics = PrometheusMetrics(service_name="worker_core", env="dev")
setup_faststream_metrics(app, metrics)

4. Workers & Cron Jobs

Для скриптов без веб-сервера. Поддерживает два режима:

  1. Daemon Server (для бесконечных циклов).
  2. Push Gateway (для скриптов, запускаемых по расписанию).
import asyncio
from fast_telemetry import WorkerMetrics

metrics = WorkerMetrics(service_name="nightly_job", env="prod")


# --- Вариант А: Long-running worker ---
def run_worker():
    # Запускает HTTP сервер в фоновом потоке
    metrics.start_server(port=8000)
    while True:
        process_data()


# --- Вариант Б: Batch Job (Cron) ---
# Универсальный трекер (работает и с with, и с async with)
async def run_batch_job():
    # Автоматически отправит метрики в Pushgateway при выходе из блока
    # Группирует по instance_id, чтобы воркеры не перезатирали друг друга
    async with metrics.track_job(gateway_url="http://pushgateway:9091", timeout=5.0):
        await do_heavy_calculation()
        metrics.inc_error("calc_error")


if __name__ == "__main__":
    asyncio.run(run_batch_job())

Grpc

Sync version

Server

metrics = PrometheusMetrics(service_name="grpc-server-foo", version="19.0.2", env="prod")
start_http_server(8000, registry=metrics.get_registry())
print("Metrics server started on port 8000")

interceptor = create_server_metrics_interceptor(metrics)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), interceptors=[interceptor])

echo_pb2_grpc.add_EchoServiceServicer_to_server(EchoService(), server)
listen_addr = "[::]:50051"
server.add_insecure_port(listen_addr)

print(f"Starting gRPC server on {listen_addr}")
server.start()
server.wait_for_termination()

Client

metrics = PrometheusMetrics(service_name="grpc-client-foo", version="43.1.9", env="prod")
start_http_server(8001, registry=metrics.get_registry())
print("Client metrics server started on port 8001")
interceptor = create_client_metrics_interceptor(metrics)
address = os.getenv("GRPC_SERVER", "localhost:50051")
with grpc.insecure_channel(address) as base_channel:
    channel = grpc.intercept_channel(base_channel, interceptor)
    stub = echo_pb2_grpc.EchoServiceStub(channel)
Async version

Server

metrics = PrometheusMetrics(service_name="async-server-grpc-foo", version="11.0.2", env="dev")
start_http_server(8000, registry=metrics.get_registry())
print("Metrics server started on port 8000")
interceptor = create_server_metrics_interceptor(metrics)
server = grpc.aio.server(interceptors=[interceptor])
echo_pb2_grpc.add_EchoServiceServicer_to_server(EchoService(), server)
listen_addr = "[::]:50051"
server.add_insecure_port(listen_addr)

print(f"Starting gRPC server on {listen_addr}")
await server.start()
await server.wait_for_termination()

Client

metrics = PrometheusMetrics(service_name="async-client-grpc-foo", version="91.1.2", env="dev")
start_http_server(8001, registry=metrics.get_registry())
print("Client metrics server started on port 8001")
interceptors = create_client_metrics_interceptor(metrics)
address = os.getenv("GRPC_ASYNC_SERVER", "localhost:50051")
async with grpc.aio.insecure_channel(address, interceptors=(*interceptors,)) as channel:
    stub = echo_pb2_grpc.EchoServiceStub(channel)

🛠 Декораторы

Библиотека предоставляет "умные" декораторы, которые работают с sync и async функциями прозрачно.

@track_exception() Считает количество исключений. Использует имя класса исключения как метку.

@metrics.track_exception
async def dangerous_operation():
    raise ValueError("Oops")
    # Увеличит счетчик fasttelemetry_business_errors_total{error_type="ValueError", ...}

@measure_task("task_name") Замеряет время выполнения функции (Histogram).

@metrics.measure_task("image_processing")
def process_image(img):
    time.sleep(1)

📊 Grafana & Prometheus

Библиотека экспортирует следующие базовые метрики:

  1. fasttelemetry_app_info (Gauge) — всегда 1. Содержит метки version, env, service. Используйте для аннотаций деплоев на графиках.
  2. fasttelemetry_business_errors_total (Counter) — счетчик ошибок бизнес-логики.
  3. fasttelemetry_task_processing_seconds (Histogram) — время выполнения внутренних задач.
  4. http_requests_...fastapi.json (если используется FastAPI).
  5. faststream_...faststream.json (если используется FastStream).

📈 Base Metrics & Grafana Dashboard

Библиотека предоставляет "Ready-to-use" набор метрик

Metric Schema

Metric Name Type Labels Description
fasttelemetry_app_info Gauge env, service, version Информация о запущенном инстансе. Всегда 1.
fasttelemetry_business_errors_total Counter error_type Количество ошибок бизнес-логики (пойманных через track_exception или вручную).
fasttelemetry_task_processing_seconds Histogram task_type Время выполнения задач (обернутых в @measure_task или timer).
fasttelemetry_long_task_processing_seconds Histogram task_type Время выполнения долгих задач (обернутых в @measure_task или timer).

⚙️ Configuration

Вы можете передать параметры явно в __init__ или использовать переменные окружения:

Environment Variable Description Default
APP_ENV Окружение (prod/dev/stage) dev
APP_VERSION Версия приложения unknown

🖥️ Grafana Dashboard JSON

Дашборды можно импортировать из папки

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fast_telemetry-1.4.0.tar.gz (127.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fast_telemetry-1.4.0-py3-none-any.whl (22.2 kB view details)

Uploaded Python 3

File details

Details for the file fast_telemetry-1.4.0.tar.gz.

File metadata

  • Download URL: fast_telemetry-1.4.0.tar.gz
  • Upload date:
  • Size: 127.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for fast_telemetry-1.4.0.tar.gz
Algorithm Hash digest
SHA256 e9928cc269eede56cf590d468659e0941ca02112c4ed0e0e517d1148cac949a5
MD5 c01b28b557b660394464ea34e3053560
BLAKE2b-256 1591e821f16c1dd382b15549de1806e71a539fd607ad88da4201dd9f4cadd86a

See more details on using hashes here.

File details

Details for the file fast_telemetry-1.4.0-py3-none-any.whl.

File metadata

  • Download URL: fast_telemetry-1.4.0-py3-none-any.whl
  • Upload date:
  • Size: 22.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.15

File hashes

Hashes for fast_telemetry-1.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 988eb1646e113158a431ece0be456831845dbfe5005bce68f8bddb7203c1f6fe
MD5 32f147d3ab8bf62214243f73ad36285b
BLAKE2b-256 c57813ec0e864e83a1cc0e004e71d3279f8ad0da4423f3e67e6b2bc8fbd8ec6f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page