Skip to main content

Celery pool to run coroutine tasks

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

Celery Pool AsyncIO

Logo

  • Free software: Apache Software License 2.0

Features

import asyncio
from celery import Celery

# celery_pool_asyncio importing is optional
# It imports when you run worker or beat if you define pool or scheduler
# but it does not imports when you open REPL or when you run web application.
# If you want to apply monkey patches anyway to make identical environment
# when you use REPL or run web application app it's good idea to import
# celery_pool_asyncio module
import celery_pool_asyncio  # noqa
# Sometimes noqa does not disable linter (Spyder IDE)
celery_pool_asyncio.__package__


app = Celery()


@app.task(
    bind=True,
    soft_time_limit=42,  # raises celery.exceptions.SoftTimeLimitExceeded inside the coroutine
    time_limit=300,  # breaks coroutine execution with futures.TimeoutError
)
async def my_task(self, *args, **kwargs):
    await asyncio.sleep(5)


@app.task
async def my_simple_task(*args, **kwargs):
    await asyncio.sleep(5)

Then run celery:

$ celery worker -A hello_async_celery.app -P celery_pool_asyncio:TaskPool

Monkey patching: wtf and why

There are many monkey patches should be applied to make application working, and some of them should be applied as early as possible. You are able to switch off any of them by setting environment variable CPA_MONKEY_DENY. Remember you should have a great reason to do it.

Except critical for work features it allows:

# await data sending to broker
async_result = await my_simple_task.delay()

# await wainting for AsyncResult
result = await async_result.get()

You can manually disable any of them by enumerating it comma separated:

$ env CPA_MONKEY_DENY=CELERY.SEND_TASK,ALL_BACKENDS celery worker -A hello_async_celery.app -P celery_pool_asyncio:TaskPool

Disabling is available for:

  • CELERY.SEND_TASK
  • WORKCONTROLLER.USE_EVENTLOOP
  • BASERESULTCONSUMER.WAIT_FOR_PENDING
  • BASERESULTCONSUMER.DRAIN_EVENTS_UNTIL
  • ASYNCBACKENDMIXIN.WAIT_FOR_PENDING
  • ALL_BACKENDS
  • BEAT.SERVICE.START
  • BEAT.SERVICE.STOP
  • BUILD_TRACER
  • KOMBU.UTILS.COMPAT
  • RPC.RESULTCONSUMER.DRAIN_EVENTS
  • AMQPBACKEND.DRAIN_EVENTS
  • AMQPBACKEND.GET_MANY
  • AMQP_BACKEND
  • RPC_BACKEND

Scheduling

Default scheduler doesn't work. PersistentScheduler is subclass of default celery scheduler.

Running celery with scheduler:

$ celery worker -A hello_async_celery.app -P celery_pool_asyncio:TaskPool --scheduler celery_pool_asyncio:PersistentScheduler
$ celery beat -A hello_async_celery.app --scheduler celery_pool_asyncio:PersistentScheduler

Embeding also supported:

$ celery worker -A hello_async_celery.app -P celery_pool_asyncio:TaskPool --scheduler celery_pool_asyncio:PersistentScheduler -B

WARNING: embeded scheduler startup is not stable. It starts correctly in ~50% of cases. It looks like race condition. But after correct startup it works well. That's why it's good idea to run scheduler in separated process.

More examples

There is an example project uses celery-pool-asyncio: https://github.com/kai3341/celery-decorator-taskcls-example

Changelog

[0.1.12]

  • Finalize monkey patcher refactoring. Now you able to switch off applying of any monkey patch. Remember with great power comes great responsibility
  • Implement soft_time_limit
  • Implement revoke
  • Fix keywords

[0.1.11]

  • Total monkey patching refactor. Now it is enabled by default, but you can manually disable some of features using environment variable CPA_MONKEY_DENY

[0.1.10]

  • Make Celery Beat working
    • Add async Celery Scheduler
    • More monkey patching
  • Move loop and loop_runner to own module
    • Avoid creating multiple loops and loop_runners per application

[0.1.9]

  • Large rework of await AsyncResult.get()
    • Works much better than earlier, but it's crap still
    • Added outnumber of monkey-patches
  • Fixed race condition on first microseconds of pool shutdown

[0.1.8]

  • Cleanup tracer, use celery.app.trace namespase where it possible

[0.1.7]

  • Refactor monkey, split it
  • Move patch_send_task to own function
  • Add patch_result_get to await AsyncResult.get

[0.1.6]

  • Avoid building trace twice
  • Also this small performance optimization fixed AsyncResult.get

[0.1.5]

  • Fix graceful shutdown

[0.1.4]

  • Fix monkey: another function must be patched

[0.1.3]

  • Add changelog
  • Append documentation

[0.1.2]

  • Add monkey patcher to make brocker IO operations nonblocking

[0.1.1]

  • Refactor code
  • Fix found errors

[0.1.0]

  • Initial commit

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery-pool-asyncio-0.1.12.tar.gz (19.7 kB view details)

Uploaded Source

Built Distribution

celery_pool_asyncio-0.1.12-py2.py3-none-any.whl (17.9 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file celery-pool-asyncio-0.1.12.tar.gz.

File metadata

  • Download URL: celery-pool-asyncio-0.1.12.tar.gz
  • Upload date:
  • Size: 19.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.4.2 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/3.7.3

File hashes

Hashes for celery-pool-asyncio-0.1.12.tar.gz
Algorithm Hash digest
SHA256 9709469a7f289c77e337a75d67e52c2a2d0b6a32b808f3fef4d61ccc227d12f2
MD5 88ce824373a7c3016dc3a6fe92d6aedb
BLAKE2b-256 c24b5c821646cda9039af68838e127124327c2a1435a0f018c6115638ab7c6ab

See more details on using hashes here.

File details

Details for the file celery_pool_asyncio-0.1.12-py2.py3-none-any.whl.

File metadata

  • Download URL: celery_pool_asyncio-0.1.12-py2.py3-none-any.whl
  • Upload date:
  • Size: 17.9 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.4.2 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/3.7.3

File hashes

Hashes for celery_pool_asyncio-0.1.12-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 d5fce3b6c406435afa080d089becf12f956b907520411282a293c18f45bd224f
MD5 1c29f2cd1d4de6d2db532254aae15307
BLAKE2b-256 0eab3302d4b2df6e188c4425a4cde0e04dce09e249fd4bdfc0c912870051174e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page