Skip to content

Conversation

@rohan-varma
Copy link
Contributor

@rohan-varma rohan-varma commented Oct 30, 2019

Summary: Closes #227.
Adds 2 methods: abortWaitSend, abortWaitRecv so that we can interrupt a blocking waitRecv or waitSend, per @pietern 's suggestion. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The methods set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. The implementation is both for the tcp and uv transport methods. Unit tests are also added.

Differential Revision: D18210908

@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Oct 30, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Differential Revision: D18210908

fbshipit-source-id: e38d7b308e501fbeea5ff804bd9fe9f5b207c08a
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Oct 30, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Differential Revision: D18210908

fbshipit-source-id: 4f904b8a208a1afe41bb81c4bbdb22fb10bda320
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Oct 30, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Differential Revision: D18210908

fbshipit-source-id: d71d5d46b1eb9294bd35ac6b7d5622efd1d71901
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

}
}

auto recvCompleted = recvCompletions_ > 0;
Copy link

@xush6528 xush6528 Oct 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't understand this state, recvCompleted. Mind adding comments to explain it a bit?

Shouldn't a user call waitRecv(..) for N times, where N == the recvCompletions_ count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add some comments.

The goal here is to return true if the receive completed (i.e. we got data from the remote side) and false if it was aborted. One way to know that we got data from the remote side is if there was at least one recvCompletion, so I used this to determine the return value. We could potentially also use !abortWaitRecv, but we could run into an issue where the receive completes successfully, then a thread sets abortWaitRecv before we return out of waitRecv, in which case returning !abortWaitRecv would not be correct

Copy link

@xush6528 xush6528 Oct 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this happen?
Before it returns out of waitRecv, doesn't it lock the mutex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, the lock would be reacquired once the thread is awakened. This should be changed to

if (abortWaitSend_): return false
else, decrement recvCompletions as below and return true

Comment on lines +252 to +261
buf->recv(ranks, slot);
recvCompleted = buf->waitRecv(&srcRank);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite understand the concepts from gloo.

What's the purpose of calling buf->recv(..) first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, buf->recv will call the recv function from a particular pair (basically an encapsulation of 2 gloo processes that can communicate), which notifies the sender that there is a receive pending, but does not block. Then, waitRecv blocks until Pair::read indicates that data has been received.

Copy link

@xush6528 xush6528 Oct 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, receive operation needs a signal first.
And it's similar code to ProcessGroupGloo::recv and ProcessGroupGloo::recvAnysource.

For normal recv, it's necessary. Otherwise, RecvWork will wait for nothing and block forever.

Since you are aborting anyway, I think these 2 lines could be removed to make the test subject highlighted.

std::vector<int> ranks = {1};

buf->recv(ranks, slot);

like your AbortSend test below, you don't call buf->send(...) before wait.

Copy link
Contributor

@pietern pietern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this, @rohan-varma.

}
}

auto recvCompleted = recvCompletions_ > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it was aborted, it should return here and not try and decrement recvCompletions_. This counter is a tally that counts the number of recv's that completed without an accompanying waitRecv call. If it is non-zero when waitRecv is called, waitRecv can return immediately without blocking (as evidenced by the condition if (recvCompletions_ == 0)).

if (rank != nullptr) {
*rank = recvRank_;
}
return recvCompleted;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have acknowledged this completion, we must always return true.

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Oct 30, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Differential Revision: D18210908

fbshipit-source-id: 588d85eb2ed4c6a0270912bbd3b11298b41f3e37
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

@rohan-varma
Copy link
Contributor Author

@pietern @xush6528 Updated with the suggestions.

Copy link

@xush6528 xush6528 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now

auto buf = context->createUnboundBuffer(&tmp, sizeof(tmp));

std::thread waitRecvThread([&]() {
std::vector<int> ranks = {1};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used.

Copy link
Contributor

@pietern pietern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the unused variable comments from @xush6528, we also need to determine what to do for subsequent calls. Right now, all future calls to waitSend and waitRecv will return false if abort has been called once. But the intent is more likely to just interrupt a single call, so we should toggle the abort flag back to false if it was true once.

@rohan-varma
Copy link
Contributor Author

Right now, all future calls to waitSend and waitRecv will return false if abort has been called once. But the intent is more likely to just interrupt a single call, so we should toggle the abort flag back to false if it was true once.

That is a good catch. We should flip the flag like you mentioned

}

if (abortWaitSend_) {
return false;
Copy link

@xush6528 xush6528 Oct 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So flip it back here.

if (abortWaitSend_) {
    abortWaitSend_ = false;
    return false;
}

Also need a test case for testing an "abort" call only cancles one "wait" call?

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Oct 31, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Differential Revision: D18210908

fbshipit-source-id: e0a47edfac536492163fa4baae4d911dcaaaeac4
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Oct 31, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Differential Revision: D18210908

fbshipit-source-id: 83a7dc66734d06f517771dcb622c6c5e40b809ff
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

Copy link

@xush6528 xush6528 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest version looks good.

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Nov 1, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Differential Revision: D18210908

fbshipit-source-id: fa3ba49e582e97929441fccb31f54ee137b64b39
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Nov 1, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Differential Revision: D18210908

fbshipit-source-id: b8f3bba514c556d6fdf512ec488a26e5c179b2e9
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Nov 1, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Differential Revision: D18210908

fbshipit-source-id: 22f586d913783f589922ce131a6ef1359b310561
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Nov 1, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Differential Revision: D18210908

fbshipit-source-id: d5c1c6a2776469fc943b88ea592aa48dcf1738a6
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

rohan-varma pushed a commit to rohan-varma/gloo that referenced this pull request Nov 4, 2019
Summary:
Pull Request resolved: pytorch#228

Adds abortWaitSend, abortWaitRecv so that we can interrupt a blocking `waitRecv` or `waitSend`. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The APIs set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. Unit tests are also added.

Reviewed By: pietern

Differential Revision: D18210908

fbshipit-source-id: 0ed05d1abff2c656299542637cb18b918c9a9893
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

Copy link

@facebook-github-bot facebook-github-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohan-varma is landing this pull request. If you are a Facebook employee, you can view this diff on Phabricator.

Summary:
Closes pytorch#227.
Adds 2 methods: `abortWaitSend`, `abortWaitRecv` so that we can interrupt a blocking `waitRecv` or `waitSend`, per pietern 's suggestion. This is useful if a gloo process is waiting for a message in one thread, but another thread decides to shutdown and wants to stop it gracefully.

The methods set a boolean variable and notify a waiting condition variable. The condition variable predicate is changed to check this condition. The implementation is both for the tcp and uv transport methods. Unit tests are also added.
Pull Request resolved: pytorch#228

Test Plan: Added relevant unit tests in `send_recv_test.cc`

Reviewed By: pietern

Differential Revision: D18210908

Pulled By: pietern

fbshipit-source-id: 013206a77edd7122c9ff6a120f3fbf41d5e75a56
@facebook-github-bot
Copy link

This pull request was exported from Phabricator. Differential Revision: D18210908

@facebook-github-bot
Copy link

@rohan-varma merged this pull request in d3554fb.

@pietern
Copy link
Contributor

pietern commented Nov 5, 2019

@rohan-varma What was the issue in the libuv code?

@rohan-varma
Copy link
Contributor Author

rohan-varma commented Nov 5, 2019

@rohan-varma What was the issue in the libuv code?

In uv/unbound_buffer.cc, std::lock_guard<std::mutex> guard(m_); should be std::lock_guard<std::mutex> guard(mutex_). This results in the uv code not building (sadly wasn't caught by any of the build systems )

@xush6528
Copy link

xush6528 commented Nov 5, 2019

Redo in #232.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Interrupt waitSend/waitRecv

4 participants