Skip to main content

concurrent execution in various flavors

Project description

godale

Debauchery tools for concurrent task execution.

pipeline status coverage report

Here you find wrappers around some concurrent.futures, multiprocessing and billiard functions to facilitate switching between parallelization implementations:

  • concurrent.futures multithreading: Python 3 standard tool for multithreading
  • concurrent.futures multiprocessing: Python 3 standard tool for multiprocessing
  • multiprocessing multiprocessing: Python 3 standard tool for multiprocessing
  • billiard multiprocessing: Multiprocessing for celery workers.

All four currently implemented wrappers handle parallel execution of the tasks and yield result (FinishedTask) objects having result() and exception() methods. This enables catching of exceptions without cancelling remaining tasks.

Usage:

from godale import Executor

# the first function argument will be replaced by items from the "items" argument below
# subsequent arguments and keyword arguments can be passed on using "fargs" and "fkwargs"
def _worker_function(a, b, some_kwarg=None):
    return a * b


# multiprocessing the Python 3 way
executor = Executor(executor="concurrent_processes")
for task in executor.as_completed(
    func=_worker_function,  # function to be executed
    iterable=range(100),  # items to be parallelized
    fargs=(10, ),  # other function arguments
    fkwargs={"a_kwarg"=True}  # function keyword arguments
):
    try:
        print(task.result())
    except ValueError:
        print("task failed")


# multithreading the Python 3 way
executor = Executor(executor="concurrent_threads")
for task in executor.as_completed(
    func=_worker_function,
    iterable=range(100),
    fargs=(10, )
):
    try:
        print(task.result())
    except ValueError:
        print("task failed")


# multiprocessing within a celery worker
executor = Executor(executor="billiard")
for task in executor.as_completed(
    func=_worker_function,
    iterable=range(100),
    fargs=(10, )
):
    try:
        print(task.result())
    except ValueError:
        print("task failed")


# using the multiprocessing standard module
executor = Executor(executor="multiprocessing")
for task in executor.as_completed(
    func=_worker_function,
    iterable=range(100),
    fargs=(10, )
):
    try:
        print(task.result())
    except ValueError:
        print("task failed")


# use different start_method than "fork"
# NOTE: with concurrent.futures and Python 3.6 and earlier, "start_method" other than
# "fork" will raise an RuntimeError
for task in executor.as_completed(
    func=_worker_function,
    iterable=range(100),
    fargs=(10, ),
    start_method="spawn"
):
    try:
        print(task.result())
    except ValueError:
        print("task failed")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

godale-0.3.tar.gz (4.7 kB view hashes)

Uploaded Source

Built Distribution

godale-0.3-py3-none-any.whl (6.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page