An extension for celery to dispatch large amount of subtasks within a main task
Project description
celery-dispatcher
An extension for celery to dispatch large amount of subtasks within a main task, and process the results separately.
Installation
pip install celery-dispatcher
Usage
NOTICE:
celery-dispatcher
use tls to store running info, so the pool implementation using coroutines(like eventlet/gevent) can not be used
Firstly, yield subtask and its parameters in tuple from the main task by following order:
task
: signatureargs
: tupple/listkwargs
: dictoptions
: dict
Then register the result handler for each subtask using signal:
from celery import shared_task
from celery_dispatcher import dispatch
from celery_dispatcher.signals import subtask_success
@shared_task
def sqrt(i):
return i * i
@dispatch
@shared_task
def calc():
for i in range(10):
yield sqrt, (i,)
@subtask_success.connect(sender=calc)
def handle_result(root_id, task_id, retval, **kwargs):
print(retval)
Or register in the decorator directly:
from celery import shared_task
from celery_dispatcher import dispatch
@shared_task
def sqrt(i):
return i * i
def handle_result(root_id, task_id, retval, **kwargs):
print(retval)
@dispatch(receiver=handle_result)
@shared_task
def calc():
for i in range(10):
yield sqrt, (i,)
Options
The dispatch
accepts the following parameters:
options
: dictreceiver
: callablebackend
: strauto_ignore
: bool
General settings
dispatcher_result_backend
Default: No result backend enabled by default.
The backend used to store subtask info. Can be one of the following:
- redis: Use Redis to store the results. See Redis backend settings.
dispatcher_batch_size
Default: 1000
The batch size of subtask dispatching, or the result retrieving.
dispatcher_poll_size
Default: 1000
The queue size of subtasks in result retrieving. celery-dispatcher
put the unfinished subtasks into a queue, polling it continuously for completion of subtasks.
dispatcher_poll_timeout
Default: 1
The default timeout for polling subtasks.
dispatcher_subtask_timeout
Default: 3600
The default timeout in seconds before celery-dispatcher
gives up retrieving the result of each subtask.
dispatcher_failure_on_subtask_timeout
Default: False
Whether raise exception when subtask timeout.
dispatcher_failure_on_subtask_exception
Default: False
Whether raise exception from subtask.
dispatcher_progress_update_frequency
Default: 1
The number of steps to record. celery-dispatcher
only actually store the updated progress in the background at most every N steps.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for celery_dispatcher-1.0.6-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 12d1cb085b29adfa648dee8ea5c64f90eabaa11036600e89cb9f306f93f1df87 |
|
MD5 | 80869778f8bfa7c0e88936710680cd8e |
|
BLAKE2b-256 | 52552e9e2052b539fd44398424b689a7a245968597f7f09732c7a3c8c81f05d8 |