-
Notifications
You must be signed in to change notification settings - Fork 26.3k
Add abort API in gloo ProcessGroup Send/Recv Work #29928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add abort to `c10d::ProcessGroup::Work`. - Change the return type of `c10d::ProcessGroup::Work::wait()` to boolean to indicate if the work is aborted after waiting. - Add unit test for the correctness of abort. ghstack-source-id: 94004848 Differential Revision: [D5685727](https://our.internmc.facebook.com/intern/diff/D5685727/) [ghstack-poisoned]
xush6528
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Need @pietern 's look too.
Original author: @xush6528 - Add abort to `c10d::ProcessGroup::Work`. - Change the return type of `c10d::ProcessGroup::Work::wait()` to boolean to indicate if the work is aborted after waiting. - Add unit test for the correctness of abort. Differential Revision: [D5685727](https://our.internmc.facebook.com/intern/diff/D5685727/) [ghstack-poisoned]
Pull Request resolved: #29928 Original author: @[100003498529358:Shihao Xu] - Add abort to `c10d::ProcessGroup::Work`. - Change the return type of `c10d::ProcessGroup::Work::wait()` to boolean to indicate if the work is aborted after waiting. - Add unit test for the correctness of abort. ghstack-source-id: 94039092 ghstack-source-id: 94039092 Differential Revision: [D5685727](https://our.internmc.facebook.com/intern/diff/D5685727/)
pritamdamania87
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something, but it seems like the way the locks are setup, the abort method can't interrupt wait?
torch/lib/c10d/ProcessGroup.hpp
Outdated
|
|
||
| // Waits until request completes. Blocking operation. | ||
| // Throws if the work completed with an exception. | ||
| // Returns false if the work is aboorted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: aborted.
torch/lib/c10d/ProcessGroupGloo.cpp
Outdated
| } | ||
|
|
||
| void ProcessGroupGloo::SendWork::abort() { | ||
| std::unique_lock<std::mutex> lock(mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a thread is blocked on buffer_->waitSend() in wait(), there is no way to abort since you need to hold the lock here? I'm wondering if this resolves the problem we're seeing, since I think the abort() call would just block forever and can't really interrupt waitSend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we would like to remove this lock, if
- no other thread can assign to buffer_
- abortWaitSend is thread safe to buffer_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced with @xush6528 about this, we should be able to remove the locks here, unbound_buffer from gloo should be thread safe. We think the tests in this diff due to the way the threads were scheduled
torch/lib/c10d/ProcessGroupGloo.cpp
Outdated
| } | ||
|
|
||
| void ProcessGroupGloo::RecvWork::abort() { | ||
| std::unique_lock<std::mutex> lock(mutex_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue with lock as waitSend
Original author: @xush6528 - Add abort to `c10d::ProcessGroup::Work`. - Change the return type of `c10d::ProcessGroup::Work::wait()` to boolean to indicate if the work is aborted after waiting. - Add unit test for the correctness of abort. Differential Revision: [D5685727](https://our.internmc.facebook.com/intern/diff/D5685727/) [ghstack-poisoned]
Original author: @xush6528 - Add abort to `c10d::ProcessGroup::Work`. - Change the return type of `c10d::ProcessGroup::Work::wait()` to boolean to indicate if the work is aborted after waiting. - Add unit test for the correctness of abort. Differential Revision: [D5685727](https://our.internmc.facebook.com/intern/diff/D5685727/) [ghstack-poisoned]
| } | ||
|
|
||
| void ProcessGroupGloo::RecvWork::abort() { | ||
| buffer_->abortWaitRecv(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not have to acquire a lock here since this function does not do any writes to class variables, and buffer_ in gloo should be thread safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we can also avoid holding the lock in RecvWork::wait and SendWork::wait when we call buffer_->waitRecv and buffer_->waitSend? We could just acquire the lock when we set completed_ and exception_?
cc @pietern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to me unless I'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The semantics for the work is that there is only a single logical user/waiter. The lock is there only to synchronize a background thread that executes the work and marks it as completed, not multiple waiters. For the send/recv work, that background thread is the actual Gloo I/O thread that marks a send/recv as completed, so for this case we could get rid of the locks entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filed #30164 so we don't forget about this.
Original author: @xush6528 - Add abort to `c10d::ProcessGroup::Work`. - Change the return type of `c10d::ProcessGroup::Work::wait()` to boolean to indicate if the work is aborted after waiting. - Add unit test for the correctness of abort. Differential Revision: [D5685727](https://our.internmc.facebook.com/intern/diff/D5685727/) [ghstack-poisoned]
Original author: @xush6528 - Add abort to `c10d::ProcessGroup::Work`. - Change the return type of `c10d::ProcessGroup::Work::wait()` to boolean to indicate if the work is aborted after waiting. - Add unit test for the correctness of abort. Differential Revision: [D5685727](https://our.internmc.facebook.com/intern/diff/D5685727/) [ghstack-poisoned]
Original author: @xush6528 - Add abort to `c10d::ProcessGroup::Work`. - Change the return type of `c10d::ProcessGroup::Work::wait()` to boolean to indicate if the work is aborted after waiting. - Add unit test for the correctness of abort. Differential Revision: [D5685727](https://our.internmc.facebook.com/intern/diff/D5685727/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
Original author: @xush6528 - Add abort to `c10d::ProcessGroup::Work`. - Change the return type of `c10d::ProcessGroup::Work::wait()` to boolean to indicate if the work is aborted after waiting. - Add unit test for the correctness of abort. Differential Revision: [D5685727](https://our.internmc.facebook.com/intern/diff/D5685727/) [ghstack-poisoned]
…gent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
Original author: @xush6528 - Add abort to `c10d::ProcessGroup::Work`. - Change the return type of `c10d::ProcessGroup::Work::wait()` to boolean to indicate if the work is aborted after waiting. - Add unit test for the correctness of abort. Differential Revision: [D5685727](https://our.internmc.facebook.com/intern/diff/D5685727/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…agent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…agent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…agent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…agent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…agent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…agent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…agent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…agent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…agent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…agent" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `localShutdown` method aborts this thread and joins the other threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid statem. A destructor is also added which simply calls `localShutdown`, and we ensure that this function is not called multiple times. Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…up agent and refactor join" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `shutdown` method aborts this and joins the all the threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid state. The new way of shutting down is: ``` rpc.wait_all_workers() rpc.shutdown() ``` Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
…actor join" After changes in the previous PR (#29928) and the gloo code (pytorch/gloo#232), we can now interrupt the listener thread in process group agent, which was blocked on `wait`ing for some work to be received. The `shutdown` method aborts this and joins the all the threads. A condition variable is also added to ensure that we don't abort the thread while `recvWork` is in an invalid state. The new way of shutting down is: ``` rpc.wait_all_workers() rpc.shutdown() ``` Differential Revision: [D5578006](https://our.internmc.facebook.com/intern/diff/D5578006/) [ghstack-poisoned]
Stack from ghstack:
Original author: @xush6528
c10d::ProcessGroup::Work.c10d::ProcessGroup::Work::wait()to boolean to indicate if the work is aborted after waiting.Differential Revision: D5685727