-
Notifications
You must be signed in to change notification settings - Fork 26.3k
Adding RRef as return value for builtin operators #25169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well.
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well.
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. ghstack-source-id: 3bbf8e1 Pull Request resolved: #25169
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. ghstack-source-id: 3f1eca4 Pull Request resolved: #25169
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. ghstack-source-id: 635a250 Pull Request resolved: #25169
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. ghstack-source-id: 76b4bde Pull Request resolved: #25169
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. ghstack-source-id: ce304cc Pull Request resolved: #25169
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
pietern
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a first pass, need to do another one.
I noticed this is not clang-format'ed. Do you want to do that as part of this PR as well?
| auto payload_size = message.payload().size(); | ||
| auto value = jit::unpickle(payload, payload_size, nullptr, &message.tensors()); | ||
| auto value = jit::unpickle( | ||
| payload, payload_size, nullptr, &message.tensors()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will conflict with #25502.
pietern
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another pass. Looking good!
| ScriptRRefFetch(id().toIValue()).toMessage() | ||
| ); | ||
| auto srv = ScriptRRefValue::fromMessage(fm->wait()); | ||
| return srv.value(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to enforce move semantics here? Not sure if this will trigger a copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding move here gives me "warning: moving a local object in a return statement prevents copy elision [-Wpessimizing-move]".
I modified the return type of srv.value() to const& to explicitly avoid additional copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible that returning as value implies move here, given that srv is also local.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A ref will be invalid after return, since srv is local.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A ref will be invalid after return, since srv is local.
Oh, I mean srv.value()'s return type is IValue&, but toHere()'s return type is still IValue. So that there will only be one copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copying IValues is actually pretty cheap since we just copy the underlying intrusive_ptr and not the entire data. The only overhead is incrementing the ref count for intrusive_ptr.
| // NB: cannot use make_shared here as the constructor of OwnerRRef is | ||
| // private. | ||
| auto rref = std::shared_ptr<OwnerRRef<T>>( | ||
| new OwnerRRef<T>(getWorkerId(), rrefId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can use std::make_shared here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OwnerRRef's constructor is private (this is intentional, so that all RRef construction goes through RRefContext). If we use make shared, OwnerRRef needs to add the allocator as a friend or we need to create a wrapper class that enables make_shared, no?
aazzolini
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, but it seems that the messaging mechanism will have to change in order to accommodate the remaining use cases, in particular allowing RRefs as arguments to built-ins. I believe we should commit this one soon and make the required changes on top.
| } | ||
|
|
||
| GloballyUniqueId GloballyUniqueId::fromIValue(at::IValue&& ivalue) { | ||
| auto ivalues = ivalue.toTuple()->elements(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens if ivalue.toTuple() conversion fails? Does it throw an exception or does it return a nullptr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It throws an exception if it is not a tuple
| namespace rpc { | ||
|
|
||
| // Temporary solution of RRef operations. | ||
| // TODO: Remove all these messages and use rpc + registered functions instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What registered functions? You mean JIT built-ins?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think RRef should be built on top of RPC not in RPC. It does not has to be JIT function, we could have our own namespace for RRef. Similar to using torch.ops.aten.* for builtin ops, we could have torch.distributed.rpc.* for RPC related functions. Then all these messages can be unified as a SCRIPT_CALL.
| if (message.isRequest()) { | ||
| auto response = cb_(std::move(message)); | ||
| send(work.from_, std::move(response)); | ||
| if (message.requiresResponse()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should always send the response, at least as an acknowledgment, and the client should not throw away the acknowledgement, at least for tracing purposes. (e.g. if a FORK_DELETE times out, we may want to re-send the request. etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address this in #25499
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
See #23110 for RRef design details. This commit only implements RRef as return value for builtin operators, and RRef will communicate between a user and the owner. More specifically, a RRef is first created on the `dist.remote` caller, which is a user of the RRef. Then the RRef user sends and notification to the owner to report the fork to the owner, and the owner uses a shared_ptr to keep the RRef alive. When the user RRef is destructed on the caller, another notification will be sent to the owner, and the owner can then drop it's RRef as well. Differential Revision: [D17048343](https://our.internmc.facebook.com/intern/diff/D17048343)
|
As of the latest commit in #25502, the conflicts no longer exist. Landing now. |
| at::IValue retRRefId, | ||
| at::IValue retForkId); | ||
|
|
||
| at::IValue retRRefId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docs for retRRefId and retForkId.
| static ScriptRemoteCall fromMessage(const Message& message); | ||
|
|
||
| private: | ||
| const at::IValue retRRefId_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add documentation about what these ids are.
| expected.append(torch.ones(n, n) * 2) | ||
|
|
||
| for i in range(m): | ||
| self.assertEqual(rrefs[i].to_here(), expected[i]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some more tests here:
- Verify all RRefs are cleaned up properly from RRefContext in the end.
- Validate state of RRefContext on each node after we've passed a bunch of RRef's around. Also, validate forks are created appropriately.
- Add test to verify owner of RRef is populated correctly.
- Take down the owner of an rref and verify to_here() throws an appropriate exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take down the owner of an rref and verify to_here() throws an appropriate exception.
@pritamdamania87 How do we take down the owner of an RRef? Are we going to expose test-only APIs for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can create a rref which is not referred in owner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gqchen could you please elaborate on "create a rref which is not referred in owner"?
|
@aazzolini Thanks for reviewing the PR, although I don't think this PR was in any state to be landed. I got a chance to review it recently and have a lot of concerns as you can see inline. At a high level, the PR needed:
All of these issues should have been addressed in this PR and not in a PR on top of this. |
|
@pritamdamania87 you're right, my accept was hasty. @mrshenli, definitely address request from other reviewers before moving on to #25499. |
| // TODO: make this asynchronous | ||
| // src is only alive within this block, use reference to avoid copy | ||
| auto& stack = src.stackRef(); | ||
| src.op()->getOperation()(stack); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to handle exceptions here? When we indeed have exception, the ownerRRef should save the exception. This exception will be thrown when client tries to access the ref value.
| // | ||
| // In order to preserve correctness of reference counting, each RRefForkData | ||
| // **MUST** be deserialized into a RRef. This means that if RRefForkData is to | ||
| // be transferred across the network, we need the guarantee that the message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can we guarantee that? why do we need this guarantee?
| RRef::RRef(worker_id_t ownerId, const RRefId& rrefId) | ||
| : ownerId_(ownerId), rrefId_(rrefId) {} | ||
|
|
||
| worker_id_t RRef::owner() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make all those get functions inline?
| if (ownerId == getWorkerId()) { | ||
| return getOrCreateOwnerRRef<T>(rrefId); | ||
| } else { | ||
| return createUserRRef(ownerId, rrefId, forkId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we separate out getOrCreateOwnerRRef() and getOrCreateUserRRef()? Do we need getOrCreateUserRRef?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need getOrCreateUserRRef, because we will always create a new UserRRef on every use.
gqchen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few more issues. I agree with Pritam that we need to get this PR wrapped up before moving on to add new features.
| "RRefForkData must be trivially copyable"); | ||
|
|
||
| // TODO: make RRef an IValue, and edit createStackForSchema accordingly | ||
| class RRef { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a description about the meaning of UserRRef, OwnerRRef, RRefId, ForkId (like what you explained in the chat thread). Those are essential to understand the rest of the code, and they are not obvious.
|
|
||
| worker_id_t owner() const; | ||
| const RRefId& id() const; | ||
| IValue fork() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In what scenario we want to call this API for OwnerRRef? Should we consider moving this method to UserRRef?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The owner may forward its RRef to a user, right? And that's when the fork occurs on owner.
| } | ||
|
|
||
| std::shared_ptr<UserRRef> createUserRRef(worker_id_t ownerId) { | ||
| TORCH_CHECK(ownerId != getWorkerId(), "Cannot create UserRRef on owner."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should let user create a UserRRef to a local tensor? This seems to be a very natural behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is an example where creating a local UserRRef is very helpful:
def foo(a: RRef[Tensor]):
pass
x = torch.ones(2, 2)
remote(on="worker1", foo, RRef(x))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| const std::shared_ptr<RpcAgent> agent_; | ||
| std::mutex mutex_; | ||
| // Keep OwnerRRefs alive until there is no living UserRRefs. | ||
| std::unordered_map<RRefId, std::shared_ptr<RRef>, RRefId::Hash> owners_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we be explicit here? use std::share_ptr<OwnerRRef>, instead of std::share_ptr<RRef>
| std::shared_ptr<OwnerRRef<IValue>> rref = | ||
| RRefContext::getInstance()->getOrCreateOwnerRRef<IValue>( | ||
| RRefId::fromIValue(srf.value())); | ||
| auto response = ScriptRRefFetchRet(rref->getValue()).toMessage(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if value is not available, this will block forever. We should add a timeout here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will add timeout and error-handling in separate PR. Let me create issues for it.
Stack from ghstack:
See #23110 for RRef design details. This commit only implements
RRef as return value for builtin operators, and RRef will communicate
between a user and the owner. More specifically, a RRef is first
created on the
dist.remotecaller, which is a user of the RRef.Then the RRef user sends and notification to the owner to report
the fork to the owner, and the owner uses a shared_ptr to keep
the RRef alive. When the user RRef is destructed on the caller,
another notification will be sent to the owner, and the owner
can then drop it's RRef as well.
Differential Revision: D17048343