| import argparse |
| import math |
| import os |
| import time |
| |
| from benchmark_dataset import BenchmarkLMDataset, collate_sentences_lm |
| import torch |
| from torch.distributed import rpc |
| import torch.nn as nn |
| from torch.utils.data import DataLoader |
| |
| from torch.distributed.pipeline.sync import Pipe |
| from torch.distributed.pipeline.sync.utils import partition_model |
| from torch.optim import Adam |
| |
| def sizeof_fmt(num, suffix='B'): |
| for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti']: |
| if abs(num) < 1024.0: |
| return "%3.2f%sB" % (num, unit) |
| num /= 1024.0 |
| |
| |
| def init_random_seed(seed: int): |
| import numpy |
| |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed(seed) |
| numpy.random.seed(seed) |
| |
| |
| iteration_count = 0 |
| |
| |
| class EmbeddingLayer(nn.Embedding): |
| def __init__(self, ntoken, ninp, initrange): |
| super().__init__(ntoken, ninp) |
| self.ninp = ninp |
| nn.init.uniform_(self.weight, -initrange, initrange) |
| |
| def forward(self, src): |
| return super().forward(src) * math.sqrt(self.ninp) |
| |
| |
| class PositionalEncodingLayer(nn.Module): |
| def __init__(self, d_model, dropout=0.1, max_len=5000): |
| super(PositionalEncodingLayer, self).__init__() |
| self.dropout = nn.Dropout(p=dropout) |
| |
| pe = torch.zeros(max_len, d_model) |
| position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) |
| div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) |
| pe[:, 0::2] = torch.sin(position * div_term) |
| pe[:, 1::2] = torch.cos(position * div_term) |
| pe = pe.unsqueeze(0).transpose(0, 1) |
| self.register_buffer("pe", pe) |
| |
| def forward(self, x): |
| x = x + self.pe[: x.size(0), :] |
| return self.dropout(x) |
| |
| |
| class TransformerDecoderLayer(nn.TransformerEncoderLayer): |
| """Though this class inherits from torch.nn.TransformerEncoderLayer, |
| it functions as a decoder in this model""" |
| |
| def __init__(self, ninp, nhead, nhid, droupout): |
| super().__init__(ninp, nhead, nhid, droupout) |
| self.src_mask = None |
| |
| def forward(self, src): |
| global iteration_count |
| iteration_count += 1 |
| |
| if self.src_mask is None or self.src_mask.size(0) != len(src): |
| device = src.device |
| mask = nn.Transformer.generate_square_subsequent_mask(len(src)).to(device) |
| self.src_mask = mask |
| |
| return super().forward(src, self.src_mask) |
| |
| |
| class LinearLayer(nn.Linear): |
| def __init__(self, ninp, ntoken, initrange): |
| super().__init__(ninp, ntoken) |
| nn.init.zeros_(self.bias) |
| nn.init.uniform_(self.weight, -initrange, initrange) |
| |
| |
| class TransformerLMSequential(nn.Sequential): |
| """A small language model based on the design of GPT-2 using nn.Sequential |
| for compatibility with Pipe""" |
| |
| def __init__(self, ntokens, ninp, nhead, nhid, dropout, initrange, ndecoder): |
| layers = [ |
| EmbeddingLayer(ntokens, ninp, initrange), |
| PositionalEncodingLayer(ninp, dropout), |
| ] |
| for _ in range(ndecoder): |
| layers.append(TransformerDecoderLayer(ninp, nhead, nhid, dropout)) |
| |
| layers.append(LinearLayer(ninp, ntokens, initrange)) |
| super(TransformerLMSequential, self).__init__(*layers) |
| |
| |
| def make_model(args, device, ntokens): |
| ninp = 2048 # embedding dimension |
| nhid = 2048 # the dimension of the feedforward network model in nn.TransformerEncoder |
| nhead = 32 # the number of heads in the multiheadattention models |
| dropout = 0 |
| initrange = 0.1 |
| ndecoder = args.num_decoder_layers |
| |
| model = TransformerLMSequential(ntokens, ninp, nhead, nhid, dropout, initrange, ndecoder).to(device) |
| |
| criterion = nn.CrossEntropyLoss() |
| lr = 0.01 # learning rate |
| |
| def make_adam(model): |
| return Adam(model.parameters(), lr=lr) |
| |
| optimizer = make_adam |
| |
| return model, criterion, optimizer |
| |
| |
| def train(lm_dataloader, model, criterion, optimizer, vocab_size, args): |
| model.train() |
| |
| vocab_size = 10000 |
| total_loss = 0.0 |
| start_time = time.time() |
| word_counter = 0 |
| |
| optimizer = optimizer(model) |
| |
| def get_first_device(model): |
| if model.devices: |
| return model.devices[0] |
| else: |
| return torch.cuda.current_device() |
| |
| def get_last_device(model): |
| if model.devices: |
| return model.devices[-1] |
| else: |
| return torch.cuda.current_device() |
| |
| |
| print('Number of parameters for model: {}'.format(sum(p.numel() for p in model.parameters()))) |
| for i, batch in enumerate(lm_dataloader): |
| bi = batch["input"] |
| if args.max_batch and i > args.max_batch: |
| break |
| optimizer.zero_grad() |
| try: |
| tmp = batch["input"].to(get_first_device(model)) |
| output = model(tmp).local_value() |
| except Exception as e: |
| raise RuntimeError(f"training failed on {torch.distributed.get_rank()}") from e |
| |
| target = batch["target"].to(get_last_device(model)) |
| output = output.to(target.device) |
| |
| loss = criterion(output.view(-1, vocab_size), target.view(-1)) |
| loss.backward() |
| del target |
| del output |
| |
| torch.nn.utils.clip_grad_value_(model.parameters(), 0.05) |
| optimizer.step() |
| |
| total_loss += loss.item() |
| log_interval = 1 |
| word_counter += batch["ntokens"] |
| if i % log_interval == 0 and i > 0: |
| cur_loss = total_loss / log_interval |
| elapsed = time.time() - start_time |
| print( |
| "| batch {:5d} | wps {:5.2f} | loss {:5.2f} | ppl {:8.2f}".format( |
| i, word_counter / elapsed, cur_loss, math.exp(cur_loss) |
| ) |
| ) |
| word_counter = 0 |
| total_loss = 0 |
| start_time = time.time() |
| |
| print('Peak memory usage for GPUs: ', end='') |
| for i in range(len(model.devices)): |
| print("cuda:{}: {}, ".format( |
| i, |
| sizeof_fmt(torch.cuda.memory_stats(i)["allocated_bytes.all.peak"])), end='') |
| print() |
| |
| |
| def generate_balance(num_devices, num_layers): |
| balance = [] |
| layers_assigned = 0 |
| for i in range(num_devices): |
| x = (num_layers - layers_assigned) / (num_devices - i) |
| if x.is_integer(): |
| balance.append(int(x)) |
| layers_assigned += x |
| else: |
| balance.append(math.ceil(x)) |
| layers_assigned += math.ceil(x) |
| return balance |
| |
| |
| def make_model_and_data(args, device): |
| device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") |
| vocab_size = 10000 |
| model, criterion, optimizer = make_model(args, device, vocab_size) |
| lm_dataset = BenchmarkLMDataset() |
| lm_dataloader = DataLoader( |
| lm_dataset, batch_size=args.batch_size, shuffle=True, num_workers=0, collate_fn=collate_sentences_lm |
| ) |
| return { |
| "model": model, |
| "criterion": criterion, |
| "optimizer": optimizer, |
| "data": lm_dataloader, |
| "vocab_size": vocab_size, |
| } |
| |
| |
| def bench_single_process(args): |
| os.environ.update({"MASTER_ADDR" : args.host}) |
| os.environ.update({"MASTER_PORT" : "10638"}) |
| |
| rpc.init_rpc( |
| "worker", |
| rank=0, |
| world_size=1, |
| ) |
| |
| num_devices = torch.cuda.device_count() if torch.cuda.is_available() else 1 |
| num_devices = min(args.num_devices, num_devices) |
| assert num_devices > 0 |
| init_random_seed(0) |
| device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") |
| |
| blob = make_model_and_data(args, None) |
| model = blob["model"] |
| |
| balance = generate_balance(num_devices, len(model)) |
| model = partition_model(model, balance) |
| p = Pipe( |
| model, chunks=args.chunks, checkpoint=args.checkpoint |
| ) |
| del model |
| del blob["model"] |
| |
| train(blob["data"], p, blob["criterion"], blob["optimizer"], blob["vocab_size"], args) |
| |
| parser = argparse.ArgumentParser(description="benchmark") |
| parser.add_argument("--host", "-o", type=str, default="localhost", help="hostname") |
| parser.add_argument("--chunks", type=int, default=4, help="number of microbatches per batch") |
| parser.add_argument("--batch-size", type=int, default=8, help="size of a batch") |
| parser.add_argument("--max-batch", type=int, default=10, help="Max number of batches") |
| parser.add_argument("--num-decoder-layers", type=int, default=10, help="Number of decoder layers in the model") |
| parser.add_argument( |
| "--checkpoint", default="except_last", choices=["always", "except_last", "never"], |
| help="Checkpointing strategy for pipe" |
| ) |
| parser.add_argument( |
| "--num-devices", type=int, default=4, help="Number of GPU devices to use" |
| ) |
| |
| if __name__ == "__main__": |
| args = parser.parse_args() |
| print(f"Running benchmark with args: {args}") |
| bench_single_process(args) |