Celery integration for HawkAPI — async tasks, beat scheduler, context propagation, healthchecks, eager-mode fixtures
Project description
hawkapi-celery
Celery integration for HawkAPI. Async tasks, beat scheduler, request-context propagation, broker/worker healthchecks, and eager-mode fixtures for tests.
Install
pip install hawkapi-celery
pip install 'hawkapi-celery[redis]' # adds redis client
Quickstart
from hawkapi import Depends, HawkAPI
from celery import Celery
from hawkapi_celery import (
CeleryConfig, bind_context, get_celery, init_celery, task,
)
celery_app: Celery # populated below
def make_app() -> HawkAPI:
app = HawkAPI()
global celery_app
celery_app = init_celery(
app,
config=CeleryConfig(
broker_url="redis://localhost:6379/0",
result_backend="redis://localhost:6379/0",
),
)
@task(celery_app, name="emails.send")
async def send_email(to: str, subject: str, body: str) -> None:
... # any await-able send logic
@app.post("/notify")
async def notify(email: str, c: Celery = Depends(get_celery)):
with bind_context(request_id="…"):
send_email.delay(email, "Welcome", "Hello!")
return {"ok": True}
return app
Tasks
from hawkapi_celery import task
@task(celery_app, name="myapp.work", queue="default",
autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
async def work(x: int) -> int: # async def — runs on a private event loop
...
return x * 2
@task(celery_app, bind=True)
def slow_work(self, payload): # sync — bound `self` for retry handling
try:
do_thing(payload)
except TransientError as exc:
raise self.retry(exc=exc, countdown=compute_backoff(self.request.retries))
Beat (periodic tasks)
from datetime import timedelta
from hawkapi_celery import Periodic, add_periodic, crontab, every
add_periodic(celery_app, "cleanup",
Periodic(task="myapp.cleanup", schedule=every(timedelta(hours=1))))
add_periodic(celery_app, "nightly_report",
Periodic(task="myapp.report", schedule=crontab(hour=2, minute=0),
kwargs={"date": "yesterday"}))
Context propagation
bind_context() carries a dict from the HTTP handler to the worker process via the task headers. Inside the task call current_context() to read it back.
from hawkapi_celery import bind_context, current_context
@task(celery_app, name="log.event")
def log_event(payload: dict) -> None:
ctx = current_context() # {"request_id": "…", "user_id": "…"}
log.info("event", **ctx, **payload)
@app.post("/event")
async def post_event(p: Payload):
with bind_context(request_id=p.request_id, user_id=p.user_id):
log_event.delay(p.model_dump())
Wired automatically by init_celery(..., propagate_context=True) (default).
Healthchecks
from hawkapi_celery import healthcheck
@app.get("/healthz")
async def healthz():
report = healthcheck(celery_app, timeout=2.0)
return {
"broker": report.broker_ok,
"workers_alive": report.workers_alive,
"workers": list(report.workers),
}
Testing
from hawkapi_celery import eager_mode, record_tasks
def test_signup_enqueues_welcome_email(client, celery_app):
with record_tasks(celery_app) as recorder:
client.post("/signup", json={"email": "x@y.z"})
assert any(t.name == "emails.send" for t in recorder.captured)
def test_signup_runs_welcome_email_inline(client, celery_app):
with eager_mode(celery_app):
client.post("/signup", json={"email": "x@y.z"})
# All tasks executed synchronously in-process — assert their side-effects directly.
CeleryConfig
CeleryConfig(
broker_url="redis://localhost:6379/0",
result_backend="redis://localhost:6379/0",
task_serializer="json",
timezone="UTC",
task_time_limit=600,
task_soft_time_limit=540,
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=1000,
task_default_queue="default",
extra_kwargs={...}, # forwarded to celery.conf.update
)
Development
git clone https://github.com/ashimov/hawkapi-celery.git
cd hawkapi-celery
uv sync --extra dev
uv run pytest -q
uv run ruff check . && uv run ruff format --check .
uv run pyright src/
License
MIT.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file hawkapi_celery-0.1.0.tar.gz.
File metadata
- Download URL: hawkapi_celery-0.1.0.tar.gz
- Upload date:
- Size: 25.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
aeb835adc80b64d14f29aa7f9c5ccd15de3700ba9560f6a2754f34443e0ce769
|
|
| MD5 |
2e251c906b1d07dcaa5287e2350c2a9a
|
|
| BLAKE2b-256 |
9dfb2fad756ca01eddf7cba0ec3004236e6747b69934b777e644e6e95a82cefa
|
Provenance
The following attestation bundles were made for hawkapi_celery-0.1.0.tar.gz:
Publisher:
release.yml on ashimov/hawkapi-celery
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
hawkapi_celery-0.1.0.tar.gz -
Subject digest:
aeb835adc80b64d14f29aa7f9c5ccd15de3700ba9560f6a2754f34443e0ce769 - Sigstore transparency entry: 1552957209
- Sigstore integration time:
-
Permalink:
ashimov/hawkapi-celery@a8d6751615915a5e0478b99f92cdecfdf619f72d -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/ashimov
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@a8d6751615915a5e0478b99f92cdecfdf619f72d -
Trigger Event:
release
-
Statement type:
File details
Details for the file hawkapi_celery-0.1.0-py3-none-any.whl.
File metadata
- Download URL: hawkapi_celery-0.1.0-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ef1a780f1f2f54e95f69f6b3786895bff4a5923d6e6cfb835be7eb07807902a0
|
|
| MD5 |
96b8769d21c024e0f82801e1f6657dae
|
|
| BLAKE2b-256 |
4a36869d61332634c7c41fcfd1a4147883146c7a36acd2e20cc9faadb8fa1d3d
|
Provenance
The following attestation bundles were made for hawkapi_celery-0.1.0-py3-none-any.whl:
Publisher:
release.yml on ashimov/hawkapi-celery
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
hawkapi_celery-0.1.0-py3-none-any.whl -
Subject digest:
ef1a780f1f2f54e95f69f6b3786895bff4a5923d6e6cfb835be7eb07807902a0 - Sigstore transparency entry: 1552957248
- Sigstore integration time:
-
Permalink:
ashimov/hawkapi-celery@a8d6751615915a5e0478b99f92cdecfdf619f72d -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/ashimov
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@a8d6751615915a5e0478b99f92cdecfdf619f72d -
Trigger Event:
release
-
Statement type: