| :orphan: |
| |
| .. _remote-reference-protocol: |
| |
| Remote Reference Protocol |
| ========================= |
| |
| This note describes the design details of Remote Reference protocol and walks |
| through message flows in different scenarios. Make sure you're familiar with the |
| :ref:`distributed-rpc-framework` before proceeding. |
| |
| Background |
| ^^^^^^^^^^ |
| |
| RRef stands for Remote REFerence. It is a reference of an object which is |
| located on the local or remote worker, and transparently handles reference |
| counting under the hood. Conceptually, it can be considered as a distributed |
| shared pointer. Applications can create an RRef by calling |
| :meth:`~torch.distributed.rpc.remote`. Each RRef is owned by the callee worker |
| of the :meth:`~torch.distributed.rpc.remote` call (i.e., owner) and can be used |
| by multiple users. The owner stores the real data and keeps track of the global |
| reference count. Every RRef can be uniquely identified by a global ``RRefId``, |
| which is assigned at the time of creation on the caller of the |
| :meth:`~torch.distributed.rpc.remote` call. |
| |
| On the owner worker, there is only one ``OwnerRRef`` instance, which contains |
| the real data, while on user workers, there can be as many ``UserRRefs`` as |
| necessary, and ``UserRRef`` does not hold the data. All usage on the owner will |
| retrieve the unique ``OwnerRRef`` instance using the globally unique ``RRefId``. |
| A ``UserRRef`` will be created when it is used as an argument or return value in |
| :meth:`~torch.distributed.rpc.rpc_sync`, |
| :meth:`~torch.distributed.rpc.rpc_async` or |
| :meth:`~torch.distributed.rpc.remote` invocation, and the owner will be notified |
| according to update the reference count. An ``OwnerRRef`` and its data will be |
| deleted when there is no ``UserRRef`` instances globally and there are no |
| reference to the ``OwnerRRef`` on the owner as well. |
| |
| |
| Assumptions |
| ^^^^^^^^^^^ |
| |
| RRef protocol is designed with the following assumptions. |
| |
| - **Transient Network Failures**: The RRef design handles transient |
| network failures by retrying messages. It cannot handle node crashes or |
| permanent network partitions. When those incidents occur, the application |
| should take down all workers, revert to the previous checkpoint, and resume |
| training. |
| - **Non-idempotent UDFs**: We assume the user functions (UDF) provided to |
| :meth:`~torch.distributed.rpc.rpc_sync`, |
| :meth:`~torch.distributed.rpc.rpc_async` or |
| :meth:`~torch.distributed.rpc.remote` are not idempotent and therefore |
| cannot be retried. However, internal RRef control messages are idempotent and |
| retried upon message failure. |
| - **Out of Order Message Delivery**: We do not assume message delivery order |
| between any pair of nodes, because both sender and receiver are using multiple |
| threads. There is no guarantee on which message will be processed first. |
| |
| |
| RRef Lifetime |
| ^^^^^^^^^^^^^ |
| |
| The goal of the protocol is to delete an ``OwnerRRef`` at an appropriate time. |
| The right time to delete an ``OwnerRRef`` is when there are no living |
| ``UserRRef`` instances and user code is not holding references to the |
| ``OwnerRRef`` either. The tricky part is to determine if there are any living |
| ``UserRRef`` instances. |
| |
| Design Reasoning |
| ---------------- |
| |
| A user can get a ``UserRRef`` in three situations: |
| |
| 1) Receiving a ``UserRRef`` from the owner. |
| 2) Receiving a ``UserRRef`` from another user. |
| 3) Creating a new ``UserRRef`` owned by another worker. |
| |
| |
| Case 1 is the simplest where the owner passes its RRef to a user, where the |
| owner calls :meth:`~torch.distributed.rpc.rpc_sync`, |
| :meth:`~torch.distributed.rpc.rpc_async`, or |
| :meth:`~torch.distributed.rpc.remote` and uses its RRef as an argument. In this |
| case a new ``UserRRef`` will be created on the user. As the owner is the caller, |
| it can easily update its local reference count on the ``OwnerRRef``. |
| |
| The only requirement is that any |
| ``UserRRef`` must notify the owner upon destruction. Hence, we need the first |
| guarantee: |
| |
| **G1. The owner will be notified when any UserRRef is deleted.** |
| |
| As messages might come delayed or out-of-order, we need one more guarantee to |
| make sure the delete message is not processed too soon. If A sends a message to |
| B that involves an RRef, we call the RRef on A (the parent RRef) and the RRef on B |
| (the child RRef). |
| |
| **G2. Parent RRef will NOT be deleted until the child RRef is confirmed by the |
| owner.** |
| |
| In cases 2 and 3, it is possible that the owner has only partial or no knowledge |
| at all about the RRef fork graph. For example, an RRef could be |
| constructed on a user, and before the owner receives any RPC call, the |
| creator user might have already shared the RRef with other users, and those |
| users could further share the RRef. One invariant is that the fork graph of |
| any RRef is always a tree, because forking an RRef always |
| creates a new ``UserRRef`` instance on the callee (except if the callee is the |
| owner), and hence every RRef has a single parent. |
| |
| The owner's view on any ``UserRRef`` in the tree has three stages: |
| |
| .. code:: |
| |
| 1) unknown -> 2) known -> 3) deleted. |
| |
| The owner's view of the entire tree keeps changing. The owner deletes its |
| ``OwnerRRef`` instance when it thinks there are no living ``UserRRef`` |
| instances, i.e., |
| when ``OwnerRRef`` is deleted, all ``UserRRef`` instances could be either indeed |
| deleted or unknown. The dangerous case is when some forks are unknown and others |
| are deleted. |
| |
| **G2** trivially guarantees that no parent ``UserRRef`` can be deleted before |
| the owner knows all of its children ``UserRRef`` instances. However, it is |
| possible that the child ``UserRRef`` may be deleted before the owner knows its |
| parent ``UserRRef``. |
| |
| Consider the following example, where the ``OwnerRRef`` forks to A, then A forks |
| to Y, and Y forks to Z: |
| |
| .. code:: |
| |
| OwnerRRef -> A -> Y -> Z |
| |
| If all of Z's messages, including the delete message, are processed by the |
| owner before Y's messages. the owner will learn of Z's deletion befores |
| knowing Y exists. Nevertheless, this does not cause any problem. Because, at least |
| one of Y's ancestors will be alive (A) and it will |
| prevent the owner from deleting the ``OwnerRRef``. More specifically, if the |
| owner does not know Y, A cannot be deleted due to **G2**, and the owner knows A |
| since it is A's parent. |
| |
| Things get a little trickier if the RRef is created on a user: |
| |
| |
| .. code:: |
| |
| OwnerRRef |
| ^ |
| | |
| A -> Y -> Z |
| |
| |
| If Z calls :meth:`~torch.distributed.rpc.RRef.to_here` on the ``UserRRef``, the |
| owner at least knows A when Z is deleted, because otherwise, |
| :meth:`~torch.distributed.rpc.RRef.to_here` wouldn't finish. If Z does not call |
| :meth:`~torch.distributed.rpc.RRef.to_here`, it is possible that the owner |
| receives all messages from Z before any message from A and Y. In this case, as |
| the real data of the ``OwnerRRef`` has not been created yet, there is nothing to |
| be deleted either. It is the same as Z does not exist at all. Hence, it's still |
| OK. |
| |
| Implementation |
| -------------- |
| |
| **G1** is implemented by sending out a delete message in ``UserRRef`` |
| destructor. To provide **G2**, the parent ``UserRRef`` is put into a context |
| whenever it is forked, indexed by the new ``ForkId``. The parent ``UserRRef`` is |
| only removed from the context when it receives an acknowledgement message (ACK) |
| from the child, and the child will only send out the ACK when it is confirmed by |
| the owner. |
| |
| |
| Protocol Scenarios |
| ^^^^^^^^^^^^^^^^^^ |
| |
| Let's now discuss how the above designs translate to the protocol in four |
| scenarios. |
| |
| User Share RRef with Owner as Return Value |
| ------------------------------------------ |
| |
| |
| .. code:: |
| |
| import torch |
| import torch.distributed.rpc as rpc |
| |
| # on worker A |
| rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1)) |
| # say the rref has RRefId 100 and ForkId 1 |
| rref.to_here() |
| |
| |
| In this case, the ``UserRRef`` is created on the user worker A, then it is |
| passed to the owner worker B together with the remote message, and then B |
| creates the ``OwnerRRef``. The method :meth:`~torch.distributed.rpc.remote` |
| returns immediately, meaning that the ``UserRRef`` can be forked/used before |
| the owner knows about it. |
| |
| On the owner, when receiving the :meth:`~torch.distributed.rpc.remote` call, it |
| will create the ``OwnerRRef``, and returns an ACK to acknowledge ``{100, 1}`` |
| (``RRefId``, ``ForkId``). Only after receiving this ACK, can A delete its |
| ``UserRRef``. This involves both **G1** and **G2**. **G1** is obvious. For |
| **G2**, the ``OwnerRRef`` is a child of the ``UserRRef``, and the ``UserRRef`` |
| is not deleted until it receives the ACK from the owner. |
| |
| .. image:: https://user-images\.githubusercontent\.com/16999635/69164772-98181300-0abe-11ea-93a7-9ad9f757cd94.png |
| :alt: user_to_owner_ret.png |
| :width: 500 px |
| |
| The diagram above shows the message flow, where solid arrow contains user |
| function and dashed arrow are builtin messages. Note that the first two messages |
| from A to B (:meth:`~torch.distributed.rpc.remote` and |
| :meth:`~torch.distributed.rpc.RRef.to_here`) may |
| arrive at B in any order, but the final delete message will only be sent out |
| when: |
| |
| - B acknowledges ``UserRRef {100, 1}`` (G2), and |
| - Python GC agrees to delete the local ``UserRRef`` instance. This occurs when |
| the RRef is no longer in scope and is eligible for garbage collection. |
| |
| |
| |
| User Share RRef with Owner as Argument |
| -------------------------------------- |
| |
| .. code:: |
| |
| import torch |
| import torch.distributed.rpc as rpc |
| |
| # on worker A and worker B |
| def func(rref): |
| pass |
| |
| # on worker A |
| rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1)) |
| # say the rref has RRefId 100 and ForkId 1 |
| rpc.rpc_async('B', func, args=(rref, )) |
| |
| |
| In this case, after creating the ``UserRRef`` on A, A uses it as an argument in |
| a followup RPC call to B. A will keep ``UserRRef {100, 1}`` alive until it |
| receives the acknowledge from B (**G2**, not the return value of the RPC call). |
| This is necessary because A should not send out the delete message until all |
| previous messages are received, otherwise, the ``OwnerRRef`` could be |
| deleted before usage as we do not guarantee message delivery order. This is done |
| by creating a child ``ForkId`` of RRef, holding them in a map until receives the |
| owner confirms the child ``ForkId``. The figure below shows the message flow. |
| |
| .. image:: https://user-images.githubusercontent.com/16999635/69164845-b67e0e80-0abe-11ea-93fa-d24674e75a2b.png |
| :alt: user_to_owner_arg.png |
| :width: 500 px |
| |
| |
| Note that the ``UserRRef`` could be deleted on B before func finishes or even |
| starts. However this is OK, as at the time B sends out ACK for the child |
| ``ForkId``, it already acquired the ``OwnerRRef`` instance, which would prevent |
| it been deleted too soon. |
| |
| |
| Owner Share RRef with User |
| -------------------------- |
| |
| Owner to user is the simplest case, where the owner can update reference |
| counting locally, and does not need any additional control message to notify |
| others. Regarding **G2**, it is same as the parent receives the ACK from the |
| owner immediately, as the parent is the owner. |
| |
| .. code:: |
| |
| import torch |
| import torch.distributed.rpc as RRef, rpc |
| |
| # on worker B and worker C |
| def func(rref): |
| pass |
| |
| # on worker B, creating a local RRef |
| rref = RRef("data") |
| # say the rref has RRefId 100 |
| dist.rpc_async('C', func, args=(rref, )) |
| |
| |
| .. image:: https://user-images.githubusercontent.com/16999635/69164921-c990de80-0abe-11ea-9250-d32ad00cf4ae.png |
| :alt: owner_to_user.png |
| :width: 500 px |
| |
| The figure above shows the message flow. Note that when the ``OwnerRRef`` exits |
| scope after the rpc_async call, it will not be deleted, because internally |
| there is a map to hold it alive if there is any known forks, in which case is |
| ``UserRRef {100, 1}``. (**G2**) |
| |
| |
| User Share RRef with User |
| ------------------------- |
| |
| This is the most complicated case where caller user (parent ``UserRRef``), |
| callee user (child ``UserRRef``), and the owner all need to get involved. |
| |
| .. code:: |
| |
| import torch |
| import torch.distributed.rpc as rpc |
| |
| # on worker A and worker C |
| def func(rref): |
| pass |
| |
| # on worker A |
| rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1)) |
| # say the rref has RRefId 100 and ForkId 1 |
| rpc.rpc_async('C', func, args=(rref, )) |
| |
| .. image:: https://user-images.githubusercontent.com/16999635/69164971-d6adcd80-0abe-11ea-971d-6b7af131f0fd.png |
| :alt: user_to_user.png |
| :width: 500 px |
| |
| When C receives the child ``UserRRef`` from A, it sends out a fork request to |
| the owner B. Later, when the B confirms the ``UserRRef`` on C, C will perform |
| two actions in parallel: 1) send out the child ACK to A ,and 2) run the user |
| provided function. During this time, the parent (A) will hold its |
| ``UserRRef {100, 1}`` alive to achieve **G2**. |