blob: 418e20168c283cc392aed96ab714a0aede3dcc0a [file] [log] [blame]
import argparse
import math
import os
import time
from benchmark_dataset import BenchmarkLMDataset, collate_sentences_lm
import torch
from torch.distributed import rpc
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.distributed.pipeline.sync import Pipe
from torch.distributed.pipeline.sync.utils import partition_model
from torch.optim import Adam
def sizeof_fmt(num, suffix='B'):
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti']:
if abs(num) < 1024.0:
return "%3.2f%sB" % (num, unit)
num /= 1024.0
def init_random_seed(seed: int):
import numpy
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
numpy.random.seed(seed)
iteration_count = 0
class EmbeddingLayer(nn.Embedding):
def __init__(self, ntoken, ninp, initrange):
super().__init__(ntoken, ninp)
self.ninp = ninp
nn.init.uniform_(self.weight, -initrange, initrange)
def forward(self, src):
return super().forward(src) * math.sqrt(self.ninp)
class PositionalEncodingLayer(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncodingLayer, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer("pe", pe)
def forward(self, x):
x = x + self.pe[: x.size(0), :]
return self.dropout(x)
class TransformerDecoderLayer(nn.TransformerEncoderLayer):
"""Though this class inherits from torch.nn.TransformerEncoderLayer,
it functions as a decoder in this model"""
def __init__(self, ninp, nhead, nhid, droupout):
super().__init__(ninp, nhead, nhid, droupout)
self.src_mask = None
def forward(self, src):
global iteration_count
iteration_count += 1
if self.src_mask is None or self.src_mask.size(0) != len(src):
device = src.device
mask = nn.Transformer.generate_square_subsequent_mask(len(src)).to(device)
self.src_mask = mask
return super().forward(src, self.src_mask)
class LinearLayer(nn.Linear):
def __init__(self, ninp, ntoken, initrange):
super().__init__(ninp, ntoken)
nn.init.zeros_(self.bias)
nn.init.uniform_(self.weight, -initrange, initrange)
class TransformerLMSequential(nn.Sequential):
"""A small language model based on the design of GPT-2 using nn.Sequential
for compatibility with Pipe"""
def __init__(self, ntokens, ninp, nhead, nhid, dropout, initrange, ndecoder):
layers = [
EmbeddingLayer(ntokens, ninp, initrange),
PositionalEncodingLayer(ninp, dropout),
]
for _ in range(ndecoder):
layers.append(TransformerDecoderLayer(ninp, nhead, nhid, dropout))
layers.append(LinearLayer(ninp, ntokens, initrange))
super(TransformerLMSequential, self).__init__(*layers)
def make_model(args, device, ntokens):
ninp = 2048 # embedding dimension
nhid = 2048 # the dimension of the feedforward network model in nn.TransformerEncoder
nhead = 32 # the number of heads in the multiheadattention models
dropout = 0
initrange = 0.1
ndecoder = args.num_decoder_layers
model = TransformerLMSequential(ntokens, ninp, nhead, nhid, dropout, initrange, ndecoder).to(device)
criterion = nn.CrossEntropyLoss()
lr = 0.01 # learning rate
def make_adam(model):
return Adam(model.parameters(), lr=lr)
optimizer = make_adam
return model, criterion, optimizer
def train(lm_dataloader, model, criterion, optimizer, vocab_size, args):
model.train()
vocab_size = 10000
total_loss = 0.0
start_time = time.time()
word_counter = 0
optimizer = optimizer(model)
def get_first_device(model):
if model.devices:
return model.devices[0]
else:
return torch.cuda.current_device()
def get_last_device(model):
if model.devices:
return model.devices[-1]
else:
return torch.cuda.current_device()
print('Number of parameters for model: {}'.format(sum(p.numel() for p in model.parameters())))
for i, batch in enumerate(lm_dataloader):
bi = batch["input"]
if args.max_batch and i > args.max_batch:
break
optimizer.zero_grad()
try:
tmp = batch["input"].to(get_first_device(model))
output = model(tmp).local_value()
except Exception as e:
raise RuntimeError(f"training failed on {torch.distributed.get_rank()}") from e
target = batch["target"].to(get_last_device(model))
output = output.to(target.device)
loss = criterion(output.view(-1, vocab_size), target.view(-1))
loss.backward()
del target
del output
torch.nn.utils.clip_grad_value_(model.parameters(), 0.05)
optimizer.step()
total_loss += loss.item()
log_interval = 1
word_counter += batch["ntokens"]
if i % log_interval == 0 and i > 0:
cur_loss = total_loss / log_interval
elapsed = time.time() - start_time
print(
"| batch {:5d} | wps {:5.2f} | loss {:5.2f} | ppl {:8.2f}".format(
i, word_counter / elapsed, cur_loss, math.exp(cur_loss)
)
)
word_counter = 0
total_loss = 0
start_time = time.time()
print('Peak memory usage for GPUs: ', end='')
for i in range(len(model.devices)):
print("cuda:{}: {}, ".format(
i,
sizeof_fmt(torch.cuda.memory_stats(i)["allocated_bytes.all.peak"])), end='')
print()
def generate_balance(num_devices, num_layers):
balance = []
layers_assigned = 0
for i in range(num_devices):
x = (num_layers - layers_assigned) / (num_devices - i)
if x.is_integer():
balance.append(int(x))
layers_assigned += x
else:
balance.append(math.ceil(x))
layers_assigned += math.ceil(x)
return balance
def make_model_and_data(args, device):
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
vocab_size = 10000
model, criterion, optimizer = make_model(args, device, vocab_size)
lm_dataset = BenchmarkLMDataset()
lm_dataloader = DataLoader(
lm_dataset, batch_size=args.batch_size, shuffle=True, num_workers=0, collate_fn=collate_sentences_lm
)
return {
"model": model,
"criterion": criterion,
"optimizer": optimizer,
"data": lm_dataloader,
"vocab_size": vocab_size,
}
def bench_single_process(args):
os.environ.update({"MASTER_ADDR" : args.host})
os.environ.update({"MASTER_PORT" : "10638"})
rpc.init_rpc(
"worker",
rank=0,
world_size=1,
)
num_devices = torch.cuda.device_count() if torch.cuda.is_available() else 1
num_devices = min(args.num_devices, num_devices)
assert num_devices > 0
init_random_seed(0)
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
blob = make_model_and_data(args, None)
model = blob["model"]
balance = generate_balance(num_devices, len(model))
model = partition_model(model, balance)
p = Pipe(
model, chunks=args.chunks, checkpoint=args.checkpoint
)
del model
del blob["model"]
train(blob["data"], p, blob["criterion"], blob["optimizer"], blob["vocab_size"], args)
parser = argparse.ArgumentParser(description="benchmark")
parser.add_argument("--host", "-o", type=str, default="localhost", help="hostname")
parser.add_argument("--chunks", type=int, default=4, help="number of microbatches per batch")
parser.add_argument("--batch-size", type=int, default=8, help="size of a batch")
parser.add_argument("--max-batch", type=int, default=10, help="Max number of batches")
parser.add_argument("--num-decoder-layers", type=int, default=10, help="Number of decoder layers in the model")
parser.add_argument(
"--checkpoint", default="except_last", choices=["always", "except_last", "never"],
help="Checkpointing strategy for pipe"
)
parser.add_argument(
"--num-devices", type=int, default=4, help="Number of GPU devices to use"
)
if __name__ == "__main__":
args = parser.parse_args()
print(f"Running benchmark with args: {args}")
bench_single_process(args)