Skip to main content

An extension for celery to dispatch large amount of subtasks within a main task

Project description

celery-dispatcher

An extension for celery to dispatch large amount of subtasks within a main task, and process the results separately.

Installation

pip install celery-dispatcher

Usage

NOTICE: celery-dispatcher use tls to store running info, so the pool implementation using coroutines(like eventlet/gevent) can not be used

Firstly, yield subtask and its parameters in tuple from the main task by following order:

  1. task: signature
  2. args: tupple/list
  3. kwargs: dict
  4. options: dict

Then register the result handler for each subtask using signal:

from celery import shared_task
from celery_dispatcher import dispatch
from celery_dispatcher.signals import subtask_success

@shared_task
def sqrt(i):
    return i * i

@dispatch
@shared_task
def calc():
    for i in range(10):
        yield sqrt, (i,)

@subtask_success.connect(sender=calc)
def handle_result(root_id, task_id, retval, **kwargs):
    print(retval)

Or register in the decorator directly:

from celery import shared_task
from celery_dispatcher import dispatch

@shared_task
def sqrt(i):
    return i * i

def handle_result(root_id, task_id, retval, **kwargs):
    print(retval)

@dispatch(receiver=handle_result)
@shared_task
def calc():
    for i in range(10):
        yield sqrt, (i,)

If you want to revoke all subtasks, call revoke with specific signal:

import signal
from celery import shared_task
from celery_dispatcher import dispatch

@shared_task
def subtask(i):
    ...

@dispatch
@shared_task
def maintask():
    for i in range(10):
        yield subtask, (i,)

result = maintask.delay()
result.revoke(terminate=True, signal=signal.SIGUSR1)

Options

The dispatch accepts the following parameters:

  • options: dict
  • receiver: callable
  • backend: str
  • auto_ignore: bool

General settings

dispatcher_result_backend

Default: No result backend enabled by default.

The backend used to store subtask info. Can be one of the following:

dispatcher_batch_size

Default: 100

The batch size of subtask dispatching, or the result retrieving.

dispatcher_poll_size

Default: 100

The queue size of subtasks in result retrieving. celery-dispatcher put the unfinished subtasks into a queue, polling it continuously for completion of subtasks.

dispatcher_poll_timeout

Default: 10

The default timeout for polling subtasks.

dispatcher_subtask_timeout

Default: 3600

The default timeout in seconds before celery-dispatcher gives up retrieving the result of each subtask.

dispatcher_failure_on_subtask_timeout

Default: False

Whether raise exception when subtask timeout.

dispatcher_failure_on_subtask_exception

Default: False

Whether raise exception from subtask.

dispatcher_progress_update_frequency

Default: 1

The number of steps to record. celery-dispatcher only actually store the updated progress in the background at most every N steps.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

celery-dispatcher-1.1.3.tar.gz (12.3 kB view details)

Uploaded Source

Built Distribution

celery_dispatcher-1.1.3-py3-none-any.whl (14.3 kB view details)

Uploaded Python 3

File details

Details for the file celery-dispatcher-1.1.3.tar.gz.

File metadata

  • Download URL: celery-dispatcher-1.1.3.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for celery-dispatcher-1.1.3.tar.gz
Algorithm Hash digest
SHA256 906583aa51f63ac7a1d9c1bf749004b98743cd4312c52dcd8cb153be8ab9d533
MD5 e5492157421b247be15fc53b07bf40ae
BLAKE2b-256 e3149252676f3d46dd9eae29f639eafd00a14c20f2517a86a36bdaa725a0e6a6

See more details on using hashes here.

File details

Details for the file celery_dispatcher-1.1.3-py3-none-any.whl.

File metadata

File hashes

Hashes for celery_dispatcher-1.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 21bd7fdf3d76e27fab758344b38f07d5aac6babe9731b2b6daabc1cba401a064
MD5 49b32ef2fd29394ffd82f8c868209687
BLAKE2b-256 6a538023a4e743baff17fc18c03aaade45efcb7a7d52a7f33514e9ac9c347e68

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page