Skip to main content

Celery integration for HawkAPI — async tasks, beat scheduler, context propagation, healthchecks, eager-mode fixtures

Project description

hawkapi-celery

Celery integration for HawkAPI. Async tasks, beat scheduler, request-context propagation, broker/worker healthchecks, and eager-mode fixtures for tests.

Install

pip install hawkapi-celery
pip install 'hawkapi-celery[redis]'         # adds redis client

Quickstart

from hawkapi import Depends, HawkAPI
from celery import Celery
from hawkapi_celery import (
    CeleryConfig, bind_context, get_celery, init_celery, task,
)


celery_app: Celery  # populated below


def make_app() -> HawkAPI:
    app = HawkAPI()
    global celery_app
    celery_app = init_celery(
        app,
        config=CeleryConfig(
            broker_url="redis://localhost:6379/0",
            result_backend="redis://localhost:6379/0",
        ),
    )


    @task(celery_app, name="emails.send")
    async def send_email(to: str, subject: str, body: str) -> None:
        ...  # any await-able send logic


    @app.post("/notify")
    async def notify(email: str, c: Celery = Depends(get_celery)):
        with bind_context(request_id="…"):
            send_email.delay(email, "Welcome", "Hello!")
        return {"ok": True}

    return app

Tasks

from hawkapi_celery import task

@task(celery_app, name="myapp.work", queue="default",
      autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
async def work(x: int) -> int:        # async def — runs on a private event loop
    ...
    return x * 2


@task(celery_app, bind=True)
def slow_work(self, payload):          # sync — bound `self` for retry handling
    try:
        do_thing(payload)
    except TransientError as exc:
        raise self.retry(exc=exc, countdown=compute_backoff(self.request.retries))

Beat (periodic tasks)

from datetime import timedelta
from hawkapi_celery import Periodic, add_periodic, crontab, every

add_periodic(celery_app, "cleanup",
             Periodic(task="myapp.cleanup", schedule=every(timedelta(hours=1))))

add_periodic(celery_app, "nightly_report",
             Periodic(task="myapp.report", schedule=crontab(hour=2, minute=0),
                      kwargs={"date": "yesterday"}))

Context propagation

bind_context() carries a dict from the HTTP handler to the worker process via the task headers. Inside the task call current_context() to read it back.

from hawkapi_celery import bind_context, current_context

@task(celery_app, name="log.event")
def log_event(payload: dict) -> None:
    ctx = current_context()                  # {"request_id": "…", "user_id": "…"}
    log.info("event", **ctx, **payload)


@app.post("/event")
async def post_event(p: Payload):
    with bind_context(request_id=p.request_id, user_id=p.user_id):
        log_event.delay(p.model_dump())

Wired automatically by init_celery(..., propagate_context=True) (default).

Healthchecks

from hawkapi_celery import healthcheck


@app.get("/healthz")
async def healthz():
    report = healthcheck(celery_app, timeout=2.0)
    return {
        "broker": report.broker_ok,
        "workers_alive": report.workers_alive,
        "workers": list(report.workers),
    }

Testing

from hawkapi_celery import eager_mode, record_tasks


def test_signup_enqueues_welcome_email(client, celery_app):
    with record_tasks(celery_app) as recorder:
        client.post("/signup", json={"email": "x@y.z"})
    assert any(t.name == "emails.send" for t in recorder.captured)


def test_signup_runs_welcome_email_inline(client, celery_app):
    with eager_mode(celery_app):
        client.post("/signup", json={"email": "x@y.z"})
    # All tasks executed synchronously in-process — assert their side-effects directly.

CeleryConfig

CeleryConfig(
    broker_url="redis://localhost:6379/0",
    result_backend="redis://localhost:6379/0",
    task_serializer="json",
    timezone="UTC",
    task_time_limit=600,
    task_soft_time_limit=540,
    worker_prefetch_multiplier=1,
    worker_max_tasks_per_child=1000,
    task_default_queue="default",
    extra_kwargs={...},          # forwarded to celery.conf.update
)

Development

git clone https://github.com/ashimov/hawkapi-celery.git
cd hawkapi-celery
uv sync --extra dev
uv run pytest -q
uv run ruff check . && uv run ruff format --check .
uv run pyright src/

License

MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hawkapi_celery-0.2.0.tar.gz (26.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hawkapi_celery-0.2.0-py3-none-any.whl (13.2 kB view details)

Uploaded Python 3

File details

Details for the file hawkapi_celery-0.2.0.tar.gz.

File metadata

  • Download URL: hawkapi_celery-0.2.0.tar.gz
  • Upload date:
  • Size: 26.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for hawkapi_celery-0.2.0.tar.gz
Algorithm Hash digest
SHA256 4df3a99c99616eff9fe6a842f2ff05997c7c894a41908fe0261d80fe641bc37c
MD5 1af71cf7ffe9286d28abeedfb40fcda1
BLAKE2b-256 191c12aaf96dc0ed89dc2566e38164890543bd25f5f991f885560a05baf24be9

See more details on using hashes here.

Provenance

The following attestation bundles were made for hawkapi_celery-0.2.0.tar.gz:

Publisher: release.yml on ashimov/hawkapi-celery

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file hawkapi_celery-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: hawkapi_celery-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 13.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for hawkapi_celery-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4793a412f82e331015a8dbc53d2992ca39a5239823539f9fe2242bb6246d6926
MD5 8d4964f3897c377a9c0d5403ac4373c6
BLAKE2b-256 65079018c301c3911587411f20886de92e42fe4c4a4acef8bc6846d9a01c4b06

See more details on using hashes here.

Provenance

The following attestation bundles were made for hawkapi_celery-0.2.0-py3-none-any.whl:

Publisher: release.yml on ashimov/hawkapi-celery

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page