Skip to main content

dask-pytorch has been renamed dask-pytorch-ddp

Project description

dask-pytorch-ddp

NOTE: dask-pytorch has been renamed to dask-pytorch-ddp!

We issued this final release of dask-pytorch v0.1.2, which is identical to dask-pytorch-ddp v0.2.0, to ease this transition. You can find dask-pytorch-ddp on PyPI at https://pypi.python.org/pypi/dask-pytorch-ddp/.

dask-pytorch-ddp is a Python package that makes it easy to train PyTorch models on Dask clusters using distributed data parallel. The intended scope of the project is

  • bootstrapping PyTorch workers on top of a Dask cluster
  • Using distributed data stores (e.g., S3) as normal PyTorch datasets
  • mechanisms for tracking and logging intermediate results, training statistics, and checkpoints.

At this point, this library and examples provided are tailored to computer vision tasks, but this library is intended to be useful for any sort of PyTorch tasks. The only thing really specific to image processing is the S3ImageFolder dataset class. Implementing a PyTorch dataset (assuming map style random access) outside of images currently requires implementing __getitem__(self, idx: int): and __len__(self): We plan to add more varied examples for other use cases in the future, and welcome PRs extending functionality.

Typical non-dask workflow

A typical example of non-dask PyTorch usage is as follows:

Loading Data

Create an dataset (ImageFolder), and wrap it in a DataLoader

transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(250),
    transforms.ToTensor()
])

whole_dataset = ImageFolder(path, transform=transform)

batch_size = 100
num_workers = 64
indices = list(range(len(data)))
np.random.shuffle(indices)
train_idx = indices[:num]
test_idx = indices[num:num+num]

train_sampler = SubsetRandomSampler(train_idx)
train_loader = DataLoader(data, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers)

Training a Model

Loop over the dataset, and train the model by stepping the optimizer

device = torch.device(0)
net = models.resnet18(pretrained=False)
model = net.to(device)
device_ids = [0]

criterion = nn.CrossEntropyLoss().cuda()
lr = 0.001
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
count = 0
for epoch in range(n_epochs):
    model.train()  # Set model to training mode
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)

        # zero the parameter gradients
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        count += 1

Now on Dask

With dask_pytorch_ddp and PyTorch Distributed Data Parallel, we can train on multiple workers as follows:

Loading Data

Load the dataset from S3, and explicitly set the multiprocessing context (Dask defaults to spawn, but pytorch is generally configured to use fork)

from dask_pytorch_ddp.data import S3ImageFolder

whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)
train_loader = torch.utils.data.DataLoader(
    whole_dataset, sampler=train_sampler, batch_size=batch_size, num_workers=num_workers, multiprocessing_context=mp.get_context('fork')
)

Training in Parallel

Wrap the training loop in a function (and add metrics logging. Not necessary, but very useful). Convert the model into a PyTorch Distributed Data Parallel (DDP) model which knows how to sync gradients together across workers.

import uuid
import pickle
import logging
import json


key = uuid.uuid4().hex
rh = DaskResultsHandler(key)

def run_transfer_learning(bucket, prefix, samplesize, n_epochs, batch_size, num_workers, train_sampler):
    worker_rank = int(dist.get_rank())
    device = torch.device(0)
    net = models.resnet18(pretrained=False)
    model = net.to(device)
    model = DDP(model, device_ids=[0])

    criterion = nn.CrossEntropyLoss().cuda()
    lr = 0.001
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    whole_dataset = S3ImageFolder(bucket, prefix, transform=transform)

    train_loader = torch.utils.data.DataLoader(
        whole_dataset,
        sampler=train_sampler,
        batch_size=batch_size,
        num_workers=num_workers,
        multiprocessing_context=mp.get_context('fork')
    )

    count = 0
    for epoch in range(n_epochs):
        # Each epoch has a training and validation phase
        model.train()  # Set model to training mode
        for inputs, labels in train_loader:
            dt = datetime.datetime.now().isoformat()
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            # zero the parameter gradients
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            count += 1

            # statistics
            rh.submit_result(
                f"worker/{worker_rank}/data-{dt}.json",
                json.dumps({'loss': loss.item(), 'epoch': epoch, 'count': count, 'worker': worker_rank})
            )
            if (count % 100) == 0 and worker_rank == 0:
                rh.submit_result(f"checkpoint-{dt}.pkl", pickle.dumps(model.state_dict()))

How does it work?

dask-pytorch-ddp is largely a wrapper around existing pytorch functionality. pytorch.distributed provides infrastructure for Distributed Data Parallel (DDP).

In DDP, you create N workers, and the 0th worker is the "master", and coordinates the synchronization of buffers and gradients. In SGD, gradients are normally averaged between all data points in a batch. By running batches on multiple workers, and averaging the gradients, DDP enables you to run SGD with a much bigger batch size (N * batch_size)

dask-pytorch-ddp sets some environment variables to configure the "master" host and port, and then calls init_process_group before training, and calls destroy_process_group after training. This is the same process normally done manually by the data scientist.

Multi GPU machines

dask_cuda_worker automatically rotates CUDA_VISIBLE_DEVICES for each worker it creates (typically one per GPU). As a result, your PyTorch code should always start with the 0th GPU.

For example, if I have an 8 GPU machine, the 3rd worker will have CUDA_VISIBLE_DEVICES set to 2,3,4,5,6,7,0,1. On that worker, if I call torch.device(0), I will get GPU 2.

What else?

dask-pytorch-ddp also implements an S3 based ImageFolder. More distributed friendly datasets are planned. dask-pytorch-ddp also implements a basic results aggregation framework so that it is easy to collect training metrics across different workers. Currently, only DaskResultsHandler which leverages Dask pub-sub communication protocols is implemented, but an S3 based result handler is planned.

Some Notes

Dask generally spawns processes. PyTorch generally forks. When using a multiprocessing enabled data loader, it is a good idea to pass the Fork multiprocessing context to force the use of Forking in the data loader.

Some Dask deployments do not permit spawning processes. To override this, you can change the distributed.worker.daemon setting.

Environment variables are a convenient way to do this:

DASK_DISTRIBUTED__WORKER__DAEMON=False

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dask-pytorch-0.1.2.tar.gz (8.7 kB view details)

Uploaded Source

Built Distribution

dask_pytorch-0.1.2-py3-none-any.whl (9.6 kB view details)

Uploaded Python 3

File details

Details for the file dask-pytorch-0.1.2.tar.gz.

File metadata

  • Download URL: dask-pytorch-0.1.2.tar.gz
  • Upload date:
  • Size: 8.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.24.0 setuptools/50.3.0 requests-toolbelt/0.9.1 tqdm/4.54.0 CPython/3.8.6

File hashes

Hashes for dask-pytorch-0.1.2.tar.gz
Algorithm Hash digest
SHA256 f367ffbbb69c23614c76f959e6db3cc040bf1bfeaa3a944437e8fbb79d2549b8
MD5 920a73492e9c0c0c49fc62ca896efcd1
BLAKE2b-256 41a543338e6bfb66013d1bc614f722ea9aa7a27a98f4bc5c28380b44a34e767b

See more details on using hashes here.

File details

Details for the file dask_pytorch-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: dask_pytorch-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 9.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.24.0 setuptools/50.3.0 requests-toolbelt/0.9.1 tqdm/4.54.0 CPython/3.8.6

File hashes

Hashes for dask_pytorch-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 05d00005bf55d3b32bf9dcfcb6a86dda3e176e666192aa0ebf3b58b80247f9b6
MD5 c0b1f797da88ae55cf9070690ee5ff78
BLAKE2b-256 4b15ccf5cca2ea2bea33431b12341f2f65fca7c1cf488a5d32bc6024c6bb132b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page