A an extension of the multiproc/thread map that supports stateful workers & size-aware iterators.
Project description
Important note
This is a preliminary version: We plan to redesign the API to make easier to use.
Overview
This is a missing piece of the Python multitask (both threads and processes) API, which:
- Permits enjoyable parallelization of iteration through both unsizeable and sizeable collections with a possibility to display
a progress bar of your choice (i.e., not necessarily
tqdm
) - It also supports stateful worker pools, which can be initialized directly in a worker process/thread.
This package is dependency-free and supports both threads and processes. Due to GIL-limitations threads provide only limited parallelization as of now, but this will change in the foreseeable future.
PS: Note that the use of this package (and multiprocessing map in general) in a Jupyter notebook may require changing the process starting behavior (see this sample notebook)
Summary of advantages
A standard worker pools together with map-style distribution of tasks is a convenient abstraction, but it lacks support for initialization of stateful workers (inside each process without copying them from the main process) and treats both sizeable and unsizeable iterators equally. As one core task for deep learners, you can load an embedding-generation model to a specific GPU only once.
Because standard map functions do not use the __len__
function even if it is provided by the input iterable,
one cannot easily display a nice progress bar (going from 0% to 100%).
A nice pqdm package does solve this problem, but it does not support lazy (memory-efficient)
iteration and it has to load all inputs into the memory. py_stateful_map
fixes all these issues
without directly incorporating tqdm.
To summarize py_stateful_map
overcomes the followings shortcomings of the standard API and/or pqdm
without requiring any additional dependencies:
- Pain-free support for stateful workers, which are initialized separately in each process using a worker-specific set of arguments (stateless workers are supported as well).
- Support for lazy, memory-efficient, iterators (unlike
pqdm
). - The package simulates an iterator that has the
__len__
function (unlike standard Python map functions) if an input iterable is sizeable. - Support for pain free handling of exceptions. Exceptions can be just stored in the return object, immediately fired, or fired after all concurrent tasks are finished.
- Support for both ordered and unordered return of results (unlike
pqdm
). Unordered return of results is sometimes more efficient.
Install & Use
To install
pip install py_stateful_map
and use (duplicated from this sample file):
from py_stateful_map import StateFullWorkerPool, WorkerResultWrapper, ExceptionBehavior, ArgumentPassing
# Use of tqdm is optional!
from tqdm import tqdm
class SimpleArithmeticClassWorker:
def __init__(self, proc_id):
print(f'Init. process {proc_id}')
self.proc_id = proc_id
def __call__(self, input_arg):
ret_val = input_arg
for t in range(100_000):
ret_val = (ret_val * ret_val) % 337
return ret_val
if __name__ == '__main__':
N_TASKS = 1000
N_WORKERS = 4
worker_kwarg_dict_arr = []
for pid in range(N_WORKERS):
worker_kwarg_dict_arr.append(dict(proc_id=pid))
input_arr = [k * 10 for k in range(N_TASKS)]
tot_res = 0
with StateFullWorkerPool(num_workers=N_WORKERS,
worker_class_or_func=SimpleArithmeticClassWorker,
worker_kwarg_dict_arr=worker_kwarg_dict_arr,
argument_passing=ArgumentPassing.AS_SINGLE_ARG,
exception_behavior=ExceptionBehavior.DEFERRED,
join_timeout=1) as proc_pool:
# just marking the type
result: WorkerResultWrapper
for result in tqdm(proc_pool.map(input_arr, is_unordered=False)):
# With deferred exceptions, they are simply returned as a part of the result object
if result.exception_obj is not None:
print('Error:', result.exception_obj)
else:
tot_res += result.ret_val
assert result.exception_obj is None
print('Total:', tot_res)
A wrapper example file that can be used to play with all possible options can be found here.
Usage in details
Passing arguments
Python function support two types of arguments: positional and kwargs (as a key-value dictionary).
The function StateFullWorkerPool.map
reads element from an input iterable object and converts them
into arguments for a worker, accordingly. This is done in three ways (similar to ``pqdm```):
- A single positional argument. This is a default behavior that can be enabled explicitly by specifying the argument passing type as
ArgumentPassing.AS_SINGLE_ARG
. In this case, each input element is directly passage to the worker function (as the only argument). - Multiple positional arguments. In this case, we assume that the input iterable contains lists or tuples of equal lengths. These tuples/arrays are interprted as positional arguments.
- Named arguments (KWARGS). In this case, the input iterable should provide dictioinaries where keys correspond to the worker function names.
Implementation of the stateful workers
Stateful workers are implemented as class objects, which implement a __call__
function. Worker
arguments are passed to the class __init__
function. For simplicity, we support passing only KWARG-style arguments,
i.e., these arguments should be provided as dictionaries. A shared argument is passed to all workers during
the initialization. If you specify a worker-specific list of arguments the number of elements must be equal
to the number of workers.
Exception handling
All exceptions generated by the worker are caught. There are three options to handle these further:
- ExceptionBehavior.DEFERRED : just save the exception object and return it to the main process.
- ExceptionBehavior.IMMEDIATE : raise the exception in the main process without waiting for other tasks to finish.
- ExceptionBehavior.IMMEDIATE_WAIT_OTHERS : wait for currently working tasks and raise the exception once they are finished
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for py_stateful_map-0.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | fdf8921e0cfe10a8a0fae3dd5d0d669dbd3c46de27aa5317e2151063d7f9ca71 |
|
MD5 | a0711b757ce28df34ef620a963dc5d1a |
|
BLAKE2b-256 | 53e017c04e1fa06b7e1614428f096021dd4ffe527b8fe4c9ee5fd8cee574a453 |