반응형
Distributed Data Parallel (DDP)
Parallel의 종류
-
- model을 쪼개서 여러 개의 GPU로 뿌려주는 경우
- model이 너무 클 때 사용
- 특정 layer는 part_1이 하고, 또 다른 특정 layer는 part_2가 하는 식임Model Parallel
- 예시 코드
class ModelParallel(nn.Module):
def __init__(self, *args, **kwargs):
super(ModelParallel, self).__init__()
self.part_1 = nn.Sequential(...)
self.part_2 = nn.Sequential(...)
# put each part on a different device
self.part_1.to(torch.device('cuda:0'))
self.part_2.to(torch.device('cuda:1'))
def forward(self, x):
x = x.to(torch.device('cuda:0'))
x1 = self.part_1(x)
x1 = x1.to(torch.device('cuda:1'))
y = self.part_2(x1)
return y
- 위와 같은 model parallel 방식은 model을 여러 부분으로 쪼개고, 각 디바이스에 일일이으 할당해줘야 하는 번거로움이 있음
2. Data Parallel
- 모델을 쪼개는 게 아니라 데이터를 여러 부분으로 쪼갬
- 데이터가 너무 커서 하나의 GPU memory에 할당할 수 없을 떄 유용
- 동일한 model copy를 두고, 각 다른 data sub-batch를 각 copy에 넣어주는 형식임.
- 즉 이는 여러 개의 model에서 병렬적으로 forwarding/backwarding이 이루어지기 때문에 반드시 동기화 해주는 과정이 필요
- 서로 다른 copy에서 각각 다른 데이터를 forwarding하기 때문에, backpropagation 시 gradient도 다르게 발생할 것임.
- 따라서 copy_1에서 발생한 gradient를 모든 copy에 반영하고,
- copy_2에서 발생한 gradient도 모든 copy에 반영해야 함
2-1. nn.DataParallel
- 내부적으로 각 iteration마다 model copy를 각 GPU에 올려주고, data를 batch 단위로 쪼개서 각 GPU에 할당함.
model = MyModel()
model = nn.DataParallel(model) # make it parallel
- 코드는 매우 간단.
- 하나의 머신에서만 work. (single process)
- 하지만 “최적화”가 되어있지 않기 때문에, synchronization 과정에서 overhead 발생
- 예를 들어, 동기화 시 각 copy에 있던 데이터들이 하나로 모이게 되고, 이 과정에서 메모리 불균형이 발생
- 멀티 프로세스가 아닌, 멀티스레드 방식의 병렬 처리를 지원하므로, 다소 느림
2-2.⭐️ DistributedDataParallel : DDP
- 다시 말하지만, 데이터 병렬화 과정임
- 스레드가 아닌, 프로세스 단위로 동작 → 더 빠르게 동작
- DDP의 경우 속도 빠르고, 여러 개의 머신에도 확장 가능 (GPU 클러스터에도 적용 가능)
- 병렬 처리 작업을 유연하게 구성 가능
- 다만 torch.multiprocessing.spawn 과 같은 초기 작업은 다소 번거로움
⇒ Multiple worker들이 하나의 global model을, 큰 dataset의 일부를 학습시켜서, 각각의 local gradient를 계산한 뒤, AllReduce 와 같은 Primitive 사용해서 collectively synchronize 진행함
cf) Node란 ? : 각각의 node는 다수의 GPU로 구성되며, node는 간단하게 컴퓨터 한 대 (또는 storage 한 대) 라고 생각하면 됨
- N : application이 실행되고 있는 node의 개수 (2개)
- G : 노드 당 GPU 개수 (각 node에는 4개의 GPU있음)
- World Size : 모든 node에 걸쳐 실행되는 application(=worker = process)의 총 개수
- Local World SIze( : 각 node에서 실행되는 process의 개수
- 위 그림에서,
- 클러스터는 노드의 집합으로써, 2개의 node로 구성됨
- 각 process당 2개의 GPU를 가지며, 각 process는 rank로 식별됨
- Rank
- global rank : 클러스터 전체 기준으로, 프로세스가 몇 번째 프로세스인지 식별
- local rank : 각 node 내에 해당 프로세스가 몇 번째 프로세스인지 식별
- 기본적으로 Model을 GPU에 올릴 때 model rank를 사용
- 다만 관습적으로는 하나의 프로세스가 하나의 GPU를 가지는 것이 좋은데, 이는 DDP 사용 시 parallel한 reader stream을 가능한 많이 가질 수 있고, 이는 I/O와 computational cost 측면에서 좋다고 알려짐
- (단, 2개의 process가 하나의 GPU를 공유하는 것은 불가능)
- 참고) DDP는 model parallel 방식(하나의 모델에 multi GPU 사용하는 방식)과도 결합하여 쓰일 수 있는데, 이 경우 각 DDP process는 model parallel을 사용하며, 모든 process에 대해서는 collectively하게 data parallel을 사용함
DDP application preparation and launch
- 각각의 process는 각 process에 해당하는 global / local rank를 알아야 함
- 이걸 알면, ProcessGroup 을 생성/init할 수 있고, AllReduce Primitive와 같은 collective communication을 가능하게 할 수 있음
Basic Use Case (Example)
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
# "gloo",
# rank=rank,
# init_method=init_method,
# world_size=world_size)
# For TcpStore, same way as on Linux.
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
위 code는 process group setting initialization 임
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
# create model and move it to GPU with id rank
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
print(f"Finished running basic DDP example on rank {rank}.")
def run_demo(demo_fn, world_size):
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
- 위 코드에서도 보이듯이, Gradient Synchronization이 backward computation동안 진행되는 것을 볼 수 있음
- backward() 값 return 시 이미 synchronization된 gradient tensor을 가지고 있음
- (중요) spawn 은 cluster 내 node가 1개 있는 상황에서만 동작함.
- 따라서, process 개수는 node 내 GPU의 개수로 지정해주면 됨.
Skewed Processing Speeds
- process를 돌리다 보면 어떤 프로세스는 빨리 끝나고, 어떤 프로세스는 늦게 끝나는 시간차가 발생할 수 있다.
- 이는 network delay, resource contention, unpredictable workload spike 등에서 비롯된다.
- 이러한 timeout들을 피하기 위해서, init_process_group을 통해 충분히 긴 시간의 timeout 을 준다.
Save and Load Checkpoints
- 통상적으로 torch.save , torch.load 를 통해 checkpoint module 진행
- DDP 사용 시, 한 가지 최적화 방법은 model을 단 하나의 프로세스에만 저장한 다음, 모든 프로세스에 로드하여 write overhead를 줄이는 것임.
- Example Code
def demo_checkpoint(rank, world_size):
print(f"Running DDP checkpoint example on rank {rank}.")
setup(rank, world_size)
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process -> 프로세스 끝난 이후에 model load 진행
# -> 각 프로세스 간의 균형을 맞춰주는 작업임 (gradient계산 후 weight update 시에 먼저 진행되면 안되기 때문에)
# 0 saves it.
dist.barrier()
# configure map_location properly : map_location 제대로 지정
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
# Not necessary to use a dist.barrier() to guard the file deletion below
# as the AllReduce ops in the backward pass of DDP already served as a synchronization.
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
print(f"Finished running DDP checkpoint example on rank {rank}.")
Combining DDP with Model Parallelism
- DDP 는 multi-GPU model과도 함께 작업 가능
- large model + large dataset
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1):
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0)
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1)
def forward(self, x):
x = x.to(self.dev0)
x = self.relu(self.net1(x))
x = x.to(self.dev1)
return self.net2(x)
위 코드처럼 model parallel은 각 part에 대해 device(GPU)를 수동적으로 부여해야 함.
def demo_model_parallel(rank, world_size):
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
# setup mp_model and devices for this process
# 한 프로세스 당 GPU 2개 할당
dev0 = rank * 2 # 0, 2, 4, 6, 8 ...
dev1 = rank * 2 + 1 # 1, 3, 5, 7, 9 ...
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
optimizer.zero_grad()
# outputs will be on dev1
outputs = ddp_mp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(dev1)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
print(f"Finished running DDP with model parallel example on rank {rank}.")
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus # 총 GPU 개수
run_demo(demo_basic, world_size)
run_demo(demo_checkpoint, world_size)
world_size = n_gpus//2
run_demo(demo_model_parallel, world_size)
Initialize DDP with torch.distributed.run/torchrun
- DDP code와 job initialization 더 쉽게 leverage하는 방법
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic():
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
dist.init_process_group("nccl")
rank = dist.get_rank()
print(f"Start running basic DDP example on rank {rank}.")
# create model and move it to GPU with id rank
device_id = rank % torch.cuda.device_count()
model = ToyModel().to(device_id)
ddp_model = DDP(model, device_ids=[device_id])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(device_id)
loss_fn(outputs, labels).backward()
optimizer.step()
dist.destroy_process_group()
print(f"Finished running basic DDP example on rank {rank}.")
if __name__ == "__main__":
demo_basic()
아래 command로 실행가능
torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR:29400 elastic_ddp.py
- two host(nodes)
- 각 host 당 8개의 process 실행
- 즉, 총 16 GPUs 사용
- torchrun :
- 환경 변수에서 local_rank를 읽어오기 때문에 특별히 arg parsing을 추가로 해줄 필요가 없어 편리함 (local rank assign 바로 가능)
반응형