Skip to main content

Parallel processing with progress bars

Project description

Parallelbar

Parallelbar displays the progress of tasks in the process pool for methods such as map, imap and imap_unordered. Parallelbar is based on the tqdm module and the standard python multiprocessing library. Starting from version 0.2.0, the ProcessPoll class of the pebble library is used to implement the map method. Thanks to this, it became possible to handle exceptions that occur within a separate process and also set a timeout for the execution of a task by a process.

Installation

pip install parallelbar
or
pip install --user git+https://github.com/dubovikmaster/parallelbar.git

Usage

from parallelbar import progress_imap, progress_map, progress_imapu
from parallelbar.tools import cpu_bench, fibonacci

Let's create a list of 100 numbers and test progress_map with default parameters on a toy function cpu_bench:

tasks = [1_000_000 + i for i in range(100)]
%%time
list(map(cpu_bench, tasks))
Wall time: 52.6 s

Ok, by default this works on one core of my i7-9700F and it took 52 seconds. Let's parallelize the calculations for all 8 cores and look at the progress. This can be easily done by replacing standart function map with progress_map.

if __name__=='__main__':
    progress_map(cpu_bench, tasks)

Core progress:

Great! We got an acceleration of 6 times! We were also able to observe the process What about the progress on the cores of your cpu?

if __name__=='__main__':
    tasks = [5_000_00 + i for i in range(100)]
    progress_map(cpu_bench, tasks, n_cpu=4, chunk_size=1, core_progress=True)

You can also easily use progress_imap and progress_imapu analogs of the imap and imap_unordered methods of the Pool() class

%%time
if __name__=='__main__':
    tasks = [20 + i for i in range(15)]
    result = progress_imap(fibonacci, tasks, chunk_size=1, core_progress=False)

New in version 0.2.0

Thanks to the pebble, it is now possible to handle exceptions and set timeouts for the execution of tasks by the process in the progress_map function.
Consider the following toy example:

def foo(n):
    if n==5 or n==17:
        1/0
    elif n==10:
        time.sleep(2)
    else:
        time.sleep(1)
    return n
if __name__=='__main__':
	res = progress_map(foo, range(20), process_timeout=5, n_cpu=8)

As you can see, under the main progress bar, another progress bar has appeared that displays the number of tasks that ended unsuccessfully. At the same time, the main bar turned orange, as if signaling something went wrong

print(res)
	[0, 1, 2, 3, 4, ZeroDivisionError('division by zero'), 6, 7, 8, 9, 10, 11, 12,
     13, 14, 15, 16, ZeroDivisionError('division by zero'), 18, 19]

In the resulting array, we have exceptions in the corresponding places. Also, we can see the exception traceback:

print(res[5].traceback)
Traceback (most recent call last):
  File "/home/padu/anaconda3/envs/work/lib/python3.9/site-packages/pebble/common.py", line 174, in process_execute
    return function(*args, **kwargs)
  File "/home/padu/anaconda3/envs/work/lib/python3.9/site-packages/parallelbar/parallelbar.py", line 48, in _process
    result = func(task)
  File "/tmp/ipykernel_70395/285585760.py", line 3, in foo
    1/0
ZeroDivisionError: division by zero

From which concept at what place in the code the exception occurred. Let's add a timeout of 1.5 seconds for each process. If the process execution time exceeds 1.5 seconds, an appropriate exception will be raised and handled. In this case, the process will restart and continue to work (thanks to pebble)

if __name__=='__main__':
	res = progress_map(foo, range(20), process_timeout=1.5, n_cpu=8)

print(res)
	[0, 1, 2, 3, 4, ZeroDivisionError('division by zero'), 6, 7, 8, 9, 'function foo took longer than 1.5 s.', 
	11, 12, 13, 14, 15, 16, ZeroDivisionError('division by zero'), 18, 19]

Exception handling has also been added to methods progress_imap and progress_imapu.

New in version 0.2.10

  1. The process_timeout keyword argument has been added to the progress_imap and progress_imapu methods (can be used only if chunk_size=1!)
  2. The stopit_after_timeout decorator has been added to the tools module.
  3. Fixed a bug that caused the progrees_imap and progress_imapu methods to hang if chunk_size > 1

Example of using a decorator

from parallelbar.tools import stopit_after_timeout
import time

# abort function execution after timeout in seconds
@stopit_after_timeout(1, raise_exception=False)
def foo():
    for _ in range(5):
        time.sleep(2)
        
if __name__ == '__main__':
    start = time.time()
    print(foo())
    print(f'time took {time.time()-start} s.')
time took  1.010124 s.
'function foo took longer than 1 s.'

As you can see, instead of 5 seconds of execution, the function was interrupted after 1 second of timeout. If raise_exception=True, a TimeoutError exception will be raised.

Problems of the naive approach

Why can't I do something simpler? Let's take the standard imap method and run through it in a loop with tqdm and take the results from the processes:

from multiprocessing import Pool
from tqdm.auto import tqdm
if __name__=='__main__':
    with Pool() as p:
        tasks = [20 + i for i in range(15)]
        pool = p.imap(fibonacci, tasks)
        result = []
        for i in tqdm(pool, total=len(tasks)):
            result.append(i)

It looks good, doesn't it? But let's do the following, make the first task very difficult for the core. To do this, I will insert the number 38 at the beginning of the tasks list. Let's see what happens

if __name__=='__main__':
    with Pool() as p:
        tasks = [20 + i for i in range(15)]
        tasks.insert(0, 39)
        pool = p.imap_unordered(fibonacci, tasks)
        result = []
        for i in tqdm(pool, total=len(tasks)):
            result.append(i)

This is a fiasco. Our progress hung on the completion of the first task and then at the end showed 100% progress. Let's try to do the same experiment only for the progress_imap function:

if __name__=='__main__':
    tasks = [20 + i for i in range(15)]
    tasks.insert(0, 39)
    result = progress_imap(fibonacci, tasks)

The progress_imap function takes care of collecting the result and closing the process pool for you. In fact, the naive approach described above will work for the standard imap_unordered method. But it does not guarantee the order of the returned result. This is often critically important.

License

MIT license

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

parallelbar-0.2.14.tar.gz (7.3 kB view details)

Uploaded Source

Built Distribution

parallelbar-0.2.14-py3-none-any.whl (7.7 kB view details)

Uploaded Python 3

File details

Details for the file parallelbar-0.2.14.tar.gz.

File metadata

  • Download URL: parallelbar-0.2.14.tar.gz
  • Upload date:
  • Size: 7.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.9

File hashes

Hashes for parallelbar-0.2.14.tar.gz
Algorithm Hash digest
SHA256 5ffbc4fe242fdec536e603800336e2b0594df654b5a2f7a3c254dfbd63287e29
MD5 4d2bd4cb9ccb50c76e86f430b4c3ec02
BLAKE2b-256 081ed30423d2ed07781992c4a580a89f5263325699cc14dbca8de953fa5e5a98

See more details on using hashes here.

File details

Details for the file parallelbar-0.2.14-py3-none-any.whl.

File metadata

File hashes

Hashes for parallelbar-0.2.14-py3-none-any.whl
Algorithm Hash digest
SHA256 c256113bc4e7f7b0844ec4da6890b4a29d09b6cdfa24cd42b4462aa53912fcc8
MD5 850f792cf94cee77fdf005802e2755c4
BLAKE2b-256 f70188db75201912f517971bcf2abd13447bc52bd5068050f1da1a3d32f9cd0f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page