Skip to main content

An extension for celery to dispatch large amount of subtasks within a main task

Project description

celery-dispatcher

An extension for celery to dispatch large amount of subtasks within a main task, and process the results separately.

Installation

pip install celery-dispatcher

Usage

NOTICE: celery-dispatcher use tls to store running info, so the pool implementation using coroutines(like eventlet/gevent) can not be used

Firstly, yield subtask and its parameters in tuple from the main task by following order:

  1. task: signature
  2. args: tupple/list
  3. kwargs: dict
  4. options: dict

Then register the result handler for each subtask using signal:

from celery import shared_task
from celery_dispatcher import dispatch
from celery_dispatcher.signals import subtask_success

@shared_task
def sqrt(i):
    return i * i

@dispatch
@shared_task
def calc():
    for i in range(10):
        yield sqrt, (i,)

@subtask_success.connect(sender=calc)
def handle_result(root_id, task_id, retval, **kwargs):
    print(retval)

Or register in the decorator directly:

from celery import shared_task
from celery_dispatcher import dispatch

@shared_task
def sqrt(i):
    return i * i

def handle_result(root_id, task_id, retval, **kwargs):
    print(retval)

@dispatch(receiver=handle_result)
@shared_task
def calc():
    for i in range(10):
        yield sqrt, (i,)

If you want to revoke all subtasks, call revoke with specific signal:

import signal
from celery import shared_task
from celery_dispatcher import dispatch

@shared_task
def subtask(i):
    ...

@dispatch
@shared_task
def maintask():
    for i in range(10):
        yield subtask, (i,)

result = maintask.delay()
result.revoke(terminate=True, signal=signal.SIGUSR1)

Options

The dispatch accepts the following parameters:

  • options: dict
  • receiver: callable
  • backend: str
  • auto_ignore: bool

General settings

dispatcher_result_backend

Default: No result backend enabled by default.

The backend used to store subtask info. Can be one of the following:

dispatcher_batch_size

Default: 100

The batch size of subtask dispatching, or the result retrieving.

dispatcher_poll_size

Default: 100

The queue size of subtasks in result retrieving. celery-dispatcher put the unfinished subtasks into a queue, polling it continuously for completion of subtasks.

dispatcher_poll_timeout

Default: 10

The default timeout for polling subtasks.

dispatcher_subtask_timeout

Default: 3600

The default timeout in seconds before celery-dispatcher gives up retrieving the result of each subtask.

dispatcher_failure_on_subtask_timeout

Default: False

Whether raise exception when subtask timeout.

dispatcher_failure_on_subtask_exception

Default: False

Whether raise exception from subtask.

dispatcher_progress_update_frequency

Default: 1

The number of steps to record. celery-dispatcher only actually store the updated progress in the background at most every N steps.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery-dispatcher-1.1.3.tar.gz (12.3 kB view hashes)

Uploaded Source

Built Distribution

celery_dispatcher-1.1.3-py3-none-any.whl (14.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page