No project description provided
Project description
Celery Health Checker (celery_hchecker)
Набор утилит для проверки работоспособности Celery: брокера, backend-а результатов и воркеров. Простая, расширяемая и тестируемая библиотека с поддержкой кеширования результатов (память / Redis) и фонового мониторинга.
Особенности
- Проверяет три аспекта Celery:
broker— соединение с брокером (kombu.Connection);backend— запись/чтение результата в/result backend (store_result / get_task_meta);workers— состояние зарегистрированных воркеров (inspect.ping(), inspect.active()).
- Встроенная поддержка кеша:
MemoryCache(TTLCache);RedisCache(совместим с redis-py).
- Фоновый мониторинг (daemon thread) с настраиваемым интервалом.
- Мокабельная архитектура — легко юнит-тестировать.
Установка
# в виртуальном окружении
pip install celery # если ещё не установлен
pip install cachetools # MemoryCache использует cachetools.TTLCache
pip install redis # Если вы собираетесь использовать redis для кеширования результата
# Установите ваш пакет локально (разработка)
pip install -e .
Dev-зависимости (рекомендуется для тестирования):
pip install pytest fakeredis testcontainers pytest-mock
Быстрый старт
1. Создайте Celery app (пример proj/celery_app.py):
from celery import Celery
app = Celery("proj", broker="redis://127.0.0.1:6379/0", backend="redis://127.0.0.1:6379/1")
@app.task
def add(x, y):
return x + y
2. Подключите CeleryHealthChecker:
from celery_hchecker.healthcheker import CeleryHealthChecker
from celery_hchecker.cache.memory import MemoryCache
ch = CeleryHealthChecker(app=app, cache=MemoryCache(ttl=10), interval=10)
print(ch.force_check()) # выполнило все проверки
print(ch.is_healthy) # быстрое чтение из кеша (если есть)
print(ch.get_stats()) # статистика
3. Запустить фоновой мониторинг:
ch.start()
# ... в конце:
ch.stop()
API / основные компоненты
ICache (interface)
Методы:
get(key: str) -> Optional[Tuple[bool, float, Optional[str]]]set(key: str, value: bool, latency: float, error: Optional[str]) -> None
MemoryCache
Параметры конструктора:
maxsize: int = 1000, ttl: int = 60
RedisCache
Параметры конструктора:
client — клиент совместимый с redis-py (StrictRedis/Redis)ttl: int = 60, prefix: str = "celery_health_"
Checkers
BrokerChecker(broker_url: str, timeout: int=3, retries: int=3, delay: float=0.5)BackendChecker(backend: BaseBackend, cleanup: bool = True, timeout: float=5.0, wait: float=1.0)WorkersChecker(app: Celery, timeout: float=5.0, retries: int=3, delay: float=0.5)
Все чекеры реализуют:
name() -> strcheck() -> Tuple[bool, float, Optional[str]] — (ok, latency_seconds, error_or_None)
CeleryHealthChecker
Конструктор:
CeleryHealthChecker(
*,
app,
cache: Optional[ICache] = None,
interval: int = 30,
enable_cleanup: bool = True,
backend_poll_wait: float = 1.0,
)
Методы:
start(), stop(join_timeout: float = None)force_check() -> bool— запускает все проверки синхронно, кэширует per-checker и aggregate healthget_stats() -> dictis_healthy -> bool(свойство) — пытается прочитать агрегатный health из кэша, иначе вызывает force_check()get_instance()— возвращает singleton-экземпляр (в текущей реализации класс устроен как singleton через new)
Примеры использования
Пример: встроенный мониторинг (daemon thread)
from time import sleep
from celery_hchecker.cache.memory import MemoryCache
from celery_hchecker.healthcheker import CeleryHealthChecker
checker = CeleryHealthChecker(app=app, cache=MemoryCache(ttl=10), interval=10)
checker.start()
try:
while True:
print("Healthy:", checker.is_healthy)
print("Stats:", checker.get_stats())
sleep(10)
except KeyboardInterrupt:
checker.stop()
Пример: HTTP endpoint (FastAPI)
from fastapi import FastAPI
from celery_hchecker.cache.redis import RedisCache
import redis
rclient = redis.Redis(host="127.0.0.1", port=6379, db=2)
cache = RedisCache(client=rclient, ttl=10)
checker = CeleryHealthChecker(app=app, cache=cache, interval=15)
api = FastAPI()
@api.get("/health")
def health():
return {
"healthy": checker.is_healthy,
"stats": checker.get_stats()
}
Кэширование
- Пер-чекер ключи:
health:{checker_name}(напримерhealth:broker,health:backend,health:workers). Значение:(ok: bool, latency: float, error: Optional[str]). - Агрегированный ключ:
health— хранит итоговую булевую метрику (и latency aggregate в payload).
Логирование
Компонент использует стандартный logging с логгером CeleryHealthChecker. Уровень логирования можно настроить:
import logging
logging.getLogger('CeleryHealthChecker').setLevel(logging.DEBUG)
Ограничения
- Поддерживаются только брокеры/бэкенды, совместимые с Kombu
- Для проверки воркеров требуется доступ к Celery Control API
Лицензия
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file celery_hchecker-0.0.3.tar.gz.
File metadata
- Download URL: celery_hchecker-0.0.3.tar.gz
- Upload date:
- Size: 10.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.0 CPython/3.13.0 Windows/11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
69ac521bcaad40fcc4c36d8ee5b45cda0354be7dc53d319596efb577d6e39001
|
|
| MD5 |
08645e7a460ee51b6e117b7a849fa435
|
|
| BLAKE2b-256 |
54cc87f8297877f271a78495213e51443b7b27203b1dd67dae9ec484ddcacdf9
|
File details
Details for the file celery_hchecker-0.0.3-py3-none-any.whl.
File metadata
- Download URL: celery_hchecker-0.0.3-py3-none-any.whl
- Upload date:
- Size: 12.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.0 CPython/3.13.0 Windows/11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a0af78c45a554f62dfdaa62120d157074714c9e21929905c3e7ae72f3aa00120
|
|
| MD5 |
308e8931721704ca27549d65fb15ecb7
|
|
| BLAKE2b-256 |
380800ca3fddebecd362df2ea9864da4582a9724f4c3d92e69879c28f584d1fc
|