Parallelism in Deep Learning

Data Parallelism

We would like to process more data faster, and our GPUs are able to calculate the full result by itself.

DDP - Distributed Data Parallel

We spawn multiple processes across machines, and a create a single instance of the model per process. DDP registers an autograd hook for each trainable parameter.

Note: If applying torch.compile, just apply the DDP Wrapper before the compile

Steps:

  1. Broadcasts the local model's state_dict to all process
  2. Each DDP Process creates a local Reducer that is responsible for syncing gradients in the backward pass. The Reducer uses the AllReduce Collective (Collective Operations). It organizers paramters into buckets. AllReduce will be triggered for a bucket once the gradients have been computed and while this is happening, we can continue with grads for the rest of params. Note, they are allocated into buckets in the reverse order.
  3. DDP takes the input and does forward pass in the local model, and depending on the params, DDP could find all the unused parameters and then set them ready for reduction, immediately.
  4. In the backward, with the aurograd hook attached to all the parameters, DDP will mark when a parameter becomes ready for Reduction. When all the parameters within a bucket become ready, it will fire off AllReduce, which will calculate the mean.
  5. Then THe Oprimizer Step would just use the averaged gradients in all the local copies.

The performance advantage is due to overlapping allreduce collectives with backward computation. Therefore, we need to be able to break the backward graph, to experience this advantage, thus we need to use torchDynamo.

Fully Sharded Data Parallel (FSDP2)

FSDP Improves upon DDP by sharding the model parameters, In DDP, the model is replicated in storage and each models' computations are also replicated. However, in FSDP, the model is split in storage, but it's computations are still replicated. As long as the computations are replicated, it still falls under Data Parallelism.

Pasted image 20251127215758.png
(Image Source - https://docs.pytorch.org/tutorials/intermediate/FSDP_tutorial.html)

Before the forward pass and the backward pass, all the machines gets all the weights and parameters which are initially sharded across machines.

Once the computation is done, you free the weights from machines, and in backward, you first gather the weights again, perform backward pass and then sync the gradients across the machines in which the data is sharded across. After this, you free the weights which you got from other machines.

If you do this, for all the machines across which the model parameters are sharded, you will be able to just update the weights locally at the end.

Tensor Parallelism

What if we can't fit even one of the model layers into a GPU memory? That's the problem Tensor Parallelism tries to solve.

Megatron-LM

The Paper[@shoeybiMegatronLMTrainingMultiBillion2020] deals with the large layers in the Transformer model.

MLP

First, let's look at the MLP layer.
The MLP corresponds to the following,

Y = GeLU(XA)$$If we split the weight matrix $A$ along its rows and input $X$ along its columns, then the individual results should be synced and then added before passing to GeLU. However, when the weight matrix is partitioned along its columns, we are able to calculate the GeLU independently of the results in other devices. This allows us to remove the GPU Synchronization call before GeLU. ## Multi Head Attention This is pretty simple as we can just do the matrix multiply corresponding to one head on a single machine, and distribute it across all heads. # Output Embedding Layer The Output embedding layer will be huge in mordern transformers due to the massive vocabulary size, and in most llms, the output embedding shares weights with the input embedding, thus we need to parallelize both. There are two approaches, one parallelize GEMM to obtain the logits and then gather them first and then perform cross entropy loss function. But here we have to gather the vocab logit vector and for a large vocab size, that will take a looong time. Instead, we can fuse the GEMM of the output embedding with the cross entropy and then gather only the scalar loss values. # Pipeline Parallelism While Tensor Parallelism splits an individual model layer's operation across machines, Pipeline parallelism splits a model across machines. It assigns certain layers to specific machines. Like GPU 1 has all the layers from 0 - 4, then GPU 2 might have 5 - 9, and so on. However, let's say we have a training batch, while GPU 1 is processing data, the later GPUs just stay idle. How do you solve this? # GPipe It partitions consecutive group of layers into cells. Each cell is assigned a separate device. A mini-batch of training examples is split into smaller micro-batches. ![Pasted image 20251202195911.png](/img/user/Assets/Pasted%20image%2020251202195911.png) (Image taken from [@huangGPipeEfficientTraining2019]) Now GPU 1 will take micro batch 1 and then process it and then send it to GPU 2. GPU 1 will process micro batch 2 while GPU 2 will process micro batch 1. Thus continues. Gradients are accumulated and then update happens only after all the microbatches are processed. In massive models, we run out of memory due to storing activations. To solve this issue, during the forward pass, instead of storing all the intermediate alyers activations, we just store input to the layers at the boundary. Therefore, when running backward pass, we just generate it on the fly. # To Be Continued Tensor Parallelism Pipeline Parallelism Sequence Parallism is already covered in Ring, Ulysses and Unified Attention