Skip to main content

An implementation of decentralized data parallel based on PyTorch

Project description

Decentralized Data Parallel

The package is an PyTorch extension that faciliates multi-GPU decentralized data parallel training.

Install

pip install decent-dp

How to use

Basic Usage

Firstly, it should have distributed environment available, which means the script should be run by torchrun.

Before making the model distributed, it should initialize the distributed environment by

import torch.distributed as dist
dist.init_process_group()

Then wrap the model with DecentralizedDataParallel. Since the optimizer and the learning rate scheduler are fused in the backward pass, one will need to provide two functions: (1) optim_fn (Callable[[List[Tuple[Tensor, str]]], Optimizer]): the function constructs the optimizer based on the list of parameters with their names. (2) lr_scheduler_fn (Callable[[Optimizer], LRScheduler], optional): the function constructs the learning rate scheduler based on the provided optimizer. Examples of two functions can be found at decent_dp.optim.optim_fn_adam and decent_dp.optim.lr_scheduler_fn_cosine_with_warmup.

from decent_dp import DecentralizedDataParallel as DDP
model = ...
model = DDP(model,
            optim_fn=...,
            lr_scheduler_fn=...,
            topology=...)

Arguments of DecentralizedDataParallel

  • optim_fn (Callable[[List[Tuple[Tensor, str]]], Optimizer]): the function constructs the optimizer based on the list of parameters with their names.
  • lr_scheduler_fn (Callable[[Optimizer], LRScheduler], optional, default to None): the function constructs the learning rate scheduler based on the provided optimizer.
  • topology (str, optional, defualt to complete): the name of the communication topology. Provided: complete (fully connected graph), ring (one-peer ring graph), one-peer-exp (one-peer exponential graph), and alternating-exp-ring (alternating exponential ring). It can be the name of customized communication topology as introduced in this section.
  • scaler (GradScaler, optional, default to None): gradient scaler for automatic mixed precision training.
  • param_as_bucket_view (bool, optional, default to True): If set the True, the parameters in one bucket will be stored in a continuous memory block.
  • sync_buffer_in_global_avg (bool, optional, default to False): If set to True, it will also synchronize the buffers (like the moving average in batch normalization layers) when calling .global_avg().
  • bucket_size_in_mb (int, optional, default to 25): the argument decides the size of the bucket in MB. 25 is also the default value of DDP of PyTorch.
  • profile_mode (bool, optional, default to False): If set to True, it will record the per-iteration runtime and the non-overlapped communication time in the recent 1000 iterations. To get to statistics, call .get_time_stats().
  • local_world_size (int, optional, default to None): The local world size is acquired from the environment variable by default. One can override the value to simulate different local world sizes. Note that it should be less than or equal to the global world size, and the global world size should be divisible by the local world size.

Public Methods of DecentralizedDataParallel

  • Delegation methods:
    • train(self, mode: bool = True): same as torch.nn.Module's train.
    • eval(self): same as torch.nn.Module's eval.
    • parameters(self, recurse: bool = True) -> Iterator[Parameter]: same as torch.nn.Module's parameters
    • named_parameters(self, prefix: str = '', recurse: bool = True, remove_duplicate: bool = True) -> Iterator[Tuple[str, Parameter]]: same as torch.nn.Module's named_parameters.
  • Others:
    • get_time_stats(self) -> Dict[str, deque]: If in profile mode, return the time statistics. The keys of the returned dictionary are compute, non_overlap_comm, iter standing for the time taken by computation, non-overlapped communication, and the whole iteration, respectively. (compute + non_overlap_comm = iter)
    • reset_time_stats(self): Clear the profile statistics.
    • global_avg(self): Globally average the parameters. Also globally average the buffers if sync_buffer_in_global_avg=True. Typically, the function should be called at the end of each epoch and before the validation steps or saving the checkpoints.

Supported Schema

The decentralized algorithm schema should follow $$x_{i}^{(t)}=d_i^{(t)}+\sum_{j\in\mathcal{N}(i)}W_{ij}x_i^{(t-1)}\ d_i^{(t)}=G\left(F_i,x_j^{(t-1)}\right)$$ which means that the local update doesn't depend on the neighbors' models in the same iteration, but it can

Customized Communication Topology

Currently, the provided communication topologies are complete, ring, one-peer-exp and alternating-exp-ring.

One can introduce customized by registering additional topologies.

from decent_dp.topo import Topology, TopologyReg, Edge

@TopologyReg.register('custom-topology')
class CustomTopology(Topology):
    def _get_topo_edges(self) -> List[List[Edge]]:
        ...

One should override the _get_topo_edges method to provide the edges in every iteration in the loop. In the current version, it performs sanity check to make sure that (1) every worker is involved in every iteraion. (2) every worker is involvede in only one edge in every iteration.

The preset topologies are good examples which can be found at decent_dp.topo.CompleteTopology, decent_dp.topo.RingTopology, and etc..

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

decent_dp-0.0.2.tar.gz (15.1 kB view hashes)

Uploaded Source

Built Distribution

decent_dp-0.0.2-py3-none-any.whl (13.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page