Skip to main content

Easy Parallel Multiprocessing

Project description

ez-parallel

Build status Python Version Dependencies Status

Code style: black Security: bandit Pre-commit Semantic Versions License

Easy Parallel Multiprocessing

Installation

pip install -U ez-parallel

or install with Poetry

poetry add ez-parallel

Usage

  • Process a list of items by using parallel workers
    • Define what a worker does
    • Define how to iterate through the data
    • Just run
  • Display a global progress bar
  • Does the same for multithread

Multithread vs Multiprocessing

In multiprocessing, new processes will be launched, they won't share memory. The user should implement a way to store the results of a worker and gather these results when multiprocess() returns.

With multithreading, new threads will be launched, they all share the memory of the parent process. This also restricts the runtime to a single CPU-core, as threads from a process do not get allocated to different cores. There will be no performance improvement when the distributed work is CPU-bound.

How to choose? (guidelines)

  • CPU-heavy (data transformation, data preprocessing): multiprocessing.
  • IO-heavy (DB requests, File I/O): multithreading.

Examples

How to process a list?

import time

from ez_parallel import list_iterator, queue_worker, multiprocess

@queue_worker
def work_one_thing(x: int) -> int:
  # do something
  a = x + 2
  time.sleep(0.1)
  
  # Worked on ONE thing = return 1
  return 1

# Data
things_to_process = list(range(1000000))

# Create the iterator over the things to process
iter_fn, nb_things = list_iterator(things_to_process)

# Process all the things in parallel with 20 processes
multiprocess(
  worker_fn=work_one_thing,
  input_iterator_fn=iter_fn,
  total=nb_things,
  nb_workers=20,
  description=str('Process the things')
)

How to Process a list by batch?

import time
from typing import List

from ez_parallel import batch_iterator_from_sliceable, queue_worker, multiprocess


@queue_worker
def work_one_thing(x: List[int]) -> int:
  # do something
  a = [y + 2 for y in x]
  time.sleep(0.1)
  
  # Worked on ONE thing = return 1
  return len(x)

# Data
things_to_process = list(range(1000000))

# Create the iterator over the things to process
# This will yield batches of 128 things
iter_fn = batch_iterator_from_sliceable(items=things_to_process, batch_size=128)
nb_things = len(things_to_process)

# Process all the things in parallel with 20 processes
multiprocess(
  worker_fn=work_one_thing,
  input_iterator_fn=iter_fn,
  total=nb_things,
  nb_workers=20,
  description='Process the things'
)

How to Process a list by batch in multithread?

import time
from typing import List

from ez_parallel import batch_iterator_from_sliceable, queue_worker, multithread


@queue_worker
def work_one_thing(x: List[int]) -> int:
  # do something
  a = [y + 2 for y in x]
  time.sleep(0.1)
  
  # Worked on ONE thing = return 1
  return len(x)

# Data
things_to_process = list(range(1000000))

# Create the iterator over the things to process
# This will yield batches of 128 things
iter_fn = batch_iterator_from_sliceable(items=things_to_process, batch_size=128)
nb_things = len(things_to_process)

# Process all the things in parallel with 20 processes
multithread(
  worker_fn=work_one_thing,
  input_iterator_fn=iter_fn,
  total=nb_things,
  nb_workers=20,
  description='Process the things'
)

How to collect results in multiprocessing?

(Suggestion using temporary files) In this scenario, results are recorded as JSONL files, the final result is the concatenation of all files.

import glob
import json
import os
import random
import shutil
import string
import tempfile
from typing import List

from ez_parallel import batch_iterator_from_sliceable, queue_worker, multithread


def random_file_name() -> str:
  return ''.join(random.choices(string.ascii_letters, k=32))  


# Tmp folder
tmp_folder = os.path.join(tempfile.gettempdir(), random_file_name())
os.makedirs(tmp_folder)

@queue_worker
def work_one_thing(x: List[int]) -> int:
  # do something
  tmp_file = os.path.join(tmp_folder, random_file_name())
  with open(tmp_file, 'w') as out:
    for number in x:
      out.write(json.dumps({"number": number, "square": number ** 2}) + '\n')
  
  # Worked on ONE thing = return 1
  return len(x)

# Data
things_to_process = list(range(1000000))

# Create the iterator over the things to process
# This will yield batches of 128 things
iter_fn = batch_iterator_from_sliceable(items=things_to_process, batch_size=128)
nb_things = len(things_to_process)

# Process all the things in parallel with 20 processes
multithread(
  worker_fn=work_one_thing,
  input_iterator_fn=iter_fn,
  total=nb_things,
  nb_workers=20,
  description='Process the things'
)


# concatenate all the temporary files
with open('final_results.jsonl', 'w') as out:
  for f in glob.glob(os.path.join(tmp_folder, '*')):
    with open(f) as src:
      out.write(src.read())  

# Delete temporary files / folder
shutil.rmtree(tmp_folder)      

How to collection results in multithreading

A lot easier and straightforward, because all the threads share the same memory.

from typing import List

from ez_parallel import batch_iterator, queue_worker, multithread

# List are threadsafe in Python
results = []

@queue_worker
def work_one_thing(x: List[int]) -> int:
  # do something
  results.extend({"number": y, "square": y ** 2} for y in x)
  
  # Worked on ONE thing = return 1
  return len(x)

# Data
things_to_process = list(range(1000000))

# Create the iterator over the things to process
# This will yield batches of 128 things
iter_fn, nb_things = batch_iterator(items=things_to_process, batch_size=128)

# Process all the things in parallel with 20 processes
multithread(
  worker_fn=work_one_thing,
  input_iterator_fn=iter_fn,
  total=nb_things,
  nb_workers=20,
  description='Process the things'
)

print(len(results))

🛡 License

License

This project is licensed under the terms of the MIT license. See LICENSE for more details.

📃 Citation

@misc{ez-parallel,
  author = {Julien Rossi},
  title = {Easy Parallel Multiprocessing},
  year = {2021},
  publisher = {GitHub},
  journal = {GitHub repository},
  howpublished = {\url{https://github.com/j-rossi-nl/ez-parallel}}
}

Credits

This project was generated with python-package-template.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ez-parallel-0.1.4.tar.gz (8.7 kB view details)

Uploaded Source

Built Distribution

ez_parallel-0.1.4-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file ez-parallel-0.1.4.tar.gz.

File metadata

  • Download URL: ez-parallel-0.1.4.tar.gz
  • Upload date:
  • Size: 8.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.6 CPython/3.7.10 Linux/5.4.0-74-generic

File hashes

Hashes for ez-parallel-0.1.4.tar.gz
Algorithm Hash digest
SHA256 8c70da4482ac7b0cc9467e3d09a2f0b6843d6d8288d4d58e07cbe8f716e9b7e0
MD5 a2a07c98db819c300b44497920ca02d0
BLAKE2b-256 14a56ed4aa1951661c91831b66dff0d3e7b5f6edc30d7d65f6350be708d6ceb9

See more details on using hashes here.

File details

Details for the file ez_parallel-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: ez_parallel-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 8.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.6 CPython/3.7.10 Linux/5.4.0-74-generic

File hashes

Hashes for ez_parallel-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 d3324c6acb81d0d91a40feb6decfb6b0564a22af8a62b4be02922ebdc537b38c
MD5 063f4752bddea85897ff4ccea9ded0e2
BLAKE2b-256 78d55d6e24e50e46105a8490777525a733a1b74aba692d0a8e964fc431c9c91b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page