| from typing import Dict, List, Optional |
| |
| import torch |
| import torch.optim._functional as F |
| |
| from torch import Tensor |
| |
| __all__: List[str] = [] |
| |
| # Define a TorchScript compatible Functional SGD Optimizer |
| # where we use these optimizer in a functional way. |
| # Instead of using the `param.grad` when updating parameters, |
| # we explicitly allow the distributed optimizer pass gradients to |
| # the `step` function. In this way, we could separate the gradients |
| # and parameters and allow multithreaded trainer to update the |
| # parameters without data traces on accumulating to the same .grad. |
| # NOTE: This should be only used by distributed optimizer internals |
| # and not meant to expose to the user. |
| @torch.jit.script |
| class _FunctionalSGD: |
| def __init__( |
| self, |
| params: List[Tensor], |
| lr: float = 1e-2, |
| momentum: float = 0.0, |
| dampening: float = 0.0, |
| weight_decay: float = 0.0, |
| nesterov: bool = False, |
| maximize: bool = False, |
| foreach: bool = False, |
| _allow_empty_param_list: bool = False, |
| ): |
| self.defaults = { |
| "lr": lr, |
| "momentum": momentum, |
| "dampening": dampening, |
| "weight_decay": weight_decay, |
| } |
| self.nesterov = nesterov |
| self.maximize = maximize |
| self.foreach = foreach |
| self.state = torch.jit.annotate(Dict[torch.Tensor, Dict[str, torch.Tensor]], {}) |
| |
| if len(params) == 0 and not _allow_empty_param_list: |
| raise ValueError("optimizer got an empty parameter list") |
| |
| # NOTE: we only have one param_group and don't allow user to add additional |
| # param group as it's not a common use case. |
| self.param_group = {"params": params} |
| |
| def step_param(self, param: Tensor, grad: Optional[Tensor]): |
| """Similar to self.step, but operates on a single parameter and |
| its gradient. |
| """ |
| # TODO: Once step_param interface is robust, refactor step to call |
| # step param on each param. |
| weight_decay = self.defaults["weight_decay"] |
| momentum = self.defaults["momentum"] |
| dampening = self.defaults["dampening"] |
| lr = self.defaults["lr"] |
| params = [param] |
| momentum_buffer_list: List[Optional[Tensor]] = [] |
| grads = [] |
| |
| has_sparse_grad = False |
| if grad is not None: |
| grads.append(grad) |
| if grad.is_sparse: |
| has_sparse_grad = True |
| if param not in self.state: |
| self.state[param] = {} |
| state = self.state[param] |
| if "momentum_buffer" not in state: |
| momentum_buffer_list.append(None) |
| else: |
| momentum_buffer_list.append(state["momentum_buffer"]) |
| |
| with torch.no_grad(): |
| F.sgd( |
| params, |
| grads, |
| momentum_buffer_list, |
| weight_decay=weight_decay, |
| momentum=momentum, |
| lr=lr, |
| dampening=dampening, |
| nesterov=self.nesterov, |
| maximize=self.maximize, |
| has_sparse_grad=has_sparse_grad, |
| foreach=self.foreach, |
| ) |
| # update momentum_buffer in state |
| state = self.state[param] |
| momentum_buffer = momentum_buffer_list[0] |
| if momentum_buffer is not None: |
| state["momentum_buffer"] = momentum_buffer |
| |
| def step(self, gradients: List[Optional[Tensor]]): |
| params = self.param_group["params"] |
| params_with_grad = [] |
| grads = [] |
| momentum_buffer_list: List[Optional[Tensor]] = [] |
| lr = self.defaults["lr"] |
| weight_decay = self.defaults["weight_decay"] |
| momentum = self.defaults["momentum"] |
| dampening = self.defaults["dampening"] |
| |
| if len(params) != len(gradients): |
| raise ValueError( |
| "the gradients passed in does not equal to the size of the parameters!" |
| + f"Params length: {len(params)}. " |
| + f"Gradients length: {len(gradients)}" |
| ) |
| |
| has_sparse_grad = False |
| for param, gradient in zip(params, gradients): |
| if gradient is not None: |
| params_with_grad.append(param) |
| grads.append(gradient) |
| if gradient.is_sparse: |
| has_sparse_grad = True |
| |
| if param not in self.state: |
| self.state[param] = {} |
| |
| state = self.state[param] |
| if "momentum_buffer" not in state: |
| momentum_buffer_list.append(None) |
| else: |
| momentum_buffer_list.append(state["momentum_buffer"]) |
| |
| with torch.no_grad(): |
| F.sgd( |
| params_with_grad, |
| grads, |
| momentum_buffer_list, |
| weight_decay=weight_decay, |
| momentum=momentum, |
| lr=lr, |
| dampening=dampening, |
| nesterov=self.nesterov, |
| maximize=self.maximize, |
| has_sparse_grad=has_sparse_grad, |
| foreach=self.foreach, |
| ) |
| |
| # update momentum_buffers in state |
| for i, p in enumerate(params_with_grad): |
| state = self.state[p] |
| momentum_buffer = momentum_buffer_list[i] |
| if momentum_buffer is not None: |
| state["momentum_buffer"] = momentum_buffer |