-
Notifications
You must be signed in to change notification settings - Fork 339
Add abortWaitSend/abortWaitRecv to unbound_buffer #228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Differential Revision: D18210908 fbshipit-source-id: e38d7b308e501fbeea5ff804bd9fe9f5b207c08a
8d3fb83 to
b5c9dcc
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Differential Revision: D18210908 fbshipit-source-id: 4f904b8a208a1afe41bb81c4bbdb22fb10bda320
b5c9dcc to
c341929
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Differential Revision: D18210908 fbshipit-source-id: d71d5d46b1eb9294bd35ac6b7d5622efd1d71901
c341929 to
8a7e42d
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
gloo/transport/tcp/unbound_buffer.cc
Outdated
| } | ||
| } | ||
|
|
||
| auto recvCompleted = recvCompletions_ > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't understand this state, recvCompleted. Mind adding comments to explain it a bit?
Shouldn't a user call waitRecv(..) for N times, where N == the recvCompletions_ count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll add some comments.
The goal here is to return true if the receive completed (i.e. we got data from the remote side) and false if it was aborted. One way to know that we got data from the remote side is if there was at least one recvCompletion, so I used this to determine the return value. We could potentially also use !abortWaitRecv, but we could run into an issue where the receive completes successfully, then a thread sets abortWaitRecv before we return out of waitRecv, in which case returning !abortWaitRecv would not be correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this happen?
Before it returns out of waitRecv, doesn't it lock the mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, the lock would be reacquired once the thread is awakened. This should be changed to
if (abortWaitSend_): return false
else, decrement recvCompletions as below and return true
| buf->recv(ranks, slot); | ||
| recvCompleted = buf->waitRecv(&srcRank); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite understand the concepts from gloo.
What's the purpose of calling buf->recv(..) first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I understand, buf->recv will call the recv function from a particular pair (basically an encapsulation of 2 gloo processes that can communicate), which notifies the sender that there is a receive pending, but does not block. Then, waitRecv blocks until Pair::read indicates that data has been received.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, receive operation needs a signal first.
And it's similar code to ProcessGroupGloo::recv and ProcessGroupGloo::recvAnysource.
For normal recv, it's necessary. Otherwise, RecvWork will wait for nothing and block forever.
Since you are aborting anyway, I think these 2 lines could be removed to make the test subject highlighted.
std::vector<int> ranks = {1};
buf->recv(ranks, slot);like your AbortSend test below, you don't call buf->send(...) before wait.
pietern
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this, @rohan-varma.
gloo/transport/tcp/unbound_buffer.cc
Outdated
| } | ||
| } | ||
|
|
||
| auto recvCompleted = recvCompletions_ > 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it was aborted, it should return here and not try and decrement recvCompletions_. This counter is a tally that counts the number of recv's that completed without an accompanying waitRecv call. If it is non-zero when waitRecv is called, waitRecv can return immediately without blocking (as evidenced by the condition if (recvCompletions_ == 0)).
gloo/transport/tcp/unbound_buffer.cc
Outdated
| if (rank != nullptr) { | ||
| *rank = recvRank_; | ||
| } | ||
| return recvCompleted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have acknowledged this completion, we must always return true.
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Differential Revision: D18210908 fbshipit-source-id: 588d85eb2ed4c6a0270912bbd3b11298b41f3e37
8a7e42d to
48480d7
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
xush6528
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good now
| auto buf = context->createUnboundBuffer(&tmp, sizeof(tmp)); | ||
|
|
||
| std::thread waitRecvThread([&]() { | ||
| std::vector<int> ranks = {1}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used.
pietern
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides the unused variable comments from @xush6528, we also need to determine what to do for subsequent calls. Right now, all future calls to waitSend and waitRecv will return false if abort has been called once. But the intent is more likely to just interrupt a single call, so we should toggle the abort flag back to false if it was true once.
That is a good catch. We should flip the flag like you mentioned |
| } | ||
|
|
||
| if (abortWaitSend_) { | ||
| return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So flip it back here.
if (abortWaitSend_) {
abortWaitSend_ = false;
return false;
}Also need a test case for testing an "abort" call only cancles one "wait" call?
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Differential Revision: D18210908 fbshipit-source-id: e0a47edfac536492163fa4baae4d911dcaaaeac4
48480d7 to
3507b19
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Differential Revision: D18210908 fbshipit-source-id: 83a7dc66734d06f517771dcb622c6c5e40b809ff
3507b19 to
75241f9
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
xush6528
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Latest version looks good.
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Differential Revision: D18210908 fbshipit-source-id: fa3ba49e582e97929441fccb31f54ee137b64b39
7649c04 to
105d763
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Differential Revision: D18210908 fbshipit-source-id: b8f3bba514c556d6fdf512ec488a26e5c179b2e9
105d763 to
cc60340
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Differential Revision: D18210908 fbshipit-source-id: 22f586d913783f589922ce131a6ef1359b310561
cc60340 to
ca10ed8
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Differential Revision: D18210908 fbshipit-source-id: d5c1c6a2776469fc943b88ea592aa48dcf1738a6
ca10ed8 to
7d44293
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
Summary: Pull Request resolved: pytorch#228 Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added. Reviewed By: pietern Differential Revision: D18210908 fbshipit-source-id: 0ed05d1abff2c656299542637cb18b918c9a9893
7d44293 to
8fa59ac
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
facebook-github-bot
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rohan-varma is landing this pull request. If you are a Facebook employee, you can view this diff on Phabricator.
Summary: Closes pytorch#227. Adds 2 methods: `abortWaitSend`, `abortWaitRecv` so that we can interrupt a blocking `waitRecv` or `waitSend`, per pietern 's suggestion. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully. The methods set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. The implementation is both for the tcp and uv transport methods. Unit tests are also added. Pull Request resolved: pytorch#228 Test Plan: Added relevant unit tests in `send_recv_test.cc` Reviewed By: pietern Differential Revision: D18210908 Pulled By: pietern fbshipit-source-id: 013206a77edd7122c9ff6a120f3fbf41d5e75a56
8fa59ac to
62a2660
Compare
|
This pull request was exported from Phabricator. Differential Revision: D18210908 |
|
@rohan-varma merged this pull request in d3554fb. |
|
@rohan-varma What was the issue in the libuv code? |
In |
|
Redo in #232. |
Summary: Closes #227.
Adds 2 methods:
abortWaitSend,abortWaitRecvso that we can interrupt a blockingwaitRecvorwaitSend, per @pietern 's suggestion. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.The methods set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. The implementation is both for the tcp and uv transport methods. Unit tests are also added.
Differential Revision: D18210908