Skip to main content

Celery integration for HawkAPI — async tasks, beat scheduler, context propagation, healthchecks, eager-mode fixtures

Project description

hawkapi-celery

Celery integration for HawkAPI. Async tasks, beat scheduler, request-context propagation, broker/worker healthchecks, and eager-mode fixtures for tests.

Install

pip install hawkapi-celery
pip install 'hawkapi-celery[redis]'         # adds redis client

Quickstart

from hawkapi import Depends, HawkAPI
from celery import Celery
from hawkapi_celery import (
    CeleryConfig, bind_context, get_celery, init_celery, task,
)


celery_app: Celery  # populated below


def make_app() -> HawkAPI:
    app = HawkAPI()
    global celery_app
    celery_app = init_celery(
        app,
        config=CeleryConfig(
            broker_url="redis://localhost:6379/0",
            result_backend="redis://localhost:6379/0",
        ),
    )


    @task(celery_app, name="emails.send")
    async def send_email(to: str, subject: str, body: str) -> None:
        ...  # any await-able send logic


    @app.post("/notify")
    async def notify(email: str, c: Celery = Depends(get_celery)):
        with bind_context(request_id="…"):
            send_email.delay(email, "Welcome", "Hello!")
        return {"ok": True}

    return app

Tasks

from hawkapi_celery import task

@task(celery_app, name="myapp.work", queue="default",
      autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
async def work(x: int) -> int:        # async def — runs on a private event loop
    ...
    return x * 2


@task(celery_app, bind=True)
def slow_work(self, payload):          # sync — bound `self` for retry handling
    try:
        do_thing(payload)
    except TransientError as exc:
        raise self.retry(exc=exc, countdown=compute_backoff(self.request.retries))

Beat (periodic tasks)

from datetime import timedelta
from hawkapi_celery import Periodic, add_periodic, crontab, every

add_periodic(celery_app, "cleanup",
             Periodic(task="myapp.cleanup", schedule=every(timedelta(hours=1))))

add_periodic(celery_app, "nightly_report",
             Periodic(task="myapp.report", schedule=crontab(hour=2, minute=0),
                      kwargs={"date": "yesterday"}))

Context propagation

bind_context() carries a dict from the HTTP handler to the worker process via the task headers. Inside the task call current_context() to read it back.

from hawkapi_celery import bind_context, current_context

@task(celery_app, name="log.event")
def log_event(payload: dict) -> None:
    ctx = current_context()                  # {"request_id": "…", "user_id": "…"}
    log.info("event", **ctx, **payload)


@app.post("/event")
async def post_event(p: Payload):
    with bind_context(request_id=p.request_id, user_id=p.user_id):
        log_event.delay(p.model_dump())

Wired automatically by init_celery(..., propagate_context=True) (default).

Healthchecks

from hawkapi_celery import healthcheck


@app.get("/healthz")
async def healthz():
    report = healthcheck(celery_app, timeout=2.0)
    return {
        "broker": report.broker_ok,
        "workers_alive": report.workers_alive,
        "workers": list(report.workers),
    }

Testing

from hawkapi_celery import eager_mode, record_tasks


def test_signup_enqueues_welcome_email(client, celery_app):
    with record_tasks(celery_app) as recorder:
        client.post("/signup", json={"email": "x@y.z"})
    assert any(t.name == "emails.send" for t in recorder.captured)


def test_signup_runs_welcome_email_inline(client, celery_app):
    with eager_mode(celery_app):
        client.post("/signup", json={"email": "x@y.z"})
    # All tasks executed synchronously in-process — assert their side-effects directly.

CeleryConfig

CeleryConfig(
    broker_url="redis://localhost:6379/0",
    result_backend="redis://localhost:6379/0",
    task_serializer="json",
    timezone="UTC",
    task_time_limit=600,
    task_soft_time_limit=540,
    worker_prefetch_multiplier=1,
    worker_max_tasks_per_child=1000,
    task_default_queue="default",
    extra_kwargs={...},          # forwarded to celery.conf.update
)

Development

git clone https://github.com/Hawk-API/hawkapi-celery.git
cd hawkapi-celery
uv sync --extra dev
uv run pytest -q
uv run ruff check . && uv run ruff format --check .
uv run pyright src/

License

MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hawkapi_celery-0.2.1.tar.gz (27.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hawkapi_celery-0.2.1-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file hawkapi_celery-0.2.1.tar.gz.

File metadata

  • Download URL: hawkapi_celery-0.2.1.tar.gz
  • Upload date:
  • Size: 27.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for hawkapi_celery-0.2.1.tar.gz
Algorithm Hash digest
SHA256 7c2a9ee716320b2a7ebac591c7a90ad02eb98b57d7145d868c4ae1cd9d5af102
MD5 ec329584e925347000c0721c9837fb96
BLAKE2b-256 dc566ff98e38a0b732ce3e65c40a686d17c3a955cf3f71ffb05173ca5d86de9b

See more details on using hashes here.

Provenance

The following attestation bundles were made for hawkapi_celery-0.2.1.tar.gz:

Publisher: release.yml on Hawk-API/hawkapi-celery

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file hawkapi_celery-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: hawkapi_celery-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 13.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for hawkapi_celery-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1fc2bbb8e841d252f6312d1675aae39ac1bed506a076cb251618259c43220416
MD5 004555d75b84653d0ae93e1a6a8246d7
BLAKE2b-256 14d62e98d9e16f855a8b4bea512e0a673be2155158f4ba61d10c4b395b7d77e6

See more details on using hashes here.

Provenance

The following attestation bundles were made for hawkapi_celery-0.2.1-py3-none-any.whl:

Publisher: release.yml on Hawk-API/hawkapi-celery

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page