Parallel batch processing on top of regular python functions
Project description
pbatch
Parallel batch processing on top of regular python functions
Installation
Requires python 3.7+
pip install pbatch
Usage
pbatch.pmap
Similar to built-in map
, but executes the function in
parallel. Number of concurrent executions can be limited through a
chunk_size
keyword argument.
import time
import pbatch
def long_square(x):
time.sleep(1)
print(x)
return x ** 2
list(map(long_square, [1, 2, 3]))
# 1
# 2
# 3
# => [1, 4, 9] (after 3 seconds)
list(pbatch.pmap(long_square, [1, 2, 3]))
# 1
# 2
# 3
# => [1, 4, 9] (after 1 second)
list(pbatch.pmap(long_square, [1, 2, 3], chunk_size=2))
# 1
# 2
# 3
# => [1, 4, 9] (after 2 seconds)
Supports multiple-arity functions exactly as map
does:
import time
import pbatch
def multiple_args(a, b, c):
print(f"a={a}, b={b}, c={c})
time.sleep(1)
return c
list(map(multiple_args, [1, 2], [60, 70], [1000, 2000]))
# a=1, b=60, c=1000
# a=2, b=70, c=2000
# => [1000, 2000] (after 2 seconds)
list(pbatch.pmap(multiple_args, [1, 2], [60, 70], [1000, 2000]))
# a=1, b=60, c=1000
# a=2, b=70, c=2000
# => [1000, 2000] (after 1 second)
list(pbatch.pmap(multiple_args, [1, 2], [60, 70], [1000, 2000], chunk_size=1))
# a=1, b=60, c=1000
# a=2, b=70, c=2000
# => [1000, 2000] (after 2 second)
Note that if one iterable is shorter than the rest, remaining elements in the other iterators will be ignored.
If an exception is raised when processing an item, the remaining
elements in the current chunk will be completed and then a
pbatch.PMapException
will be raised, including the results and
exception from the current chunk.
If any of the subtasks raises an exception, a pbatch.PMapException
will be raised:
def raise_on_two(x):
if x == 2:
raise ValueError("Number is two")
return x
try:
list(pbatch.pmap(raise_on_two, [1, 2, 3]))
except pbatch.PMapException as e:
e.results
# => [1, ValueError("Number is two"), 3]
e.exceptions
# => [ValueError("Number is two")]
str(e)
# => "[1, ValueError('Number is two'), 3]"
repr(e)
# => "[1, ValueError('Number is two'), 3]"
If directly converting the results to a list, as above, and an exception is raised after the first chunk successfully completes, the results from the first chunk will be forgotten. If such results are important, it is better to manually process each item out of the generator, as chunks are generated:
results = []
try:
for result in pbatch.pmap(...):
results.append(result)
except pbatch.PMapException as e:
results.extend(e.results)
After executing, results
will contain all results that were
successfully processed without raising an exception.
Alternatively, wrap the function being mapped in a try/except block to
have more full control over when a PMapException
will be raised.
pbatch.postpone
Begin execution of a function without blocking code execution (until
.wait()
is called)
import time
import pbatch
def long_function(x, power=2):
time.sleep(1)
return x ** power
postponement = pbatch.postpone(long_function, 3, power=3)
time.sleep(1)
result = postponement.wait() # does not wait 1 second anymore
pbatch.partition
Split up an iterable into fixed-sized chunks (except the final chunk in some cases)
Returns a generator that yields lists of elements (chunks)
import pbatch
partitions = list(pbatch.partition([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], chunk_size=4))
# => [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10]]
Chunks are lazily generated:
def print_return(x):
print(x)
return x
next(pbatch.partition(map(print_return, range(10)), 4))
# 0
# 1
# 2
# 3
# => [0, 1, 2, 3]
Development
Clone the repo, then from the project directory:
python3.7 -m venv .venv
. .venv/bin/activate
make install-dev
To run tests (and show coverage):
make test
Before making changes, fix formatting and check changes (isort, black, flake8, mypy:
make format check
0.2.0 2020-09-22
(not backwards compatible)
- make
pmap
return a generator, so that the entire result set does not necessarily have to be in memory at the same time
0.1.1 2020-09-20
- Remove deprecated
loop
argument fromasyncio.wait
(fixes warning)
0.1.0 2020-09-20
- Initial release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pbatch-0.2.0.tar.gz
.
File metadata
- Download URL: pbatch-0.2.0.tar.gz
- Upload date:
- Size: 6.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.0 requests-toolbelt/0.9.1 tqdm/4.49.0 CPython/3.7.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 56ee87c309f21a5c2968fbe5be21ec65cb029ea5b707e9e3c86e8540fa312361 |
|
MD5 | 16df5ef5dc389e938bf4be82479ccfd5 |
|
BLAKE2b-256 | 47ec3029d1e16cb1e3866bab0720033c5a28c16ec6f63f46e29492da9ed888bc |
File details
Details for the file pbatch-0.2.0-py3-none-any.whl
.
File metadata
- Download URL: pbatch-0.2.0-py3-none-any.whl
- Upload date:
- Size: 6.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.0 requests-toolbelt/0.9.1 tqdm/4.49.0 CPython/3.7.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0edfa0d6e6c960636b6ab9c2b15fb70e906318a205648ba5849009102b000528 |
|
MD5 | 84929406e6d7a70adf977adf25a69dc0 |
|
BLAKE2b-256 | 8d4a4156d87f4682e1048183cad493b244b0ecad5f012dc324a7301c46b278cf |