Skip to main content

Asyncio plugin for Celery

Project description

acelery

A Celery plugin that enables seamless execution of asyncio coroutines as Celery tasks with proper event-loop management and asyncio-native result handling.

It lets you write your tasks as async def functions and consume their results from any asyncio caller without blocking the event loop.

Why?

Celery is synchronous by design: tasks are executed inside worker processes that don't know about asyncio, and AsyncResult.get() blocks the calling thread. That's painful when:

  • Your task logic naturally awaits I/O (HTTP, DB, queues, gRPC).
  • Your caller is an asyncio application (e.g. FastAPI / aiohttp) and can't afford a blocking .get().
  • You want to deduplicate "the same task with the same args" across concurrent callers so you only run the work once.

acelery addresses all three: a worker-side Runner that hosts a persistent event loop, an AIOResult that awaits results over Redis Pub/Sub instead of polling, and a trigger_or_join_task primitive that coalesces duplicate invocations.

Features

  • Async task decorator — write async def and decorate with @async_task() to run it inside a Celery worker.
  • Worker event-loop management — a shared asyncio runner is set up on worker_process_init and torn down on worker_process_shutdown.
  • Dedicated runners — opt into a per-task asyncio.run for stronger isolation when you need it.
  • Asyncio-native result handlingAIOResult.get() is a coroutine, backed by Redis Pub/Sub with periodic ticks to avoid races.
  • Task deduplicationAIOResult.trigger_or_join_task(...) either starts a new task or joins an existing in-flight one keyed by (name, args, kwargs).
  • Leftover-coroutine cleanup — orphan tasks left behind by your coroutine are detected, logged, and cancelled.
  • Retry on timeout — built on tenacity, configurable per call.
  • Pydantic-typed results — declare return_type=... and the raw backend payload is validated/parsed for you (Pydantic v1 and v2 supported).
  • Backends
    • Redis — full support (Pub/Sub-driven awaiting).
    • Other backends not currently supported.

Requirements

  • Python 3.10+
  • A Celery broker (RabbitMQ, Redis, etc. — Celery's defaults apply)
  • Redis as the result backend (required by AIOResult)

Installation

Using uv:

uv add acelery

Using poetry:

poetry add acelery

Using pip:

pip install acelery

Quick Start

Configure Celery

AIOResult reads task metadata over Redis Pub/Sub, so configure Celery with a Redis result backend:

from celery import Celery

app = Celery(
    "myapp",
    broker="amqp://guest:guest@localhost:5672//",
    backend="redis://localhost:6379/0",
)

Define an async task

import asyncio
from acelery import async_task

@app.task()
@async_task()
async def greet(name: str) -> str:
    await asyncio.sleep(0.1)
    return f"Hello, {name}!"

The decorator order matters: @app.task() must wrap @async_task(). The inner decorator turns the coroutine function into a synchronous callable that Celery can run; the outer one registers it as a Celery task.

Dedicated runner (per-task isolation)

By default tasks share the worker's global asyncio runner. Pass dedicated_runner=True for tasks that need their own asyncio.run invocation (at the cost of ~10µs of overhead per call):

@app.task()
@async_task(dedicated_runner=True)
async def isolated_task() -> str:
    await asyncio.sleep(1)
    return "isolated"

Awaiting results from asyncio

Wrap the standard Celery AsyncResult in AIOResult to await it without blocking:

from acelery import AIOResult

async def main() -> None:
    result = AIOResult(greet.delay("World"), return_type=str)
    value = await result.get(timeout=10)
    print(value)  # "Hello, World!"

get() accepts:

Argument Default Purpose
timeout 60 Hard upper bound for the whole wait.
propagate True Re-raise exceptions stored in the backend instead of returning them.
forget True Extend the result key's TTL to forget_cooldown after a successful read (so other late readers can still see it).
forget_cooldown 90 Seconds to retain the result before Redis evicts it.
interrupt_every 5.0 How often to fall back from Pub/Sub to a direct backend read (defends against missed messages).
max_pending_interrupts 3 Maximum consecutive "still pending" ticks before raising MaxPendingInterruptsError.

Trigger-or-join: deduplicate concurrent calls

If multiple callers fire the same task with the same args, you usually want one execution and many readers. trigger_or_join_task does exactly that, keyed by name(args, kwargs):

import asyncio
from acelery import AIOResult

@app.task(track_started=True)
@async_task()
async def sleeping_task(sleep_time: float) -> str:
    await asyncio.sleep(sleep_time)
    return "OK"

async def main() -> None:
    aio_result, value = await AIOResult.trigger_or_join_task(
        sleeping_task.s(2.0),
        timeout=30,
        return_type=str,
        track_started=True,  # set this whenever the task itself sets track_started=True
    )
    print(value)  # "OK"

Notes:

  • The first caller publishes the task and stores task_id under the dedupe key.
  • Subsequent callers find the existing task_id and await its result instead of re-publishing.
  • Retries on timeout are on by default (retry_on_timeout=True, max_retries_on_timeout=3).
  • If the task declares track_started=True, pass track_started=True here too so a PENDING state isn't mis-read as "still alive".

Calling Celery primitives from asyncio

AIOExecutable wraps a Signature / Task and exposes async delay / apply_async / apply, executed in a thread pool so they don't block your event loop:

from acelery import AIOExecutable

async def main() -> None:
    aio_result = await AIOExecutable(greet.s("World"), return_type=str).apply_async()
    print(await aio_result.get(timeout=10))

How it works

  • On worker_process_init, a module-level Runner (asyncio.Runner on 3.11+, polyfilled on 3.10) is started; on worker_process_shutdown it is closed cleanly.
  • @async_task runs the coroutine inside that shared runner, copies the current contextvars context for proper propagation, and after the coroutine returns it cancels any asyncio.Tasks left behind by the user code (with a warning log).
  • AIOResult.get() opens a Redis Pub/Sub subscription on the result key and races it against a periodic "tick" that reads the backend directly — this closes the race where the task finishes before the subscriber is fully attached.

License

See LICENSE.

Contributing

Issues, bug reports, and PRs are welcome. See CONTRIBUTING.md for development setup, testing, and submission guidelines.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

acelery-0.1.0a3.tar.gz (58.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

acelery-0.1.0a3-py3-none-any.whl (16.6 kB view details)

Uploaded Python 3

File details

Details for the file acelery-0.1.0a3.tar.gz.

File metadata

  • Download URL: acelery-0.1.0a3.tar.gz
  • Upload date:
  • Size: 58.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for acelery-0.1.0a3.tar.gz
Algorithm Hash digest
SHA256 37d6ba1e0bda8fadad879ce2d611074f9231bca3594f728e5952e888be5eff2b
MD5 70fc790a509c374b20810085a5380a3d
BLAKE2b-256 f619caee7c054654b5aa86a9f1f4292fe77ed75b9b2b4e4e78c36d438c10d13d

See more details on using hashes here.

Provenance

The following attestation bundles were made for acelery-0.1.0a3.tar.gz:

Publisher: release.yml on omer9564/acelery

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file acelery-0.1.0a3-py3-none-any.whl.

File metadata

  • Download URL: acelery-0.1.0a3-py3-none-any.whl
  • Upload date:
  • Size: 16.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for acelery-0.1.0a3-py3-none-any.whl
Algorithm Hash digest
SHA256 eeca85530a396e709d71f72b6f780ceaedc20f9263219242f5ccaba950b10262
MD5 48d47607aa36fdab882fd4fcbc582189
BLAKE2b-256 c595a1ce7f954c6204c60a06fcacaf64954be2390d422642e2b04d23f94c39db

See more details on using hashes here.

Provenance

The following attestation bundles were made for acelery-0.1.0a3-py3-none-any.whl:

Publisher: release.yml on omer9564/acelery

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page