Skip to main content

Parallel processing with progress bars

Project description

Parallelbar

PyPI version fury.io PyPI license PyPI download month

Table of contents

Parallelbar displays the progress of tasks in the process pool for Pool class methods such as map, starmap (since 1.2 version), imap and imap_unordered. Parallelbar is based on the tqdm module and the standard python multiprocessing library. Also, it is possible to handle exceptions that occur within a separate process, as well as set a timeout for the execution of a task by a process.

Installation

pip install parallelbar
or
pip install --user git+https://github.com/dubovikmaster/parallelbar.git

Usage

from parallelbar import progress_imap, progress_map, progress_imapu
from parallelbar.tools import cpu_bench, fibonacci

Let's create a list of 100 numbers and test progress_map with default parameters on a toy function cpu_bench:

tasks = range(10000)
%%time
list(map(cpu_bench, tasks))
Wall time: 52.6 s

Ok, by default this works on one core of my i7-9700F and it took 52 seconds. Let's parallelize the calculations for all 8 cores and look at the progress. This can be easily done by replacing standart function map with progress_map.

if __name__=='__main__':
    progress_map(cpu_bench, tasks)

Core progress:

You can also easily use progress_imap and progress_imapu analogs of the imap and imap_unordered methods of the Pool() class

%%time
if __name__=='__main__':
    tasks = [20 + i for i in range(15)]
    result = progress_imap(fibonacci, tasks, chunk_size=1, core_progress=False)

Exception handling

You can handle exceptions and set timeouts for the execution of tasks by the process.
Consider the following toy example:

def foo(n):
    if n==5 or n==17:
        1/0
    elif n==10:
        time.sleep(2)
    else:
        time.sleep(1)
    return n
if __name__=='__main__':
	res = progress_map(foo, range(20), process_timeout=5, n_cpu=8)

As you can see, under the main progress bar, another progress bar has appeared that displays the number of tasks that ended unsuccessfully. At the same time, the main bar turned orange, as if signaling something went wrong

print(res)
	[0, 1, 2, 3, 4, ZeroDivisionError('division by zero'), 6, 7, 8, 9, 10, 11, 12,
     13, 14, 15, 16, ZeroDivisionError('division by zero'), 18, 19]

In the resulting array, we have exceptions in the corresponding places. Also, we can see the exception traceback:

print(res[5].traceback)
Traceback (most recent call last):
  File "/home/padu/anaconda3/envs/work/lib/python3.9/site-packages/pebble/common.py", line 174, in process_execute
    return function(*args, **kwargs)
  File "/home/padu/anaconda3/envs/work/lib/python3.9/site-packages/parallelbar/parallelbar.py", line 48, in _process
    result = func(task)
  File "/tmp/ipykernel_70395/285585760.py", line 3, in foo
    1/0
ZeroDivisionError: division by zero

From which concept at what place in the code the exception occurred. Let's add a timeout of 1.5 seconds for each process. If the process execution time exceeds 1.5 seconds, an appropriate exception will be raised and handled. In this case, the process will restart and continue to work (thanks to pebble)

if __name__=='__main__':
	res = progress_map(foo, range(20), process_timeout=1.5, n_cpu=8)

print(res)
	[0, 1, 2, 3, 4, ZeroDivisionError('division by zero'), 6, 7, 8, 9, 'function foo took longer than 1.5 s.', 
	11, 12, 13, 14, 15, 16, ZeroDivisionError('division by zero'), 18, 19]

Exception handling has also been added to methods progress_imap and progress_imapu.

Changelog

New in version 1.3

  • added maxtaskperchild keyword parameter to the progress_map/starmap/imap/imapu function (default=None)

New in version 1.2

  • Added progress_starmap function. An extension of the starmap method of the Pool class.
  • Improved documentation.

New in version 1.1

  1. The bar_step keyword argument is no longer used and will be removed in a future version
  2. Added need_serialize boolean keyword argument to the progress_map/imap/imapu function (default False). Requires dill to be installed. If True the target function is serialized using dill library. Thus, as a target function, you can now use lambda functions, class methods and other callable objects that pickle cannot serialize
  3. Added dynamic optimization of the progress bar refresh rate. This can significantly improve the performance of the progress_map/imap/imapu functions ror very long iterables and small execution time of one task by the objective function.

New in version 1.0

  1. The "ignore" value of the error_behavior key parameter is no longer supported.
  2. Default value of key parameter error_behavior changed to "raise".
  3. The pebble module is no longer used.
  4. Added key parameter executor in the functions progress_map, progress_imap and progress_imapu. Must be one of the values:
    • "threads" - use thread pool
    • "processes" - use processes pool (default)

New in version 0.3.0

  1. The error_behavior keyword argument has been added to the progress_map, progress_imap and progress_imapu methods. Must be one of the values: "raise", "ignore", "coerce".
    • "raise" - raise an exception thrown in the process pool.
    • "ignore" - ignore the exceptions that occur. Do not add anything to the result
    • "coerce" - handle the exception. The result will include the value set by the parameter set_error_value (by default None - the traceback of the raised exception will be added to the result)
  2. The set_error_value keyword argument has been added to the progress_map, progress_imap and progress_imapu methods.

Example of usage

import time
import resource as rs
from parallelbar import progress_imap


def memory_limit(limit):
    soft, hard = rs.getrlimit(rs.RLIMIT_AS)
    rs.setrlimit(rs.RLIMIT_AS, (limit, hard))


def my_awesome_foo(n):
    if n == 0:
        s = 'a' * 10000000
    elif n == 20:
        time.sleep(100)
    else:
        time.sleep(1)
    return n


if __name__ == '__main__':
    tasks = range(30)
    start = time.monotonic()
    result = progress_imap(my_awesome_foo, tasks, 
                           process_timeout=1.5, 
                           initializer=memory_limit, 
                           initargs=(100,),
                           n_cpu=4,
                           error_behavior='coerce',
                           set_error_value=None,
                           )
    print(f'time took: {time.monotonic() - start:.1f}')
    print(result)

time took: 8.2
[MemoryError(), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
16, 17, 18, 19, TimeoutError('function "my_awesome_foo" took longer than 1.5 s.'), 21, 22, 23, 24, 25, 26, 27, 28, 29]

Set NaN instead of tracebacks to the result of the pool operation:

if __name__ == '__main__':
    tasks = range(30)
    start = time.monotonic()
    result = progress_imap(my_awesome_foo, tasks, 
                           process_timeout=1.5, 
                           initializer=memory_limit, 
                           initargs=(100,),
                           n_cpu=4,
                           error_behavior='coerce',
                           set_error_value=float('nan'),
                           )
    print(f'time took: {time.monotonic() - start:.1f}')
    print(result)

time took: 8.0
[nan, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
16, 17, 18, 19, nan, 21, 22, 23, 24, 25, 26, 27, 28, 29]

Let's ignore exception:

if __name__ == '__main__':
    tasks = range(30)
    start = time.monotonic()
    result = progress_imap(my_awesome_foo, tasks, 
                           process_timeout=1.5, 
                           initializer=memory_limit, 
                           initargs=(100,),
                           n_cpu=4,
                           error_behavior='ignore',
                           set_error_value=None,
                           )
    print(f'time took: {time.monotonic() - start:.1f}')
    print(result)

time took: 8.0
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
16, 17, 18, 19, 21, 22, 23, 24, 25, 26, 27, 28, 29]

Problems of the naive approach

Why can't I do something simpler? Let's take the standard imap method and run through it in a loop with tqdm and take the results from the processes:

from multiprocessing import Pool
from tqdm.auto import tqdm
if __name__=='__main__':
    with Pool() as p:
        tasks = [20 + i for i in range(15)]
        pool = p.imap(fibonacci, tasks)
        result = []
        for i in tqdm(pool, total=len(tasks)):
            result.append(i)

It looks good, doesn't it? But let's do the following, make the first task very difficult for the core. To do this, I will insert the number 38 at the beginning of the tasks list. Let's see what happens

if __name__=='__main__':
    with Pool() as p:
        tasks = [20 + i for i in range(15)]
        tasks.insert(0, 39)
        pool = p.imap_unordered(fibonacci, tasks)
        result = []
        for i in tqdm(pool, total=len(tasks)):
            result.append(i)

This is a fiasco. Our progress hung on the completion of the first task and then at the end showed 100% progress. Let's try to do the same experiment only for the progress_imap function:

if __name__=='__main__':
    tasks = [20 + i for i in range(15)]
    tasks.insert(0, 39)
    result = progress_imap(fibonacci, tasks)

The progress_imap function takes care of collecting the result and closing the process pool for you. In fact, the naive approach described above will work for the standard imap_unordered method. But it does not guarantee the order of the returned result. This is often critically important.

License

MIT license

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

parallelbar-1.3.1.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

parallelbar-1.3.1-py3-none-any.whl (9.8 kB view details)

Uploaded Python 3

File details

Details for the file parallelbar-1.3.1.tar.gz.

File metadata

  • Download URL: parallelbar-1.3.1.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.6

File hashes

Hashes for parallelbar-1.3.1.tar.gz
Algorithm Hash digest
SHA256 0a4d82e46f0f7b5f9605fecce0ecaf2e7dad1003ff4a5d6dd47a6d746935771b
MD5 a730e9d818626726b7e007a6dfa2708c
BLAKE2b-256 1c1c22b407ac10a540e5e53f8df9af249245cb40895e69e20595c437edb7a8f6

See more details on using hashes here.

File details

Details for the file parallelbar-1.3.1-py3-none-any.whl.

File metadata

  • Download URL: parallelbar-1.3.1-py3-none-any.whl
  • Upload date:
  • Size: 9.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.6

File hashes

Hashes for parallelbar-1.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6f8d8f8986722a8b596b72586525f29fb3cab7fdd37d9d2817730bf44235be93
MD5 c4fb4d7f659ac275bcae6275756fa906
BLAKE2b-256 a3d20389e2cda94cdebccad4ad515316f5dd080fe99e645a8a07d49fb2229be0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page