| #include "caffe2/core/operator.h" |
| #include "caffe2/operators/no_default_engine_op.h" |
| |
| namespace caffe2 { |
| |
| OPERATOR_SCHEMA(CreateCommonWorld) |
| .NumInputs(0, 1) |
| .NumOutputs(1) |
| .SetDoc(R"DOC( |
| Creates a common world for communication operators. |
| )DOC") |
| .Input(0, "kv_handler", "Key/value handler for rendezvous (optional).") |
| .Output(0, "comm_world", "A common world for collective operations.") |
| .Arg("size", "(int) size of the common world.") |
| .Arg("rank", "(int) rank of this node in the common world."); |
| |
| OPERATOR_SCHEMA(CloneCommonWorld) |
| .NumInputs(1) |
| .NumOutputs(1) |
| .SetDoc(R"DOC( |
| Clones existing common world. |
| )DOC") |
| .Input(0, "existing_comm_world", "Existing common world to clone.") |
| .Output(0, "comm_world", "A common world for collective operations."); |
| |
| OPERATOR_SCHEMA(DestroyCommonWorld) |
| .NumInputs(1) |
| .NumOutputs(1) |
| .EnforceInplace({{0, 0}}) |
| .SetDoc("Closes all connections managed by a common world.") |
| .Input(0, "common_world", "The common world to be destroyed."); |
| |
| OPERATOR_SCHEMA(Broadcast) |
| .NumInputsOutputs([](int in, int out) { |
| return in >= 2 && out == (in - 1); |
| }) |
| .EnforceInplace([](int in, int out) { return (in - 1) == out; }) |
| .InputsCanCrossDevices() |
| .IdenticalTypeAndShapeOfInput(0) |
| .SetDoc(R"DOC( |
| Does a broadcast operation from the root node to every other node. The tensor |
| on each node should have been pre-created with the same shape and data type. |
| )DOC") |
| .Input(0, "comm_world", "The common world.") |
| .Input(1, "X", "A tensor to be broadcasted.") |
| .Output(0, "X", "In-place as input 1.") |
| .Arg("root", "(int, default 0) the root to run broadcast from."); |
| |
| OPERATOR_SCHEMA(Reduce) |
| .NumInputs(2) |
| .NumOutputs(1) |
| .InputsCanCrossDevices() |
| .IdenticalTypeAndShapeOfInput(0) |
| .SetDoc(R"DOC( |
| Does a reduce operation from every node to the root node. Currently only |
| Sum is supported. |
| )DOC") |
| .Input(0, "comm_world", "The common world.") |
| .Input(1, "X", "A tensor to be reduced.") |
| .Output(0, "Y", "The reduced result on root, not set for other nodes.") |
| .Arg("root", "(int, default 0) the root to run reduce into."); |
| |
| OPERATOR_SCHEMA(Allreduce) |
| .NumInputsOutputs([](int in, int out) { |
| return in >= 2 && out == (in - 1); |
| }) |
| .EnforceInplace([](int in, int out) { return (in - 1) == out; }) |
| .IdenticalTypeAndShapeOfInput(0) |
| .InputsCanCrossDevices() |
| .SetDoc(R"DOC( |
| Does an allreduce operation among the nodes. Currently only Sum is supported. |
| )DOC") |
| .Input(0, "comm_world", "The common world.") |
| .Input(1, "X", "A tensor to be allreduced.") |
| .Output(0, "Y", "The allreduced tensor, same on all nodes."); |
| |
| OPERATOR_SCHEMA(ReduceScatter) |
| .NumInputsOutputs([](int in, int out) { |
| return in >= 2 && out == (in - 1); |
| }) |
| .EnforceInplace([](int in, int out) { return (in - 1) == out; }) |
| .IdenticalTypeAndShapeOfInput(0) |
| .InputsCanCrossDevices() |
| .SetDoc(R"DOC( |
| Does reduce-scatter operation among the nodes. Currently only Sum is supported. |
| )DOC") |
| .Input(0, "comm_world", "The common world.") |
| .Input(1, "X", "A tensor to be reduce-scattered.") |
| .Output(0, "Y", "The reduced tensor, scattered on all nodes."); |
| |
| OPERATOR_SCHEMA(Allgather) |
| .NumInputs(2, INT_MAX) |
| .NumOutputs(1) |
| .InputsCanCrossDevices() |
| .SetDoc(R"DOC( |
| Does an allgather operation among the nodes. |
| )DOC") |
| .Input(0, "comm_world", "The common world.") |
| .Input(1, "X", "A tensor to be allgathered.") |
| .Output(0, "Y", "The allgathered tensor, same on all nodes."); |
| |
| OPERATOR_SCHEMA(Barrier) |
| .NumInputs(1) |
| .SetDoc(R"DOC( |
| Does a barrier operation among the nodes. |
| )DOC") |
| .Input(0, "comm_world", "The common world."); |
| |
| OPERATOR_SCHEMA(SendTensor) |
| .NumInputs({2, 4}) |
| .NumOutputs(0) |
| .SetDoc(R"DOC( |
| Sends the tensor to another node. |
| )DOC") |
| .Input(0, "comm_world", "The common world.") |
| .Input(1, "X", "A tensor to be allgathered.") |
| .Input( |
| 2, |
| "dst", |
| "An int CPUtensor of size 1 specifying the rank. If " |
| "given, this overrides the 'to' argument of the op.") |
| .Input( |
| 3, |
| "tag", |
| "An int CPUtensor of size 1 specifying the tag to " |
| "send the tensor with. This overrides the 'tag' " |
| "argument of the op.") |
| .Arg("dst", "The rank to send the tensor to.") |
| .Arg("tag", "(int) a tag to send the tensor with.") |
| .Arg( |
| "raw_buffer", |
| "(bool) if set, only send the content and assume that the receiver " |
| "has already known the tensor's shape and information."); |
| |
| OPERATOR_SCHEMA(ReceiveTensor) |
| .NumInputs({2, 4}) |
| .NumOutputs(3) |
| .EnforceInplace({{1, 0}}) |
| .AllowInplace({{2, 1}, {3, 2}}) |
| .SetDoc(R"DOC( |
| Receives the tensor from another node. |
| )DOC") |
| .Input(0, "comm_world", "The common world.") |
| .Input( |
| 1, |
| "Y", |
| "In-place output. If raw_buffer is specified, " |
| "Y should have pre-allocated data and type..") |
| .Input( |
| 2, |
| "src", |
| "An int CPUtensor of size 1 specifying the rank. If " |
| "given, this overrides the 'from' argument of the op.") |
| .Input( |
| 3, |
| "tag", |
| "An int CPUtensor of size 1 specifying the tag to " |
| "send the tensor with. This overrides the 'tag' " |
| "argument of the op.") |
| .Output(0, "Y", "The received tensor.") |
| .Output( |
| 1, |
| "src", |
| "The sender that sent the message as a CPUTensor " |
| "of size 1 and of type int.") |
| .Output( |
| 2, |
| "tag", |
| "The tag that the message is sent with as a CPUTensor " |
| "of size 1 and of type int.") |
| .Arg("src", "(int) he rank to receive the tensor from.") |
| .Arg("tag", "(int) a tag to receive the tensor with.") |
| .Arg( |
| "raw_buffer", |
| "(bool) if set, only send the content and assume that the receiver " |
| "has already known the tensor's shape and information."); |
| |
| SHOULD_NOT_DO_GRADIENT(CreateCommonWorld); |
| SHOULD_NOT_DO_GRADIENT(CloneCommonWorld); |
| SHOULD_NOT_DO_GRADIENT(DestroyCommonWorld); |
| SHOULD_NOT_DO_GRADIENT(Broadcast); |
| SHOULD_NOT_DO_GRADIENT(Reduce); |
| SHOULD_NOT_DO_GRADIENT(Allgather); |
| SHOULD_NOT_DO_GRADIENT(Allreduce); |
| SHOULD_NOT_DO_GRADIENT(ReduceScatter); |
| SHOULD_NOT_DO_GRADIENT(Barrier); |
| SHOULD_NOT_DO_GRADIENT(SendTensor); |
| SHOULD_NOT_DO_GRADIENT(ReceiveTensor); |
| |
| // Communication operators do not have default engines. |
| REGISTER_CPU_OPERATOR(CreateCommonWorld, NoDefaultEngineOp<CPUContext>); |
| REGISTER_CPU_OPERATOR(CloneCommonWorld, NoDefaultEngineOp<CPUContext>); |
| REGISTER_CPU_OPERATOR(DestroyCommonWorld, NoDefaultEngineOp<CPUContext>); |
| REGISTER_CPU_OPERATOR(Broadcast, NoDefaultEngineOp<CPUContext>); |
| REGISTER_CPU_OPERATOR(Reduce, NoDefaultEngineOp<CPUContext>); |
| REGISTER_CPU_OPERATOR(Allgather, NoDefaultEngineOp<CPUContext>); |
| REGISTER_CPU_OPERATOR(Allreduce, NoDefaultEngineOp<CPUContext>); |
| REGISTER_CPU_OPERATOR(ReduceScatter, NoDefaultEngineOp<CPUContext>); |
| REGISTER_CPU_OPERATOR(Barrier, NoDefaultEngineOp<CPUContext>); |
| REGISTER_CPU_OPERATOR(SendTensor, NoDefaultEngineOp<CPUContext>); |
| REGISTER_CPU_OPERATOR(ReceiveTensor, NoDefaultEngineOp<CPUContext>); |
| |
| } // namespace caffe2 |