Easy Parallel Multiprocessing
Project description
ez-parallel
Installation
With pip
or pip3
:
pip install -U ez-parallel
or
pip install ez-parallel
With Poetry
:
poetry add ez-parallel
Usage
- Process a list of items by using parallel workers
- Define what a worker does
- Define how to iterate through the data
- Just run
- Display a global progress bar
- Does the same for multithread
Multithread vs Multiprocessing
In multiprocessing, new processes will be launched, they won't share memory. The user should implement a way
to store the results of a worker and gather these results when multiprocess()
returns.
With multithreading, new threads will be launched, they all share the memory of the parent process. This also restricts the runtime to a single CPU-core, as threads from a process do not get allocated to different cores. There will be no performance improvement when the distributed work is CPU-bound.
How to choose? (guidelines)
- CPU-heavy (data transformation, data preprocessing): multiprocessing.
- IO-heavy (DB requests, File I/O): multithreading.
Examples
How to process a list?
import time
from ez_parallel import list_iterator, queue_worker, multiprocess
@queue_worker
def work_one_thing(x: int) -> int:
# do something
a = x + 2
time.sleep(0.1)
# Worked on ONE thing = return 1
return 1
# Data
things_to_process = list(range(1000000))
# Create the iterator over the things to process
iter_fn, nb_things = list_iterator(things_to_process)
# Process all the things in parallel with 20 processes
multiprocess(
worker_fn=work_one_thing,
input_iterator_fn=iter_fn,
total=nb_things,
nb_workers=20,
description='Process the things'
)
How to Process a list by batch?
import time
from typing import List
from ez_parallel import batch_iterator_from_sliceable, queue_worker, multiprocess
@queue_worker
def work_one_thing(x: List[int]) -> int:
# do something
a = [y + 2 for y in x]
time.sleep(0.1)
# Worked on ONE thing = return 1
return len(x)
# Data
things_to_process = list(range(1000000))
# Create the iterator over the things to process
# This will yield batches of 128 things
iter_fn = batch_iterator_from_sliceable(items=things_to_process, batch_size=128)
nb_things = len(things_to_process)
# Process all the things in parallel with 20 processes
multiprocess(
worker_fn=work_one_thing,
input_iterator_fn=iter_fn,
total=nb_things,
nb_workers=20,
description='Process the things'
)
How to Process a list by batch in multithread?
import time
from typing import List
from ez_parallel import batch_iterator_from_sliceable, queue_worker, multithread
@queue_worker
def work_one_thing(x: List[int]) -> int:
# do something
a = [y + 2 for y in x]
time.sleep(0.1)
# Worked on ONE thing = return 1
return len(x)
# Data
things_to_process = list(range(1000000))
# Create the iterator over the things to process
# This will yield batches of 128 things
iter_fn = batch_iterator_from_sliceable(items=things_to_process, batch_size=128)
nb_things = len(things_to_process)
# Process all the things in parallel with 20 processes
multithread(
worker_fn=work_one_thing,
input_iterator_fn=iter_fn,
total=nb_things,
nb_workers=20,
description='Process the things'
)
How to collect results in multiprocessing?
(Suggestion using temporary files) In this scenario, results are recorded as JSONL files, the final result is the concatenation of all files.
import glob
import json
import os
import random
import shutil
import string
import tempfile
from typing import List
from ez_parallel import batch_iterator_from_sliceable, queue_worker, multithread
def random_file_name() -> str:
return ''.join(random.choices(string.ascii_letters, k=32))
# All processes write in the same file
# The OS will deal with concurrent access
tmp_file = os.path.join(tempfile.gettempdir(), random_file_name())
@queue_worker
def work_one_thing(x: List[int]) -> int:
# This call is blocking until the file can be written
with open(tmp_file, 'a') as out:
for number in x:
out.write(json.dumps({"number": number, "square": number ** 2}) + '\n')
# Worked on ONE thing = return 1
return len(x)
# Data
things_to_process = list(range(1000000))
# Create the iterator over the things to process
# This will yield batches of 128 things
iter_fn = batch_iterator_from_sliceable(items=things_to_process, batch_size=128)
nb_things = len(things_to_process)
# Process all the things in parallel with 20 processes
multithread(
worker_fn=work_one_thing,
input_iterator_fn=iter_fn,
total=nb_things,
nb_workers=20,
description='Process the things'
)
# Collect all the data
with open(tmp_file, 'r') as src:
data = [json.loads(line) for line in src]
# Delete temporary file
os.remove(tmp_file)
How to collect results in multithreading
A lot easier and straightforward, because all the threads share the same memory.
from typing import List
from ez_parallel import batch_iterator, queue_worker, multithread
# List are threadsafe in Python
results = []
@queue_worker
def work_one_thing(x: List[int]) -> int:
# do something
results.extend({"number": y, "square": y ** 2} for y in x)
# Worked on ONE thing = return 1
return len(x)
# Data
things_to_process = list(range(1000000))
# Create the iterator over the things to process
# This will yield batches of 128 things
iter_fn, nb_things = batch_iterator(items=things_to_process, batch_size=128)
# Process all the things in parallel with 20 processes
multithread(
worker_fn=work_one_thing,
input_iterator_fn=iter_fn,
total=nb_things,
nb_workers=20,
description='Process the things'
)
print(len(results))
🛡 License
This project is licensed under the terms of the MIT
license. See LICENSE for more details.
📃 Citation
@misc{ez-parallel,
author = {Julien Rossi},
title = {Easy Parallel Multiprocessing},
year = {2021},
publisher = {GitHub},
journal = {GitHub repository},
howpublished = {\url{https://github.com/j-rossi-nl/ez-parallel}}
}
Credits
This project was generated with python-package-template
.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file ez-parallel-0.1.11.tar.gz
.
File metadata
- Download URL: ez-parallel-0.1.11.tar.gz
- Upload date:
- Size: 11.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.6 CPython/3.7.10 Linux/5.4.0-88-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5185e492701dad2de2b9c47534327fbb0b396e71d2f15f5720269243ad0d95f4 |
|
MD5 | dd0a04da79ed943ee43c5719fc4abcc6 |
|
BLAKE2b-256 | 6210fc6f5bfbfd9ea24080520b86ebfa19497ae8bff8342de544ab93501de2d7 |
File details
Details for the file ez_parallel-0.1.11-py3-none-any.whl
.
File metadata
- Download URL: ez_parallel-0.1.11-py3-none-any.whl
- Upload date:
- Size: 9.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.6 CPython/3.7.10 Linux/5.4.0-88-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b9f589712ee3b4923ead77baf5a09cb0d6b6cdcd32bfe594158195740a172962 |
|
MD5 | 868bb0ad23e35c26db3294b35400ac8f |
|
BLAKE2b-256 | 7799d273cd95a50b4ce275b8ca653e8be154390ff3df525ff4b77f6dbe8eb9c5 |