Asyncio plugin for Celery
Project description
acelery
A Celery plugin that enables seamless execution of asyncio coroutines as Celery tasks with proper event-loop management and asyncio-native result handling.
It lets you write your tasks as async def functions and consume their results from any asyncio caller without blocking the event loop.
Why?
Celery is synchronous by design: tasks are executed inside worker processes that don't know about asyncio, and AsyncResult.get() blocks the calling thread. That's painful when:
- Your task logic naturally
awaits I/O (HTTP, DB, queues, gRPC). - Your caller is an
asyncioapplication (e.g. FastAPI / aiohttp) and can't afford a blocking.get(). - You want to deduplicate "the same task with the same args" across concurrent callers so you only run the work once.
acelery addresses all three: a worker-side Runner that hosts a persistent event loop, an AIOResult that awaits results over Redis Pub/Sub instead of polling, and a trigger_or_join_task primitive that coalesces duplicate invocations.
Features
- Async task decorator — write
async defand decorate with@async_task()to run it inside a Celery worker. - Worker event-loop management — a shared
asynciorunner is set up onworker_process_initand torn down onworker_process_shutdown. - Dedicated runners — opt into a per-task
asyncio.runfor stronger isolation when you need it. - Asyncio-native result handling —
AIOResult.get()is a coroutine, backed by Redis Pub/Sub with periodic ticks to avoid races. - Task deduplication —
AIOResult.trigger_or_join_task(...)either starts a new task or joins an existing in-flight one keyed by(name, args, kwargs). - Leftover-coroutine cleanup — orphan tasks left behind by your coroutine are detected, logged, and cancelled.
- Retry on timeout — built on
tenacity, configurable per call. - Pydantic-typed results — declare
return_type=...and the raw backend payload is validated/parsed for you (Pydantic v1 and v2 supported). - Backends
- Redis — full support (Pub/Sub-driven awaiting).
- Other backends not currently supported.
Requirements
- Python 3.10+
- A Celery broker (RabbitMQ, Redis, etc. — Celery's defaults apply)
- Redis as the result backend (required by
AIOResult)
Installation
Using uv:
uv add acelery
Using poetry:
poetry add acelery
Using pip:
pip install acelery
Quick Start
Configure Celery
AIOResult reads task metadata over Redis Pub/Sub, so configure Celery with a Redis result backend:
from celery import Celery
app = Celery(
"myapp",
broker="amqp://guest:guest@localhost:5672//",
backend="redis://localhost:6379/0",
)
Define an async task
import asyncio
from acelery import async_task
@app.task()
@async_task()
async def greet(name: str) -> str:
await asyncio.sleep(0.1)
return f"Hello, {name}!"
The decorator order matters: @app.task() must wrap @async_task(). The inner decorator turns the coroutine function into a synchronous callable that Celery can run; the outer one registers it as a Celery task.
Dedicated runner (per-task isolation)
By default tasks share the worker's global asyncio runner. Pass dedicated_runner=True for tasks that need their own asyncio.run invocation (at the cost of ~10µs of overhead per call):
@app.task()
@async_task(dedicated_runner=True)
async def isolated_task() -> str:
await asyncio.sleep(1)
return "isolated"
Awaiting results from asyncio
Wrap the standard Celery AsyncResult in AIOResult to await it without blocking:
from acelery import AIOResult
async def main() -> None:
result = AIOResult(greet.delay("World"), return_type=str)
value = await result.get(timeout=10)
print(value) # "Hello, World!"
get() accepts:
| Argument | Default | Purpose |
|---|---|---|
timeout |
60 |
Hard upper bound for the whole wait. |
propagate |
True |
Re-raise exceptions stored in the backend instead of returning them. |
forget |
True |
Extend the result key's TTL to forget_cooldown after a successful read (so other late readers can still see it). |
forget_cooldown |
90 |
Seconds to retain the result before Redis evicts it. |
interrupt_every |
5.0 |
How often to fall back from Pub/Sub to a direct backend read (defends against missed messages). |
max_pending_interrupts |
3 |
Maximum consecutive "still pending" ticks before raising MaxPendingInterruptsError. |
Trigger-or-join: deduplicate concurrent calls
If multiple callers fire the same task with the same args, you usually want one execution and many readers. trigger_or_join_task does exactly that, keyed by name(args, kwargs):
import asyncio
from acelery import AIOResult
@app.task(track_started=True)
@async_task()
async def sleeping_task(sleep_time: float) -> str:
await asyncio.sleep(sleep_time)
return "OK"
async def main() -> None:
aio_result, value = await AIOResult.trigger_or_join_task(
sleeping_task.s(2.0),
timeout=30,
return_type=str,
track_started=True, # set this whenever the task itself sets track_started=True
)
print(value) # "OK"
Notes:
- The first caller publishes the task and stores
task_idunder the dedupe key. - Subsequent callers find the existing
task_idandawaitits result instead of re-publishing. - Retries on timeout are on by default (
retry_on_timeout=True,max_retries_on_timeout=3). - If the task declares
track_started=True, passtrack_started=Truehere too so aPENDINGstate isn't mis-read as "still alive".
Calling Celery primitives from asyncio
AIOExecutable wraps a Signature / Task and exposes async delay / apply_async / apply, executed in a thread pool so they don't block your event loop:
from acelery import AIOExecutable
async def main() -> None:
aio_result = await AIOExecutable(greet.s("World"), return_type=str).apply_async()
print(await aio_result.get(timeout=10))
How it works
- On
worker_process_init, a module-levelRunner(asyncio.Runneron 3.11+, polyfilled on 3.10) is started; onworker_process_shutdownit is closed cleanly. @async_taskruns the coroutine inside that shared runner, copies the currentcontextvarscontext for proper propagation, and after the coroutine returns it cancels anyasyncio.Tasks left behind by the user code (with a warning log).AIOResult.get()opens a Redis Pub/Sub subscription on the result key and races it against a periodic "tick" that reads the backend directly — this closes the race where the task finishes before the subscriber is fully attached.
License
See LICENSE.
Contributing
Issues, bug reports, and PRs are welcome. See CONTRIBUTING.md for development setup, testing, and submission guidelines.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file acelery-0.1.0a3.tar.gz.
File metadata
- Download URL: acelery-0.1.0a3.tar.gz
- Upload date:
- Size: 58.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
37d6ba1e0bda8fadad879ce2d611074f9231bca3594f728e5952e888be5eff2b
|
|
| MD5 |
70fc790a509c374b20810085a5380a3d
|
|
| BLAKE2b-256 |
f619caee7c054654b5aa86a9f1f4292fe77ed75b9b2b4e4e78c36d438c10d13d
|
Provenance
The following attestation bundles were made for acelery-0.1.0a3.tar.gz:
Publisher:
release.yml on omer9564/acelery
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
acelery-0.1.0a3.tar.gz -
Subject digest:
37d6ba1e0bda8fadad879ce2d611074f9231bca3594f728e5952e888be5eff2b - Sigstore transparency entry: 1674420919
- Sigstore integration time:
-
Permalink:
omer9564/acelery@ef0fa0633347183720dd3b808f7a2cfe63d8ce94 -
Branch / Tag:
refs/tags/0.1.0-alpha.3 - Owner: https://github.com/omer9564
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@ef0fa0633347183720dd3b808f7a2cfe63d8ce94 -
Trigger Event:
release
-
Statement type:
File details
Details for the file acelery-0.1.0a3-py3-none-any.whl.
File metadata
- Download URL: acelery-0.1.0a3-py3-none-any.whl
- Upload date:
- Size: 16.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eeca85530a396e709d71f72b6f780ceaedc20f9263219242f5ccaba950b10262
|
|
| MD5 |
48d47607aa36fdab882fd4fcbc582189
|
|
| BLAKE2b-256 |
c595a1ce7f954c6204c60a06fcacaf64954be2390d422642e2b04d23f94c39db
|
Provenance
The following attestation bundles were made for acelery-0.1.0a3-py3-none-any.whl:
Publisher:
release.yml on omer9564/acelery
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
acelery-0.1.0a3-py3-none-any.whl -
Subject digest:
eeca85530a396e709d71f72b6f780ceaedc20f9263219242f5ccaba950b10262 - Sigstore transparency entry: 1674420947
- Sigstore integration time:
-
Permalink:
omer9564/acelery@ef0fa0633347183720dd3b808f7a2cfe63d8ce94 -
Branch / Tag:
refs/tags/0.1.0-alpha.3 - Owner: https://github.com/omer9564
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@ef0fa0633347183720dd3b808f7a2cfe63d8ce94 -
Trigger Event:
release
-
Statement type: