Skip to content

Conversation

@mrshenli
Copy link
Contributor

@mrshenli mrshenli commented Jul 23, 2019

Stack from ghstack:

Features:

  • sync and async RPC for builtin operators
  • RpcAgent API
  • ProcessGroupAgent implementation

Goal:

This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs.

  • have a minimum working and testable RPC implementation.
  • make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
    • For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
    • For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
  • support blocking and non-blocking RequestCallback
    • blocking means the callback won't return before sending out the response
    • non-blocking can be achieved by enqueue the (from, request, RpcAgent&) tuple and use a different thread to process them. That is why there is an RpcAgent& arg in the param list.

Differential Revision: D15194693

Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



We are not exporting this diff until we finalize distributed autograd design and publish the API review publicly.

https://fb.quip.com/FabTAZKVgQpf

Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
@mrshenli mrshenli requested review from apaszke and pietern as code owners July 23, 2019 14:16
@pytorchbot pytorchbot added caffe2 module: build Build system issues oncall: distributed Add this issue/PR to distributed oncall triage queue module: internals Related to internal abstractions in c10 and ATen module: pybind Related to our Python bindings / interactions with other Python libraries labels Jul 23, 2019
mrshenli added a commit that referenced this pull request Jul 23, 2019
Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



We are not exporting this diff until we finalize distributed autograd design and publish the API review publicly.

https://fb.quip.com/FabTAZKVgQpf

Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)

ghstack-source-id: 86988305
Pull Request resolved: #23228
@mrshenli mrshenli changed the title sync and async torch.distributed.rpc for builtin operators [WIP] sync and async torch.distributed.rpc for builtin operators Jul 23, 2019
…rators"

Stack from [ghstack](https://github.com/ezyang/ghstack):
* **#23228 sync and async torch.distributed.rpc for builtin operators**

Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation for #23110
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
…rators"

Stack from [ghstack](https://github.com/ezyang/ghstack):
* **#23228 sync and async torch.distributed.rpc for builtin operators**

Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation for #23110
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
…rators"

Stack from [ghstack](https://github.com/ezyang/ghstack):
* **#23228 sync and async torch.distributed.rpc for builtin operators**

Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation for #23110
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
mrshenli added a commit that referenced this pull request Jul 25, 2019
Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



We are not exporting this diff until we finalize distributed autograd design and publish the API review publicly.

https://fb.quip.com/FabTAZKVgQpf

Pull Request resolved: #23228
ghstack-source-id: 87140239

Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
@mrshenli
Copy link
Contributor Author

@pytorchbot retest this please

…rators"


Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation for #23110
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
@pytorchbot pytorchbot added the module: tests Issues related to tests (not the torch.testing module) label Jul 25, 2019
…rators"


Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation for #23110
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
mrshenli added a commit that referenced this pull request Jul 25, 2019
Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



We are not exporting this diff until we finalize distributed autograd design and publish the API review publicly.

https://fb.quip.com/FabTAZKVgQpf

Pull Request resolved: #23228
ghstack-source-id: 87205606

Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
…rators"


Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation for #23110
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
mrshenli added a commit that referenced this pull request Jul 26, 2019
Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

* have a minimum working and testable RPC implementation
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



We are not exporting this diff until we finalize distributed autograd design and publish the API review publicly.

https://fb.quip.com/FabTAZKVgQpf

Pull Request resolved: #23228
ghstack-source-id: 87224632

Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
@mrshenli mrshenli changed the title [WIP] sync and async torch.distributed.rpc for builtin operators sync and async torch.distributed.rpc for builtin operators Jul 26, 2019
@mrshenli mrshenli requested review from ezyang, gchanan and soumith July 26, 2019 02:48
@xush6528
Copy link
Contributor

xush6528 commented Aug 6, 2019

I think, for a disallowed operation, there should also be test cases that assert it raises as expected, rather than getting stuck in some unexpected states.

Also, document these constraint in docstrings.

Yeah, both sync_rpc and join_rpc might need more tests.

Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs.

* have a minimum working and testable RPC implementation.
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
@mrshenli
Copy link
Contributor Author

mrshenli commented Aug 6, 2019

rebase messed up files, will clean up

Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs.

* have a minimum working and testable RPC implementation.
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs.

* have a minimum working and testable RPC implementation.
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
Features:

* sync and async RPC for builtin operators
* RpcAgent API
* ProcessGroupAgent implementation

Goal:

This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs.

* have a minimum working and testable RPC implementation.
* make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation
  * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object.
  * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...).
* support blocking and non-blocking RequestCallback
  * blocking means the callback won't return before sending out the response
  * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list.



Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
@mrshenli
Copy link
Contributor Author

mrshenli commented Aug 6, 2019

@xush6528 I made changes to allow calling join_rpc multiple times (to be consistent with Thread.join())

@mrshenli
Copy link
Contributor Author

mrshenli commented Aug 6, 2019

Hi @driazati, what's the plan for landing this PR and #23241? I saw pickler/unpickler APIs are changed in #23241. Do you mind if I merge this one first and then we address the conflicts together?

Message message = deserialize(ss);

if (message.isRequest()) {
cb_(names_[srcRank], std::move(message), *this);
Copy link
Contributor

@xush6528 xush6528 Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrshenli

Here is a call sequence that blocks.

Let me turn it into a unit test.

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, cb has to be non-blocking for UDFs. It will come in the next PR. For this one, as we only target builtin ops, blocking cb should suffice.

@mrshenli
Copy link
Contributor Author

mrshenli commented Aug 6, 2019

Errors are the same for three failed tests, and are irrelevant (also appears in other PRs, e.g., #23897). I am landing this PR.

19:57:38 CMake Error at third_party/fbgemm/third_party/asmjit/CMakeLists.txt:1 (cmake_minimum_required):
19:57:38   CMake 3.8 or higher is required.  You are running version 3.6.3

@zou3519 zou3519 deleted the gh/mrshenli/7/head branch August 6, 2019 23:06
@jianyuh
Copy link
Member

jianyuh commented Aug 6, 2019

Errors are the same for three failed tests, and are irrelevant (also appears in other PRs, e.g., #23897). I am landing this PR.

19:57:38 CMake Error at third_party/fbgemm/third_party/asmjit/CMakeLists.txt:1 (cmake_minimum_required):
19:57:38   CMake 3.8 or higher is required.  You are running version 3.6.3

Please ignore the error message for asmjit CMake version: this is due to the upgrade of FBGEMM. We have reverted the PR in FBGEMM.

Copy link
Contributor

@pritamdamania87 pritamdamania87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass through PR a little late, hopefully we can address some of these comments in a separate PR?

n = i + self.rank + 1
ret = dist.rpc('worker%d' % dstRank, torch.add,
args=(torch.ones(n, n), torch.ones(n, n)))
self.assertEqual(ret, torch.ones(n, n) * 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we should test some errors and verify we handle them correctly, some examples:

  1. Invalid argument types.
  2. Invalid number of arguments.
  3. Invalid operator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, @zhaojuanmao also mentioned that. This will come in followup PRs.

sys.exit(0)


def _wrap_with_rpc(func):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use a decorator like this to setup and teardown each test? Usually python unit test frameworks use setUp and tearDown methods for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we could write.

class RpcTest(MultiProcessTestCase)
    @classmethod
    def setUpClass(cls):
        super(MultiProcessTestCase, cls).setUpClass()
        cls._run = _wrap_with_rpc(cls._run)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

{message.id(), (int64_t) message.type()}, {torch::kInt64}
));

torch::save(tensors, os);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on some profiling, @aazzolini mentioned torch::save and torch::load is pretty slow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I also learnt that from @aazzolini. I will try the new pickle API when #23241 is merged

torch::load(tensors, is);

TORCH_CHECK(tensors.size() >= 2, "Failed to deserialize a message.");
auto miscTensor = std::move(tensors.back());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe call this metadataTensor.

TORCH_CHECK(dstRank != pg_->getRank(), "ProcessGroupAgent does not support "
"making RPC calls to self.")

auto requestId = nextId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextId() is not thread safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextId_ is atomic now, is that sufficient?


} // namespace

ScriptRet::ScriptRet(at::IValue&& value) : value_(value) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does an OP always return a single IValue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, according to @zdevito's comment:

A return should just be a single value. This is how python and TorchScript already work.

// Return value of a builtin operator or a TorchScript function.
class TORCH_API ScriptRet final {
public:
explicit ScriptRet(at::IValue&& values);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: value

namespace distributed {
namespace rpc {

void processRequestBlocking(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some documentation to this function and also some documentation to this file mentioning what is the purpose of this file and which methods belong here.


return agent.send(
dstName, ScriptCall(op, std::move(stack)).toMessage());
} catch (std::runtime_error) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a LOG(DEBUG) when catching errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pritamdamania87 do you know which LOG I should use if I would like to make it work both internally and externally?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return names


def join_rpc():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned above, this makes sense for ProcessGroup based RPCs, but not really something like a thrift implementation. We shouldn't have join and sync in the public APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assume Thrift also have a join or shutdown to make sure the local RpcAgent is not exiting too early?

@mrshenli
Copy link
Contributor Author

mrshenli commented Aug 7, 2019

@pritamdamania87 thanks for the comments, will definitely address them in a followup PR.

@ezyang
Copy link
Contributor

ezyang commented Aug 7, 2019

@mrshenli I think this diff breaks OS X binary builds:

Aug 07 00:00:03 + python -c 'import torch'
Aug 07 00:00:04 dyld: lazy symbol binding failed: Symbol not found: __ZN5torch11distributed3rpc16python_functionsEv
Aug 07 00:00:04   Referenced from: /Users/distiller/project/miniconda/envs/wheel_py36/lib/python3.6/site-packages/torch/lib/libtorch_python.dylib
Aug 07 00:00:04   Expected in: flat namespace
Aug 07 00:00:04 
Aug 07 00:00:04 dyld: Symbol not found: __ZN5torch11distributed3rpc16python_functionsEv
Aug 07 00:00:04   Referenced from: /Users/distiller/project/miniconda/envs/wheel_py36/lib/python3.6/site-packages/torch/lib/libtorch_python.dylib
Aug 07 00:00:04   Expected in: flat namespace

https://circleci.com/gh/pytorch/pytorch/2385105?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link/console

facebook-github-bot pushed a commit that referenced this pull request Aug 8, 2019
Summary:
#23228 caused build failure on OSX, because rpc.h is included as long as USE_DISTRIBUTED=1, but rpc/init.cpp (and others) is only included when NOT APPLE. So, it cannot find python_functions defined in init.cpp on MacOS. This PR attempt to fix it by wrapping rpc.h with USE_C10D, which is only set when NOT APPLE.

I tried this fix locally and it works.
Pull Request resolved: #23998

Differential Revision: D16706087

Pulled By: mrshenli

fbshipit-source-id: d04fe6717a181a3198289cdef51439708c2e291d
xush6528 added a commit to xush6528/pytorch that referenced this pull request Aug 8, 2019
Summary:
# Issue

Implementation in pytorch#23228 has a recvLoop thread that blocks on running a requested RPC function.

This can't meet the requirement in pytorch#23110

The proposal implies a worker could send out nested RPC.

"Nested RPC" means, an RPC callee could send out another RPC, before the first RPC returns, and since the worker's recv thread would be busy waiting for the return of the first RPC function, there is no thread to receive the return value of the second RPC function.

A diagram showing the case of this issue,

pytorch#23228 (comment)

# Solution

- Add a test case to capture this requirement.

- Add debugging utilities that could be quite useful for debugging tricky RPC cases.

# Misc

- Add debugging utility in common_distributed for tracing RPC calls.

Differential Revision: D16682122

fbshipit-source-id: 062e77faa5e8467cb3cfe5b0a16333c2762768a9
xush6528 added a commit to xush6528/pytorch that referenced this pull request Aug 9, 2019
Summary:
Pull Request resolved: pytorch#24036

# Issue

Implementation in pytorch#23228 has a recvLoop thread that blocks on running a requested RPC function.

This can't meet the requirement in pytorch#23110, where the proposal implies a worker could send out nested RPC.

"Nested RPC" means, an RPC callee could send out another RPC, before the first RPC returns, and since the worker's recv thread would be busy waiting for the return of the first RPC function, there is no available idle thread to receive the return value of the second RPC function.

A diagram showing the case of this issue, pytorch#23228 (comment).

A more extensive thinking by mrshenli in pytorch#23569 (comment).

# Solution

- Add a test case to capture this requirement.

# Misc

- Add debugging utilities that could be quite useful for tracing RPC behaviors and debugging tricky RPC cases similar to this.

Differential Revision: D16682122

fbshipit-source-id: ffa78eb20af4e2cf9476998fa544ab940035cae9
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

caffe2 module: build Build system issues module: internals Related to internal abstractions in c10 and ATen module: pybind Related to our Python bindings / interactions with other Python libraries module: tests Issues related to tests (not the torch.testing module) oncall: distributed Add this issue/PR to distributed oncall triage queue

Projects

None yet

Development

Successfully merging this pull request may close these issues.