Skip to main content

Parallel batch processing on top of regular python functions

Project description

pbatch

travis codecov pypi license

Parallel batch processing on top of regular python functions

Installation

Requires python 3.7+

pip install pbatch

Usage

pbatch.pmap

Similar to built-in map, but executes the function in parallel. Number of concurrent executions can be limited through a chunk_size keyword argument.

import time
import pbatch

def long_square(x):
    time.sleep(1)
    print(x)
    return x ** 2

list(map(long_square, [1, 2, 3]))
# 1
# 2
# 3
# => [1, 4, 9] (after 3 seconds)

list(pbatch.pmap(long_square, [1, 2, 3]))
# 1
# 2
# 3
# => [1, 4, 9] (after 1 second)

list(pbatch.pmap(long_square, [1, 2, 3], chunk_size=2))
# 1
# 2
# 3
# => [1, 4, 9] (after 2 seconds)

Supports multiple-arity functions exactly as map does:

import time
import pbatch

def multiple_args(a, b, c):
    print(f"a={a}, b={b}, c={c})
    time.sleep(1)
    return c

list(map(multiple_args, [1, 2], [60, 70], [1000, 2000]))
# a=1, b=60, c=1000
# a=2, b=70, c=2000
# => [1000, 2000] (after 2 seconds)

list(pbatch.pmap(multiple_args, [1, 2], [60, 70], [1000, 2000]))
# a=1, b=60, c=1000
# a=2, b=70, c=2000
# => [1000, 2000] (after 1 second)

list(pbatch.pmap(multiple_args, [1, 2], [60, 70], [1000, 2000], chunk_size=1))
# a=1, b=60, c=1000
# a=2, b=70, c=2000
# => [1000, 2000] (after 2 second)

Note that if one iterable is shorter than the rest, remaining elements in the other iterators will be ignored.

If an exception is raised when processing an item, the remaining elements in the current chunk will be completed and then a pbatch.PMapException will be raised, including the results and exception from the current chunk.

If any of the subtasks raises an exception, a pbatch.PMapException will be raised:

def raise_on_two(x):
    if x == 2:
        raise ValueError("Number is two")
    return x

try:
    list(pbatch.pmap(raise_on_two, [1, 2, 3]))
except pbatch.PMapException as e:
    e.results
    # => [1, ValueError("Number is two"), 3]

    e.exceptions
    # => [ValueError("Number is two")]

    str(e)
    # => "[1, ValueError('Number is two'), 3]"

    repr(e)
    # => "[1, ValueError('Number is two'), 3]"

If directly converting the results to a list, as above, and an exception is raised after the first chunk successfully completes, the results from the first chunk will be forgotten. If such results are important, it is better to manually process each item out of the generator, as chunks are generated:

results = []
try:
    for result in pbatch.pmap(...):
        results.append(result)
except pbatch.PMapException as e:
    results.extend(e.results)

After executing, results will contain all results that were successfully processed without raising an exception.

Alternatively, wrap the function being mapped in a try/except block to have more full control over when a PMapException will be raised.

pbatch.postpone

Begin execution of a function without blocking code execution (until .wait() is called)

import time
import pbatch

def long_function(x, power=2):
    time.sleep(1)
    return x ** power

postponement = pbatch.postpone(long_function, 3, power=3)
time.sleep(1)
result = postponement.wait()  # does not wait 1 second anymore

pbatch.partition

Split up an iterable into fixed-sized chunks (except the final chunk in some cases)

Returns a generator that yields lists of elements (chunks)

import pbatch

partitions = list(pbatch.partition([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], chunk_size=4))
# => [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10]]

Chunks are lazily generated:

def print_return(x):
    print(x)
    return x

next(pbatch.partition(map(print_return, range(10)), 4))
# 0
# 1
# 2
# 3
# => [0, 1, 2, 3]

Development

Clone the repo, then from the project directory:

python3.7 -m venv .venv
. .venv/bin/activate

make install-dev

To run tests (and show coverage):

make test

Before making changes, fix formatting and check changes (isort, black, flake8, mypy:

make format check

0.2.0 2020-09-22

(not backwards compatible)

  • make pmap return a generator, so that the entire result set does not necessarily have to be in memory at the same time

0.1.1 2020-09-20

  • Remove deprecated loop argument from asyncio.wait (fixes warning)

0.1.0 2020-09-20

  • Initial release

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pbatch-0.2.0.tar.gz (6.5 kB view details)

Uploaded Source

Built Distribution

pbatch-0.2.0-py3-none-any.whl (6.4 kB view details)

Uploaded Python 3

File details

Details for the file pbatch-0.2.0.tar.gz.

File metadata

  • Download URL: pbatch-0.2.0.tar.gz
  • Upload date:
  • Size: 6.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.0 requests-toolbelt/0.9.1 tqdm/4.49.0 CPython/3.7.9

File hashes

Hashes for pbatch-0.2.0.tar.gz
Algorithm Hash digest
SHA256 56ee87c309f21a5c2968fbe5be21ec65cb029ea5b707e9e3c86e8540fa312361
MD5 16df5ef5dc389e938bf4be82479ccfd5
BLAKE2b-256 47ec3029d1e16cb1e3866bab0720033c5a28c16ec6f63f46e29492da9ed888bc

See more details on using hashes here.

File details

Details for the file pbatch-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pbatch-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 6.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/50.3.0 requests-toolbelt/0.9.1 tqdm/4.49.0 CPython/3.7.9

File hashes

Hashes for pbatch-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0edfa0d6e6c960636b6ab9c2b15fb70e906318a205648ba5849009102b000528
MD5 84929406e6d7a70adf977adf25a69dc0
BLAKE2b-256 8d4a4156d87f4682e1048183cad493b244b0ecad5f012dc324a7301c46b278cf

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page