-
Notifications
You must be signed in to change notification settings - Fork 26.3k
sync and async torch.distributed.rpc for builtin operators #23228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. We are not exporting this diff until we finalize distributed autograd design and publish the API review publicly. https://fb.quip.com/FabTAZKVgQpf Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. We are not exporting this diff until we finalize distributed autograd design and publish the API review publicly. https://fb.quip.com/FabTAZKVgQpf Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/) ghstack-source-id: 86988305 Pull Request resolved: #23228
…rators" Stack from [ghstack](https://github.com/ezyang/ghstack): * **#23228 sync and async torch.distributed.rpc for builtin operators** Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation for #23110 * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
…rators" Stack from [ghstack](https://github.com/ezyang/ghstack): * **#23228 sync and async torch.distributed.rpc for builtin operators** Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation for #23110 * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
…rators" Stack from [ghstack](https://github.com/ezyang/ghstack): * **#23228 sync and async torch.distributed.rpc for builtin operators** Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation for #23110 * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. We are not exporting this diff until we finalize distributed autograd design and publish the API review publicly. https://fb.quip.com/FabTAZKVgQpf Pull Request resolved: #23228 ghstack-source-id: 87140239 Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
|
@pytorchbot retest this please |
…rators" Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation for #23110 * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
…rators" Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation for #23110 * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. We are not exporting this diff until we finalize distributed autograd design and publish the API review publicly. https://fb.quip.com/FabTAZKVgQpf Pull Request resolved: #23228 ghstack-source-id: 87205606 Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
…rators" Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation for #23110 * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: * have a minimum working and testable RPC implementation * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. We are not exporting this diff until we finalize distributed autograd design and publish the API review publicly. https://fb.quip.com/FabTAZKVgQpf Pull Request resolved: #23228 ghstack-source-id: 87224632 Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
|
I think, for a disallowed operation, there should also be test cases that assert it raises as expected, rather than getting stuck in some unexpected states. Also, document these constraint in docstrings. Yeah, both sync_rpc and join_rpc might need more tests. |
Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs. * have a minimum working and testable RPC implementation. * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
|
rebase messed up files, will clean up |
Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs. * have a minimum working and testable RPC implementation. * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs. * have a minimum working and testable RPC implementation. * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
Features: * sync and async RPC for builtin operators * RpcAgent API * ProcessGroupAgent implementation Goal: This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs. * have a minimum working and testable RPC implementation. * make sure the RpcAgent API is sufficient for future ThriftAgent and TensorPipeAgent implementation * For tensor pipe implementation, it might allocate multiple underlying communication channels with different types, and might also use streaming serialization/deserialization for large tensors. To support this requirement, the current implementation only convert a BuiltinOp into a Message which contains a byte vector and a tensor table. It is up to the RpcAgent implementation to determine how it would like to serialize a Message object. * For ThriftAgent, as Thrift has it own request/response matching solution, the Message.id is no longer necessary. Hence the id can be dropped during serialization. All it needs to do is to pass the response Message object to the Future returned by send(...). * support blocking and non-blocking RequestCallback * blocking means the callback won't return before sending out the response * non-blocking can be achieved by enqueue the `(from, request, RpcAgent&)` tuple and use a different thread to process them. That is why there is an `RpcAgent&` arg in the param list. Differential Revision: [D15194693](https://our.internmc.facebook.com/intern/diff/D15194693/)
|
@xush6528 I made changes to allow calling |
| Message message = deserialize(ss); | ||
|
|
||
| if (message.isRequest()) { | ||
| cb_(names_[srcRank], std::move(message), *this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, cb has to be non-blocking for UDFs. It will come in the next PR. For this one, as we only target builtin ops, blocking cb should suffice.
|
Errors are the same for three failed tests, and are irrelevant (also appears in other PRs, e.g., #23897). I am landing this PR. |
Please ignore the error message for asmjit CMake version: this is due to the upgrade of FBGEMM. We have reverted the PR in FBGEMM. |
pritamdamania87
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass through PR a little late, hopefully we can address some of these comments in a separate PR?
| n = i + self.rank + 1 | ||
| ret = dist.rpc('worker%d' % dstRank, torch.add, | ||
| args=(torch.ones(n, n), torch.ones(n, n))) | ||
| self.assertEqual(ret, torch.ones(n, n) * 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we should test some errors and verify we handle them correctly, some examples:
- Invalid argument types.
- Invalid number of arguments.
- Invalid operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, @zhaojuanmao also mentioned that. This will come in followup PRs.
| sys.exit(0) | ||
|
|
||
|
|
||
| def _wrap_with_rpc(func): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, we could write.
class RpcTest(MultiProcessTestCase)
@classmethod
def setUpClass(cls):
super(MultiProcessTestCase, cls).setUpClass()
cls._run = _wrap_with_rpc(cls._run)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
| {message.id(), (int64_t) message.type()}, {torch::kInt64} | ||
| )); | ||
|
|
||
| torch::save(tensors, os); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on some profiling, @aazzolini mentioned torch::save and torch::load is pretty slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I also learnt that from @aazzolini. I will try the new pickle API when #23241 is merged
| torch::load(tensors, is); | ||
|
|
||
| TORCH_CHECK(tensors.size() >= 2, "Failed to deserialize a message."); | ||
| auto miscTensor = std::move(tensors.back()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe call this metadataTensor.
| TORCH_CHECK(dstRank != pg_->getRank(), "ProcessGroupAgent does not support " | ||
| "making RPC calls to self.") | ||
|
|
||
| auto requestId = nextId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nextId() is not thread safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nextId_ is atomic now, is that sufficient?
|
|
||
| } // namespace | ||
|
|
||
| ScriptRet::ScriptRet(at::IValue&& value) : value_(value) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does an OP always return a single IValue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, according to @zdevito's comment:
A return should just be a single value. This is how python and TorchScript already work.
| // Return value of a builtin operator or a TorchScript function. | ||
| class TORCH_API ScriptRet final { | ||
| public: | ||
| explicit ScriptRet(at::IValue&& values); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: value
| namespace distributed { | ||
| namespace rpc { | ||
|
|
||
| void processRequestBlocking( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some documentation to this function and also some documentation to this file mentioning what is the purpose of this file and which methods belong here.
|
|
||
| return agent.send( | ||
| dstName, ScriptCall(op, std::move(stack)).toMessage()); | ||
| } catch (std::runtime_error) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a LOG(DEBUG) when catching errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pritamdamania87 do you know which LOG I should use if I would like to make it work both internally and externally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mrshenli It looks like we do use glog in our code: https://github.com/pytorch/pytorch/blob/master/torch/csrc/utils/throughput_benchmark-inl.h#L73
| return names | ||
|
|
||
|
|
||
| def join_rpc(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned above, this makes sense for ProcessGroup based RPCs, but not really something like a thrift implementation. We shouldn't have join and sync in the public APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would assume Thrift also have a join or shutdown to make sure the local RpcAgent is not exiting too early?
|
@pritamdamania87 thanks for the comments, will definitely address them in a followup PR. |
|
@mrshenli I think this diff breaks OS X binary builds: |
Summary: #23228 caused build failure on OSX, because rpc.h is included as long as USE_DISTRIBUTED=1, but rpc/init.cpp (and others) is only included when NOT APPLE. So, it cannot find python_functions defined in init.cpp on MacOS. This PR attempt to fix it by wrapping rpc.h with USE_C10D, which is only set when NOT APPLE. I tried this fix locally and it works. Pull Request resolved: #23998 Differential Revision: D16706087 Pulled By: mrshenli fbshipit-source-id: d04fe6717a181a3198289cdef51439708c2e291d
Summary: # Issue Implementation in pytorch#23228 has a recvLoop thread that blocks on running a requested RPC function. This can't meet the requirement in pytorch#23110 The proposal implies a worker could send out nested RPC. "Nested RPC" means, an RPC callee could send out another RPC, before the first RPC returns, and since the worker's recv thread would be busy waiting for the return of the first RPC function, there is no thread to receive the return value of the second RPC function. A diagram showing the case of this issue, pytorch#23228 (comment) # Solution - Add a test case to capture this requirement. - Add debugging utilities that could be quite useful for debugging tricky RPC cases. # Misc - Add debugging utility in common_distributed for tracing RPC calls. Differential Revision: D16682122 fbshipit-source-id: 062e77faa5e8467cb3cfe5b0a16333c2762768a9
Summary: Pull Request resolved: pytorch#24036 # Issue Implementation in pytorch#23228 has a recvLoop thread that blocks on running a requested RPC function. This can't meet the requirement in pytorch#23110, where the proposal implies a worker could send out nested RPC. "Nested RPC" means, an RPC callee could send out another RPC, before the first RPC returns, and since the worker's recv thread would be busy waiting for the return of the first RPC function, there is no available idle thread to receive the return value of the second RPC function. A diagram showing the case of this issue, pytorch#23228 (comment). A more extensive thinking by mrshenli in pytorch#23569 (comment). # Solution - Add a test case to capture this requirement. # Misc - Add debugging utilities that could be quite useful for tracing RPC behaviors and debugging tricky RPC cases similar to this. Differential Revision: D16682122 fbshipit-source-id: ffa78eb20af4e2cf9476998fa544ab940035cae9

Stack from ghstack:
Features:
Goal:
This is the first PR for #23110, and there will be many followup ones. So let's focus on the overall API and code structure. Details like efficiency and error handling can be improved in future PRs.
(from, request, RpcAgent&)tuple and use a different thread to process them. That is why there is anRpcAgent&arg in the param list.Differential Revision: D15194693