Skip to main content

concurrent execution in various flavors

Project description

godale

Debauchery tools for concurrent task execution.

pipeline status coverage report

Here you find wrappers around some concurrent.futures, multiprocessing and billiard functions to facilitate switching between parallelization implementations:

  • concurrent.futures multithreading: Python 3 standard tool for multithreading
  • concurrent.futures multiprocessing: Python 3 standard tool for multiprocessing
  • multiprocessing multiprocessing: Python 3 standard tool for multiprocessing
  • billiard multiprocessing: Multiprocessing for celery workers.

All four currently implemented wrappers handle parallel execution of the tasks and yield result (FinishedTask) objects having result() and exception() methods. This enables catching of exceptions without cancelling remaining tasks.

Usage:

from godale import Executor

# the first function argument will be replaced by items from the "items" argument below
# subsequent arguments and keyword arguments can be passed on using "fargs" and "fkwargs"
def _worker_function(a, b, some_kwarg=None):
    return a * b


# multiprocessing the Python 3 way
executor = Executor(executor="concurrent_processes")
for task in executor.as_completed(
    func=_worker_function,  # function to be executed
    iterable=range(100),  # items to be parallelized
    fargs=(10, ),  # other function arguments
    fkwargs={"a_kwarg"=True}  # function keyword arguments
):
    try:
        print(task.result())
    except ValueError:
        print("task failed")


# multithreading the Python 3 way
executor = Executor(executor="concurrent_threads")
for task in executor.as_completed(
    func=_worker_function,
    iterable=range(100),
    fargs=(10, )
):
    try:
        print(task.result())
    except ValueError:
        print("task failed")


# multiprocessing within a celery worker
executor = Executor(executor="billiard")
for task in executor.as_completed(
    func=_worker_function,
    iterable=range(100),
    fargs=(10, )
):
    try:
        print(task.result())
    except ValueError:
        print("task failed")


# using the multiprocessing standard module
executor = Executor(executor="multiprocessing")
for task in executor.as_completed(
    func=_worker_function,
    iterable=range(100),
    fargs=(10, )
):
    try:
        print(task.result())
    except ValueError:
        print("task failed")


# use different start_method than "fork"
# NOTE: with concurrent.futures and Python 3.6 and earlier, "start_method" other than
# "fork" will raise an RuntimeError
for task in executor.as_completed(
    func=_worker_function,
    iterable=range(100),
    fargs=(10, ),
    start_method="spawn"
):
    try:
        print(task.result())
    except ValueError:
        print("task failed")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for godale, version 0.3
Filename, size File type Python version Upload date Hashes
Filename, size godale-0.3-py3-none-any.whl (6.3 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size godale-0.3.tar.gz (4.7 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring DigiCert DigiCert EV certificate Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page