Skip to main content

Celery pool to run coroutine tasks

This project has been archived.

The maintainers of this project have marked this project as archived. No new releases are expected.

Project description

Celery Pool AsyncIO

Logo

  • Free software: Apache Software License 2.0

Features

import asyncio
from celery import Celery
from celery_pool_asyncio import monkey as cpa_monkey

# Apply monkey patches before creating celery app
cpa_monkey.patch()

app = Celery()


@app.task(
    bind=True,
    soft_time_limit=None,  # temporary unimplemented. You can help me
    time_limit=300,  # raises futures.TimeoutError on timeout
)
async def my_task(self, *args, **kwargs):
    await asyncio.sleep(5)


@app.task
async def my_simple_task(*args, **kwargs):
    await asyncio.sleep(5)

Then run celery:

$ celery worker -A hello_async_celery.app -P celery_pool_asyncio:TaskPool

Monkey patching: wtf and why

There are many monkey patches applies automatically, but some of them are optional or may change application behavior. It's ok in general, but exceptions are possible. That's why it's good idea to apply it manually.

Allows:

async_result = await my_simple_task.delay()
result = await async_result.get()

Manual targets:

  • celery.app.Celery.send_task
  • celery.worker.worker.WorkController.should_use_eventloop
  • celery.backends.asynchronous.BaseResultConsumer._wait_for_pending
  • celery.backends.asynchronous.BaseResultConsumer.drain_events_until
  • celery.backends.asynchronous.AsyncBackendMixin.wait_for_pending
  • celery.backends.amqp.AMQPBackend.drain_events
  • celery.backends.amqp.AMQPBackend.get_many
  • celery.backends.rpc.ResultConsumer.drain_events

Scheduling

Default scheduler doesn't work. PersistentScheduler is subclass of default celery scheduler.

Running celery with scheduler:

$ celery worker -A hello_async_celery.app -P celery_pool_asyncio:TaskPool --scheduler celery_pool_asyncio:PersistentScheduler
$ celery beat -A hello_async_celery.app --scheduler celery_pool_asyncio:PersistentScheduler

Embeding also supported:

$ celery worker -A hello_async_celery.app -P celery_pool_asyncio:TaskPool --scheduler celery_pool_asyncio:PersistentScheduler -B

WARNING: embeded scheduler startup is not stable. It starts correctly in ~50% of cases. It looks like race condition. But after correct startup it works well.

More examples

There is an example project uses celery-pool-asyncio: https://github.com/kai3341/celery-decorator-taskcls-example

Changelog

[0.1.10]

  • Make Celery Beat working
    • Add async Celery Scheduler
    • More monkey patching
  • Move loop and loop_runner to own module
    • Avoid creating multiple loops and loop_runners per application

[0.1.9]

  • Large rework of await AsyncResult.get()
    • Works much better than earlier, but it's crap still
    • Added outnumber of monkey-patches
  • Fixed race condition on first microseconds of pool shutdown

[0.1.8]

  • Cleanup tracer, use celery.app.trace namespase where it possible

[0.1.7]

  • Refactor monkey, split it
  • Move patch_send_task to own function
  • Add patch_result_get to await AsyncResult.get

[0.1.6]

  • Avoid building trace twice
  • Also this small performance optimization fixed AsyncResult.get

[0.1.5]

  • Fix graceful shutdown

[0.1.4]

  • Fix monkey: another function must be patched

[0.1.3]

  • Add changelog
  • Append documentation

[0.1.2]

  • Add monkey patcher to make brocker IO operations nonblocking

[0.1.1]

  • Refactor code
  • Fix found errors

[0.1.0]

  • Initial commit

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery-pool-asyncio-0.1.10.tar.gz (17.3 kB view details)

Uploaded Source

Built Distribution

celery_pool_asyncio-0.1.10-py2.py3-none-any.whl (15.2 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file celery-pool-asyncio-0.1.10.tar.gz.

File metadata

  • Download URL: celery-pool-asyncio-0.1.10.tar.gz
  • Upload date:
  • Size: 17.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.4.2 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/3.7.3

File hashes

Hashes for celery-pool-asyncio-0.1.10.tar.gz
Algorithm Hash digest
SHA256 39f6b81627a1965071ce561b9bf4bfa39ffb338cb04e291bcde9192a3017d171
MD5 127513eeea6bc518cb9b05631ec1a565
BLAKE2b-256 78824707e93ba02717e81695c496043b57d0c89e628c5853dbaf1a6b467d1e1e

See more details on using hashes here.

File details

Details for the file celery_pool_asyncio-0.1.10-py2.py3-none-any.whl.

File metadata

  • Download URL: celery_pool_asyncio-0.1.10-py2.py3-none-any.whl
  • Upload date:
  • Size: 15.2 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.13.0 pkginfo/1.4.2 requests/2.21.0 setuptools/40.8.0 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/3.7.3

File hashes

Hashes for celery_pool_asyncio-0.1.10-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 4389546757c5b760bbab46dc1a92ee74c47859684882edd074ffdce9caacbd40
MD5 5e4e4e67de5f0c0186b8bc807c441f67
BLAKE2b-256 786f7ee8e64051d7dc7ca4b91605f37b63390e870551a811059129d22597d06a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page