-
Notifications
You must be signed in to change notification settings - Fork 26.3k
Add sparse tensor allreduce #19146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add sparse tensor allreduce #19146
Conversation
Differential Revision: D14889547 Differential Version: 78974033
Differential Revision: D14889547 Differential Version: 79084183
| } else { | ||
| // We will need to coalesce first, which means new tensors will | ||
| // be allocated on the streams we just allocated, and there | ||
| // is no need to record them separately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the op is not synchronized, tensors can be freed even before coalesce happens. Isn't that still a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't be freed because the op holds on to the input tensors. The coalesce on this new stream will happen after any operations against the inputs because of events[i].block() that is run before this block.
| // | ||
| // - [0:4]: sparse dims | ||
| // - [4:8]: dense dims | ||
| // - [8]: nnz |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is nnz? number of non-zero elements?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
| // | ||
| // The layout of this memory is as follows: | ||
| // | ||
| // - [0:4]: sparse dims |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When merging this PR, let's create an issue to generalize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. The reason I picked 4 here is that I never saw anything bigger and we need to have a constant maximum number that is shared between processes, or else require another collective before we can exchange these details.
|
|
||
| // Gather all indices and all values. | ||
| auto indices = allgather_indices(input, metadata); | ||
| auto values = allgather_values(input, metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should these two be done in one allgather?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be possible in theory, since we have all data. But the data types may be different (the indices are always long and the values may be anything). To combine them in a single call we'd need to cast both of them to char buffers since it would run on the byte level.
| // Copy back to input tensors. | ||
| outputs.reserve(inputs.size()); | ||
| for (size_t i = 0; i < inputs.size(); i++) { | ||
| outputs.push_back(output.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this only for all_reduce_multigpu API? If yes, should we throw error instead if inputs.size() > 1? Because the docs says
The function operates in-place and requires that each tensor to be a GPU tensor on different GPUs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update and deprecate all_reduce_multigpu instead, IMO. We already have the multi-input collectives (for some at least), so I added this one here for parity with those. If we end up with a replicated DDP module with an embedding bag, the reducer will end up calling this multiple sparse inputs. This functionality needs to live somewhere and I rather have it here than move it back into the reducer where we'd compute a sum across replicas before passing it to the process group. Doing so would introduce an asymmetry with ProcessGroupNCCL, where multi input allreduce is valid (and will defer summing them to NCCL).
torch/lib/c10d/ProcessGroupGloo.cpp
Outdated
| /*non_blocking=*/true, | ||
| /*copy=*/true); | ||
|
|
||
| outputs.push_back( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are not writing back to the input tensors for sparse use case, we should edit Python docs accordingly. BTW, we can still write to input tensors using tensor.set_()?
Differential Revision: D14889547 Differential Version: 79475049
Differential Revision: D14889547 Differential Version: 80027945
Differential Revision: D14889547 Differential Version: 85199091
Differential Revision: D14889547 Differential Version: 85201710
|
@mrshenli The variable to tensor unboxing is no longer needed now that there has been some progress with the tensor/variable merge. The update also removes some verbose copying code (separate copy calls for the indices and values) and replaces those with a straightforward |
|
This pull request has been merged in aee6a41. |
Summary: Pull Request resolved: #22036 Implemented only on ProcessGroupGloo, as an allgather of metadata (sparse_dim, dense_dim, and nnz), followed by an allgather of indices, followed by an allgather of values. Once these operations have finished, all ranks locally compute a reduction over these sparse tensors. Works for both CPU and CUDA tensors. This surfaced a problem with the existing assumption of only modifying tensors that are passed at the call site, because for sparse tensors we don't know the dimensions of the output tensors before we run the collective. To deal with this unknown, this commit adds a `result` function to the `c10d::ProcessGroup::Work` class that returns a vector of tensors. It's a bit odd to have to retrieve the result through this function only for operations on sparse tensors. To make this work irrespective of tensor layout, we can create a follow-up commit to make all in place operations make their results accessible through this function as well. This doesn't break any existing contracts but does have the potential to add interface ambiguity. This is a resubmission of #19146. Reviewed By: mrshenli Differential Revision: D15926384 fbshipit-source-id: b6ee5d81606bfa8ed63c3d63a9e307613491e0ae
|
@pietern when is this excepted to be merge and what will be the rc version? |
|
@pietern thanks, i tried it on gloo backend and it's works. I have a few questions: |
|
Hi @lironmo ,
|
|
thanks. |
Stack:
:white_circle: #19443 Support sparse gradients in DistributedDataParallel 💛
:black_circle: #19146 Add sparse tensor allreduce 💛
Implemented only on ProcessGroupGloo, as an allgather of metadata
(sparse_dim, dense_dim, and nnz), followed by an allgather of indices,
followed by an allgather of values. Once these operations have
finished, all ranks locally compute a reduction over these sparse
tensors. Works for both CPU and CUDA tensors.
This surfaced a problem with the existing assumption of only modifying
tensors that are passed at the call site, because for sparse tensors
we don't know the dimensions of the output tensors before we run the
collective. To deal with this unknown, this commit adds a
resultfunction to the
c10d::ProcessGroup::Workclass that returns a vectorof tensors.
It's a bit odd to have to retrieve the result through this function
only for operations on sparse tensors. To make this work irrespective
of tensor layout, we can create a follow-up commit to make all in
place operations make their results accessible through this function
as well. This doesn't break any existing contracts but does have the
potential to add interface ambiguity.
Differential Revision: D14889547