Provides a @distribute decorator that enables concurrent execution of functions without boilerplate code.
Project description
Overview
Haven't planned your functions for large workloads or don't feel comfortable with various options for concurrency?
Please, don't be sad - just distribute.
Installation
pip install just-distibute
TL;DR
from just_distribute import distribute
@distribute(job='compute', workers=8) # job in ('compute', 'io', 'web', 'ray')
def your_time_consuming_func(*args):
...
Getting Started
Always make sure your function you want to distribute has proper typehints, because just_distribute makes some assumptions based on type annotations. Also, data to be distributed shall be passed as positional arguments, keyword arguments are treated as constants
CPU intensive tasks
Instead of:
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
...
# slow, probably need to rewrite it ;(
results = []
for const1, const2 in range(1000), range(4000, 2000, -2):
results.append(
some_existing_cpu_intensive_function(const1, const2)
)
Do:
from just_distribute import distribute
@distribute(job='compute', workers=8)
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
...
# <happy CPU fan noises>
results = some_existing_cpu_intensive_function(range(1000), range(4000, 2000, -2))
I/O intensive tasks
Instead of:
def some_existing_io_intensive_function(data_to_write: bytes, filename: str, verbose: bool = True):
...
# slow, probably need to rewrite it ;(
data_store: dict = ... # some processed data to save
for name, data in data_store.items():
some_existing_io_intensive_function(data, name)
Do:
from just_distribute import distribute
@distribute(job='io', workers=8)
def some_existing_io_intensive_function(data_to_write: bytes, filename: str, verbose: bool = True):
...
data_store: dict = ... # some processed data to save
# <happy HDD noises???>
# any keyword arguments are not distributed :)
some_existing_io_intensive_function(data_store.values(), data_store.keys(), verbose=False)
Somewhere over the network :guitar:
Instead of:
def some_existing_web_requesting_function(data_to_send: dict, url: str, api_key: str):
...
# slow, probably need to rewrite it ;(
data_store: list[dict] = ... # some data to process on a remote service
for data in data_store:
some_existing_web_requesting_function(data, url="https://some_web_api.com/process", api_key="***")
Do:
from just_distribute import distribute
@distribute(job='web', workers=8)
def some_existing_web_requesting_function(data_to_send: dict, url: str, api_key: str):
...
data_store: list[dict] = ... # some data to process on a remote service
# <happy router blinking>
some_existing_web_requesting_function(data_store, url="https://some_web_api.com/process", api_key="***")
Or in the existing Ray cluster
Instead of:
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
...
# slow, probably need to rewrite it ;(
results = []
for const1, const2 in range(1000), range(4000, 2000, -2):
results.append(
some_existing_cpu_intensive_function(const1, const2)
)
Do:
from just_distribute import distribute
@distribute(job='ray')
def some_existing_cpu_intensive_function(x: int, y: int) -> int:
...
# <happy CPU fan noises on the cluster's host>
results = some_existing_cpu_intensive_function(range(1000), range(4000, 2000, -2))
For instruction how to set up Ray cluster on bare metal or in the cloud, see: Ray documentation
More advanced cases
When wrapped function by default takes iterable, autobatch takes care of it:
from just_distribute import distribute
@distribute(job='compute', workers=8, autobatch=True) # default autobatch is True, so you can just omit this parameter
def intensive_computation(numbers: list[int]):
...
a: list[int] = ...
intensive_computation(a) # works fine
When wrapped function by default takes equal length iterables:
from just_distribute import distribute
@distribute(job='compute', workers=8, autobatch=False) # default autobatch is True
def intensive_computation(numbers1: list[int], numbers2: list[int]):
for n1, n2 in zip(numbers1, numbers2):
...
a: list[int] = ...
b: list[int] = ...
intensive_computation(a, b) # TypeError: 'int' object is not iterable -> because autobatch is off
# and wrapped function takes iterables as an input
# manually batched
a: list[list[int]] = ...
b: list[list[int]] = ...
assert len(a) == len(b) # True
assert all([len(_a) == len(_b) for _a, _b in zip(a, b)]) # True -> properly, manually batched data
intensive_computation(a, b) # works fine
# or just use default autobatch=True
a: list[int] = ...
b: list[int] = ...
intensive_computation(a, b) # works fine
When wrapped function by default takes possibly different length iterables:
from just_distribute import distribute
from itertools import product
@distribute(job='compute', workers=8, autobatch=False) # default autobatch is True
def intensive_computation(numbers1: list[int], numbers2: list[int]):
for n1, n2 in product(numbers1, numbers2):
...
# manually batched
a: list[list[int]] = ...
b: list[list[int]] = ...
intensive_computation(a, b) # works fine
# or autobatch=True
a: list[int] = ...
b: list[int] = ...
intensive_computation(a, numbers2=b) # works fine in this certain example, because autobatch takes care of numbers1
# and numbers2 is treated as a constant
When wrapped function has mixed type, non-constant (in distribute sense) parameters:
from just_distribute import distribute
from collections.abc import Iterable
@distribute(job='compute', workers=8)
def intensive_computation(numbers: list[int], power: int, verbose: bool = True):
...
a = list(range(1000)) * 100
b = range(100)
assert len(a) > len(b)
assert len(a) % len(b) == 0 # for every element in b there is N elements in a
intensive_computation(a, b, verbose=False) # works fine
# or autobatch=False and data manually batched
a: list[list[int]] = ...
b: list[int] = ...
intensive_computation(a, b, verbose=False) # works fine
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for just_distribute-0.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | a76355ff260f5f7b660162913f4fff6870e65e6a95becd661bf6d88818fff8cd |
|
MD5 | 50ab0fb2a45874e998d2c1495881cc16 |
|
BLAKE2b-256 | 8a81cc69dd3bb69bfb0e0f06ff12c41952d265cd16087e43acb3d508752a6011 |