Skip to main content

Celery integration for HawkAPI — async tasks, beat scheduler, context propagation, healthchecks, eager-mode fixtures

Project description

hawkapi-celery

Celery integration for HawkAPI. Async tasks, beat scheduler, request-context propagation, broker/worker healthchecks, and eager-mode fixtures for tests.

Install

pip install hawkapi-celery
pip install 'hawkapi-celery[redis]'         # adds redis client

Quickstart

from hawkapi import Depends, HawkAPI
from celery import Celery
from hawkapi_celery import (
    CeleryConfig, bind_context, get_celery, init_celery, task,
)


celery_app: Celery  # populated below


def make_app() -> HawkAPI:
    app = HawkAPI()
    global celery_app
    celery_app = init_celery(
        app,
        config=CeleryConfig(
            broker_url="redis://localhost:6379/0",
            result_backend="redis://localhost:6379/0",
        ),
    )


    @task(celery_app, name="emails.send")
    async def send_email(to: str, subject: str, body: str) -> None:
        ...  # any await-able send logic


    @app.post("/notify")
    async def notify(email: str, c: Celery = Depends(get_celery)):
        with bind_context(request_id="…"):
            send_email.delay(email, "Welcome", "Hello!")
        return {"ok": True}

    return app

Tasks

from hawkapi_celery import task

@task(celery_app, name="myapp.work", queue="default",
      autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
async def work(x: int) -> int:        # async def — runs on a private event loop
    ...
    return x * 2


@task(celery_app, bind=True)
def slow_work(self, payload):          # sync — bound `self` for retry handling
    try:
        do_thing(payload)
    except TransientError as exc:
        raise self.retry(exc=exc, countdown=compute_backoff(self.request.retries))

Beat (periodic tasks)

from datetime import timedelta
from hawkapi_celery import Periodic, add_periodic, crontab, every

add_periodic(celery_app, "cleanup",
             Periodic(task="myapp.cleanup", schedule=every(timedelta(hours=1))))

add_periodic(celery_app, "nightly_report",
             Periodic(task="myapp.report", schedule=crontab(hour=2, minute=0),
                      kwargs={"date": "yesterday"}))

Context propagation

bind_context() carries a dict from the HTTP handler to the worker process via the task headers. Inside the task call current_context() to read it back.

from hawkapi_celery import bind_context, current_context

@task(celery_app, name="log.event")
def log_event(payload: dict) -> None:
    ctx = current_context()                  # {"request_id": "…", "user_id": "…"}
    log.info("event", **ctx, **payload)


@app.post("/event")
async def post_event(p: Payload):
    with bind_context(request_id=p.request_id, user_id=p.user_id):
        log_event.delay(p.model_dump())

Wired automatically by init_celery(..., propagate_context=True) (default).

Healthchecks

from hawkapi_celery import healthcheck


@app.get("/healthz")
async def healthz():
    report = healthcheck(celery_app, timeout=2.0)
    return {
        "broker": report.broker_ok,
        "workers_alive": report.workers_alive,
        "workers": list(report.workers),
    }

Testing

from hawkapi_celery import eager_mode, record_tasks


def test_signup_enqueues_welcome_email(client, celery_app):
    with record_tasks(celery_app) as recorder:
        client.post("/signup", json={"email": "x@y.z"})
    assert any(t.name == "emails.send" for t in recorder.captured)


def test_signup_runs_welcome_email_inline(client, celery_app):
    with eager_mode(celery_app):
        client.post("/signup", json={"email": "x@y.z"})
    # All tasks executed synchronously in-process — assert their side-effects directly.

CeleryConfig

CeleryConfig(
    broker_url="redis://localhost:6379/0",
    result_backend="redis://localhost:6379/0",
    task_serializer="json",
    timezone="UTC",
    task_time_limit=600,
    task_soft_time_limit=540,
    worker_prefetch_multiplier=1,
    worker_max_tasks_per_child=1000,
    task_default_queue="default",
    extra_kwargs={...},          # forwarded to celery.conf.update
)

Development

git clone https://github.com/ashimov/hawkapi-celery.git
cd hawkapi-celery
uv sync --extra dev
uv run pytest -q
uv run ruff check . && uv run ruff format --check .
uv run pyright src/

License

MIT.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

hawkapi_celery-0.1.0.tar.gz (25.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

hawkapi_celery-0.1.0-py3-none-any.whl (12.8 kB view details)

Uploaded Python 3

File details

Details for the file hawkapi_celery-0.1.0.tar.gz.

File metadata

  • Download URL: hawkapi_celery-0.1.0.tar.gz
  • Upload date:
  • Size: 25.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for hawkapi_celery-0.1.0.tar.gz
Algorithm Hash digest
SHA256 aeb835adc80b64d14f29aa7f9c5ccd15de3700ba9560f6a2754f34443e0ce769
MD5 2e251c906b1d07dcaa5287e2350c2a9a
BLAKE2b-256 9dfb2fad756ca01eddf7cba0ec3004236e6747b69934b777e644e6e95a82cefa

See more details on using hashes here.

Provenance

The following attestation bundles were made for hawkapi_celery-0.1.0.tar.gz:

Publisher: release.yml on ashimov/hawkapi-celery

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file hawkapi_celery-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: hawkapi_celery-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 12.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for hawkapi_celery-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ef1a780f1f2f54e95f69f6b3786895bff4a5923d6e6cfb835be7eb07807902a0
MD5 96b8769d21c024e0f82801e1f6657dae
BLAKE2b-256 4a36869d61332634c7c41fcfd1a4147883146c7a36acd2e20cc9faadb8fa1d3d

See more details on using hashes here.

Provenance

The following attestation bundles were made for hawkapi_celery-0.1.0-py3-none-any.whl:

Publisher: release.yml on ashimov/hawkapi-celery

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page