Skip to main content

Parallel batch processing on top of regular python functions

Project description

pbatch

travis codecov pypi license

Parallel batch processing on top of regular python functions

Installation

Requires python 3.7+

pip install pbatch

Usage

pbatch.pmap

Similar to built-in map, but executes the function in parallel. Number of concurrent executions can be limited through a chunk_size keyword argument.

import time
import pbatch

def long_square(x):
    time.sleep(1)
    print(x)
    return x ** 2

list(map(long_square, [1, 2, 3]))
# 1
# 2
# 3
# => [1, 4, 9] (after 3 seconds)

list(pbatch.pmap(long_square, [1, 2, 3]))
# 1
# 2
# 3
# => [1, 4, 9] (after 1 second)

list(pbatch.pmap(long_square, [1, 2, 3], chunk_size=2))
# 1
# 2
# 3
# => [1, 4, 9] (after 2 seconds)

Supports multiple-arity functions exactly as map does:

import time
import pbatch

def multiple_args(a, b, c):
    print(f"a={a}, b={b}, c={c})
    time.sleep(1)
    return c

list(map(multiple_args, [1, 2], [60, 70], [1000, 2000]))
# a=1, b=60, c=1000
# a=2, b=70, c=2000
# => [1000, 2000] (after 2 seconds)

list(pbatch.pmap(multiple_args, [1, 2], [60, 70], [1000, 2000]))
# a=1, b=60, c=1000
# a=2, b=70, c=2000
# => [1000, 2000] (after 1 second)

list(pbatch.pmap(multiple_args, [1, 2], [60, 70], [1000, 2000], chunk_size=1))
# a=1, b=60, c=1000
# a=2, b=70, c=2000
# => [1000, 2000] (after 2 second)

Note that if one iterable is shorter than the rest, remaining elements in the other iterators will be ignored.

If an exception is raised when processing an item, the remaining elements in the current chunk will be completed and then a pbatch.PMapException will be raised, including the results and exception from the current chunk.

If any of the subtasks raises an exception, a pbatch.PMapException will be raised:

def raise_on_two(x):
    if x == 2:
        raise ValueError("Number is two")
    return x

try:
    list(pbatch.pmap(raise_on_two, [1, 2, 3]))
except pbatch.PMapException as e:
    e.results
    # => [1, ValueError("Number is two"), 3]

    e.exceptions
    # => [ValueError("Number is two")]

    str(e)
    # => "[1, ValueError('Number is two'), 3]"

    repr(e)
    # => "[1, ValueError('Number is two'), 3]"

If directly converting the results to a list, as above, and an exception is raised after the first chunk successfully completes, the results from the first chunk will be forgotten. If such results are important, it is better to manually process each item out of the generator, as chunks are generated:

results = []
try:
    for result in pbatch.pmap(...):
        results.append(result)
except pbatch.PMapException as e:
    results.extend(e.results)

After executing, results will contain all results that were successfully processed without raising an exception.

Alternatively, wrap the function being mapped in a try/except block to have more full control over when a PMapException will be raised.

pbatch.postpone

Begin execution of a function without blocking code execution (until .wait() is called)

import time
import pbatch

def long_function(x, power=2):
    time.sleep(1)
    return x ** power

postponement = pbatch.postpone(long_function, 3, power=3)
time.sleep(1)
result = postponement.wait()  # does not wait 1 second anymore

pbatch.partition

Split up an iterable into fixed-sized chunks (except the final chunk in some cases)

Returns a generator that yields lists of elements (chunks)

import pbatch

partitions = list(pbatch.partition([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], chunk_size=4))
# => [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10]]

Chunks are lazily generated:

def print_return(x):
    print(x)
    return x

next(pbatch.partition(map(print_return, range(10)), 4))
# 0
# 1
# 2
# 3
# => [0, 1, 2, 3]

Development

Clone the repo, then from the project directory:

python3.7 -m venv .venv
. .venv/bin/activate

make install-dev

To run tests (and show coverage):

make test

Before making changes, fix formatting and check changes (isort, black, flake8, mypy:

make format check

0.2.0 2020-09-22

(not backwards compatible)

  • make pmap return a generator, so that the entire result set does not necessarily have to be in memory at the same time

0.1.1 2020-09-20

  • Remove deprecated loop argument from asyncio.wait (fixes warning)

0.1.0 2020-09-20

  • Initial release

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pbatch-0.2.0.tar.gz (6.5 kB view hashes)

Uploaded Source

Built Distribution

pbatch-0.2.0-py3-none-any.whl (6.4 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page