Avoid unnecessary flattening in all-gather and reduce-scatter#33924
Avoid unnecessary flattening in all-gather and reduce-scatter#33924thorjohnsen wants to merge 13 commits intopytorch:masterfrom
Conversation
💊 Build failures summary and remediationsAs of commit 1f674c8 (more details on the Dr. CI page):
🕵️ 5 new failures recognized by patternsThe following build failures do not appear to be due to upstream breakages:
|
|
cc @agolynski Does this PR has implications on all_gather_coalesced and all_gather_base/single? |
This is a useful PR (at least in the current state of code). ASIS, this does not seem to play well with coalesced impls as tensors are assumed to be of the same size, e.g. but i don't see a reason why this can't be extended to work with coalesced tensors as well. The eventual direction is to have |
torch/lib/c10d/ProcessGroupNCCL.cpp
Outdated
| std::vector<at::Tensor>& other, | ||
| size_t world_size) { | ||
| size_t world_size, | ||
| size_t rank) { |
There was a problem hiding this comment.
to simplify reasoning in upstream code, can you introduce
bool* inplace or bool* already_flat instead and fill it in as you do below for 'inplace'?
There was a problem hiding this comment.
Do you mean introduce a new flag in python API so that "discovery" logic can be removed from ProcessGroupNCCL.cpp? In other words
torch.distributed.all_gather(output_tensors, input_tensor)
Will always flatten output tensors, while
torch.distributed.all_gather(output_tensors, input_tensor, already_flat=True)
will always do an inplace reduction?
There was a problem hiding this comment.
This can be an option as well, but I think the point of this cl is to allow user code to run fast without necessary changing it explicitly, so they'll get it 'for free'.
I meant having output bool here, e.g.
auto outputFlattened = flatten_for_scatter_gather(
outputTensors, inputTensors, size, rank, *output_is_flat
);
and then use it instead of
if (outputFlattened[i][j].storage().is_alias_of(
outputTensors[i][j].storage()) &&
outputTensors[i][j].storage_offset() ==
(outputTensors[i][0].storage_offset() +
outputTensors[i][j].numel() * j)) {
break;
}
Would it work?
There was a problem hiding this comment.
It will work, but the logic that determines if the list of tensors are views into an already flattened tensor will then have to go into the code that implements each NCCL op, so we'd end up duplicating it. Unless I put the reasoning code in a separate method.
torch/lib/c10d/ProcessGroupNCCL.cpp
Outdated
| bool inplace = true; | ||
| for (auto j = size_t{}; j < tensor_lists[i].size(); ++j) { | ||
| auto t = tensor_lists[i][j]; | ||
| if (!t.storage().is_alias_of(other[i].storage()) || |
There was a problem hiding this comment.
I might be confused about this. Why outputs should share storage with inputs? Don't we just want all outputs to share storage or this is a special case of allocating storage where outputs are combined with inputs (I didn't know this is allowed by NCCL engine!).
If that's the case, this saves some memory for separate input allocation in addition to output allocation!
However, for this specific case, maybe it's just enough to check if all outputs are properly aligned?
Do you know what would happen if outputs are aligned and share storage with inputs but inputs aren't properly placed within outputs, would NCCL crash or produce garbage outputs?
There was a problem hiding this comment.
That makes sense. In-place will definitely save memory in addition to copying data.
I was wondering if we can save copying in case we've allocated output tensors separately from inputs, but outputs are still contiguous (that is, already flat). Something like:
input = torch.tensor(size)
....
outputs = torch.tensor(sizeworld_sizenum_devices)
pg.allgather(output, input)
Do you think this usecase is worth covering as well?
(Alternatively we can just ask users to copy 'input' at the appropriate location inside 'output' and change its storage accordingly.)
What do you think?
There was a problem hiding this comment.
Both cases should work. For all-gather, we can avoid copying to flatten and un-flatten if the outputs are views into an already flattened tensor. In addition, if the input is also a view into the same flattened tensor and input is properly aligned, we can do an in-place operation. The code in my PR already does this, but I don't verify that the input is correctly aligned. I will add code to verify proper alignment.
|
Would it be possible to add a test that exercises the new code path? |
|
Absolutely! I can add a test script to verify correctness and demonstrate the perf benefit. |
|
@mrshenli mentioned all_gather_coalesce as a potential issue. Is this something I need to address in this PR? |
|
I've discussed this with @mrshenli offline and he mentioned that this might become a breaking change if the following holds:
previous behavior: we make a copy and correct copy is being distributed the suggestion is to have this in API explicitly, e.g. add optional bool in_place = false to python, and similarly for C++. You'll still get your performance gains by setting in_place = true, but we can avoid breaking clients. What do you think? |
|
I think that is a sensible compromise. I will make the change. |
|
I have added inplace flag to torch.distributed.all_reduce and torch.distributed.all_gather. Please have a look and let me know if this is not what you had in mind. I am working on a couple of test scripts and will submit them when they are ready. |
Looks good, thank you! |
@agolynski Is this even allowed today from API point of view? As MPI backend simply enqueues the operation to MPI thread and returns. It may produce wrong results if input tensors are modified before |
IMHO, it is better to do in-place check internally as user can easily make a mistake here. And check itself would be very simple and inexpensive in most cases. |
|
Hi @ddkalamk the inplace flag simply signals that NCCL inplace op is allowed. This only affects NCCL backend. As you have pointed out, inplace NCCL op can potentially fail if user modifies input tensors while op is in flight, hence this is an opt-in feature, i.e. the inplace flag defaults to False. The code still checks internally if arguments can facilitate NCCL inplace op and will only schedule an inplace NCCL op if inplace == True and the arguments support it. |
Modifying inputs while op is in flight is always disallowed whether we are doing in-place or out-of-place operation as we don't get to know when op has finished reading the input. Even MPI backend would potentially fail if one modifies input before op is complete. So, not sure how this argument helps. On the other hand, what happens if input and output buffers overlap and user doesn't specify I am not familiar with NCCL but from API description it appears that if input and output pointers are overlapping and correctly aligned, it would implicitly do in-place operation. But least in MPI case, one need to explicitly specify |
Sorry, i think I misunderstood this comment. @agolynski do you mean if tensors are place for in-place operation (i.e. overlapping), we make a copy of input and to make it explicitly out-of-place operation? And to disable this behavior we need extra argument? Is this behavior specific to NCCL or applies to other backends as well? |
11edb59 to
b89fdc2
Compare
…github.com/thorjohnsen/pytorch into avoid_unnecessary_flattening_for_allgather .
|
How do I proceed with this PR? My last 3 commits only changed whitespace or wording inside comments, before that all tests were passing except flake8. Flake8 is now passing, but some other tests are failing for reasons that seem unrelated to the code in this PR. For instance, caffe2_onnx_main_py3_6_clang7_ubuntu16_04_build fails with the following error message: Apr 27 16:58:18 Unable to checkout '815e209e4f30910dfdc47aa2ad041e3a46d61b44' in submodule path 'third_party/fbgemm' 815e209e4f30910dfdc47aa2ad041e3a46d61b44 is not one of my commits. Do I simply wait until these issues are resolved? I need to give progress report to management. Grateful for any help! |
|
@thorjohnsen PR looks good to me, could you rebase and resubmit and I'm happy to merge in it. @agolynski Let me know if you have any further comments on this PR. |
| if (!tensor_lists[i][j].storage().is_alias_of(tensor_lists[i][0].storage()) || | ||
| tensor_lists[i][j].storage_offset() != (tensor_lists[i][0].storage_offset() + | ||
| j * tensor_lists[i][0].numel())) { | ||
| no_copy = false; |
There was a problem hiding this comment.
I wonder if it would be better to be explicit here and throw or at least warn if user specified no_copy but we can't actually do it?
|
Hi @thorjohnsen! Are you planning on continue work on this PR? I think we should be good to go after a rebase, and there are some important perf issues this can solve. Otherwise we're happy to take on this work from here! |
|
Hi @osalpekar I'll try to get this rebased right now. |
|
@thorjohnsen Thanks! Please let us know if you need any help. |
|
@thorjohnsen Let us know if you can rebase by this Friday. Since merging this is a bit urgent, I'll have to take over the PR on Friday |
|
@osalpekar If you have the time, I would be grateful if you can take over and bring this in. I am under a tight deadline and am unable to look at this PR until next week. |
|
@osalpekar I've taken a stab at rebasing here: https://github.com/slayton58/pytorch/tree/avoid_unnecessary_flattening but I've been unable to get tests to even run (they either abort or hang on my system, no output) -- it might help you some. |
|
Taking it over |
|
@thorjohnsen I can not import your PR, would you please help signing the CLA? or do you have time to rebase this PR? thanks. |
|
someone else is working on implementing all-gather-base that will just return a single flatten tensor, I think that implementation is more cleaner for use cases mentioned here, and can replace improvement in this PR. thoughts? @agolynski |
|
Hi @thorjohnsen! Thank you for your pull request and welcome to our community. Action RequiredIn order to merge any pull request (code, docs, etc.), we require contributors to sign our Contributor License Agreement, and we don't seem to have one on file for you. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at [email protected]. Thanks! |
|
Looks like this PR hasn't been updated in a while so we're going to go ahead and mark this as |
### Description - This PR renames `_all_gather_base` to `all_gather_into_tensor` so that it is clearer in meaning. - The `all_gather_into_tensor` API differs from the `all_gather` API in the output it accepts -- a single, large tensor instead of a list of tensors. - This PR also adds deprecation warning to `_all_gather_base`. ### Issue `_all_gather_base` was implemented in #33924 to avoid unnecessary flattening. There was previous effort (#82639) to merge `_all_gather_base` with the existing `all_gather` API by detecting the parameter type passed in for the output. There are, however, two "blockers" that make the merge difficult: (i) The merge leads to backward compatibility break. We would need to change the parameter name `tensor_list` in `all_gather` to a general name `output` that can cover both tensor and tensor list. (ii) Recently, the `all_gather` API has added uneven tensor support, utilizing the tensor boundaries implied by the list. We are, however, not sure to add such support to the `_all_gather_base` function, because that would require users to pass in additional tensor boundary information. In view of the above, we decided to productize `_all_gather_base` as a separate function, but with a clearer name. ### Testing Added tests: - `test_all_gather_into_cat_tensor_cuda` -- output form as with `torch.cat`. For example: ``` >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 ``` - `test_all_gather_into_stack_tensor_cuda` -- output form as with `torch.stack`. For example: ``` >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1 ``` The output form is determined by the shape of the output tensor passed by the user, no flag used. Cc @rohan-varma @mrshenli @crcrpar @ptrblck @H-Huang Pull Request resolved: #85686 Approved by: https://github.com/rohan-varma, https://github.com/crcrpar
### Description - This PR renames `_all_gather_base` to `all_gather_into_tensor` so that it is clearer in meaning. - The `all_gather_into_tensor` API differs from the `all_gather` API in the output it accepts -- a single, large tensor instead of a list of tensors. - This PR also adds deprecation warning to `_all_gather_base`. ### Issue `_all_gather_base` was implemented in pytorch#33924 to avoid unnecessary flattening. There was previous effort (pytorch#82639) to merge `_all_gather_base` with the existing `all_gather` API by detecting the parameter type passed in for the output. There are, however, two "blockers" that make the merge difficult: (i) The merge leads to backward compatibility break. We would need to change the parameter name `tensor_list` in `all_gather` to a general name `output` that can cover both tensor and tensor list. (ii) Recently, the `all_gather` API has added uneven tensor support, utilizing the tensor boundaries implied by the list. We are, however, not sure to add such support to the `_all_gather_base` function, because that would require users to pass in additional tensor boundary information. In view of the above, we decided to productize `_all_gather_base` as a separate function, but with a clearer name. ### Testing Added tests: - `test_all_gather_into_cat_tensor_cuda` -- output form as with `torch.cat`. For example: ``` >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 ``` - `test_all_gather_into_stack_tensor_cuda` -- output form as with `torch.stack`. For example: ``` >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1 ``` The output form is determined by the shape of the output tensor passed by the user, no flag used. Cc @rohan-varma @mrshenli @crcrpar @ptrblck @H-Huang Pull Request resolved: pytorch#85686 Approved by: https://github.com/rohan-varma, https://github.com/crcrpar
### Description - This PR renames `_all_gather_base` to `all_gather_into_tensor` so that it is clearer in meaning. - The `all_gather_into_tensor` API differs from the `all_gather` API in the output it accepts -- a single, large tensor instead of a list of tensors. - This PR also adds deprecation warning to `_all_gather_base`. ### Issue `_all_gather_base` was implemented in #33924 to avoid unnecessary flattening. There was previous effort (#82639) to merge `_all_gather_base` with the existing `all_gather` API by detecting the parameter type passed in for the output. There are, however, two "blockers" that make the merge difficult: (i) The merge leads to backward compatibility break. We would need to change the parameter name `tensor_list` in `all_gather` to a general name `output` that can cover both tensor and tensor list. (ii) Recently, the `all_gather` API has added uneven tensor support, utilizing the tensor boundaries implied by the list. We are, however, not sure to add such support to the `_all_gather_base` function, because that would require users to pass in additional tensor boundary information. In view of the above, we decided to productize `_all_gather_base` as a separate function, but with a clearer name. ### Testing Added tests: - `test_all_gather_into_cat_tensor_cuda` -- output form as with `torch.cat`. For example: ``` >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 ``` - `test_all_gather_into_stack_tensor_cuda` -- output form as with `torch.stack`. For example: ``` >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1 ``` The output form is determined by the shape of the output tensor passed by the user, no flag used. Cc @rohan-varma @mrshenli @crcrpar @ptrblck @H-Huang Pull Request resolved: #85686 Approved by: https://github.com/rohan-varma, https://github.com/crcrpar

This PR addresses an important special case, which comes up frequently as we are working towards model parallel training and a few other things. torch.distributed.all_gather takes two arguments, a list of output tensors and a single input tensor. The all_gather op collects inputs from all the ranks into the list of output tensors. Internally, the rank inputs are collected into a flattened tensor and then copied from there to the output tensors. Often, the list of output tensors are simply views into an already flattened tensor, in which case the un-flattening is unnecessary and we can do an in-place all-gather op instead. This saves memory and improves performance. This is particularly important when the all-gather is done within a single node, because all the reductions are done over high-speed nvlinks and the extra D2D copies really hurt performance. To demonstrate the difference this PR makes, I ran the test script shown below on a DGX1 with 8 x 32GB V100 cards, using NVIDIA's 19.11 devel image. Without this PR, it took 13.96 seconds (average of 5 runs) to complete 100 all-gathers of a flattened tensor with 1.07 billion floats (1024^3). With this PR, the same 100 all-gathers took 3.01 seconds, a 4.64x improvement.
The situation for reduce-scatter is fundamentally the same, but reversed. Instead of doing D2D copies after the op to un-flatten the output tensors, reduce-scatter has to do D2D copies before the op to flatten the input tensors. The performance overhead is the same as for all-gather. This PR fixes both all-gather and reduce-scatter.
This PR should have no side-effects. Everything should work exactly like before, but when the special case is detected, you will see a large bump in throughput.
Thanks to @alpha0422 for contributing the original code.
Command line:
python -m torch.distributed.launch --nnodes=1 --nproc_per_node=8 test_all_gather.pytest_all_gather.py