An implementation of decentralized data parallel based on PyTorch
Project description
Decentralized Data Parallel
The package is an PyTorch extension that faciliates multi-GPU decentralized data parallel training.
Install
pip install decent-dp
How to use
Basic Usage
Firstly, it should have distributed environment available, which means the script should be run by torchrun
.
Before making the model distributed, it should initialize the distributed environment by
import torch.distributed as dist
dist.init_process_group()
Then wrap the model with DecentralizedDataParallel
. Since the optimizer and the learning rate scheduler are fused in the backward pass, one will need to provide two functions: (1) optim_fn
(Callable[[List[Tuple[Tensor, str]]], Optimizer]
): the function constructs the optimizer based on the list of parameters with their names. (2) lr_scheduler_fn
(Callable[[Optimizer], LRScheduler]
, optional): the function constructs the learning rate scheduler based on the provided optimizer. Examples of two functions can be found at decent_dp.optim.optim_fn_adam
and decent_dp.optim.lr_scheduler_fn_cosine_with_warmup
.
from decent_dp import DecentralizedDataParallel as DDP
model = ...
model = DDP(model,
optim_fn=...,
lr_scheduler_fn=...,
topology=...)
Arguments of DecentralizedDataParallel
optim_fn
(Callable[[List[Tuple[Tensor, str]]], Optimizer]
): the function constructs the optimizer based on the list of parameters with their names.lr_scheduler_fn
(Callable[[Optimizer], LRScheduler]
, optional, default toNone
): the function constructs the learning rate scheduler based on the provided optimizer.topology
(str
, optional, defualt tocomplete
): the name of the communication topology. Provided:complete
(fully connected graph),ring
(one-peer ring graph),one-peer-exp
(one-peer exponential graph), andalternating-exp-ring
(alternating exponential ring). It can be the name of customized communication topology as introduced in this section.scaler
(GradScaler
, optional, default toNone
): gradient scaler for automatic mixed precision training.param_as_bucket_view
(bool
, optional, default toTrue
): If set theTrue
, the parameters in one bucket will be stored in a continuous memory block.sync_buffer_in_global_avg
(bool
, optional, default toFalse
): If set toTrue
, it will also synchronize the buffers (like the moving average in batch normalization layers) when calling.global_avg()
.bucket_size_in_mb
(int
, optional, default to25
): the argument decides the size of the bucket in MB.25
is also the default value of DDP of PyTorch.profile_mode
(bool
, optional, default toFalse
): If set toTrue
, it will record the per-iteration runtime and the non-overlapped communication time in the recent 1000 iterations. To get to statistics, call.get_time_stats()
.local_world_size
(int
, optional, default toNone
): The local world size is acquired from the environment variable by default. One can override the value to simulate different local world sizes. Note that it should be less than or equal to the global world size, and the global world size should be divisible by the local world size.
Public Methods of DecentralizedDataParallel
- Delegation methods:
train(self, mode: bool = True)
: same astorch.nn.Module
'strain
.eval(self)
: same astorch.nn.Module
'seval
.parameters(self, recurse: bool = True) -> Iterator[Parameter]
: same astorch.nn.Module
'sparameters
named_parameters(self, prefix: str = '', recurse: bool = True, remove_duplicate: bool = True) -> Iterator[Tuple[str, Parameter]]
: same astorch.nn.Module
'snamed_parameters
.
- Others:
get_time_stats(self) -> Dict[str, deque]
: If in profile mode, return the time statistics. The keys of the returned dictionary arecompute
,non_overlap_comm
,iter
standing for the time taken by computation, non-overlapped communication, and the whole iteration, respectively. (compute
+non_overlap_comm
=iter
)reset_time_stats(self)
: Clear the profile statistics.global_avg(self)
: Globally average the parameters. Also globally average the buffers ifsync_buffer_in_global_avg=True
. Typically, the function should be called at the end of each epoch and before the validation steps or saving the checkpoints.
Supported Schema
The decentralized algorithm schema should follow $$x_{i}^{(t)}=d_i^{(t)}+\sum_{j\in\mathcal{N}(i)}W_{ij}x_i^{(t-1)}\ d_i^{(t)}=G\left(F_i,x_j^{(t-1)}\right)$$ which means that the local update doesn't depend on the neighbors' models in the same iteration, but it can
Customized Communication Topology
Currently, the provided communication topologies are complete
, ring
, one-peer-exp
and alternating-exp-ring
.
One can introduce customized by registering additional topologies.
from decent_dp.topo import Topology, TopologyReg, Edge
@TopologyReg.register('custom-topology')
class CustomTopology(Topology):
def _get_topo_edges(self) -> List[List[Edge]]:
...
One should override the _get_topo_edges
method to provide the edges in every iteration in the loop. In the current version, it performs sanity check to make sure that (1) every worker is involved in every iteraion. (2) every worker is involvede in only one edge in every iteration.
The preset topologies are good examples which can be found at decent_dp.topo.CompleteTopology
, decent_dp.topo.RingTopology
, and etc..
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for decent_dp-0.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 575392a3ea111bf753fd47b38d6ba744ad4e9337c646c3062f4197ce4115cbf8 |
|
MD5 | b0a7f5cf4015a760c95841f4dd6ef9f8 |
|
BLAKE2b-256 | 600543c538b4ab478f3d3f14629e63cf4f7f875511fb7d829ad3596aea4586b5 |