| import itertools |
| import collections.abc |
| import contextlib |
| import io |
| import logging |
| import os |
| import pickle |
| import time |
| import warnings |
| from collections import namedtuple |
| from datetime import timedelta |
| from typing import Any, Dict, Optional, Tuple, Union |
| |
| import torch |
| from torch._C._distributed_c10d import ( |
| AllreduceCoalescedOptions, |
| AllreduceOptions, |
| AllToAllOptions, |
| _DistributedBackendOptions, |
| BarrierOptions, |
| BroadcastOptions, |
| GatherOptions, |
| PrefixStore, |
| ProcessGroup, |
| ReduceOp, |
| ReduceOptions, |
| ReduceScatterOptions, |
| ScatterOptions, |
| Store, |
| DebugLevel, |
| get_debug_level, |
| Work |
| ) |
| from torch._six import string_classes |
| |
| from .constants import default_pg_timeout |
| from .rendezvous import register_rendezvous_handler, rendezvous # noqa: F401 |
| |
| __all__ = [ |
| 'Backend', 'GroupMember', 'P2POp', 'all_gather', 'all_gather_coalesced', |
| 'all_gather_multigpu', 'all_gather_object', 'all_reduce', |
| 'all_reduce_coalesced', 'all_reduce_multigpu', 'all_to_all', |
| 'all_to_all_single', 'barrier', 'batch_isend_irecv', 'broadcast', |
| 'broadcast_multigpu', 'broadcast_object_list', 'destroy_process_group', |
| 'dist_backend', 'gather', 'gather_object', 'get_backend', 'get_rank', |
| 'get_world_size', 'group', 'init_process_group', 'irecv', |
| 'is_gloo_available', 'is_initialized', 'is_mpi_available', |
| 'is_nccl_available', 'is_torchelastic_launched', 'is_ucc_available', |
| 'isend', 'monitored_barrier', 'new_group', 'new_subgroups', |
| 'new_subgroups_by_enumeration', 'recv', 'reduce', 'reduce_multigpu', |
| 'reduce_scatter', 'reduce_scatter_multigpu', 'scatter', |
| 'scatter_object_list', 'send', 'supports_complex', |
| 'AllreduceCoalescedOptions', 'AllreduceOptions', 'AllToAllOptions', |
| 'BarrierOptions', 'BroadcastOptions', 'GatherOptions', 'PrefixStore', |
| 'ProcessGroup', 'ReduceOp', 'ReduceOptions', 'ReduceScatterOptions', |
| 'ScatterOptions', 'Store', 'DebugLevel', 'get_debug_level', 'Work', |
| 'default_pg_timeout', 'get_group_rank', 'get_global_rank', 'get_process_group_ranks', |
| 'reduce_op', 'all_gather_into_tensor', 'reduce_scatter_tensor', |
| ] |
| |
| _MPI_AVAILABLE = True |
| _NCCL_AVAILABLE = True |
| _GLOO_AVAILABLE = True |
| _UCC_AVAILABLE = True |
| |
| _pickler = pickle.Pickler |
| _unpickler = pickle.Unpickler |
| |
| # Change __module__ of all imported types from torch._C._distributed_c10d that are public |
| def _export_c_types(): |
| _public_types_to_change_module = [ |
| AllreduceCoalescedOptions, |
| AllreduceOptions, |
| AllToAllOptions, |
| BarrierOptions, |
| BroadcastOptions, |
| GatherOptions, |
| PrefixStore, |
| ProcessGroup, |
| ReduceOp, |
| ReduceOptions, |
| ReduceScatterOptions, |
| ScatterOptions, |
| Store, |
| DebugLevel, |
| get_debug_level, |
| Work |
| ] |
| for type in _public_types_to_change_module: |
| type.__module__ = "torch.distributed.distributed_c10d" |
| _export_c_types() |
| |
| try: |
| from torch._C._distributed_c10d import ProcessGroupMPI |
| ProcessGroupMPI.__module__ = "torch.distributed.distributed_c10d" |
| __all__ += ["ProcessGroupMPI"] |
| except ImportError: |
| _MPI_AVAILABLE = False |
| |
| try: |
| from torch._C._distributed_c10d import ProcessGroupNCCL |
| ProcessGroupNCCL.__module__ = "torch.distributed.distributed_c10d" |
| __all__ += ["ProcessGroupNCCL"] |
| except ImportError: |
| _NCCL_AVAILABLE = False |
| |
| try: |
| from torch._C._distributed_c10d import ProcessGroupGloo |
| from torch._C._distributed_c10d import _ProcessGroupWrapper |
| ProcessGroupGloo.__module__ = "torch.distributed.distributed_c10d" |
| __all__ += ["ProcessGroupGloo"] |
| except ImportError: |
| _GLOO_AVAILABLE = False |
| |
| try: |
| from torch._C._distributed_c10d import ProcessGroupUCC |
| ProcessGroupUCC.__module__ = "torch.distributed.distributed_c10d" |
| __all__ += ["ProcessGroupUCC"] |
| except ImportError: |
| _UCC_AVAILABLE = False |
| |
| logger = logging.getLogger(__name__) |
| |
| PG_WRAPPER_STORE_PREFIX = "pg_wrapper" |
| |
| |
| # Some reduce ops are not supported by complex numbers and will result in an error. |
| # We currently provide complex support to the distributed API by viewing |
| # complex tensors as real (torch.view_as_real), meaning that calling |
| # these unsupported ops will return garbage values rather than error out. |
| # (e.g. max(2+3i, 3+2i) = 3+3i) |
| # We'd like calls to unsupported ops to error out accordingly, |
| # rather than returning garbage values. |
| def supports_complex(reduceOp: ReduceOp) -> bool: |
| denyList = [ |
| ReduceOp.MAX, |
| ReduceOp.MIN, |
| ReduceOp.PRODUCT, |
| ReduceOp.BAND, |
| ReduceOp.BOR, |
| ReduceOp.BXOR, |
| ] |
| return reduceOp not in denyList |
| |
| |
| class Backend(object): |
| """ |
| An enum-like class of available backends: GLOO, NCCL, UCC, MPI, and other registered |
| backends. |
| |
| The values of this class are lowercase strings, e.g., ``"gloo"``. They can |
| be accessed as attributes, e.g., ``Backend.NCCL``. |
| |
| This class can be directly called to parse the string, e.g., |
| ``Backend(backend_str)`` will check if ``backend_str`` is valid, and |
| return the parsed lowercase string if so. It also accepts uppercase strings, |
| e.g., ``Backend("GLOO")`` returns ``"gloo"``. |
| |
| .. note:: The entry ``Backend.UNDEFINED`` is present but only used as |
| initial value of some fields. Users should neither use it directly |
| nor assume its existence. |
| """ |
| |
| UNDEFINED = "undefined" |
| GLOO = "gloo" |
| NCCL = "nccl" |
| UCC = "ucc" |
| MPI = "mpi" |
| TCP = "tcp" |
| |
| _BackendPlugin = namedtuple("_BackendPlugin", ["creator_fn", "extended_api"]) |
| |
| _plugins: Dict[str, _BackendPlugin] = {} |
| |
| def __new__(cls, name: str): |
| if not isinstance(name, string_classes): |
| raise ValueError("Backend name must be a string, but got: {}".format(name)) |
| value = getattr(Backend, name.upper(), Backend.UNDEFINED) |
| |
| if value == Backend.TCP: |
| raise ValueError( |
| "TCP backend has been deprecated. Please use " |
| "Gloo or MPI backend for collective operations " |
| "on CPU tensors." |
| ) |
| elif value == Backend.UNDEFINED: |
| raise ValueError("Invalid backend: '{}'".format(name)) |
| elif value != Backend.GLOO and value != Backend.NCCL and value != Backend.UCC and value != Backend.MPI: |
| value = name.lower() |
| return value |
| |
| @classmethod |
| def register_backend(cls, name, func, extended_api=False): |
| """ |
| Registers a new backend with the given name and instantiating function. |
| |
| This class method is used by 3rd party ``ProcessGroup`` extension to |
| register new backends. |
| |
| Args: |
| name (str): Backend name of the ``ProcessGroup`` extension. It |
| should match the one in ``init_process_group()``. |
| func (function): Function handler that instantiates the backend. |
| The function should be implemented in the backend |
| extension and takes four arguments, including |
| ``store``, ``rank``, ``world_size``, and ``timeout``. |
| extended_api (bool, optional): Whether the backend supports extended argument structure. |
| Default: ``False``. If set to ``True``, the backend |
| will get an instance of ``c10d::DistributedBackendOptions``, and |
| a process group options object as defined by the backend implementation. |
| |
| .. note:: This support of 3rd party backend is experimental and subject to change. |
| |
| """ |
| # Allow UCC plugin if Pytorch is not built with native support. |
| # TODO: remove this exception once UCC plugin is fully deprecated. |
| if (name != Backend.UCC or (name == Backend.UCC and is_ucc_available())): |
| assert not hasattr(Backend, name.upper()), ( |
| f"{name.upper()} c10d backend already exist" |
| ) |
| assert name.upper() not in Backend._plugins, ( |
| f"{name.upper()} c10d backend creator function already exist" |
| ) |
| |
| setattr(Backend, name.upper(), name.upper()) |
| Backend._plugins[name.upper()] = Backend._BackendPlugin(func, extended_api) |
| |
| |
| # `_backend`, `dist_backend`, and `reduce_op` are here to maintain backward |
| # compatibility with pre-c10d distributed package. |
| # TODO: remove them when users are ready to take a hard dependency on PyTorch 1. |
| _backend: str = Backend.UNDEFINED |
| dist_backend = Backend |
| |
| |
| class _reduce_op(object): |
| r""" |
| Deprecated enum-like class for reduction operations: ``SUM``, ``PRODUCT``, |
| ``MIN``, and ``MAX``. |
| |
| :class:`~torch.distributed.ReduceOp` is recommended to use instead. |
| """ |
| |
| def __init__(self): |
| # __members__ is a dict storing key-value pairs for enum classes |
| for k, v in ReduceOp.RedOpType.__members__.items(): |
| setattr(self, k, v) |
| self.__members__ = ReduceOp.RedOpType.__members__ |
| |
| def __getattribute__(self, key): |
| warnings.warn( |
| "torch.distributed.reduce_op is deprecated, please use " |
| "torch.distributed.ReduceOp instead" |
| ) |
| return object.__getattribute__(self, key) |
| |
| |
| reduce_op = _reduce_op() |
| |
| |
| class group(object): |
| # Points to the default PG once initialized. |
| WORLD: Optional[ProcessGroup] = None |
| |
| |
| class GroupMember(object): |
| # Alias to group.WORLD for backward compatibility |
| WORLD = group.WORLD |
| NON_GROUP_MEMBER = object() |
| |
| |
| # Cached process groups |
| # For NCCL and GLOO pg, it is a map from ProcessGroup to (Backend, Store) |
| # For MPI pg, it is a map from ProcessGroup to (Backend, None) |
| _pg_map: Dict[ProcessGroup, Tuple[str, Optional[Store]]] = {} |
| # Process group's names, map from ProcessGroup to str |
| _pg_names: Dict[ProcessGroup, str] = {} |
| # Process group's global rank to local rank mapping |
| _pg_group_ranks: Dict[ProcessGroup, Dict[int, int]] = {} |
| |
| # Default process group state |
| _default_pg_init_method = None |
| |
| # Process group count for default naming |
| _group_count = 0 |
| |
| STORE_BASED_BARRIER_PREFIX = "store_based_barrier_key" |
| |
| |
| def _get_pg_device(group: ProcessGroup): |
| """ |
| Returns the device to use with ``group``. |
| This is cuda for NCCL and CPU for everything else |
| """ |
| if _check_for_nccl_backend(group): |
| return torch.device("cuda", torch.cuda.current_device()) |
| return torch.device("cpu") |
| |
| |
| def _store_based_barrier(rank, store, timeout): |
| """ |
| Barrier based on store which is used for synchronizing processes after |
| ``init_process_group`` or ``new_group``. Intended to be used only with |
| those two methods and is not a generic alternative to ``barrier()``. |
| """ |
| store_key = "{}:{}".format(STORE_BASED_BARRIER_PREFIX, _group_count) |
| store.add(store_key, 1) |
| logger.info("Added key: {} to store for rank: {}".format(store_key, rank)) |
| |
| # Now wait for all workers to check in with the store. |
| world_size = get_world_size() |
| # Use 'add' instead of 'get' since for some store implementations 'add' |
| # doesn't work well with 'get'. Ideally the store implementations should |
| # be fixed, but for backward compatiblity reasons it is risky to change |
| # the store implementations. Once, we completely migrate away from these |
| # legacy stores, we can use 'get' here instead. |
| worker_count = store.add(store_key, 0) |
| start = time.time() |
| log_time = time.time() |
| while worker_count != world_size: |
| time.sleep(0.01) |
| worker_count = store.add(store_key, 0) |
| |
| # Print status periodically to keep track. |
| if timedelta(seconds=(time.time() - log_time)) > timedelta(seconds=10): |
| logger.info( |
| "Waiting in store based barrier to initialize process group for " |
| "rank: {}, key: {} (world_size={}, worker_count={}, timeout={})".format( |
| rank, store_key, world_size, worker_count, timeout |
| ) |
| ) |
| log_time = time.time() |
| |
| if timedelta(seconds=(time.time() - start)) > timeout: |
| raise RuntimeError( |
| "Timed out initializing process group in store based barrier on " |
| "rank: {}, for key: {} (world_size={}, worker_count={}, timeout={})".format( |
| rank, store_key, world_size, worker_count, timeout |
| ) |
| ) |
| |
| logger.info( |
| f"Rank {rank}: Completed store-based barrier for key:{store_key} with {world_size} nodes." |
| ) |
| |
| |
| def _rank_not_in_group(group: ProcessGroup): |
| """ |
| Helper that checks if the current process's rank is not in a given group. |
| """ |
| if group is None: |
| return False |
| return group == GroupMember.NON_GROUP_MEMBER |
| |
| |
| def _warn_not_in_group(op_name): |
| global_rank = -1 if GroupMember.WORLD is None else GroupMember.WORLD.rank() |
| warnings.warn( |
| f"Running {op_name} on global rank {global_rank} which does not " |
| "belong to the given group." |
| ) |
| |
| |
| def get_group_rank(group: ProcessGroup, global_rank: int) -> int: |
| """ |
| Translate a global rank into a group rank. |
| |
| ``global_rank`` must be part of ``group`` otherwise this raises RuntimeError. |
| |
| Args: |
| group (ProcessGroup): ProcessGroup to find the relative rank. |
| global_rank (int): Global rank to query. |
| |
| Returns: |
| Group rank of ``global_rank`` relative to ``group`` |
| |
| N.B. calling this function on the default process group returns identity |
| """ |
| if group is GroupMember.WORLD: |
| return global_rank |
| if group not in _pg_group_ranks: |
| raise RuntimeError(f"Group {group} is not registered, please create group with torch.distributed.new_group API") |
| group_ranks = _pg_group_ranks[group] |
| if global_rank not in group_ranks: |
| raise RuntimeError(f"Global rank {global_rank} is not part of group {group}") |
| |
| return group_ranks[global_rank] |
| |
| def get_global_rank(group: ProcessGroup, group_rank: int) -> int: |
| """ |
| Translate a group rank into a global rank. |
| |
| ``group_rank`` must be part of `group` otherwise this raises RuntimeError. |
| |
| Args: |
| group (ProcessGroup): ProcessGroup to find the global rank from. |
| group_rank (int): Group rank to query. |
| |
| Returns: |
| Global rank of ``group_rank`` relative to ``group`` |
| |
| N.B. calling this function on the default process group returns identity |
| """ |
| if group is GroupMember.WORLD: |
| return group_rank |
| if group not in _pg_group_ranks: |
| raise RuntimeError(f"Group {group} is not registered, please create group with torch.distributed.new_group API") |
| for rank, grp_rank in _pg_group_ranks[group].items(): |
| if grp_rank == group_rank: |
| return rank |
| raise RuntimeError(f"Group rank {group_rank} is not part of group {group}") |
| |
| # TODO: remove this once the ecosystem moves away from it. |
| def _get_global_rank(group, rank): |
| """ |
| This method is deprecated, please use get_global_rank. |
| """ |
| warnings.warn( |
| "torch.distributed.distributed_c10d._get_global_rank is deprecated " |
| "please use torch.distributed.distributed_c10d.get_global_rank instead" |
| ) |
| return get_global_rank(group, rank) |
| |
| |
| def get_process_group_ranks(group: ProcessGroup): |
| """ |
| Get all ranks associated with ``group``. |
| |
| Args: |
| group (ProcessGroup): ProcessGroup to get all ranks from. |
| |
| Returns: |
| List of global ranks ordered by group rank. |
| """ |
| return list(_pg_group_ranks[group].keys()) |
| |
| def _get_group_size(group): |
| """ |
| Helper that gets a given group's world size. |
| """ |
| if group is GroupMember.WORLD or group is None: |
| default_pg = _get_default_group() |
| return default_pg.size() |
| return group.size() |
| |
| |
| def _check_single_tensor(param, param_name): |
| """ |
| Helper to check that the parameter ``param_name`` is a single tensor. |
| """ |
| if not isinstance(param, torch.Tensor): |
| raise RuntimeError( |
| "Invalid function argument. Expected parameter `{}` " |
| "to be of type torch.Tensor.".format(param_name) |
| ) |
| |
| |
| def _check_tensor_list(param, param_name): |
| """ |
| Helper to check that the parameter ``param_name`` is a list of tensors. |
| """ |
| if not isinstance(param, list) or not all( |
| isinstance(p, torch.Tensor) for p in param |
| ): |
| raise RuntimeError( |
| "Invalid function argument. Expected parameter `{}` " |
| "to be of type List[torch.Tensor].".format(param_name) |
| ) |
| |
| def _as_iterable(obj) -> collections.abc.Iterable: |
| return obj if isinstance(obj, list) else (obj,) |
| |
| def _ensure_all_tensors_same_dtype(*tensors) -> None: |
| last_dtype = None |
| for tensor in itertools.chain(*map(_as_iterable, tensors)): |
| tensor_dtype = tensor.dtype |
| # Mixing complex and its element type is allowed |
| if tensor_dtype.is_complex: |
| tensor_dtype = torch.float32 if tensor_dtype == torch.complex64 else torch.complex128 |
| |
| if last_dtype is None: |
| last_dtype = tensor_dtype |
| else: |
| if last_dtype != tensor_dtype: |
| raise RuntimeError( |
| "Invalid usage of tensors with different dtypes" |
| f"Found {last_dtype} and {tensor.dtype}" |
| ) |
| |
| |
| def _check_op(op): |
| """ |
| Helper to check that the ``op`` is either isend or irecv. |
| """ |
| if op not in [isend, irecv]: |
| raise RuntimeError( |
| "Invalid ``op``. Expected ``op`` " |
| "to be of type ``torch.distributed.isend`` or " |
| "``torch.distributed.irecv``." |
| ) |
| |
| |
| def _check_p2p_op_list(p2p_op_list): |
| """ |
| Helper to check that the ``p2p_op_list`` is a list of P2POp instances and |
| all ops use the same group. |
| """ |
| if not isinstance(p2p_op_list, list) or not all( |
| isinstance(p2p_op, P2POp) for p2p_op in p2p_op_list |
| ): |
| raise RuntimeError( |
| "Invalid ``p2p_op_list``. Each op is expected to " |
| "to be of type ``torch.distributed.P2POp``." |
| ) |
| |
| group = p2p_op_list[0].group |
| if not all(group == p2p_op.group for p2p_op in p2p_op_list): |
| raise RuntimeError("All ops need to use the same group.") |
| |
| |
| def is_mpi_available() -> bool: |
| """ |
| Checks if the MPI backend is available. |
| """ |
| return _MPI_AVAILABLE |
| |
| |
| def is_nccl_available() -> bool: |
| """ |
| Checks if the NCCL backend is available. |
| """ |
| return _NCCL_AVAILABLE |
| |
| |
| def is_gloo_available() -> bool: |
| """ |
| Checks if the Gloo backend is available. |
| """ |
| return _GLOO_AVAILABLE |
| |
| |
| def is_ucc_available() -> bool: |
| """ |
| Checks if the UCC backend is available. |
| """ |
| return _UCC_AVAILABLE |
| |
| |
| def is_initialized() -> bool: |
| """ |
| Checking if the default process group has been initialized |
| """ |
| return GroupMember.WORLD is not None |
| |
| |
| def is_torchelastic_launched() -> bool: |
| """ |
| Checks whether this process was launched with ``torch.distributed.elastic`` |
| (aka torchelastic). The existence of ``TORCHELASTIC_RUN_ID`` environment |
| variable is used as a proxy to determine whether the current process |
| was launched with torchelastic. This is a reasonable proxy since |
| ``TORCHELASTIC_RUN_ID`` maps to the rendezvous id which is always a |
| non-null value indicating the job id for peer discovery purposes.. |
| """ |
| return os.getenv("TORCHELASTIC_RUN_ID") is not None |
| |
| |
| def _get_default_group(): |
| """ |
| Getting the default process group created by init_process_group |
| """ |
| if not is_initialized(): |
| raise RuntimeError( |
| "Default process group has not been initialized, " |
| "please make sure to call init_process_group." |
| ) |
| return GroupMember.WORLD |
| |
| |
| def _get_default_store(): |
| """ |
| Getting the default store created by init_process_group |
| """ |
| if not is_initialized(): |
| raise RuntimeError( |
| "Default process group has not been initialized, " |
| "please make sure to call init_process_group." |
| ) |
| default_pg = _get_default_group() |
| _, default_store = _pg_map[default_pg] |
| return default_store |
| |
| |
| def _update_default_pg(pg): |
| GroupMember.WORLD = group.WORLD = pg |
| |
| |
| def get_backend(group: Optional[ProcessGroup] = None) -> str: |
| """ |
| Returns the backend of the given process group. |
| |
| Args: |
| group (ProcessGroup, optional): The process group to work on. The |
| default is the general main process group. If another specific group |
| is specified, the calling process must be part of :attr:`group`. |
| |
| Returns: |
| The backend of the given process group as a lower case string. |
| |
| """ |
| if group is None: |
| pg = _get_default_group() |
| else: |
| pg = group |
| if _rank_not_in_group(pg): |
| raise RuntimeError("Invalid process group specified") |
| pg_store = _pg_map.get(pg, None) |
| assert pg_store is not None |
| return pg_store[0] |
| |
| |
| def init_process_group( |
| backend: Union[str, Backend], |
| init_method: Optional[str] = None, |
| timeout: timedelta = default_pg_timeout, |
| world_size: int = -1, |
| rank: int = -1, |
| store: Optional[Store] = None, |
| group_name: str = "", |
| pg_options: Optional[Any] = None, |
| ): |
| """ |
| Initializes the default distributed process group, and this will also |
| initialize the distributed package. |
| |
| There are 2 main ways to initialize a process group: |
| 1. Specify ``store``, ``rank``, and ``world_size`` explicitly. |
| 2. Specify ``init_method`` (a URL string) which indicates where/how |
| to discover peers. Optionally specify ``rank`` and ``world_size``, |
| or encode all required parameters in the URL and omit them. |
| |
| If neither is specified, ``init_method`` is assumed to be "env://". |
| |
| |
| Args: |
| backend (str or Backend): The backend to use. Depending on |
| build-time configurations, valid values include ``mpi``, ``gloo``, |
| ``nccl``, and ``ucc``. This field should be given as a lowercase |
| string (e.g., ``"gloo"``), which can also be accessed via |
| :class:`Backend` attributes (e.g., ``Backend.GLOO``). If using |
| multiple processes per machine with ``nccl`` backend, each process |
| must have exclusive access to every GPU it uses, as sharing GPUs |
| between processes can result in deadlocks. ``ucc`` backend is |
| experimental. |
| init_method (str, optional): URL specifying how to initialize the |
| process group. Default is "env://" if no |
| ``init_method`` or ``store`` is specified. |
| Mutually exclusive with ``store``. |
| world_size (int, optional): Number of processes participating in |
| the job. Required if ``store`` is specified. |
| rank (int, optional): Rank of the current process (it should be a |
| number between 0 and ``world_size``-1). |
| Required if ``store`` is specified. |
| store(Store, optional): Key/value store accessible to all workers, used |
| to exchange connection/address information. |
| Mutually exclusive with ``init_method``. |
| timeout (timedelta, optional): Timeout for operations executed against |
| the process group. Default value equals 30 minutes. |
| This is applicable for the ``gloo`` backend. For ``nccl``, this is |
| applicable only if the environment variable ``NCCL_BLOCKING_WAIT`` |
| or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When |
| ``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the |
| process will block and wait for collectives to complete before |
| throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set, |
| this is the duration after which collectives will be aborted |
| asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT`` |
| will provide errors to the user which can be caught and handled, |
| but due to its blocking nature, it has a performance overhead. On |
| the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little |
| performance overhead, but crashes the process on errors. This is |
| done since CUDA execution is async and it is no longer safe to |
| continue executing user code since failed async NCCL operations |
| might result in subsequent CUDA operations running on corrupted |
| data. Only one of these two environment variables should be set. |
| For ``ucc``, blocking wait is supported similar to NCCL. However, |
| async error handling is done differently since with UCC we have |
| progress thread and not watch-dog thread. |
| group_name (str, optional, deprecated): Group name. |
| pg_options (ProcessGroupOptions, optional): process group options |
| specifying what additional options need to be passed in during |
| the construction of specific process groups. As of now, the only |
| options we support is ``ProcessGroupNCCL.Options`` for the ``nccl`` |
| backend, ``is_high_priority_stream`` can be specified so that |
| the nccl backend can pick up high priority cuda streams when |
| there're compute kernels waiting. |
| |
| .. note:: To enable ``backend == Backend.MPI``, PyTorch needs to be built from source |
| on a system that supports MPI. |
| |
| """ |
| global _pg_group_ranks |
| global _backend |
| global _default_pg_init_method |
| |
| if not isinstance(timeout, timedelta): |
| raise RuntimeError( |
| "Expected timeout argument to be of type" "datetime.timedelta" |
| ) |
| |
| if GroupMember.WORLD is not None: |
| raise RuntimeError("trying to initialize the default process group " "twice!") |
| |
| assert (store is None) or ( |
| init_method is None |
| ), "Cannot specify both init_method and store." |
| |
| if store is not None: |
| assert world_size > 0, "world_size must be positive if using store" |
| assert rank >= 0, "rank must be non-negative if using store" |
| elif init_method is None: |
| init_method = "env://" |
| |
| backend = Backend(backend) |
| |
| if backend == Backend.MPI: |
| if world_size != -1 or rank != -1: |
| warnings.warn( |
| "For MPI backend, world_size ({}) and rank ({}) " |
| "are ignored since they are assigned by the " |
| "MPI runtime.".format(world_size, rank) |
| ) |
| |
| default_pg = _new_process_group_helper( |
| -1, -1, [], Backend.MPI, None, group_name=group_name, timeout=timeout |
| ) |
| _update_default_pg(default_pg) |
| else: |
| # backward compatible API |
| if store is None: |
| rendezvous_iterator = rendezvous( |
| init_method, rank, world_size, timeout=timeout |
| ) |
| store, rank, world_size = next(rendezvous_iterator) |
| store.set_timeout(timeout) |
| |
| # Use a PrefixStore to avoid accidental overrides of keys used by |
| # different systems (e.g. RPC) in case the store is multi-tenant. |
| store = PrefixStore("default_pg", store) |
| |
| default_pg = _new_process_group_helper( |
| world_size, |
| rank, |
| [], |
| backend, |
| store, |
| pg_options=pg_options, |
| group_name=group_name, |
| timeout=timeout, |
| ) |
| _update_default_pg(default_pg) |
| |
| _pg_group_ranks[GroupMember.WORLD] = {i: i for i in range(GroupMember.WORLD.size())} # type: ignore[attr-defined, index] |
| _backend = _pg_map[GroupMember.WORLD][0] # type: ignore[index] |
| _default_pg_init_method = init_method |
| |
| # barrier at the end to ensure that once we return from this method, all |
| # process groups including global variables are updated correctly on all |
| # ranks. |
| if backend == Backend.MPI: |
| # MPI backend doesn't use store. |
| barrier() |
| else: |
| # Use store based barrier here since barrier() used a bunch of |
| # default devices and messes up NCCL internal state. |
| _store_based_barrier(rank, store, timeout) |
| # Set sequence numbers for gloo and nccl process groups. |
| if get_backend(default_pg) in [Backend.GLOO, Backend.NCCL]: |
| default_pg._set_sequence_number_for_group() |
| |
| |
| def _new_process_group_helper( |
| group_size, |
| group_rank, |
| global_ranks_in_group, |
| backend, |
| store, |
| pg_options=None, |
| group_name=None, |
| timeout=default_pg_timeout, |
| ): |
| """ |
| Create a new distributed process group. |
| |
| This function must be called by ALL processes in the global group, even if |
| the calling process is not part of the newly created group. In that case, |
| this function returns GroupMember.NON_GROUP_MEMBER. |
| |
| This function is called with ``group_ranks == []`` for the default group. |
| """ |
| global _pg_map |
| global _group_count |
| global _pg_names |
| |
| if not group_name: |
| group_name = str(_group_count) |
| _group_count += 1 |
| |
| if group_name in _pg_names.values(): |
| raise RuntimeError( |
| "The specified group name has already been " |
| "created, please use a different group name" |
| ) |
| |
| if not isinstance(timeout, timedelta): |
| raise RuntimeError( |
| "Expected timeout argument to be of type" "datetime.timedelta" |
| ) |
| |
| # The list of group ranks is empty if we're creating the default group. |
| is_default_group = len(global_ranks_in_group) == 0 |
| |
| backend = Backend(backend) |
| pg: Union[ProcessGroupGloo, ProcessGroupMPI, ProcessGroupNCCL, ProcessGroupUCC] |
| if backend == Backend.MPI: |
| if not is_mpi_available(): |
| raise RuntimeError( |
| "Distributed package doesn't have MPI built in." |
| " MPI is only included if you build PyTorch from" |
| " source on a host that has MPI installed." |
| ) |
| pg = ProcessGroupMPI.create(global_ranks_in_group) |
| if not pg: |
| return GroupMember.NON_GROUP_MEMBER |
| _pg_map[pg] = (Backend.MPI, None) |
| _pg_names[pg] = group_name |
| else: |
| # If this is a subgroup (which means group_ranks is specified), |
| # we check if the current process is a member of the new group. |
| if not is_default_group: |
| global_rank = _get_default_group().rank() |
| if global_rank not in global_ranks_in_group: |
| return GroupMember.NON_GROUP_MEMBER |
| |
| # Use the group name as prefix in the default store, such that |
| # a single store can be reused by multiple groups. |
| prefix_store = PrefixStore(group_name, store) |
| |
| if backend == Backend.GLOO: |
| if pg_options is not None: |
| raise RuntimeError("GLOO options not supported") |
| pg = ProcessGroupGloo(prefix_store, group_rank, group_size, timeout=timeout) |
| # In debug mode and if GLOO is available, wrap in a wrapper PG that |
| # enables enhanced collective checking for debugability. |
| if get_debug_level() == DebugLevel.DETAIL: |
| if not _GLOO_AVAILABLE: |
| logger.info( |
| """TORCH_DISTRIBUTED_DEBUG was set to DETAIL, but |
| GLOO is not available. Build with Gloo to |
| create a wrapper process group in debug mode |
| to aid collective desynchronization debugging.""" |
| ) |
| else: |
| pg = _create_process_group_wrapper( |
| wrapped_pg=pg, |
| store_prefix=group_name, |
| store=store, |
| rank=group_rank, |
| world_size=group_size, |
| timeout=timeout, |
| ) |
| _pg_map[pg] = (Backend.GLOO, store) |
| _pg_names[pg] = group_name |
| elif backend == Backend.NCCL: |
| if not is_nccl_available(): |
| raise RuntimeError("Distributed package doesn't have NCCL " "built in") |
| if pg_options is not None: |
| assert isinstance( |
| pg_options, ProcessGroupNCCL.Options |
| ), "Expected pg_options argument to be of type ProcessGroupNCCL.Options" |
| else: |
| # default pg_options for NCCL |
| pg_options = ProcessGroupNCCL.Options() |
| pg_options.is_high_priority_stream = False |
| pg_options._timeout = timeout |
| |
| pg = ProcessGroupNCCL(prefix_store, group_rank, group_size, pg_options) |
| # In debug mode and if GLOO is available, wrap in a wrapper PG that |
| # enables enhanced collective checking for debugability. |
| if get_debug_level() == DebugLevel.DETAIL: |
| if not _GLOO_AVAILABLE: |
| logger.info( |
| """TORCH_DISTRIBUTED_DEBUG was set to DETAIL, but |
| GLOO is not available. Build with Gloo to |
| create a wrapper process group in debug mode |
| to aid collective desynchronization debugging.""" |
| ) |
| else: |
| pg = _create_process_group_wrapper( |
| wrapped_pg=pg, |
| store_prefix=group_name, |
| store=store, |
| rank=group_rank, |
| world_size=group_size, |
| timeout=timeout, |
| ) |
| _pg_map[pg] = (Backend.NCCL, store) |
| _pg_names[pg] = group_name |
| elif backend == Backend.UCC and is_ucc_available(): |
| # TODO: once UCC plugin is fully deprecated, remove |
| # is_ucc_available() from above elif-condition and raise |
| # RuntimeError if is_ucc_available() returns false. |
| |
| pg = ProcessGroupUCC(prefix_store, group_rank, group_size, timeout=timeout) |
| # In debug mode and if GLOO is available, wrap in a wrapper PG that |
| # enables enhanced collective checking for debugability. |
| if get_debug_level() == DebugLevel.DETAIL: |
| if not _GLOO_AVAILABLE: |
| logger.info( |
| """TORCH_DISTRIBUTED_DEBUG was set to DETAIL, but |
| GLOO is not available. Build with Gloo to |
| create a wrapper process group in debug mode |
| to aid collective desynchronization debugging.""" |
| ) |
| else: |
| pg = _create_process_group_wrapper( |
| wrapped_pg=pg, |
| store_prefix=group_name, |
| store=store, |
| rank=group_rank, |
| world_size=group_size, |
| timeout=timeout, |
| ) |
| _pg_map[pg] = (Backend.UCC, store) |
| _pg_names[pg] = group_name |
| else: |
| assert backend.upper() in Backend._plugins, ( |
| f"Unknown c10d backend type {backend.upper()}" |
| ) |
| |
| backend_plugin = Backend._plugins[backend.upper()] |
| creator_fn = backend_plugin.creator_fn |
| extended_api = backend_plugin.extended_api |
| |
| if not extended_api: |
| pg = creator_fn(prefix_store, group_rank, group_size, timeout) |
| else: |
| dist_backend_opts = _DistributedBackendOptions() |
| dist_backend_opts.store = prefix_store |
| dist_backend_opts.group_rank = group_rank |
| dist_backend_opts.group_size = group_size |
| dist_backend_opts.timeout = timeout |
| dist_backend_opts.group_id = group_name |
| dist_backend_opts.global_ranks_in_group = global_ranks_in_group |
| |
| pg = creator_fn(dist_backend_opts, pg_options) |
| _pg_map[pg] = (backend, store) |
| _pg_names[pg] = group_name |
| |
| return pg |
| |
| |
| def destroy_process_group(group: Optional[ProcessGroup] = None): |
| """ |
| Destroy a given process group, and deinitialize the distributed package |
| |
| Args: |
| group (ProcessGroup, optional): The process group to be destroyed, if |
| group.WORLD is given, all process |
| groups including the default one will |
| be destroyed. |
| """ |
| global _pg_map |
| global _pg_names |
| global _pg_group_ranks |
| global _default_pg_init_method |
| global _group_count |
| |
| if group == GroupMember.NON_GROUP_MEMBER: |
| return |
| |
| if group is None: |
| pg = GroupMember.WORLD |
| else: |
| pg = group |
| |
| assert pg is not None |
| if _pg_map.get(pg, None) is None: |
| raise RuntimeError("Invalid process group specified") |
| |
| if group is None or group == GroupMember.WORLD: |
| _update_default_pg(None) |
| _default_pg_init_method = None |
| _pg_map.clear() |
| _pg_names.clear() |
| _pg_group_ranks.clear() |
| |
| # when process group doesn't have an explicit name (only WORLD (default) |
| # process group can have an explicit name), we use global _group_counter |
| # to generate the name. We need to reset the counter on destruction to |
| # allow consistent value to be generated when we re-create process |
| # groups after some trainers recover from failure |
| # |
| # We only reset this when WORLD is being destroyed because if this |
| # process group is in good state, we aren't dealing with failures. |
| _group_count = 0 |
| else: |
| del _pg_map[pg] |
| del _pg_names[pg] |
| del _pg_group_ranks[pg] |
| |
| |
| def get_rank(group: Optional[ProcessGroup] = None) -> int: |
| """ |
| Returns the rank of the current process in the provided ``group`` or the |
| default group if none was provided. |
| |
| Rank is a unique identifier assigned to each process within a distributed |
| process group. They are always consecutive integers ranging from 0 to |
| ``world_size``. |
| |
| Args: |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| |
| Returns: |
| The rank of the process group |
| -1, if not part of the group |
| |
| """ |
| if _rank_not_in_group(group): |
| return -1 |
| |
| default_pg = _get_default_group() |
| if group is None or group is GroupMember.WORLD: |
| return default_pg.rank() |
| |
| return get_group_rank(group, default_pg.rank()) |
| |
| |
| def get_world_size(group: Optional[ProcessGroup] = None) -> int: |
| """ |
| Returns the number of processes in the current process group |
| |
| Args: |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| |
| Returns: |
| The world size of the process group |
| -1, if not part of the group |
| |
| """ |
| if _rank_not_in_group(group): |
| return -1 |
| |
| return _get_group_size(group) |
| |
| |
| def isend(tensor: torch.Tensor, dst: int, group: Optional[ProcessGroup] = None, tag: int = 0) -> Work: |
| """ |
| Sends a tensor asynchronously. |
| |
| .. warning:: |
| Modifying ``tensor`` before the request completes causes undefined |
| behavior. |
| |
| Args: |
| tensor (Tensor): Tensor to send. |
| dst (int): Destination rank. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| tag (int, optional): Tag to match send with remote recv |
| |
| Returns: |
| A distributed request object. |
| None, if not part of the group |
| |
| """ |
| _check_single_tensor(tensor, "tensor") |
| if _rank_not_in_group(group): |
| _warn_not_in_group("isend") |
| return |
| |
| if group is None or group is GroupMember.WORLD: |
| default_pg = _get_default_group() |
| return default_pg.send([tensor], dst, tag) |
| else: |
| group_dst_rank = get_group_rank(group, dst) |
| return group.send([tensor], group_dst_rank, tag) |
| |
| |
| def irecv(tensor: torch.Tensor, src: Optional[int] = None, group: Optional[ProcessGroup] = None, tag: int = 0) -> Work: |
| """ |
| Receives a tensor asynchronously. |
| |
| Args: |
| tensor (Tensor): Tensor to fill with received data. |
| src (int, optional): Source rank. Will receive from any |
| process if unspecified. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| tag (int, optional): Tag to match recv with remote send |
| |
| Returns: |
| A distributed request object. |
| None, if not part of the group |
| |
| """ |
| _check_single_tensor(tensor, "tensor") |
| if _rank_not_in_group(group): |
| _warn_not_in_group("irecv") |
| return |
| |
| if group is None or group is GroupMember.WORLD: |
| pg = _get_default_group() |
| else: |
| pg = group |
| |
| if src is None: |
| return pg.recv_anysource([tensor], tag) |
| else: |
| if pg is GroupMember.WORLD: |
| return pg.recv([tensor], src, tag) |
| else: |
| group_src_rank = get_group_rank(pg, src) |
| return pg.recv([tensor], group_src_rank, tag) |
| |
| |
| def send(tensor: torch.Tensor, dst: int, group: Optional[ProcessGroup] = None, tag: int = 0) -> Work: |
| """ |
| Sends a tensor synchronously. |
| |
| Args: |
| tensor (Tensor): Tensor to send. |
| dst (int): Destination rank. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| tag (int, optional): Tag to match send with remote recv |
| |
| """ |
| _check_single_tensor(tensor, "tensor") |
| if _rank_not_in_group(group): |
| _warn_not_in_group("send") |
| return |
| |
| if group is None or group is GroupMember.WORLD: |
| default_pg = _get_default_group() |
| default_pg.send([tensor], dst, tag).wait() |
| else: |
| group_dst_rank = get_group_rank(group, dst) |
| group.send([tensor], group_dst_rank, tag).wait() |
| |
| |
| def recv(tensor: torch.Tensor, src: Optional[int] = None, group: Optional[ProcessGroup] = None, tag: int = 0) -> Work: |
| """ |
| Receives a tensor synchronously. |
| |
| Args: |
| tensor (Tensor): Tensor to fill with received data. |
| src (int, optional): Source rank. Will receive from any |
| process if unspecified. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| tag (int, optional): Tag to match recv with remote send |
| |
| Returns: |
| Sender rank |
| -1, if not part of the group |
| |
| """ |
| _check_single_tensor(tensor, "tensor") |
| if _rank_not_in_group(group): |
| _warn_not_in_group("recv") |
| return -1 |
| |
| if group is None: |
| pg = _get_default_group() |
| else: |
| pg = group |
| |
| if src is None: |
| work = pg.recv_anysource([tensor], tag) |
| work.wait() |
| src_rank = work._source_rank() |
| if group is None or group is GroupMember.WORLD: |
| return src_rank |
| else: |
| return get_global_rank(pg, src_rank) |
| else: |
| if group is None or group is GroupMember.WORLD: |
| pg.recv([tensor], src, tag).wait() |
| else: |
| group_src_rank = get_group_rank(pg, src) |
| pg.recv([tensor], group_src_rank, tag).wait() |
| return src |
| |
| |
| class P2POp(object): |
| """ |
| A class to build point-to-point operations for ``batch_isend_irecv``. |
| |
| This class builds the type of P2P operation, communication buffer, peer rank, |
| Process Group group, and tag. Instances of this class will be passed to |
| ``batch_isend_irecv`` for point-to-point communications. |
| |
| Args: |
| op (Callable): A function to send data to or receive data from a peer process. |
| The type of ``op`` is either ``torch.distributed.isend`` or |
| ``torch.distributed.irecv``. |
| tensor (Tensor): Tensor to send or receive. |
| peer (int): Destination or source rank. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| tag (int, optional): Tag to match send with recv. |
| """ |
| |
| def __init__(self, op, tensor, peer, group=None, tag=0): |
| self.op = op |
| self.tensor = tensor |
| self.peer = peer |
| self.group = group |
| self.tag = tag |
| |
| def __new__(cls, op, tensor, peer, group=None, tag=0): |
| _check_op(op) |
| _check_single_tensor(tensor, "tensor") |
| return object.__new__(cls) |
| |
| |
| @contextlib.contextmanager |
| def _coalescing_manager(group, reqs): |
| if group is None: |
| group = _get_default_group() |
| group._start_coalescing() |
| try: |
| yield |
| finally: |
| group._end_coalescing(reqs) |
| |
| |
| def batch_isend_irecv(p2p_op_list): |
| """ |
| Send or Receive a batch of tensors asynchronously and return a list of requests. |
| |
| Process each of the operations in ``p2p_op_list`` and return the corresponding |
| requests. NCCL, Gloo, and UCC backend are currently supported. |
| |
| Args: |
| p2p_op_list: A list of point-to-point operations(type of each operator is |
| ``torch.distributed.P2POp``). The order of the isend/irecv in the list |
| matters and it needs to match with corresponding isend/irecv on the |
| remote end. |
| |
| Returns: |
| A list of distributed request objects returned by calling the corresponding |
| op in the op_list. |
| |
| Examples: |
| >>> # xdoctest: +SKIP("no rank") |
| >>> send_tensor = torch.arange(2) + 2 * rank |
| >>> recv_tensor = torch.randn(2) |
| >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1)%world_size) |
| >>> recv_op = dist.P2POp(dist.irecv, recv_tensor, (rank - 1 + world_size)%world_size) |
| >>> reqs = batch_isend_irecv([send_op, recv_op]) |
| >>> for req in reqs: |
| >>> req.wait() |
| >>> recv_tensor |
| tensor([2, 3]) # Rank 0 |
| tensor([0, 1]) # Rank 1 |
| |
| .. note:: Note that when this API is used with the NCCL PG backend, users must set |
| the current GPU device with `torch.cuda.set_device`, otherwise it will |
| lead to unexpected hang issues. |
| |
| In addition, if this API is the first collective call in the ``group`` |
| passed to ``dist.P2POp``, all ranks of the ``group`` must participate in |
| this API call; otherwise, the behavior is undefined. If this API call is |
| not the first collective call in the ``group``, batched P2P operations |
| involving only a subset of ranks of the ``group`` are allowed. |
| """ |
| _check_p2p_op_list(p2p_op_list) |
| group = p2p_op_list[0].group |
| reqs = [] |
| with _coalescing_manager(group, reqs): |
| for p2p_op in p2p_op_list: |
| op = p2p_op.op |
| tensor = p2p_op.tensor |
| peer = p2p_op.peer |
| curr_group = p2p_op.group |
| tag = p2p_op.tag |
| |
| ret = op(tensor, peer, curr_group, tag) |
| |
| if ret is not None: |
| reqs.append(ret) |
| return reqs |
| |
| |
| def broadcast_multigpu(tensor_list, src, group=None, async_op=False, src_tensor=0): |
| """ |
| Broadcasts the tensor to the whole group with multiple GPU tensors |
| per node. |
| |
| ``tensor`` must have the same number of elements in all the GPUs from |
| all processes participating in the collective. each tensor in the list must |
| be on a different GPU |
| |
| Only nccl and gloo backend are currently supported |
| tensors should only be GPU tensors |
| |
| Args: |
| tensor_list (List[Tensor]): Tensors that participate in the collective |
| operation. If ``src`` is the rank, then the specified ``src_tensor`` |
| element of ``tensor_list`` (``tensor_list[src_tensor]``) will be |
| broadcast to all other tensors (on different GPUs) in the src process |
| and all tensors in ``tensor_list`` of other non-src processes. |
| You also need to make sure that ``len(tensor_list)`` is the same |
| for all the distributed processes calling this function. |
| |
| src (int): Source rank. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| src_tensor (int, optional): Source tensor rank within ``tensor_list`` |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| """ |
| warnings.warn( |
| "torch.distributed.broadcast_multigpu will be deprecated. If you must " |
| "use it, please revisit our documentation later at " |
| "https://pytorch.org/docs/master/distributed.html#multi-gpu-collective-functions" |
| ) |
| |
| if _rank_not_in_group(group): |
| _warn_not_in_group("broadcast_multigpu") |
| return |
| |
| opts = BroadcastOptions() |
| opts.rootRank = src |
| opts.rootTensor = src_tensor |
| |
| if group is None or group is GroupMember.WORLD: |
| default_pg = _get_default_group() |
| work = default_pg.broadcast(tensor_list, opts) |
| else: |
| group_src_rank = get_group_rank(group, src) |
| opts.rootRank = group_src_rank |
| work = group.broadcast(tensor_list, opts) |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def broadcast(tensor, src, group=None, async_op=False): |
| """ |
| Broadcasts the tensor to the whole group. |
| |
| ``tensor`` must have the same number of elements in all processes |
| participating in the collective. |
| |
| Args: |
| tensor (Tensor): Data to be sent if ``src`` is the rank of current |
| process, and tensor to be used to save received data otherwise. |
| src (int): Source rank. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| """ |
| _check_single_tensor(tensor, "tensor") |
| if _rank_not_in_group(group): |
| _warn_not_in_group("broadcast") |
| return |
| |
| opts = BroadcastOptions() |
| opts.rootRank = src |
| opts.rootTensor = 0 |
| |
| if group is None or group is GroupMember.WORLD: |
| default_pg = _get_default_group() |
| work = default_pg.broadcast([tensor], opts) |
| else: |
| group_src_rank = get_group_rank(group, src) |
| opts.rootRank = group_src_rank |
| work = group.broadcast([tensor], opts) |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def all_reduce_multigpu(tensor_list, op=ReduceOp.SUM, group=None, async_op=False): |
| r""" |
| Reduces the tensor data across all machines in such a way that all get |
| the final result. This function reduces a number of tensors on every node, |
| while each tensor resides on different GPUs. |
| Therefore, the input tensor in the tensor list needs to be GPU tensors. |
| Also, each tensor in the tensor list needs to reside on a different GPU. |
| |
| After the call, all ``tensor`` in ``tensor_list`` is going to be bitwise |
| identical in all processes. |
| |
| Complex tensors are supported. |
| |
| Only nccl and gloo backend is currently supported |
| tensors should only be GPU tensors |
| |
| Args: |
| tensor_list (List[Tensor]): List of input and output tensors of |
| the collective. The function operates in-place and requires that |
| each tensor to be a GPU tensor on different GPUs. |
| You also need to make sure that ``len(tensor_list)`` is the same for |
| all the distributed processes calling this function. |
| op (optional): One of the values from |
| ``torch.distributed.ReduceOp`` |
| enum. Specifies an operation used for element-wise reductions. |
| group (ProcessGroup, optional): The process group to work on. If |
| ``None``, the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| """ |
| warnings.warn( |
| "torch.distributed.all_reduce_multigpu will be deprecated. If you must " |
| "use it, please revisit our documentation later at " |
| "https://pytorch.org/docs/master/distributed.html#multi-gpu-collective-functions" |
| ) |
| |
| if _rank_not_in_group(group): |
| return |
| |
| tensor_list = [ |
| t if not t.is_complex() else torch.view_as_real(t) for t in tensor_list |
| ] |
| |
| opts = AllreduceOptions() |
| opts.reduceOp = op |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.allreduce(tensor_list, opts) |
| else: |
| work = group.allreduce(tensor_list, opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def all_reduce(tensor, op=ReduceOp.SUM, group=None, async_op=False): |
| """ |
| Reduces the tensor data across all machines in such a way that all get |
| the final result. |
| |
| After the call ``tensor`` is going to be bitwise identical in all processes. |
| |
| Complex tensors are supported. |
| |
| Args: |
| tensor (Tensor): Input and output of the collective. The function |
| operates in-place. |
| op (optional): One of the values from |
| ``torch.distributed.ReduceOp`` |
| enum. Specifies an operation used for element-wise reductions. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| Examples: |
| >>> # xdoctest: +SKIP("no rank") |
| >>> # All tensors below are of torch.int64 type. |
| >>> # We have 2 process groups, 2 ranks. |
| >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank |
| >>> tensor |
| tensor([1, 2]) # Rank 0 |
| tensor([3, 4]) # Rank 1 |
| >>> dist.all_reduce(tensor, op=ReduceOp.SUM) |
| >>> tensor |
| tensor([4, 6]) # Rank 0 |
| tensor([4, 6]) # Rank 1 |
| |
| >>> # All tensors below are of torch.cfloat type. |
| >>> # We have 2 process groups, 2 ranks. |
| >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) |
| >>> tensor |
| tensor([1.+1.j, 2.+2.j]) # Rank 0 |
| tensor([3.+3.j, 4.+4.j]) # Rank 1 |
| >>> dist.all_reduce(tensor, op=ReduceOp.SUM) |
| >>> tensor |
| tensor([4.+4.j, 6.+6.j]) # Rank 0 |
| tensor([4.+4.j, 6.+6.j]) # Rank 1 |
| |
| """ |
| _check_single_tensor(tensor, "tensor") |
| if _rank_not_in_group(group): |
| _warn_not_in_group("all_reduce") |
| return |
| |
| if tensor.is_complex(): |
| if not supports_complex(op): |
| raise RuntimeError(f"all_reduce does not support {op} on complex tensors") |
| tensor = torch.view_as_real(tensor) |
| |
| opts = AllreduceOptions() |
| opts.reduceOp = op |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.allreduce([tensor], opts) |
| else: |
| work = group.allreduce([tensor], opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def all_reduce_coalesced(tensors, op=ReduceOp.SUM, group=None, async_op=False): |
| """ |
| WARNING: at this time individual shape checking is not implemented across nodes. |
| For example, if the rank 0 node passes [torch.rand(4), torch.rand(2)] and the |
| rank 1 node passes [torch.rand(2), torch.rand(2), torch.rand(2)], the allreduce |
| operation will proceed without complaint and return erroneous outputs. This lack |
| of shape checking results in significant performance improvements but users of this |
| function should take extra care to ensure that each node passes in tensors whose |
| shapes match across nodes. |
| |
| Reduces each tensor in tensors (residing on the same device) across all machines |
| in such a way that all get the final result. |
| |
| After the call each tensor in tensors is going to bitwise identical |
| in all processes. |
| |
| Complex tensors are supported. |
| |
| Args: |
| tensors (List[Tensor]): Input and output of the collective. The function |
| operates in-place. |
| op (Optional[ReduceOp]): One of the values from |
| ``torch.distributed.ReduceOp`` enum. Specifies an operation used for |
| element-wise reductions. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (Optional[bool]): Whether this op should be an async op. |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group. |
| |
| """ |
| warnings.warn( |
| "torch.distributed.all_reduce_coalesced will be deprecated. If you must " |
| "use it, please revisit our documentation later at " |
| "https://pytorch.org/docs/master/distributed.html#collective-functions" |
| ) |
| _check_tensor_list(tensors, "tensor") |
| _ensure_all_tensors_same_dtype(tensors) |
| if _rank_not_in_group(group): |
| _warn_not_in_group("all_reduce_coalesced") |
| return |
| |
| if any([t.is_complex() for t in tensors]) and not supports_complex(op): |
| raise RuntimeError(f"all_reduce does not support {op} on complex tensors") |
| |
| tensors = [t if not t.is_complex() else torch.view_as_real(t) for t in tensors] |
| |
| opts = AllreduceCoalescedOptions() |
| opts.reduceOp = op |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.allreduce_coalesced(tensors, opts) |
| else: |
| work = group.allreduce_coalesced(tensors, opts) |
| |
| if async_op: |
| return work.get_future() |
| else: |
| work.wait() |
| |
| |
| def reduce_multigpu( |
| tensor_list, dst, op=ReduceOp.SUM, group=None, async_op=False, dst_tensor=0 |
| ): |
| """ |
| Reduces the tensor data on multiple GPUs across all machines. Each tensor |
| in ``tensor_list`` should reside on a separate GPU |
| |
| Only the GPU of ``tensor_list[dst_tensor]`` on the process with rank ``dst`` |
| is going to receive the final result. |
| |
| Only nccl backend is currently supported |
| tensors should only be GPU tensors |
| |
| Args: |
| tensor_list (List[Tensor]): Input and output GPU tensors of the |
| collective. The function operates in-place. |
| You also need to make sure that ``len(tensor_list)`` is the same for |
| all the distributed processes calling this function. |
| dst (int): Destination rank |
| op (optional): One of the values from |
| ``torch.distributed.ReduceOp`` |
| enum. Specifies an operation used for element-wise reductions. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| dst_tensor (int, optional): Destination tensor rank within |
| ``tensor_list`` |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, otherwise |
| |
| """ |
| warnings.warn( |
| "torch.distributed.reduce_multigpu will be deprecated. If you must " |
| "use it, please revisit our documentation later at " |
| "https://pytorch.org/docs/master/distributed.html#multi-gpu-collective-functions" |
| ) |
| |
| if _rank_not_in_group(group): |
| _warn_not_in_group("reduce_multigpu") |
| return |
| |
| opts = ReduceOptions() |
| opts.reduceOp = op |
| opts.rootRank = dst |
| opts.rootTensor = dst_tensor |
| |
| if group is None or group is GroupMember.WORLD: |
| default_pg = _get_default_group() |
| work = default_pg.reduce(tensor_list, opts) |
| else: |
| group_dst_rank = get_group_rank(group, dst) |
| opts.rootRank = group_dst_rank |
| work = group.reduce(tensor_list, opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def reduce(tensor, dst, op=ReduceOp.SUM, group=None, async_op=False): |
| """ |
| Reduces the tensor data across all machines. |
| |
| Only the process with rank ``dst`` is going to receive the final result. |
| |
| Args: |
| tensor (Tensor): Input and output of the collective. The function |
| operates in-place. |
| dst (int): Destination rank |
| op (optional): One of the values from |
| ``torch.distributed.ReduceOp`` |
| enum. Specifies an operation used for element-wise reductions. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| """ |
| _check_single_tensor(tensor, "tensor") |
| if _rank_not_in_group(group): |
| _warn_not_in_group("reduce") |
| return |
| |
| opts = ReduceOptions() |
| opts.reduceOp = op |
| opts.rootRank = dst |
| |
| if group is None or group is GroupMember.WORLD: |
| default_pg = _get_default_group() |
| work = default_pg.reduce([tensor], opts) |
| else: |
| group_dst_rank = get_group_rank(group, dst) |
| opts.rootRank = group_dst_rank |
| work = group.reduce([tensor], opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def all_gather_multigpu( |
| output_tensor_lists, input_tensor_list, group=None, async_op=False |
| ): |
| """ |
| Gathers tensors from the whole group in a list. |
| Each tensor in ``tensor_list`` should reside on a separate GPU |
| |
| Only nccl backend is currently supported |
| tensors should only be GPU tensors |
| |
| Complex tensors are supported. |
| |
| Args: |
| output_tensor_lists (List[List[Tensor]]): Output lists. It should |
| contain correctly-sized tensors on each GPU to be used for output |
| of the collective, e.g. ``output_tensor_lists[i]`` contains the |
| all_gather result that resides on the GPU of |
| ``input_tensor_list[i]``. |
| |
| Note that each element of ``output_tensor_lists`` has the size of |
| ``world_size * len(input_tensor_list)``, since the function all |
| gathers the result from every single GPU in the group. To interpret |
| each element of ``output_tensor_lists[i]``, note that |
| ``input_tensor_list[j]`` of rank k will be appear in |
| ``output_tensor_lists[i][k * world_size + j]`` |
| |
| Also note that ``len(output_tensor_lists)``, and the size of each |
| element in ``output_tensor_lists`` (each element is a list, |
| therefore ``len(output_tensor_lists[i])``) need to be the same |
| for all the distributed processes calling this function. |
| |
| input_tensor_list (List[Tensor]): List of tensors(on different GPUs) to |
| be broadcast from current process. |
| Note that ``len(input_tensor_list)`` needs to be the same for |
| all the distributed processes calling this function. |
| |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| """ |
| warnings.warn( |
| "torch.distributed.all_gather_multigpu will be deprecated. If you must " |
| "use it, please revisit our documentation later at " |
| "https://pytorch.org/docs/master/distributed.html#multi-gpu-collective-functions" |
| ) |
| |
| if _rank_not_in_group(group): |
| _warn_not_in_group("all_gather_multigpu") |
| return |
| |
| output_tensor_lists = [ |
| [t if not t.is_complex() else torch.view_as_real(t) for t in l] |
| for l in output_tensor_lists |
| ] |
| input_tensor_list = [ |
| t if not t.is_complex() else torch.view_as_real(t) for t in input_tensor_list |
| ] |
| |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.allgather(output_tensor_lists, input_tensor_list) |
| else: |
| work = group.allgather(output_tensor_lists, input_tensor_list) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def _object_to_tensor(obj, device): |
| f = io.BytesIO() |
| _pickler(f).dump(obj) |
| byte_storage = torch.ByteStorage.from_buffer(f.getvalue()) # type: ignore[attr-defined] |
| # Do not replace `torch.ByteTensor` or `torch.LongTensor` with torch.tensor and specifying dtype. |
| # Otherwise, it will casue 100X slowdown. |
| # See: https://github.com/pytorch/pytorch/issues/65696 |
| byte_tensor = torch.ByteTensor(byte_storage).to(device) |
| local_size = torch.LongTensor([byte_tensor.numel()]).to(device) |
| return byte_tensor, local_size |
| |
| |
| def _tensor_to_object(tensor, tensor_size): |
| tensor = tensor.cpu() |
| buf = tensor.numpy().tobytes()[:tensor_size] |
| return _unpickler(io.BytesIO(buf)).load() |
| |
| def _check_for_nccl_backend(group): |
| pg = group or _get_default_group() |
| # Gate PG wrapper check on Gloo availability. |
| if _GLOO_AVAILABLE: |
| # It is not expected for PG to be wrapped many times, but support it just |
| # in case |
| while isinstance(pg, _ProcessGroupWrapper): |
| pg = pg.wrapped_pg |
| |
| return ( |
| is_nccl_available() and |
| isinstance(pg, ProcessGroupNCCL) |
| ) |
| |
| def all_gather_object(object_list, obj, group=None): |
| """ |
| Gathers picklable objects from the whole group into a list. Similar to |
| :func:`all_gather`, but Python objects can be passed in. Note that the object |
| must be picklable in order to be gathered. |
| |
| Args: |
| object_list (list[Any]): Output list. It should be correctly sized as the |
| size of the group for this collective and will contain the output. |
| object (Any): Pickable Python object to be broadcast from current process. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. Default is ``None``. |
| |
| Returns: |
| None. If the calling rank is part of this group, the output of the |
| collective will be populated into the input ``object_list``. If the |
| calling rank is not part of the group, the passed in ``object_list`` will |
| be unmodified. |
| |
| .. note:: Note that this API differs slightly from the :func:`all_gather` |
| collective since it does not provide an ``async_op`` handle and thus |
| will be a blocking call. |
| |
| .. note:: For NCCL-based processed groups, internal tensor representations |
| of objects must be moved to the GPU device before communication takes |
| place. In this case, the device used is given by |
| ``torch.cuda.current_device()`` and it is the user's responsiblity to |
| ensure that this is set so that each rank has an individual GPU, via |
| ``torch.cuda.set_device()``. |
| |
| .. warning:: |
| :func:`all_gather_object` uses ``pickle`` module implicitly, which is |
| known to be insecure. It is possible to construct malicious pickle data |
| which will execute arbitrary code during unpickling. Only call this |
| function with data you trust. |
| |
| Example:: |
| >>> # xdoctest: +SKIP("need process group init") |
| >>> # Note: Process group initialization omitted on each rank. |
| >>> import torch.distributed as dist |
| >>> # Assumes world_size of 3. |
| >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object |
| >>> output = [None for _ in gather_objects] |
| >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) |
| >>> output |
| ['foo', 12, {1: 2}] |
| """ |
| if _rank_not_in_group(group): |
| _warn_not_in_group("all_gather_object") |
| return |
| |
| current_device = _get_pg_device(group) |
| input_tensor, local_size = _object_to_tensor(obj, current_device) |
| |
| # Gather all local sizes. This is so that we can find the max size, and index |
| # until the correct size when deserializing the tensors. |
| group_size = get_world_size(group=group) |
| object_sizes_tensor = torch.zeros( |
| group_size, dtype=torch.long, device=current_device |
| ) |
| object_size_list = [ |
| object_sizes_tensor[i].unsqueeze(dim=0) for i in range(group_size) |
| ] |
| # Allgather tensor sizes |
| all_gather(object_size_list, local_size, group=group) |
| max_object_size = int(max(object_size_list).item()) # type: ignore[type-var] |
| # Resize tensor to max size across all ranks. |
| input_tensor.resize_(max_object_size) |
| coalesced_output_tensor = torch.empty( |
| max_object_size * group_size, dtype=torch.uint8, device=current_device |
| ) |
| # Output tensors are nonoverlapping views of coalesced_output_tensor |
| output_tensors = [ |
| coalesced_output_tensor[max_object_size * i : max_object_size * (i + 1)] |
| for i in range(group_size) |
| ] |
| all_gather(output_tensors, input_tensor, group=group) |
| # Deserialize outputs back to object. |
| for i, tensor in enumerate(output_tensors): |
| tensor = tensor.type(torch.uint8) |
| if tensor.device != torch.device("cpu"): |
| tensor = tensor.cpu() |
| tensor_size = object_size_list[i] |
| object_list[i] = _tensor_to_object(tensor, tensor_size) |
| |
| |
| def gather_object(obj, object_gather_list=None, dst=0, group=None): |
| """ |
| Gathers picklable objects from the whole group in a single process. |
| Similar to :func:`gather`, but Python objects can be passed in. Note that the |
| object must be picklable in order to be gathered. |
| |
| Args: |
| obj (Any): Input object. Must be picklable. |
| object_gather_list (list[Any]): Output list. On the ``dst`` rank, it |
| should be correctly sized as the size of the group for this |
| collective and will contain the output. Must be ``None`` on non-dst |
| ranks. (default is ``None``) |
| dst (int, optional): Destination rank. (default is 0) |
| group: (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. Default is ``None``. |
| |
| Returns: |
| None. On the ``dst`` rank, ``object_gather_list`` will contain the |
| output of the collective. |
| |
| .. note:: Note that this API differs slightly from the gather collective |
| since it does not provide an async_op handle and thus will be a blocking |
| call. |
| |
| .. note:: For NCCL-based processed groups, internal tensor representations |
| of objects must be moved to the GPU device before communication takes |
| place. In this case, the device used is given by |
| ``torch.cuda.current_device()`` and it is the user's responsiblity to |
| ensure that this is set so that each rank has an individual GPU, via |
| ``torch.cuda.set_device()``. |
| |
| .. warning:: |
| :func:`gather_object` uses ``pickle`` module implicitly, which is |
| known to be insecure. It is possible to construct malicious pickle data |
| which will execute arbitrary code during unpickling. Only call this |
| function with data you trust. |
| |
| Example:: |
| >>> # xdoctest: +SKIP("need process group init") |
| >>> # Note: Process group initialization omitted on each rank. |
| >>> import torch.distributed as dist |
| >>> # Assumes world_size of 3. |
| >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object |
| >>> output = [None for _ in gather_objects] |
| >>> dist.gather_object( |
| ... gather_objects[dist.get_rank()], |
| ... output if dist.get_rank() == 0 else None, |
| ... dst=0 |
| ... ) |
| >>> # On rank 0 |
| >>> output |
| ['foo', 12, {1: 2}] |
| """ |
| if _rank_not_in_group(group): |
| _warn_not_in_group("gather_object") |
| return |
| |
| # Ensure object_gather_list is specified appopriately. |
| my_rank = get_rank() |
| _validate_output_list_for_rank(my_rank, dst, object_gather_list) |
| current_device = _get_pg_device(group) |
| input_tensor, local_size = _object_to_tensor(obj, current_device) |
| |
| # Gather all local sizes. This is so that we can find the max size, and index |
| # until the correct size when deserializing the tensors. |
| group_size = get_world_size(group=group) |
| object_sizes_tensor = torch.zeros( |
| group_size, dtype=torch.long, device=current_device |
| ) |
| object_size_list = [ |
| object_sizes_tensor[i].unsqueeze(dim=0) for i in range(group_size) |
| ] |
| # Allgather tensor sizes. An all-gather is needed here despite this being a |
| # gather, since each rank needs to broadcast a tensor of the same (maximal) |
| # size. |
| all_gather(object_size_list, local_size, group=group) |
| max_object_size = int(max(object_size_list).item()) # type: ignore[type-var] |
| # Resize tensor to max size across all ranks. |
| input_tensor.resize_(max_object_size) |
| # Avoid populating output tensors if the result won't be gathered on this rank. |
| if my_rank == dst: |
| coalesced_output_tensor = torch.empty( |
| max_object_size * group_size, dtype=torch.uint8, device=current_device |
| ) |
| # Output tensors are nonoverlapping views of coalesced_output_tensor |
| output_tensors = [ |
| coalesced_output_tensor[max_object_size * i : max_object_size * (i + 1)] |
| for i in range(group_size) |
| ] |
| # All ranks call gather with equal-sized tensors. |
| gather( |
| input_tensor, |
| gather_list=output_tensors if my_rank == dst else None, |
| dst=dst, |
| group=group, |
| ) |
| if my_rank != dst: |
| return |
| for i, tensor in enumerate(output_tensors): |
| tensor = tensor.type(torch.uint8) |
| tensor_size = object_size_list[i] |
| object_gather_list[i] = _tensor_to_object(tensor, tensor_size) |
| |
| |
| def broadcast_object_list(object_list, src=0, group=None, device=None): |
| """ |
| Broadcasts picklable objects in ``object_list`` to the whole group. Similar |
| to :func:`broadcast`, but Python objects can be passed in. |
| Note that all objects in ``object_list`` must be picklable in order to be |
| broadcasted. |
| |
| Args: |
| object_list (List[Any]): List of input objects to broadcast. |
| Each object must be picklable. Only objects on the ``src`` rank will |
| be broadcast, but each rank must provide lists of equal sizes. |
| src (int): Source rank from which to broadcast ``object_list``. |
| group: (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. Default is ``None``. |
| device (``torch.device``, optional): If not None, the objects are |
| serialized and converted to tensors which are moved to the |
| ``device`` before broadcasting. Default is ``None``. |
| |
| Returns: |
| ``None``. If rank is part of the group, ``object_list`` will contain the |
| broadcasted objects from ``src`` rank. |
| |
| .. note:: For NCCL-based processed groups, internal tensor representations |
| of objects must be moved to the GPU device before communication takes |
| place. In this case, the device used is given by |
| ``torch.cuda.current_device()`` and it is the user's responsiblity to |
| ensure that this is set so that each rank has an individual GPU, via |
| ``torch.cuda.set_device()``. |
| |
| .. note:: Note that this API differs slightly from the :func:`all_gather` |
| collective since it does not provide an ``async_op`` handle and thus |
| will be a blocking call. |
| |
| .. warning:: |
| :func:`broadcast_object_list` uses ``pickle`` module implicitly, which |
| is known to be insecure. It is possible to construct malicious pickle |
| data which will execute arbitrary code during unpickling. Only call this |
| function with data you trust. |
| |
| Example:: |
| >>> # xdoctest: +SKIP("need process group init") |
| >>> # Note: Process group initialization omitted on each rank. |
| >>> import torch.distributed as dist |
| >>> if dist.get_rank() == 0: |
| >>> # Assumes world_size of 3. |
| >>> objects = ["foo", 12, {1: 2}] # any picklable object |
| >>> else: |
| >>> objects = [None, None, None] |
| >>> # Assumes backend is not NCCL |
| >>> device = torch.device("cpu") |
| >>> dist.broadcast_object_list(objects, src=0, device=device) |
| >>> objects |
| ['foo', 12, {1: 2}] |
| """ |
| if _rank_not_in_group(group): |
| _warn_not_in_group("broadcast_object_list") |
| return |
| |
| # Current device selection. |
| # To preserve backwards compatibility, ``device`` is default to ``None`` |
| # in which case we run current logic of device selection, i.e. |
| # ``current_device`` is CUDA if backend is NCCL otherwise CPU device. In the |
| # case it is not ``None`` we move the size and object tensors to be |
| # broadcasted to this device. |
| current_device = device or _get_pg_device(group) |
| my_rank = get_rank() |
| # Serialize object_list elements to tensors on src rank. |
| if my_rank == src: |
| tensor_list, size_list = zip(*[_object_to_tensor(obj, current_device) for obj in object_list]) |
| object_sizes_tensor = torch.cat(size_list) |
| else: |
| object_sizes_tensor = torch.empty(len(object_list), dtype=torch.long, device=current_device) |
| |
| # Broadcast object sizes |
| broadcast(object_sizes_tensor, src=src, group=group) |
| |
| # Concatenate and broadcast serialized object tensors |
| if my_rank == src: |
| object_tensor = torch.cat(tensor_list) |
| else: |
| object_tensor = torch.empty( # type: ignore[call-overload] |
| torch.sum(object_sizes_tensor).item(), # type: ignore[arg-type] |
| dtype=torch.uint8, |
| device=current_device |
| ) |
| |
| broadcast(object_tensor, src=src, group=group) |
| # Deserialize objects using their stored sizes. |
| offset = 0 |
| if my_rank != src: |
| for i, obj_size in enumerate(object_sizes_tensor): |
| obj_view = object_tensor[offset : offset + obj_size] |
| obj_view = obj_view.type(torch.uint8) |
| if obj_view.device != torch.device("cpu"): |
| obj_view = obj_view.cpu() |
| offset += obj_size |
| object_list[i] = _tensor_to_object(obj_view, obj_size) |
| |
| |
| def scatter_object_list( |
| scatter_object_output_list, scatter_object_input_list, src=0, group=None |
| ): |
| """ |
| Scatters picklable objects in ``scatter_object_input_list`` to the whole |
| group. Similar to :func:`scatter`, but Python objects can be passed in. On |
| each rank, the scattered object will be stored as the first element of |
| ``scatter_object_output_list``. Note that all objects in |
| ``scatter_object_input_list`` must be picklable in order to be scattered. |
| |
| Args: |
| scatter_object_output_list (List[Any]): Non-empty list whose first |
| element will store the object scattered to this rank. |
| scatter_object_input_list (List[Any]): List of input objects to scatter. |
| Each object must be picklable. Only objects on the ``src`` rank will |
| be scattered, and the argument can be ``None`` for non-src ranks. |
| src (int): Source rank from which to scatter |
| ``scatter_object_input_list``. |
| group: (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. Default is ``None``. |
| |
| Returns: |
| ``None``. If rank is part of the group, ``scatter_object_output_list`` |
| will have its first element set to the scattered object for this rank. |
| |
| .. note:: Note that this API differs slightly from the scatter collective |
| since it does not provide an ``async_op`` handle and thus will be a |
| blocking call. |
| |
| .. warning:: |
| :func:`scatter_object_list` uses ``pickle`` module implicitly, which |
| is known to be insecure. It is possible to construct malicious pickle |
| data which will execute arbitrary code during unpickling. Only call this |
| function with data you trust. |
| |
| Example:: |
| >>> # xdoctest: +SKIP("need process group init") |
| >>> # Note: Process group initialization omitted on each rank. |
| >>> import torch.distributed as dist |
| >>> if dist.get_rank() == 0: |
| >>> # Assumes world_size of 3. |
| >>> objects = ["foo", 12, {1: 2}] # any picklable object |
| >>> else: |
| >>> # Can be any list on non-src ranks, elements are not used. |
| >>> objects = [None, None, None] |
| >>> output_list = [None] |
| >>> dist.scatter_object_list(output_list, objects, src=0) |
| >>> # Rank i gets objects[i]. For example, on rank 2: |
| >>> output_list |
| [{1: 2}] |
| """ |
| if _rank_not_in_group(group): |
| _warn_not_in_group("scatter_object_list") |
| return |
| |
| if ( |
| not isinstance(scatter_object_output_list, list) |
| or len(scatter_object_output_list) < 1 |
| ): |
| raise RuntimeError( |
| "Expected argument scatter_object_output_list to be a list of size at least 1." |
| ) |
| |
| my_rank = get_rank(group) |
| pg_device = _get_pg_device(group) |
| if my_rank == src: |
| tensor_list, tensor_sizes = zip( |
| *[_object_to_tensor(obj, pg_device) for obj in scatter_object_input_list] |
| ) |
| tensor_list, tensor_sizes = list(tensor_list), list(tensor_sizes) |
| |
| # Src rank broadcasts the maximum tensor size. This is because all ranks are |
| # expected to call into scatter() with equal-sized tensors. |
| if my_rank == src: |
| max_tensor_size = max(tensor_sizes) |
| for tensor in tensor_list: |
| tensor.resize_(max_tensor_size) |
| else: |
| max_tensor_size = torch.tensor([0], dtype=torch.long, device=pg_device) |
| broadcast(max_tensor_size, src=src, group=group) |
| |
| # Scatter actual serialized objects |
| output_tensor = torch.empty(max_tensor_size.item(), dtype=torch.uint8, device=pg_device) |
| scatter( |
| output_tensor, |
| scatter_list=None if my_rank != src else tensor_list, |
| src=src, |
| group=group, |
| ) |
| |
| # Scatter per-object sizes to trim tensors when deserializing back to object |
| obj_tensor_size = torch.tensor([0], dtype=torch.long, device=pg_device) |
| scatter( |
| obj_tensor_size, |
| scatter_list=None if my_rank != src else tensor_sizes, |
| src=src, |
| group=group, |
| ) |
| |
| # Deserialize back to object |
| scatter_object_output_list[0] = _tensor_to_object(output_tensor, obj_tensor_size) |
| |
| |
| def all_gather(tensor_list, tensor, group=None, async_op=False): |
| """ |
| Gathers tensors from the whole group in a list. |
| |
| Complex tensors are supported. |
| |
| Args: |
| tensor_list (list[Tensor]): Output list. It should contain |
| correctly-sized tensors to be used for output of the collective. |
| tensor (Tensor): Tensor to be broadcast from current process. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| Examples: |
| >>> # xdoctest: +SKIP("need process group init") |
| >>> # All tensors below are of torch.int64 dtype. |
| >>> # We have 2 process groups, 2 ranks. |
| >>> tensor_list = [torch.zeros(2, dtype=torch.int64) for _ in range(2)] |
| >>> tensor_list |
| [tensor([0, 0]), tensor([0, 0])] # Rank 0 and 1 |
| >>> tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * rank |
| >>> tensor |
| tensor([1, 2]) # Rank 0 |
| tensor([3, 4]) # Rank 1 |
| >>> dist.all_gather(tensor_list, tensor) |
| >>> tensor_list |
| [tensor([1, 2]), tensor([3, 4])] # Rank 0 |
| [tensor([1, 2]), tensor([3, 4])] # Rank 1 |
| |
| >>> # All tensors below are of torch.cfloat dtype. |
| >>> # We have 2 process groups, 2 ranks. |
| >>> tensor_list = [torch.zeros(2, dtype=torch.cfloat) for _ in range(2)] |
| >>> tensor_list |
| [tensor([0.+0.j, 0.+0.j]), tensor([0.+0.j, 0.+0.j])] # Rank 0 and 1 |
| >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat) + 2 * rank * (1+1j) |
| >>> tensor |
| tensor([1.+1.j, 2.+2.j]) # Rank 0 |
| tensor([3.+3.j, 4.+4.j]) # Rank 1 |
| >>> dist.all_gather(tensor_list, tensor) |
| >>> tensor_list |
| [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 0 |
| [tensor([1.+1.j, 2.+2.j]), tensor([3.+3.j, 4.+4.j])] # Rank 1 |
| |
| """ |
| _check_tensor_list(tensor_list, "tensor_list") |
| _check_single_tensor(tensor, "tensor") |
| _ensure_all_tensors_same_dtype(tensor_list, tensor) |
| if _rank_not_in_group(group): |
| _warn_not_in_group("all_gather") |
| return |
| |
| tensor_list = [ |
| t if not t.is_complex() else torch.view_as_real(t) for t in tensor_list |
| ] |
| tensor = tensor if not tensor.is_complex() else torch.view_as_real(tensor) |
| |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.allgather([tensor_list], [tensor]) |
| else: |
| work = group.allgather([tensor_list], [tensor]) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False): |
| """ |
| Gather tensors from all ranks and put them in a single output tensor. |
| |
| Args: |
| output_tensor (Tensor): Output tensor to accommodate tensor elements |
| from all ranks. It must be correctly sized to have one of the |
| following forms: |
| (i) a concatenation of all the input tensors along the primary |
| dimension; for definition of "concatenation", see ``torch.cat()``; |
| (ii) a stack of all the input tensors along the primary dimension; |
| for definition of "stack", see ``torch.stack()``. |
| Examples below may better explain the supported output forms. |
| input_tensor (Tensor): Tensor to be gathered from current rank. |
| Different from the ``all_gather`` API, the input tensors in this |
| API must have the same size across all ranks. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| Examples: |
| >>> # xdoctest: +SKIP("need process group init") |
| >>> # All tensors below are of torch.int64 dtype and on CUDA devices. |
| >>> # We have two ranks. |
| >>> device = torch.device(f'cuda:{rank}') |
| >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank |
| >>> tensor_in |
| tensor([1, 2], device='cuda:0') # Rank 0 |
| tensor([3, 4], device='cuda:1') # Rank 1 |
| >>> # Output in concatenation form |
| >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) |
| >>> dist.all_gather_into_tensor(tensor_out, tensor_in) |
| >>> tensor_out |
| tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 |
| tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 |
| >>> # Output in stack form |
| >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) |
| >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) |
| >>> tensor_out2 |
| tensor([[1, 2], |
| [3, 4]], device='cuda:0') # Rank 0 |
| tensor([[1, 2], |
| [3, 4]], device='cuda:1') # Rank 1 |
| |
| .. warning:: |
| The Gloo backend does not support this API. |
| |
| """ |
| _check_single_tensor(input_tensor, "input_tensor") |
| _check_single_tensor(output_tensor, "output_tensor") |
| if _rank_not_in_group(group): |
| _warn_not_in_group("all_gather_into_tensor") |
| return |
| |
| output_tensor = ( |
| output_tensor |
| if not output_tensor.is_complex() |
| else torch.view_as_real(output_tensor) |
| ) |
| input_tensor = ( |
| input_tensor |
| if not input_tensor.is_complex() |
| else torch.view_as_real(input_tensor) |
| ) |
| |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg._allgather_base(output_tensor, input_tensor) |
| else: |
| work = group._allgather_base(output_tensor, input_tensor) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def _all_gather_base(output_tensor, input_tensor, group=None, async_op=False): |
| """ |
| Single tensor all gather. Gathers a single tensor from all ranks, and puts them in a single output tensor. |
| |
| Args: |
| output_tensor (Tensor): Output tensor. It should contain |
| correctly-sized tensors to be used for output of the collective. |
| input_tensor (Tensor): Tensor to be broadcast from current process. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| .. warning:: |
| `_all_gather_base` is a private function. Users should use |
| `all_gather_into_tensor` instead. |
| |
| """ |
| warnings.warn( |
| "torch.distributed._all_gather_base is a private function and will be " |
| "deprecated. Please use torch.distributed.all_gather_into_tensor " |
| "instead." |
| ) |
| return all_gather_into_tensor(output_tensor, input_tensor, group, async_op) |
| |
| |
| def all_gather_coalesced( |
| output_tensor_lists, input_tensor_list, group=None, async_op=False |
| ): |
| """ |
| Gathers input tensors from the whole group in a list in a coalesced manner. |
| |
| Complex tensors are supported. |
| |
| Args: |
| output_tensor_lists (list[list[Tensor]]): Output list. It should contain |
| correctly-sized tensors to be used for output of the collective. |
| input_tensor_list (list[Tensor]): Tensors to be broadcast from |
| current process. At least one tensor has to be non empty. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op. |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| Example: |
| we have 2 process groups, 2 ranks. |
| rank 0 passes: |
| input_tensor_list = [[[1, 1], [1, 1]], [2], [3, 3]] |
| output_tensor_lists = |
| [[[[-1, -1], [-1, -1]], [-1], [-1, -1]], |
| [[[-1, -1], [-1, -1]], [-1], [-1, -1]]] |
| rank 1 passes: |
| input_tensor_list = [[[3, 3], [3, 3]], [5], [1, 1]] |
| output_tensor_lists = |
| [[[[-1, -1], [-1, -1]], [-1], [-1, -1]], |
| [[[-1, -1], [-1, -1]], [-1], [-1, -1]]] |
| both rank 0 and 1 get: |
| output_tensor_lists = |
| [[[1, 1], [1, 1]], [2], [3, 3]], |
| [[3, 3], [3, 3]], [5], [1, 1]]]. |
| |
| WARNING: at this time individual shape checking is not implemented across nodes. |
| For example, if the rank 0 node passes [torch.rand(4), torch.rand(2)] and the |
| rank 1 node passes [torch.rand(2), torch.rand(2), torch.rand(2)], the |
| all_gather_coalesced operation will proceed without complaint and return |
| erroneous outputs. This lack of shape checking results in significant |
| performance improvements but users of this function should take extra care |
| to ensure that each node passes in tensors whose shapes match across nodes. |
| """ |
| warnings.warn( |
| "torch.distributed.all_gather_coalesced will be deprecated. If you must " |
| "use it, please revisit our documentation later at " |
| "https://pytorch.org/docs/master/distributed.html#collective-functions" |
| ) |
| # We only check basic compatibility with C++ params here, C++ code will |
| # do shape and type checking. |
| if _rank_not_in_group(group): |
| _warn_not_in_group("all_gather_coalesced") |
| return |
| _check_tensor_list(input_tensor_list, "tensor_list") |
| _ensure_all_tensors_same_dtype(input_tensor_list) |
| if not isinstance(output_tensor_lists, list): |
| raise RuntimeError( |
| "Invalid function argument: " "output_tensor_lists should be a list" |
| ) |
| for output_tensor_list in output_tensor_lists: |
| _check_tensor_list(output_tensor_list, "output_tensor_lists") |
| _ensure_all_tensors_same_dtype(output_tensor_list) |
| |
| output_tensor_lists = [ |
| [t if not t.is_complex() else torch.view_as_real(t) for t in l] |
| for l in output_tensor_lists |
| ] |
| input_tensor_list = [ |
| t if not t.is_complex() else torch.view_as_real(t) for t in input_tensor_list |
| ] |
| |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.allgather_coalesced(output_tensor_lists, input_tensor_list) |
| else: |
| work = group.allgather_coalesced(output_tensor_lists, input_tensor_list) |
| |
| if async_op: |
| return work.get_future() |
| else: |
| work.wait() |
| |
| |
| def _validate_output_list_for_rank(my_rank, dst, gather_list): |
| if dst == my_rank: |
| if not gather_list: |
| raise ValueError( |
| "Argument ``gather_list`` must be specified on destination rank." |
| ) |
| elif gather_list: |
| raise ValueError( |
| "Argument ``gather_list`` must NOT be specified " |
| "on non-destination ranks." |
| ) |
| |
| |
| def gather(tensor, gather_list=None, dst=0, group=None, async_op=False): |
| """ |
| Gathers a list of tensors in a single process. |
| |
| Args: |
| tensor (Tensor): Input tensor. |
| gather_list (list[Tensor], optional): List of appropriately-sized |
| tensors to use for gathered data (default is None, must be specified |
| on the destination rank) |
| dst (int, optional): Destination rank (default is 0) |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| """ |
| _check_single_tensor(tensor, "tensor") |
| |
| # Parameter ``gather_list`` may be left unspecified on non-dst ranks. |
| if gather_list: |
| _check_tensor_list(gather_list, "gather_list") |
| else: |
| gather_list = [] |
| _ensure_all_tensors_same_dtype(tensor, gather_list) |
| |
| if _rank_not_in_group(group): |
| _warn_not_in_group("gather") |
| return |
| |
| my_rank = get_rank() |
| _validate_output_list_for_rank(my_rank, dst, gather_list) |
| output_tensors = [gather_list] if dst == my_rank else [] |
| input_tensors = [tensor] |
| |
| opts = GatherOptions() |
| opts.rootRank = dst |
| |
| if group is None or group is GroupMember.WORLD: |
| default_pg = _get_default_group() |
| work = default_pg.gather(output_tensors, input_tensors, opts) |
| else: |
| group_dst_rank = get_group_rank(group, dst) |
| opts.rootRank = group_dst_rank |
| work = group.gather(output_tensors, input_tensors, opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def scatter(tensor, scatter_list=None, src=0, group=None, async_op=False): |
| """ |
| Scatters a list of tensors to all processes in a group. |
| |
| Each process will receive exactly one tensor and store its data in the |
| ``tensor`` argument. |
| |
| Complex tensors are supported. |
| |
| Args: |
| tensor (Tensor): Output tensor. |
| scatter_list (list[Tensor]): List of tensors to scatter (default is |
| None, must be specified on the source rank) |
| src (int): Source rank (default is 0) |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| |
| """ |
| _check_single_tensor(tensor, "tensor") |
| |
| # Parameter ``scatter_list`` may be left unspecified on non-src ranks. |
| if scatter_list: |
| _check_tensor_list(scatter_list, "scatter_list") |
| else: |
| scatter_list = [] |
| _ensure_all_tensors_same_dtype(tensor, scatter_list) |
| |
| if _rank_not_in_group(group): |
| _warn_not_in_group("scatter") |
| return |
| scatter_list = [ |
| t if not t.is_complex() else torch.view_as_real(t) for t in scatter_list |
| ] |
| tensor = tensor if not tensor.is_complex() else torch.view_as_real(tensor) |
| |
| my_rank = get_rank() |
| if src == my_rank: |
| if not scatter_list: |
| raise ValueError( |
| "Argument ``scatter_list`` must be specified " "on source rank." |
| ) |
| input_tensors = [scatter_list] |
| output_tensors = [tensor] |
| else: |
| if scatter_list: |
| raise ValueError( |
| "Argument ``scatter_list`` must NOT be specified " |
| "on non-source ranks." |
| ) |
| input_tensors = [] |
| output_tensors = [tensor] |
| |
| opts = ScatterOptions() |
| opts.rootRank = src |
| |
| if group is None or group is GroupMember.WORLD: |
| default_pg = _get_default_group() |
| work = default_pg.scatter(output_tensors, input_tensors, opts) |
| else: |
| group_src_rank = get_group_rank(group, src) |
| opts.rootRank = group_src_rank |
| work = group.scatter(output_tensors, input_tensors, opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def reduce_scatter_multigpu( |
| output_tensor_list, input_tensor_lists, op=ReduceOp.SUM, group=None, async_op=False |
| ): |
| """ |
| Reduce and scatter a list of tensors to the whole group. Only nccl backend |
| is currently supported. |
| |
| Each tensor in ``output_tensor_list`` should reside on a separate GPU, as |
| should each list of tensors in ``input_tensor_lists``. |
| |
| Args: |
| output_tensor_list (List[Tensor]): Output tensors (on different GPUs) |
| to receive the result of the operation. |
| |
| Note that ``len(output_tensor_list)`` needs to be the same for all |
| the distributed processes calling this function. |
| |
| input_tensor_lists (List[List[Tensor]]): Input lists. It should |
| contain correctly-sized tensors on each GPU to be used for input of |
| the collective, e.g. ``input_tensor_lists[i]`` contains the |
| reduce_scatter input that resides on the GPU of |
| ``output_tensor_list[i]``. |
| |
| Note that each element of ``input_tensor_lists`` has the size of |
| ``world_size * len(output_tensor_list)``, since the function |
| scatters the result from every single GPU in the group. To |
| interpret each element of ``input_tensor_lists[i]``, note that |
| ``output_tensor_list[j]`` of rank k receives the reduce-scattered |
| result from ``input_tensor_lists[i][k * world_size + j]`` |
| |
| Also note that ``len(input_tensor_lists)``, and the size of each |
| element in ``input_tensor_lists`` (each element is a list, |
| therefore ``len(input_tensor_lists[i])``) need to be the same for |
| all the distributed processes calling this function. |
| |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op. |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group. |
| |
| """ |
| warnings.warn( |
| "torch.distributed.reduce_scatter_multigpu will be deprecated. If you must " |
| "use it, please revisit our documentation later at " |
| "https://pytorch.org/docs/master/distributed.html#multi-gpu-collective-functions" |
| ) |
| |
| if _rank_not_in_group(group): |
| _warn_not_in_group("reduce_scatter_multigpu") |
| return |
| |
| opts = ReduceScatterOptions() |
| opts.reduceOp = op |
| |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.reduce_scatter(output_tensor_list, input_tensor_lists, opts) |
| else: |
| work = group.reduce_scatter(output_tensor_list, input_tensor_lists, opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def reduce_scatter(output, input_list, op=ReduceOp.SUM, group=None, async_op=False): |
| """ |
| Reduces, then scatters a list of tensors to all processes in a group. |
| |
| Args: |
| output (Tensor): Output tensor. |
| input_list (list[Tensor]): List of tensors to reduce and scatter. |
| op (optional): One of the values from |
| ``torch.distributed.ReduceOp`` |
| enum. Specifies an operation used for element-wise reductions. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op. |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group. |
| |
| """ |
| _check_single_tensor(output, "output") |
| _check_tensor_list(input_list, "input_list") |
| _ensure_all_tensors_same_dtype(output, input_list) |
| if _rank_not_in_group(group): |
| _warn_not_in_group("reduce_scatter") |
| return |
| |
| opts = ReduceScatterOptions() |
| opts.reduceOp = op |
| |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.reduce_scatter([output], [input_list], opts) |
| else: |
| work = group.reduce_scatter([output], [input_list], opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def reduce_scatter_tensor(output, input, op=ReduceOp.SUM, group=None, async_op=False): |
| """ |
| Reduces, then scatters a tensor to all ranks in a group. |
| |
| Args: |
| output (Tensor): Output tensor. It should have the same size across all |
| ranks. |
| input (Tensor): Input tensor to be reduced and scattered. Its size |
| should be output tensor size times the world size. The input tensor |
| can have one of the following shapes: |
| (i) a concatentation of the output tensors along the primary |
| dimension, or |
| (ii) a stack of the output tensors along the primary dimension. |
| For definition of "concatenation", see ``torch.cat()``. |
| For definition of "stack", see ``torch.stack()``. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op. |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group. |
| |
| Examples: |
| >>> # xdoctest: +SKIP("need process group init") |
| >>> # All tensors below are of torch.int64 dtype and on CUDA devices. |
| >>> # We have two ranks. |
| >>> device = torch.device(f'cuda:{rank}') |
| >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) |
| >>> # Input in concatenation form |
| >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) |
| >>> tensor_in |
| tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 |
| tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 |
| >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) |
| >>> tensor_out |
| tensor([0, 2], device='cuda:0') # Rank 0 |
| tensor([4, 6], device='cuda:1') # Rank 1 |
| >>> # Input in stack form |
| >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) |
| >>> tensor_in |
| tensor([[0, 1], |
| [2, 3]], device='cuda:0') # Rank 0 |
| tensor([[0, 1], |
| [2, 3]], device='cuda:1') # Rank 1 |
| >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) |
| >>> tensor_out |
| tensor([0, 2], device='cuda:0') # Rank 0 |
| tensor([4, 6], device='cuda:1') # Rank 1 |
| |
| .. warning:: |
| The Gloo backend does not support this API. |
| |
| """ |
| _check_single_tensor(output, "output") |
| _check_single_tensor(input, "input") |
| |
| if _rank_not_in_group(group): |
| _warn_not_in_group("reduce_scatter_tensor") |
| return |
| |
| opts = ReduceScatterOptions() |
| opts.reduceOp = op |
| |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg._reduce_scatter_base(output, input, opts) |
| else: |
| work = group._reduce_scatter_base(output, input, opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def _reduce_scatter_base(output, input, op=ReduceOp.SUM, group=None, async_op=False): |
| """ |
| Reduces, then scatters a flattened tensor to all processes in a group. |
| |
| Args: |
| output (Tensor): Output tensor. |
| input (Tensor): Input tensor that is of size output tensor size times world size |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op. |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group. |
| |
| .. warning:: |
| `_reduce_scatter_base` is a private function. Users should use |
| `reduce_scatter_tensor` instead. |
| |
| """ |
| warnings.warn( |
| "torch.distributed._reduce_scatter_base is a private function and will " |
| "be deprecated. Please use torch.distributed.reduce_scatter_tensor " |
| "instead." |
| ) |
| return reduce_scatter_tensor(output, input, op, group, async_op) |
| |
| |
| def all_to_all_single( |
| output, |
| input, |
| output_split_sizes=None, |
| input_split_sizes=None, |
| group=None, |
| async_op=False, |
| ): |
| """ |
| Each process splits input tensor and then scatters the split list |
| to all processes in a group. Then concatenate the received tensors from all |
| the processes in the group and return single output tensor. |
| |
| Complex tensors are supported. |
| |
| Args: |
| output (Tensor): Gathered cancatenated output tensor. |
| input (Tensor): Input tensor to scatter. |
| output_split_sizes: (list[Int], optional): Output split sizes for dim 0 |
| if specified None or empty, dim 0 of ``output`` tensor must divide |
| equally by ``world_size``. |
| input_split_sizes: (list[Int], optional): Input split sizes for dim 0 |
| if specified None or empty, dim 0 of ``input`` tensor must divide |
| equally by ``world_size``. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op. |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group. |
| |
| .. warning:: |
| `all_to_all_single` is experimental and subject to change. |
| |
| Examples: |
| >>> # xdoctest: +SKIP("Undefined rank") |
| >>> input = torch.arange(4) + rank * 4 |
| >>> input |
| tensor([0, 1, 2, 3]) # Rank 0 |
| tensor([4, 5, 6, 7]) # Rank 1 |
| tensor([8, 9, 10, 11]) # Rank 2 |
| tensor([12, 13, 14, 15]) # Rank 3 |
| >>> output = torch.empty([4], dtype=torch.int64) |
| >>> dist.all_to_all_single(output, input) |
| >>> output |
| tensor([0, 4, 8, 12]) # Rank 0 |
| tensor([1, 5, 9, 13]) # Rank 1 |
| tensor([2, 6, 10, 14]) # Rank 2 |
| tensor([3, 7, 11, 15]) # Rank 3 |
| |
| >>> # Essentially, it is similar to following operation: |
| >>> scatter_list = list(input.chunk(world_size)) |
| >>> gather_list = list(output.chunk(world_size)) |
| >>> for i in range(world_size): |
| >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i) |
| |
| >>> # Another example with uneven split |
| >>> input |
| tensor([0, 1, 2, 3, 4, 5]) # Rank 0 |
| tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 |
| tensor([20, 21, 22, 23, 24]) # Rank 2 |
| tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 |
| >>> input_splits |
| [2, 2, 1, 1] # Rank 0 |
| [3, 2, 2, 2] # Rank 1 |
| [2, 1, 1, 1] # Rank 2 |
| [2, 2, 2, 1] # Rank 3 |
| >>> output_splits |
| [2, 3, 2, 2] # Rank 0 |
| [2, 2, 1, 2] # Rank 1 |
| [1, 2, 1, 2] # Rank 2 |
| [1, 2, 1, 1] # Rank 3 |
| >>> output = ... |
| >>> dist.all_to_all_single(output, input, output_splits, input_splits) |
| >>> output |
| tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0 |
| tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1 |
| tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2 |
| tensor([ 5, 17, 18, 24, 36]) # Rank 3 |
| |
| |
| >>> # Another example with tensors of torch.cfloat type. |
| >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) |
| >>> input |
| tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0 |
| tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1 |
| tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2 |
| tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3 |
| >>> output = torch.empty([4], dtype=torch.int64) |
| >>> dist.all_to_all_single(output, input) |
| >>> output |
| tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0 |
| tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1 |
| tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2 |
| tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3 |
| """ |
| if _rank_not_in_group(group): |
| _warn_not_in_group("all_to_all_single") |
| return |
| |
| opts = AllToAllOptions() |
| _check_single_tensor(output, "output") |
| _check_single_tensor(input, "input") |
| _ensure_all_tensors_same_dtype(output, input) |
| |
| if input.is_complex(): |
| input = torch.view_as_real(input) |
| if output.is_complex(): |
| output = torch.view_as_real(output) |
| |
| output_split_sizes = [] if output_split_sizes is None else output_split_sizes |
| input_split_sizes = [] if input_split_sizes is None else input_split_sizes |
| |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.alltoall_base( |
| output, input, output_split_sizes, input_split_sizes, opts |
| ) |
| else: |
| work = group.alltoall_base( |
| output, input, output_split_sizes, input_split_sizes, opts |
| ) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False): |
| """ |
| Each process scatters list of input tensors to all processes in a group and |
| return gathered list of tensors in output list. |
| |
| Complex tensors are supported. |
| |
| Args: |
| output_tensor_list (list[Tensor]): List of tensors to be gathered one |
| per rank. |
| input_tensor_list (list[Tensor]): List of tensors to scatter one per rank. |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op. |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group. |
| |
| .. warning:: |
| `all_to_all` is experimental and subject to change. |
| |
| Examples: |
| >>> # xdoctest: +SKIP("Undefined rank") |
| >>> input = torch.arange(4) + rank * 4 |
| >>> input = list(input.chunk(4)) |
| >>> input |
| [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 |
| [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 |
| [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 |
| [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 |
| >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) |
| >>> dist.all_to_all(output, input) |
| >>> output |
| [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 |
| [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 |
| [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 |
| [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3 |
| |
| >>> # Essentially, it is similar to following operation: |
| >>> scatter_list = input |
| >>> gather_list = output |
| >>> for i in range(world_size): |
| >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i) |
| |
| >>> input |
| tensor([0, 1, 2, 3, 4, 5]) # Rank 0 |
| tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 |
| tensor([20, 21, 22, 23, 24]) # Rank 2 |
| tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 |
| >>> input_splits |
| [2, 2, 1, 1] # Rank 0 |
| [3, 2, 2, 2] # Rank 1 |
| [2, 1, 1, 1] # Rank 2 |
| [2, 2, 2, 1] # Rank 3 |
| >>> output_splits |
| [2, 3, 2, 2] # Rank 0 |
| [2, 2, 1, 2] # Rank 1 |
| [1, 2, 1, 2] # Rank 2 |
| [1, 2, 1, 1] # Rank 3 |
| >>> input = list(input.split(input_splits)) |
| >>> input |
| [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 |
| [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 |
| [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 |
| [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 |
| >>> output = ... |
| >>> dist.all_to_all(output, input) |
| >>> output |
| [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 |
| [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 |
| [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 |
| [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3 |
| |
| >>> # Another example with tensors of torch.cfloat type. |
| >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) |
| >>> input = list(input.chunk(4)) |
| >>> input |
| [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 |
| [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 |
| [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 |
| [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 |
| >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) |
| >>> dist.all_to_all(output, input) |
| >>> output |
| [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 |
| [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 |
| [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 |
| [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3 |
| |
| """ |
| if _rank_not_in_group(group): |
| _warn_not_in_group("all_to_all") |
| return |
| |
| opts = AllToAllOptions() |
| _check_tensor_list(output_tensor_list, "output_tensor_list") |
| _check_tensor_list(input_tensor_list, "input_tensor_list") |
| _ensure_all_tensors_same_dtype(output_tensor_list, input_tensor_list) |
| |
| input_tensor_list = [ |
| t if not t.is_complex() else torch.view_as_real(t) for t in input_tensor_list |
| ] |
| output_tensor_list = [ |
| t if not t.is_complex() else torch.view_as_real(t) for t in output_tensor_list |
| ] |
| |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.alltoall(output_tensor_list, input_tensor_list, opts) |
| else: |
| work = group.alltoall(output_tensor_list, input_tensor_list, opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def barrier(group=GroupMember.WORLD, async_op=False, device_ids=None): |
| |
| """ |
| Synchronizes all processes. |
| |
| This collective blocks processes until the whole group enters this function, |
| if async_op is False, or if async work handle is called on wait(). |
| |
| Args: |
| group (ProcessGroup, optional): The process group to work on. If None, |
| the default process group will be used. |
| async_op (bool, optional): Whether this op should be an async op |
| device_ids ([int], optional): List of device/GPU ids. |
| Valid only for NCCL backend. |
| |
| Returns: |
| Async work handle, if async_op is set to True. |
| None, if not async_op or if not part of the group |
| """ |
| if _rank_not_in_group(group): |
| _warn_not_in_group("barrier") |
| return |
| |
| opts = BarrierOptions() |
| if device_ids is not None: |
| if get_backend(group) != Backend.NCCL: |
| raise RuntimeError( |
| "Function argument device_ids not supported " |
| "for the selected backend {}".format(get_backend(group)) |
| ) |
| if isinstance(device_ids, list): |
| opts.device_ids = device_ids |
| else: |
| raise RuntimeError( |
| "Invalid function argument: " "device_ids type should be List[int]" |
| ) |
| |
| if group is None: |
| default_pg = _get_default_group() |
| work = default_pg.barrier(opts=opts) |
| else: |
| work = group.barrier(opts=opts) |
| |
| if async_op: |
| return work |
| else: |
| work.wait() |
| |
| |
| def monitored_barrier(group=GroupMember.WORLD, timeout=None, wait_all_ranks=False): |
| """ |
| Synchronizes all processes similar to ``torch.distributed.barrier``, but takes |
| a configurable timeout and is able to report ranks that did not pass this |
| barrier within that timeout. Specifically, for non-zero ranks, will block |
| until a send/recv is processed from rank 0. Rank 0 will block until all send |
| /recv from other ranks are processed, and will report failures for ranks |
| that failed to respond in time. Note that if one rank does not reach the |
| monitored_barrier (for example due to a hang), all other ranks would fail |
| in monitored_barrier. |
| |
| This collective will block all processes/ranks in the group, until the |
| whole group exits the function successfully, making it useful for debugging |
| and synchronizing. However, it can have a performance impact and should only |
| be used for debugging or scenarios that require full synchronization points |
| on the host-side. For debugging purposees, this barrier can be inserted |
| before the application's collective calls to check if any ranks are |
| desynchronized. |
| |
| .. note:: Note that this collective is only supported with the GLOO backend. |
| |
| Args: |
| group (ProcessGroup, optional): The process group to work on. If |
| ``None``, the default process group will be used. |
| timeout (datetime.timedelta, optional): Timeout for monitored_barrier. |
| If ``None``, the default process group timeout will be used. |
| wait_all_ranks (bool, optional): Whether to collect all failed ranks or |
| not. By default, this is ``False`` and ``monitored_barrier`` on rank 0 |
| will throw on the first failed rank it encounters in order to fail |
| fast. By setting ``wait_all_ranks=True`` ``monitored_barrier`` will |
| collect all failed ranks and throw an error containing information |
| about all failed ranks. |
| |
| Returns: |
| ``None``. |
| |
| Example:: |
| >>> # xdoctest: +SKIP("need process group init") |
| >>> # Note: Process group initialization omitted on each rank. |
| >>> import torch.distributed as dist |
| >>> if dist.get_rank() != 1: |
| >>> dist.monitored_barrier() # Raises exception indicating that |
| >>> # rank 1 did not call into monitored_barrier. |
| >>> # Example with wait_all_ranks=True |
| >>> if dist.get_rank() == 0: |
| >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception |
| >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into |
| >>> # monitored_barrier. |
| """ |
| |
| # Need to call rank not in group before using the group, otherwise |
| # "Invalid process group" error is raised. |
| if _rank_not_in_group(group): |
| _warn_not_in_group("monitored_barrier") |
| return |
| |
| if get_backend(group) != Backend.GLOO: |
| raise RuntimeError("monitored_barrier is only implemented for GLOO backend.") |
| |
| if timeout is None: |
| timeout = default_pg_timeout |
| |
| group_to_use = _get_default_group() if group is None else group |
| return group_to_use.monitored_barrier(timeout, wait_all_ranks=wait_all_ranks) |
| |
| |
| def _create_process_group_wrapper( |
| wrapped_pg: ProcessGroup, |
| store_prefix: str, |
| store: Store, |
| rank: int, |
| world_size: int, |
| timeout: timedelta = default_pg_timeout, |
| ): |
| # Create a separate prefix store for the helper process group. |
| prefix = f"{PG_WRAPPER_STORE_PREFIX}:{store_prefix}" |
| store = PrefixStore(prefix, store) |
| helper_pg = ProcessGroupGloo(store, rank, world_size, timeout=timeout) |
| # Wrap the underlying pg with ProcessGroupWrapper. |
| wrapped_pg = _ProcessGroupWrapper(wrapped_pg, helper_pg) |
| return wrapped_pg |
| |
| def new_group(ranks=None, timeout=default_pg_timeout, backend=None, pg_options=None): |
| """ |
| Creates a new distributed group. |
| |
| This function requires that all processes in the main group (i.e. all |
| processes that are part of the distributed job) enter this function, even |
| if they are not going to be members of the group. Additionally, groups |
| should be created in the same order in all processes. |
| |
| .. warning:: |
| Using multiple process groups with the ``NCCL`` backend concurrently |
| is not safe and the user should perform explicit synchronization in |
| their application to ensure only one process group is used at a time. |
| This means collectives from one process group should have completed |
| execution on the device (not just enqueued since CUDA execution is |
| async) before collectives from another process group are enqueued. |
| See `Using multiple NCCL communicators concurrently <https://docs.nvid |
| ia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using |
| -multiple-nccl-communicators-concurrently>`_ for more details. |
| |
| Args: |
| ranks (list[int]): List of ranks of group members. If ``None``, will be |
| set to all ranks. Default is ``None``. |
| timeout (timedelta, optional): Timeout for operations executed against |
| the process group. Default value equals 30 minutes. |
| This is applicable for the ``gloo`` backend. For ``nccl``, this is |
| applicable only if the environment variable ``NCCL_BLOCKING_WAIT`` |
| or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When |
| ``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the |
| process will block and wait for collectives to complete before |
| throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set, |
| this is the duration after which collectives will be aborted |
| asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT`` |
| will provide errors to the user which can be caught and handled, |
| but due to its blocking nature, it has a performance overhead. On |
| the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little |
| performance overhead, but crashes the process on errors. This is |
| done since CUDA execution is async and it is no longer safe to |
| continue executing user code since failed async NCCL operations |
| might result in subsequent CUDA operations running on corrupted |
| data. Only one of these two environment variables should be set. |
| backend (str or Backend, optional): The backend to use. Depending on |
| build-time configurations, valid values are ``gloo`` and ``nccl``. |
| By default uses the same backend as the global group. This field |
| should be given as a lowercase string (e.g., ``"gloo"``), which can |
| also be accessed via :class:`Backend` attributes (e.g., |
| ``Backend.GLOO``). If ``None`` is passed in, the backend |
| corresponding to the default process group will be used. Default is |
| ``None``. |
| pg_options (ProcessGroupOptions, optional): process group options |
| specifying what additional options need to be passed in during |
| the construction of specific process groups. i.e. for the ``nccl`` |
| backend, ``is_high_priority_stream`` can be specified so that |
| process group can pick up high priority cuda streams. |
| |
| Returns: |
| A handle of distributed group that can be given to collective calls. |
| """ |
| |
| global _pg_group_ranks |
| |
| default_pg = _get_default_group() |
| default_backend, default_store = _pg_map[default_pg] |
| global_rank = default_pg.rank() |
| global_world_size = default_pg.size() |
| |
| # Default to the same backend as the global process group |
| # if the backend is not specified. |
| if not backend: |
| backend = default_backend |
| |
| # checks the input ranks |
| if ranks is not None: |
| ranks = sorted(ranks) |
| group_world_size = len(ranks) |
| if group_world_size > global_world_size: |
| raise RuntimeError( |
| "the new group's world size should be less or " |
| "equal to the world size set by " |
| "init_process_group" |
| ) |
| # check ranks' sanity |
| for rank in ranks: |
| if rank < 0 or rank >= global_world_size: |
| raise RuntimeError( |
| "The new group's rank should be within the " |
| "the world_size set by init_process_group" |
| ) |
| if global_rank in ranks: |
| group_rank = ranks.index(global_rank) |
| else: |
| group_rank = None |
| else: |
| ranks = list(range(global_world_size)) |
| group_world_size = global_world_size |
| group_rank = global_rank |
| |
| backend = Backend(backend) |
| pg = _new_process_group_helper( |
| group_world_size, |
| group_rank, |
| ranks, |
| backend, |
| default_store, |
| pg_options=pg_options, |
| timeout=timeout, |
| ) |
| |
| # Create the global rank to group rank mapping |
| _pg_group_ranks[pg] = { |
| global_rank: group_rank for group_rank, global_rank in enumerate(ranks) |
| } |
| |
| # barrier at the end to ensure that once we return from this method, all |
| # process groups including global variables are updated correctly on all |
| # ranks. |
| if backend == Backend.MPI: |
| # MPI doesn't have store. |
| barrier() |
| else: |
| # Use store based barrier here since barrier() used a bunch of |
| # default devices and messes up NCCL internal state. |
| _store_based_barrier(global_rank, default_store, timeout) |
| # Set sequence numbers for gloo and nccl process groups. |
| if pg != GroupMember.NON_GROUP_MEMBER and get_backend(pg) in [ |
| Backend.GLOO, |
| Backend.NCCL, |
| ]: |
| pg._set_sequence_number_for_group() |
| |
| return pg |
| |
| |
| def new_subgroups( |
| group_size=None, |
| group=None, |
| timeout=default_pg_timeout, |
| backend=None, |
| pg_options=None, |
| ): |
| """ |
| Creates GPU subgroups of equal size. By default, it creates intra-machine subgroups, |
| where each of which contains all the ranks of a machine, based on the assumption |
| that each machine has the same number of CUDA devices. |
| |
| This is a convenience API that calls ``new_group`` to generate multiple subgroups. |
| It requires that all processes in the main group (i.e. all |
| processes that are part of the distributed job) enter this function, even |
| if they are not going to be members of the group. |
| |
| .. warning:: |
| This API only works when CUDA is available. |
| |
| .. warning:: |
| If ``group_size`` is passed in, the world size must be divisible by ``group_size``. |
| If no ``group_size`` is passed in, and not all the machines have the same number |
| of devices, the subgroup division will be different across nodes and can cause |
| unexpected behaviors. |
| |
| .. warning:: |
| Using multiple process groups with the ``NCCL`` backend concurrently |
| is not safe and the user should perform explicit synchronization in |
| their application to ensure only one process group is used at a time. |
| This means collectives from one process group should have completed |
| execution on the device (not just enqueued since CUDA execution is |
| async) before collectives from another process group are enqueued. |
| See `Using multiple NCCL communicators concurrently <https://docs.nvid |
| ia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using |
| -multiple-nccl-communicators-concurrently>`_ for more details. |
| |
| Args: |
| group_size (int, optional): The size of each subgroup. If ``None``, |
| the default subgroup size is equal to the number of devices on each machine, |
| based on the assumption that each machine has exactly the same |
| number of devices. Default is ``None``. |
| timeout (timedelta, optional): Timeout for operations executed against |
| the process group. Default value equals 30 minutes. |
| This is applicable for the ``gloo`` backend. For ``nccl``, this is |
| applicable only if the environment variable ``NCCL_BLOCKING_WAIT`` |
| or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When |
| ``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the |
| process will block and wait for collectives to complete before |
| throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set, |
| this is the duration after which collectives will be aborted |
| asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT`` |
| will provide errors to the user which can be caught and handled, |
| but due to its blocking nature, it has a performance overhead. On |
| the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little |
| performance overhead, but crashes the process on errors. This is |
| done since CUDA execution is async and it is no longer safe to |
| continue executing user code since failed async NCCL operations |
| might result in subsequent CUDA operations running on corrupted |
| data. Only one of these two environment variables should be set. |
| backend (str or Backend, optional): The backend to use. Depending on |
| build-time configurations, valid values are ``gloo`` and ``nccl``. |
| By default uses the same backend as the global group. This field |
| should be given as a lowercase string (e.g., ``"gloo"``), which can |
| also be accessed via :class:`Backend` attributes (e.g., |
| ``Backend.GLOO``). If ``None`` is passed in, the backend |
| corresponding to the default process group will be used. Default is |
| ``None``. |
| pg_options (ProcessGroupOptions, optional): process group options |
| specifying what additional options need to be passed in during |
| the construction of specific process groups. i.e. for the ``nccl`` |
| backend, ``is_high_priority_stream`` can be specified so that |
| process group can pick up high priority cuda streams. |
| |
| Returns: |
| The subgroup containing the current rank, and all the subgroups used for cleanup. |
| |
| Examples: |
| >>> # Create intra-machine subgroups. |
| >>> # xdoctest: +SKIP("need process group init") |
| >>> cur_subgroup, subgroups = dist.new_subgroups() |
| >>> # Allreduce within the machine. |
| >>> rank = dist.get_rank() |
| >>> tensor = torch.ones(1, device=rank) * rank |
| >>> dist.all_reduce(tensor, group=cur_subgroup) |
| >>> tensor |
| tensor([8]) # Assume 8 is the number of CUDA devices per machine. |
| >>> # Cleanup. |
| >>> for subgroup in subgroups: |
| >>> dist.destroy_process_group(subgroup) |
| """ |
| if not torch.cuda.is_available(): |
| raise ValueError("Subgroups can only be created when CUDA is available") |
| |
| if group_size is None: |
| group_size = torch.cuda.device_count() |
| world_size = get_world_size() |
| if world_size < group_size: |
| raise ValueError("The arg 'group_size' must not exceed the world size") |
| if world_size % group_size != 0: |
| raise ValueError("The world size must be divisible by 'group_size'") |
| |
| subgroups = [] |
| cur_subgroup = None |
| |
| for subgroup_id in range(world_size // group_size): |
| start_rank = subgroup_id * group_size |
| end_rank = start_rank + group_size |
| ranks_in_subgroup = list(range(start_rank, end_rank)) |
| subgroup = new_group( |
| ranks=ranks_in_subgroup, |
| timeout=timeout, |
| backend=backend, |
| pg_options=pg_options, |
| ) |
| subgroups.append(subgroup) |
| |
| rank = get_rank() |
| if rank in ranks_in_subgroup: |
| cur_subgroup = subgroup |
| logger.info( |
| "Rank {} is assigned to subgroup {}".format(rank, ranks_in_subgroup) |
| ) |
| |
| return cur_subgroup, subgroups |
| |
| |
| def new_subgroups_by_enumeration( |
| ranks_per_subgroup_list, |
| timeout=default_pg_timeout, |
| backend=None, |
| pg_options=None, |
| ): |
| """ |
| Creates GPU subgroups by dividing the global world, where the division is specified by |
| a nested list of ranks. The subgroups cannot have overlap, and some ranks may not have |
| to be in any subgroup. |
| |
| This is a convenience API that calls ``new_group`` to generate multiple subgroups. |
| It requires that all processes in the main group (i.e. all |
| processes that are part of the distributed job) enter this function, even |
| if they are not going to be members of the group. |
| |
| .. warning:: |
| Using multiple process groups with the ``NCCL`` backend concurrently |
| is not safe and the user should perform explicit synchronization in |
| their application to ensure only one process group is used at a time. |
| This means collectives from one process group should have completed |
| execution on the device (not just enqueued since CUDA execution is |
| async) before collectives from another process group are enqueued. |
| See `Using multiple NCCL communicators concurrently <https://docs.nvid |
| ia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using |
| -multiple-nccl-communicators-concurrently>`_ for more details. |
| |
| Args: |
| ranks_per_subgroup_list (list[list[int]]): A nested list of ranks of |
| group members. |
| timeout (timedelta, optional): Timeout for operations executed against |
| the process group. Default value equals 30 minutes. |
| This is applicable for the ``gloo`` backend. For ``nccl``, this is |
| applicable only if the environment variable ``NCCL_BLOCKING_WAIT`` |
| or ``NCCL_ASYNC_ERROR_HANDLING`` is set to 1. When |
| ``NCCL_BLOCKING_WAIT`` is set, this is the duration for which the |
| process will block and wait for collectives to complete before |
| throwing an exception. When ``NCCL_ASYNC_ERROR_HANDLING`` is set, |
| this is the duration after which collectives will be aborted |
| asynchronously and the process will crash. ``NCCL_BLOCKING_WAIT`` |
| will provide errors to the user which can be caught and handled, |
| but due to its blocking nature, it has a performance overhead. On |
| the other hand, ``NCCL_ASYNC_ERROR_HANDLING`` has very little |
| performance overhead, but crashes the process on errors. This is |
| done since CUDA execution is async and it is no longer safe to |
| continue executing user code since failed async NCCL operations |
| might result in subsequent CUDA operations running on corrupted |
| data. Only one of these two environment variables should be set. |
| backend (str or Backend, optional): The backend to use. Depending on |
| build-time configurations, valid values are ``gloo`` and ``nccl``. |
| By default uses the same backend as the global group. This field |
| should be given as a lowercase string (e.g., ``"gloo"``), which can |
| also be accessed via :class:`Backend` attributes (e.g., |
| ``Backend.GLOO``). If ``None`` is passed in, the backend |
| corresponding to the default process group will be used. Default is |
| ``None``. |
| pg_options (ProcessGroupOptions, optional): process group options |
| specifying what additional options need to be passed in during |
| the construction of specific process groups. i.e. for the ``nccl`` |
| backend, ``is_high_priority_stream`` can be specified so that |
| process group can pick up high priority cuda streams. |
| |
| Returns: |
| The subgroup containing the current rank, and all the subgroups used for cleanup. |
| |
| Examples: |
| >>> # Create two subgroups, where each has 2 processes. |
| >>> # xdoctest: +SKIP("need process group init") |
| >>> cur_subgroup, subgroups = dist.new_subgroups(ranks=[[0, 2], [1, 3]]) |
| >>> rank = dist.get_rank() |
| >>> tensor = torch.ones(1, device=rank) * rank |
| >>> dist.all_reduce(tensor, group=cur_subgroup) |
| >>> tensor |
| tensor([2]) # Subgroup 0: ranks 0 and 2 |
| tensor([4]) # Subgroup 1: ranks 1 and 3 |
| """ |
| if not torch.cuda.is_available(): |
| raise ValueError("Subgroups can only be created when CUDA is available") |
| if ranks_per_subgroup_list is None or len(ranks_per_subgroup_list) == 0: |
| raise ValueError("The arg 'ranks_per_subgroup_list' cannot be empty") |
| |
| world_size = get_world_size() |
| |
| subgroups = [] |
| cur_subgroup = None |
| # Create a mapping from rank to subgroup to check if there is any subgroup overlap. |
| rank_to_ranks_dict = {} # type: ignore[var-annotated] |
| for ranks in ranks_per_subgroup_list: |
| subgroup = new_group( |
| ranks=ranks, |
| timeout=timeout, |
| backend=backend, |
| pg_options=pg_options, |
| ) |
| subgroups.append(subgroup) |
| my_rank = get_rank() |
| for rank in ranks: |
| if rank in rank_to_ranks_dict: |
| raise ValueError( |
| "Rank {} has appeared in both subgroup {} and {}".format( |
| rank, rank_to_ranks_dict[rank], ranks |
| ) |
| ) |
| rank_to_ranks_dict[rank] = ranks |
| if my_rank == rank: |
| cur_subgroup = subgroup |
| logger.info("Rank {} is assigned to subgroup {}".format(rank, ranks)) |
| |
| return cur_subgroup, subgroups |