Skip to main content

Joblib-like interface for parallel GPU computations (e.g. data preprocessing)

Project description

Build Status codecov Documentation Status PyPI version

GPUParallel

Joblib-like interface for parallel GPU computations (e.g. data preprocessing).

import torch
from gpuparallel import GPUParallel, delayed

def perform(idx, device_id, **kwargs):
    tensor = torch.Tensor([idx]).to(device_id)
    return (tensor * tensor).item()

result = GPUParallel(n_gpu=2)(delayed(perform)(idx) for idx in range(5))
print(list(result))  # result: [0.0, 1.0, 4.0, 9.0, 16.0], ordered in accordance with input parameters

Features:

Install

python3 -m pip install gpuparallel
# or
python3 -m pip install git+git://github.com/vlivashkin/gpuparallel.git

Examples

Initialize networks once on worker init

Function init_fn is called on init of every worker. All common resources (e.g. networks) can be initialized here.

from gpuparallel import GPUParallel, delayed

def init(device_id=None, **kwargs):
    global model
    model = load_model().to(device_id)

def perform(img, device_id=None, **kwargs):
    global model
    return model(img.to(device_id))
    
gp = GPUParallel(n_gpu=16, n_workers_per_gpu=2, init_fn=init)
results = gp(delayed(perform)(img) for img in fnames)

Reuse initialized workers

Once workers are initialized, they keep live until GPUParallel object exist. You can perform several queues of tasks without reinitializing worker resources:

gp = GPUParallel(n_gpu=16, n_workers_per_gpu=2, init_fn=init)
overall_results = []
for folder_images in folders:
    folder_results = gp(delayed(perform)(img) for img in folder_images)
    overall_results.extend(folder_results)
del gp  # this will close process pool to free memory

Result is a generator

GPUParallel call returns a generator to use results during caclulations (e.g. for sequential saving ordered results)

import h5py

gp = GPUParallel(n_gpu=16, n_workers_per_gpu=2, preserve_order=True)
result = gp(delayed(perform)(img) for img in images)

with h5py.File('output.h5') as f:
    result_dataset = f.create_dataset('result', shape=(300, 224, 224, 3))

    for idx, result in enumerate(result):
        result_dataset[idx] = result

Auto batching

Use class BatchGPUParallel for auto spliting tensor to workers. flat_result flag de-batches results (works only if single array/tensor returned)

arr = np.zeros((102, 103))
bgpup = BatchGPUParallel(task_fn=task, batch_size=3, flat_result=True, n_gpu=2)
flat_results = np.array(list(bgpup(arr)))

Simple logging from workers

print() inside a worker won't be seen in the main process, but you still can use logging to stderr of the main process. Use log_to_stderr() call to init logging, and log.info(message) to log info from workers

from gpuparallel import GPUParallel, delayed, log_to_stderr, log

log_to_stderr('INFO')

def perform(idx, worker_id=None, device_id=None):
    hi = f'Hello world #{idx} from worker #{worker_id} with {device_id}!'
    log.info(hi)

GPUParallel(n_gpu=2)(delayed(perform)(idx) for idx in range(2))

It will return:

[INFO/Worker-1(cuda:1)]:Hello world #1 from worker #1 with cuda:1!
[INFO/Worker-0(cuda:0)]:Hello world #0 from worker #0 with cuda:0!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gpuparallel-0.2.2.tar.gz (8.6 kB view hashes)

Uploaded Source

Built Distribution

gpuparallel-0.2.2-py3-none-any.whl (8.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page