Skip to main content

Asyncio plugin for Celery

Project description

acelery

A Celery plugin that enables seamless execution of asyncio coroutines as Celery tasks with proper event-loop management and asyncio-native result handling.

It lets you write your tasks as async def functions and consume their results from any asyncio caller without blocking the event loop.

Why?

Celery is synchronous by design: tasks are executed inside worker processes that don't know about asyncio, and AsyncResult.get() blocks the calling thread. That's painful when:

  • Your task logic naturally awaits I/O (HTTP, DB, queues, gRPC).
  • Your caller is an asyncio application (e.g. FastAPI / aiohttp) and can't afford a blocking .get().
  • You want to deduplicate "the same task with the same args" across concurrent callers so you only run the work once.

acelery addresses all three: a worker-side Runner that hosts a persistent event loop, an AIOResult that awaits results over Redis Pub/Sub instead of polling, and a trigger_or_join_task primitive that coalesces duplicate invocations.

Features

  • Async task decorator — write async def and decorate with @async_task() to run it inside a Celery worker.
  • Worker event-loop management — a shared asyncio runner is set up on worker_process_init and torn down on worker_process_shutdown.
  • Dedicated runners — opt into a per-task asyncio.run for stronger isolation when you need it.
  • Asyncio-native result handlingAIOResult.get() is a coroutine, backed by Redis Pub/Sub with periodic ticks to avoid races.
  • Task deduplicationAIOResult.trigger_or_join_task(...) either starts a new task or joins an existing in-flight one keyed by (name, args, kwargs).
  • Leftover-coroutine cleanup — orphan tasks left behind by your coroutine are detected, logged, and cancelled.
  • Retry on timeout — built on tenacity, configurable per call.
  • Pydantic-typed results — declare return_type=... and the raw backend payload is validated/parsed for you (Pydantic v1 and v2 supported).
  • Backends
    • Redis — full support (Pub/Sub-driven awaiting).
    • Other backends not currently supported.

Requirements

  • Python 3.10+
  • A Celery broker (RabbitMQ, Redis, etc. — Celery's defaults apply)
  • Redis as the result backend (required by AIOResult)

Installation

Using uv:

uv add acelery

Using poetry:

poetry add acelery

Using pip:

pip install acelery

Quick Start

Configure Celery

AIOResult reads task metadata over Redis Pub/Sub, so configure Celery with a Redis result backend:

from celery import Celery

app = Celery(
    "myapp",
    broker="amqp://guest:guest@localhost:5672//",
    backend="redis://localhost:6379/0",
)

Define an async task

import asyncio
from acelery import async_task

@app.task()
@async_task()
async def greet(name: str) -> str:
    await asyncio.sleep(0.1)
    return f"Hello, {name}!"

The decorator order matters: @app.task() must wrap @async_task(). The inner decorator turns the coroutine function into a synchronous callable that Celery can run; the outer one registers it as a Celery task.

Dedicated runner (per-task isolation)

By default tasks share the worker's global asyncio runner. Pass dedicated_runner=True for tasks that need their own asyncio.run invocation (at the cost of ~10µs of overhead per call):

@app.task()
@async_task(dedicated_runner=True)
async def isolated_task() -> str:
    await asyncio.sleep(1)
    return "isolated"

Awaiting results from asyncio

Wrap the standard Celery AsyncResult in AIOResult to await it without blocking:

from acelery import AIOResult

async def main() -> None:
    result = AIOResult(greet.delay("World"), return_type=str)
    value = await result.get(timeout=10)
    print(value)  # "Hello, World!"

get() accepts:

Argument Default Purpose
timeout 60 Hard upper bound for the whole wait.
propagate True Re-raise exceptions stored in the backend instead of returning them.
forget True Extend the result key's TTL to forget_cooldown after a successful read (so other late readers can still see it).
forget_cooldown 90 Seconds to retain the result before Redis evicts it.
interrupt_every 5.0 How often to fall back from Pub/Sub to a direct backend read (defends against missed messages).
max_pending_interrupts 3 Maximum consecutive "still pending" ticks before raising MaxPendingInterruptsError.

Trigger-or-join: deduplicate concurrent calls

If multiple callers fire the same task with the same args, you usually want one execution and many readers. trigger_or_join_task does exactly that, keyed by name(args, kwargs):

import asyncio
from acelery import AIOResult

@app.task(track_started=True)
@async_task()
async def sleeping_task(sleep_time: float) -> str:
    await asyncio.sleep(sleep_time)
    return "OK"

async def main() -> None:
    aio_result, value = await AIOResult.trigger_or_join_task(
        sleeping_task.s(2.0),
        timeout=30,
        return_type=str,
        track_started=True,  # set this whenever the task itself sets track_started=True
    )
    print(value)  # "OK"

Notes:

  • The first caller publishes the task and stores task_id under the dedupe key.
  • Subsequent callers find the existing task_id and await its result instead of re-publishing.
  • Retries on timeout are on by default (retry_on_timeout=True, max_retries_on_timeout=3).
  • If the task declares track_started=True, pass track_started=True here too so a PENDING state isn't mis-read as "still alive".

Calling Celery primitives from asyncio

AIOExecutable wraps a Signature / Task and exposes async delay / apply_async / apply, executed in a thread pool so they don't block your event loop:

from acelery import AIOExecutable

async def main() -> None:
    aio_result = await AIOExecutable(greet.s("World"), return_type=str).apply_async()
    print(await aio_result.get(timeout=10))

How it works

  • On worker_process_init, a module-level Runner (asyncio.Runner on 3.11+, polyfilled on 3.10) is started; on worker_process_shutdown it is closed cleanly.
  • @async_task runs the coroutine inside that shared runner, copies the current contextvars context for proper propagation, and after the coroutine returns it cancels any asyncio.Tasks left behind by the user code (with a warning log).
  • AIOResult.get() opens a Redis Pub/Sub subscription on the result key and races it against a periodic "tick" that reads the backend directly — this closes the race where the task finishes before the subscriber is fully attached.

License

See LICENSE.

Contributing

Issues, bug reports, and PRs are welcome. See CONTRIBUTING.md for development setup, testing, and submission guidelines.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

acelery-0.1.0.tar.gz (58.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

acelery-0.1.0-py3-none-any.whl (16.6 kB view details)

Uploaded Python 3

File details

Details for the file acelery-0.1.0.tar.gz.

File metadata

  • Download URL: acelery-0.1.0.tar.gz
  • Upload date:
  • Size: 58.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for acelery-0.1.0.tar.gz
Algorithm Hash digest
SHA256 4c6e87ff44357f2e3477fae61bf6c4aad678ebb7442ee7dc99b0b2d502a97757
MD5 cb877248f6e7f02dc08c35ba432294bf
BLAKE2b-256 6e2fa4bb8706572d06a995e0cc8022c01b771644214957565b68c2e3f145f5e2

See more details on using hashes here.

Provenance

The following attestation bundles were made for acelery-0.1.0.tar.gz:

Publisher: release.yml on omer9564/acelery

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file acelery-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: acelery-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 16.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for acelery-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b14c3367e1c0642c9bead1a19e2e2fbdc033af3fb48fc1899b4974126d3656b9
MD5 91dddcb8f1910397f1a12a49647c9ae6
BLAKE2b-256 862a0d98573eead895755f0529da261019d3a229e963e2f3ef92778c58f99205

See more details on using hashes here.

Provenance

The following attestation bundles were made for acelery-0.1.0-py3-none-any.whl:

Publisher: release.yml on omer9564/acelery

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page