-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Add All-to-all comms support to distributed module and MPI backend #32361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @agolynski |
input, | ||
group=group.WORLD, | ||
async_op=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: fix indent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
Could you add tests for this? |
249c4ba
to
bec9c50
Compare
Could you please point me to similar test for other primitives? Sorry, I have least experience writing formal tests :( |
@ddkalamk similar tests live in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for contributing! I added some minor comments inline.
group=group.WORLD, | ||
async_op=False): | ||
""" | ||
Each process splits input tensor equally and then scatters the split list to all processes in a group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the tensor cannot be equally split across world_size? e.g., input is [0, 1, 2, 3], and world_size = 3? Will it error out before running the allToAll op, or does different ranks need to provide different size output tensors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At present, it expects to split both input and output exactly but code doesn't check for it. However, ideally we want to support the case where input and/or output does not split equally.
I propose adding input_split_sizes
and output_split_sizes
optional arguments to provide list of per rank integer split sizes when they are not equally split (something similar to what torch.split()
uses). Another question is, if input/output tensors are multi-dimensional, do we split whole tensor or just dim 0
and what the split sizes would map to? Again, my recommendation is to assume dim 0
implicitly as other dims would cause non-contiguous splits and require copies anyway. So, better we use explicit list of tensors when split is not along dim 0
. In either case, sum of split sizes should be less than or equal to total tensor size or dim 0
size depending one which behavior we pick.
Finally, I don't know if there is use case for it or not, but if we want to match MPI_Alltoallv semantic, we may also add optional list of offsets for input and output tensors from where each split starts.
Let me know your thoughts and I will update PR accordingly.
PS: Similar discussion applies to all_gather_base, scatter_base and gather_base.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one more question regarding APIs:
virtual std::shared_ptr<ProcessGroup::Work> allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts = AllgatherOptions()) = 0;
virtual std::shared_ptr<ProcessGroup::Work> allgather_base(
at::Tensor& outputBuffer,
at::Tensor& inputBuffer,
const AllgatherOptions& opts = AllgatherOptions()) = 0;
These are the abstract definitions of ProcessGroup::allgather
and ProcessGroup::allgather_base
. I see before adding allgather_base
, each ProcessGroup
method was taking either vector<Tensor>
or vector<vector<Tensor>>
as arguments. But all gather simply take single tensor as argument. Is this part of planned API clean up or allgather_base
is expected to take a vector<Tensor>
as argument?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @agolynski, could you please comment on the plans for allgather
and allgather_base
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mrshenli @agolynski,
I have updated PR with my proposal to support uneven split. Please take a look. I have also added more examples to documentation of API.
cc @dmudiger @srinivas212
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
@@ -14,6 +14,7 @@ | |||
GatherOptions, | |||
ReduceOptions, | |||
ReduceScatterOptions, | |||
AllToAllOptions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import list was following alphabet order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix this in next update.
[ 3 7 11 15] # Rank 3 | ||
|
||
Essentially, it is similar to following operation: | ||
scatter_list = list(input.chunk(world_size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this show as a code block in the doc. We might need to add >>>
prefix here, e.g.:
pytorch/torch/distributed/rpc/api.py
Lines 181 to 204 in 02f09a1
Example:: | |
Make sure that ``MASTER_ADDRESS`` and ``MASTER_PORT`` are set properly | |
on both workers. Refer to :meth:`~torch.distributed.init_process_group` | |
API for more details. For example, | |
>>> export MASTER_ADDRESS=localhost | |
>>> export MASTER_port=5678 | |
Then run the following code in two different processes: | |
>>> # On worker 0: | |
>>> import torch | |
>>> import torch.distributed.rpc as rpc | |
>>> rpc.init_rpc("worker0", rank=0, world_size=2) | |
>>> # do some work | |
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) | |
>>> # ready to shutdown | |
>>> rpc.shutdown() | |
>>> # On worker 1: | |
>>> import torch.distributed.rpc as rpc | |
>>> rpc.init_rpc("worker1", rank=1, world_size=2) | |
>>> # wait for worker 0 to finish work, and then shutdown. | |
>>> rpc.shutdown() |
Could you please build the doc locally and post a screenshot of the API doc?
You can server the page using this script locally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will update the example and doc as we finalize on API semantics and arguments. Will add >>>
prefix at that time.
bec9c50
to
3ba07ec
Compare
💊 CircleCI build failures summary and remediationsAs of commit 80cfb5c (more details on the Dr. CI page): 💚 💚 Looks good so far! There are no CircleCI failures yet. 💚 💚 This comment was automatically generated by Dr. CI (expand for details).Follow this link to opt-out of these comments for your Pull Requests.Please report bugs/suggestions on the GitHub issue tracker. This comment has been revised 27 times. |
@mrshenli @agolynski |
output_split_sizes=[], | ||
input_split_sizes=[], | ||
group=group.WORLD, | ||
async_op=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @ddkalamk, really nice work! Couple of suggestions on the API and implementations.
- Looks like there are two "flavors" of AlltoAll in this PR: a version that logically splits a single tensor into 'N' partitions on each rank and one that already takes a pre-split list of tensors. I wonder if it would make sense to also have two APIs or only support the "list-of-tensors" form. That way, it's clear when the splits should be specified. One thought I had for signatures was:
def alltoall_list(inputs: List[Tensor]) -> List[Tensor]
anddef alltoall_single(input: Tensor, dimension: int, splits:List[int]) -> Tensor
. This will logically partitioninput
intonproc
partitions alongdimension
usingsplits
. Optionally ifsplits
is None or [], you can follow a partitioning strategy similar to ScaLAPACK's one-dimensional block distribution (https://www.netlib.org/scalapack/slug/node75.html#figbcol)
- Ideally, all output metadata should be computable just form inputs. The downside is that computing output displacements from input displacements requires an additional alltoall or allgather but typically with alltoall (at least in my experience) you do need to do this step anyway and most of the time goes in communicating the actual arrays and not the initial displacement data. Additionally, this makes using the API much less error-prone.
- I saw that MPI now supports non-blocking versions of alltoall/alltoallv. Have you considered supporting these as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Sudarshan!
2.> I think that's a convenient option. My guess if we orchestrate learning from a single source then we can avoid this round of communication for uneven splits as well. I presume for even splits it would be similar to other collective communication where we don't require extra round of communication to ensure sanity of the inputs.
Let's maybe offer this as an option as a followup PR if necessary? (this PR might get too complicated)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Sudarshan and Alexander for your feedback. Here are my thoughts.
1.> The current distributed API only support XX_list()
versions, e.g. scatter
, gather
or allgather
. All of these take list of tensors and none has option to take single tensor and split internally. However, disadvantage of this is we add overhead of copying them to contiguous buffer inside MPI backend. To avoid that I have added single tensor version. I believe the newly added ProcessGroup::allgather_base() has similar motivation. However, there is no corresponding change added to python API. Therefore, I just overloaded current API for both the semantics. I think it is more of a API naming decision. If you prefer to name them as alltoall_list
and alltoall_single
, i will make that change.
2.> Computing output splits is one time job and typically done in the init phase. Doing it inside API would add significant performance overhead. I guess that's reason MPI_Alltoall allows it to specify explicitly.
3.> current MPI backend uses dedicated thread to drive communication. Therefore, from pytorch application point of view, communication is still asynchronous. Using async MPI collective calls would require changes to core MPI backend implementation and I think out of scope for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, here's my two cents but I'll defer to the experts like @agolynski and @srinivas212
- On the API, having one flavor of alltoall that follows the convention of existing collectives and works with a list of tensors might be sufficient for the first version? Slicing tensors can be cheap if they're all just views of the same larger tensor.
- On computing recv offsets inside the API: This adds an overhead of a single alltoall with scalars which is usually tiny compared to the subsequent alltoall or alltoallv to exchange the actual data. On the positive side, this less error-prone for the user. My two cents would be towards a simpler API for now and add additional knobs later if needed for performance. But I'll defer to the experts on this.
- Sounds good, thanks for explaining.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Agree with you that with current convention of existing collectives, only version with list of tensors fits well. However, I added single tensor version looking at newly added
allgather_base
API. The discussion about using single tensor instead of list of tensor applies to almost all collectives and they all need common mechanism to support uneven splits. - About computing receive offsets, if we focus on list of tensor version of API, we need this information before making
alltoall
call to create list of correct sized output tensors. This is very similar to existinggather
orallgather
APIs which take list of output tensors. The single tensor version is just extension of this conventional list based version where we are passing size information as part of list ofsplit_sizes
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@drdarshan re. 2> I think there's another issue as well. It might be a bit tricky to implement the case you refer to because the background thread is simply executing one collective call at a time. We will need queue up two collective calls and have the background thread have the output of first one feed into the second one. On the flip side, it also does not make sense to do this from the main thread because it would block the main thread exposing the communication overheads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @ddkalamk, thanks and you're right. Let's keep the API consistent with the one for distributed.all_gather
. Maybe down the line, we can revise the collectives together (or provide utility functions) to compute output shapes automatically.
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
std::vector<at::Tensor>& outputTensors, | ||
std::vector<at::Tensor>& inputTensors, | ||
std::vector<int>& outputSplitSizes, | ||
std::vector<int>& inputSplitSizes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why these are not const std::vector<T>&
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just followed how ProcessGroup::allgather
is written, no other APIs seem to have const
before std::vector<T>&
virtual std::shared_ptr<ProcessGroup::Work> allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts = AllgatherOptions()) = 0;
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
|
||
if(outputSplitSizes.size() == 0 && inputSplitSizes.size() == 0) { | ||
// We can use alltoall | ||
if ((outputTensors[0].numel() != inputTensors[0].numel()) || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it OK to assume you will not have empty lists passed down to this layer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here list will always have exactly one tensor. If there is nothing to send (or receive), we can pass an empty tensor with zero size.
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
checkSingleTensorHelper(inputTensors[0]); | ||
checkSingleTensorHelper(outputTensors[0]); | ||
if(outputTensors[0].size(0) % size_ != 0) { | ||
throw std::runtime_error("Tensor's dim 0 does not divide equally across group size"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::runtime_error
is used for a lot of error conditions - is there a more specific one for non-conforming arguments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't find anything better :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@agolynski - do you have any thoughts here? If it is conventional to throw std::runtime_error
and they surface idiomatically on the Python side, we can leave it as-is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's maybe change such errors to std::invalid_argument?
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
auto dstdata = (entry->dst)[0]; | ||
c10::DeviceGuard guard(srcdata.device()); | ||
std::unique_lock<std::mutex> globalLock(pgGlobalMutex_); | ||
MPI_CHECK(MPI_Alltoall( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing that you're assuming that both input and output tensors are non-strided? I.e., they are stored contiguously in row-major order on both sending and receiving ends?
Typically, you don't have to make this assumption. You can create a single MPI_Type_vector
per rank than encodes stride information and let MPI handle "flattening" of data internally. For now, just a check to make sure the data is indeed in contiguous row-major order is sufficient just so you don't get garbled outputs silently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@drdarshan the issue with letting MPI handle flattening of data internally is performance. We have a single MPI background thread and most MPI implementations don't optimize MPI_Type_vector and other derived data types. But like you already mentioned, the extra check might be helpful to avoid silent errors and will require checking the entire input and output tensor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @srinivas212 on performance aspect. The necessary check is already there in checkSingleTensor
call. I am following the same strategy as used in other API implementation code.
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
std::vector<int> recv_lengths(size_); | ||
std::vector<int> send_offsets(size_); | ||
std::vector<int> recv_offsets(size_); | ||
auto computeLengthsAndOffsets = [this](std::vector<int> split_sizes, at::Tensor &tensor, std::vector<int> &lengths, std::vector<int> &offsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it idiomatic to use lambda functions this way? You can just hoist this as a utility method in this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally it should be an utility method as functionality is useful for other APIs as well if we decide to extend them with XX_base
versions and add support for uneven split. But for now it is required only in alltoall implementation, so I localized it as lambda function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that it pollutes the code somewhat arguably making it less readable, how about a static helper function just outside ProcessGroupMPI::alltoall_base so we don't need to do that move later?
you don't need to capture [this] for this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaner is to make these utility methods. Will move them near checkSingleTensor
.
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
std::vector<int> recv_lengths(size_); | ||
std::vector<int> send_offsets(size_); | ||
std::vector<int> recv_offsets(size_); | ||
auto computeLengthsAndOffsets = [this](std::vector<int> split_sizes, at::Tensor &tensor, std::vector<int> &lengths, std::vector<int> &offsets) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
split_sizes
will pass by value. Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, will change it to pass by reference.
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
std::vector<int> recv_offsets(size_); | ||
auto computeLengthsAndOffsets = [this](std::vector<int> split_sizes, at::Tensor &tensor, std::vector<int> &lengths, std::vector<int> &offsets) { | ||
if(split_sizes.size() == 0) { | ||
int split_size = tensor.numel() / size_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can overflow for large tensors and MPI will complain about negative splits and crash the program. [unfortunately looks like the standard still specifies displacements to be 32-bit ints for backwards compatibility]. To be 100% safe, I suggest running a sanity check collective right before calling MPI to make sure all arguments are within range. You can also use something like the SafeInt
library to avoid writing a lot of these checks explicitly (https://github.com/dcleblanc/SafeInt/blob/master/SafeInt.hpp)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, agree. Will change std::vector<int>&
to std::vector<int64_t>&
in the arguments and add check for overflow.
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
auto computeLengthsAndOffsets = [this](std::vector<at::Tensor> &tensors, std::vector<int> &lengths, std::vector<int> &offsets) { | ||
int offset = 0; | ||
for(int i = 0; i < size_; i++) { | ||
lengths[i] = tensors[i].numel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the comment above, this is doing an unchecked cast from int64_t
to int
which can overflow for tensors > 2GB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, will fix it
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
if (split_sizes.size() == 0) { | ||
if (tensor.size(0) % group_size != 0) { | ||
throw std::runtime_error( | ||
"Tensor's dim 0 does not divide equally across group size"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about invalid argument error here (and similar cases below)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should I just change it in alltoall
implementation? All other places just use std::runtime_error
e.g. checkSingleTensorHelper
. I thought it will be inconsistent. Let me know if I should change.
// Checking the input tensor's validity
void checkSingleTensorHelper(const at::Tensor& tensor) {
if (!tensor.is_contiguous()) {
throw std::runtime_error("input tensor has to be contiguous");
}
if (tensor.is_sparse()) {
throw std::runtime_error("input tensor has to be dense");
}
if (tensor.is_cuda() && !cudaAwareMpiCheck()) {
throw std::runtime_error(
"CUDA tensor detected and the MPI used doesn't "
"have CUDA-aware MPI support");
}
}
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
std::vector<int>& lengths, | ||
std::vector<int>& offsets, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since these are output params, do you mind making them last and having
std::vector* lengths
e.g.
https://google.github.io/styleguide/cppguide.html#Output_Parameters
again: it's not a hard rule, but I think it improved readability for generations to come :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
std::vector<int64_t> send_lengthsL(send_lengths.begin(), send_lengths.end()); | ||
std::vector<int64_t> recv_lengthsL(recv_lengths.begin(), recv_lengths.end()); | ||
at::Tensor srcFlatData = at::empty({src_len}, srcdata[0].options()); | ||
at::Tensor dstFlatData = at::empty({dst_len}, dstdata[0].options()); | ||
auto srcFlatDataSplits = srcFlatData.split_with_sizes(c10::IntArrayRef(send_lengthsL), 0); | ||
for (int i = 0; i < size_; i++) { | ||
srcFlatDataSplits[i].copy_(srcdata[i].view({-1})); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use primitives that other process groups are already using, e.g. flattenDenseTensors and newLikeFlat?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, but those primitives won't work here as they need equal sized tensors and we may have unequal sized tensors in the list here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, apologies! I think flattenDenseTensors should still work for your usecase (there is no limitation on sizes there)? newLikeFlat won't work unfortunately...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now I am keeping it as is, but plan to optimize it later to check if list of tensors is derived from flat tensor. Will change this at that time.
@@ -1465,6 +1466,128 @@ def reduce_scatter(output, | |||
work.wait() | |||
|
|||
|
|||
def all_to_all(output, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mind splitting this API into two: one for single tensor with output_split_sizes and one for list of tensors?
This would be good if we want to do type safety checking in PY, e.g. mypy and other API in this file more or less follow this convention (inputs/outputs are either single tensors, list of tensors or list of lists of tensors)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent with other APIs, I would prefer to keep all_to_all
name for list variant. What name would you suggest for single tensor variant? all_to_all_base
or all_to_all_single
or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe all_to_all and all_to_all_single?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mrshenli and @agolynski
Is there an easy way to detect if a list of tensors is created from one big tensor using split
or chunk
operation? If yes, we can potentially eliminate need for all_to_all_single
. I see that they share the same underlying storage but don't know if we can use any specific tensor property to infer that. The need for all_to_all_single
comes from extra copies involved to create contiguous tensor inside MPI backend. If we can somehow avoid that, we can eliminate need for single tensor based version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do it, but I think it's good for consistency to have single tensor all-to-all API.
if you are interested in detecting such tensors you can take a look at #33924
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @agolynski . I had almost reinvented the same thing though I was trying to more generalize it to support uneven sizes and also different storage base, if possible, using offsets parameter available in most of vector versions of MPI APIs. (e.g. MPI_Allgatherv
, MPI_Alltoallv
). But I wasn't sure about right way to check. This PR confirms that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe all_to_all and all_to_all_single?
Done!
Hello @ddkalamk - no other major concerns from me. Thank you! |
output_split_sizes=[], | ||
input_split_sizes=[], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lint failures on these two lines are real:
{
path: 'torch/distributed/distributed_c10d.py',
start_line: 1471,
end_line: 1471,
start_column: 35,
end_column: 35,
annotation_level: 'failure',
message: '[B006] Do not use mutable data structures for argument defaults. They are created during function definition time. All calls to the function reuse this one instance of that data structure, persisting changes between them.'
},
{
path: 'torch/distributed/distributed_c10d.py',
start_line: 1472,
end_line: 1472,
start_column: 34,
end_column: 34,
annotation_level: 'failure',
message: '[B006] Do not use mutable data structures for argument defaults. They are created during function definition time. All calls to the function reuse this one instance of that data structure, persisting changes between them.'
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix it to use None
instead of []
as default value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed!
@mrsalehi @agolynski @drdarshan |
Hi @mrsalehi @agolynski, Could you please take a look to make progress on this PR? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srinivas212 has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @ddkalamk
Sorry about the delay, and thanks a lot for the big effort. I left some comments inline, and most of them are nits.
Requesting changes mostly for two reasons:
- We will need test coverage before we can land this.
- Let's first mark this feature as experimental, so that we still have the chance to make API changes later.
Thank you!!
""" | ||
Each process splits input tensor and then scatters the split list | ||
to all processes in a group. Then concatenate the received tensors from all | ||
the processes in the group and return single output tensor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a warning here to leave us wiggle room for future API changes?
.. warning::
`all_to_all_single` is experimental and subject to change.
""" | ||
Each process scatters list of input tensors to all processes in a group and | ||
return gathered list of tensors in output list. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. Shall we add a warning here to leave us wiggle room for future API changes?
.. warning::
`all_to_all_single` is experimental and subject to change.
int64_t computeLengthsAndOffsets( | ||
const std::vector<int64_t>& split_sizes, | ||
const at::Tensor& tensor, | ||
std::vector<int>* lengths, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason for passing in pointers instead of references?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed as per comment from @agolynski #32361 (comment)
nit: since these are output params, do you mind making them last and having
std::vector* lengths
e.g.
https://google.github.io/styleguide/cppguide.html#Output_Parametersagain: it's not a hard rule, but I think it improved readability for generations to come :)
Hi @mrshenli, Thanks for the review. |
Hi @mrshenli Looking at Scatter/Gather tests, I have pushed few tests for all_to_all, please take a look and let me know if those looks good. Hoping to see this merge to master soon. Thanks, |
eebbd8b
to
935179b
Compare
@mrshenli Rebased to latest master with squash to single commit. Let me know if anything is still blocking. -Dhiraj |
bd2cb08
to
42211ba
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Will stamp when all tests pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srinivas212 has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
c10::DeviceGuard guard(srcdata.device()); | ||
std::unique_lock<std::mutex> globalLock(pgGlobalMutex_); | ||
MPI_CHECK(MPI_Alltoall( | ||
srcdata.data_ptr(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, does clang-format say we should do 6-space indent here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-format says this should be 4 spaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extremely sorry @mrshenli. I am so dumb that I don't know how to use clang-format. Hopefully, I have done it write.
Please let me know if these are the correct steps to apply clang-format.
- Stage the changes using
git add
- Run
git-clang-format
- Stage the new changes again
Last time I searched on web and added few command to my vimrc file, but looks like I didn't do it right. Sorry. Is there any similar way to format python code as well that i should use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing some internal lint warnings on this diff. Could you please apply clang-format to the changes?
torch/csrc/distributed/c10d/init.cpp
Outdated
std::vector<int64_t> outputSplitSizes, | ||
std::vector<int64_t> inputSplitSizes) { | ||
return pg.alltoall_base( | ||
output, input, outputSplitSizes, inputSplitSizes, ::c10d::AllToAllOptions()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- output, input, outputSplitSizes, inputSplitSizes, ::c10d::AllToAllOptions());
+ output,
+ input,
+ outputSplitSizes,
+ inputSplitSizes,
+ ::c10d::AllToAllOptions());
torch/csrc/distributed/c10d/init.cpp
Outdated
std::vector<at::Tensor>& output, | ||
std::vector<at::Tensor>& input) { | ||
return pg.alltoall( | ||
output, input, ::c10d::AllToAllOptions()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- return pg.alltoall(
- output, input, ::c10d::AllToAllOptions());
+ return pg.alltoall(output, input, ::c10d::AllToAllOptions());
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
c10::DeviceGuard guard(srcdata.device()); | ||
std::unique_lock<std::mutex> globalLock(pgGlobalMutex_); | ||
MPI_CHECK(MPI_Alltoall( | ||
srcdata.data_ptr(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-format says this should be 4 spaces.
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
c10::DeviceGuard guard(srcdata.device()); | ||
std::unique_lock<std::mutex> globalLock(pgGlobalMutex_); | ||
MPI_CHECK(MPI_Alltoallv( | ||
srcdata.data_ptr(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
torch/lib/c10d/ProcessGroupMPI.cpp
Outdated
mpiDatatype.at(dstdata[0].scalar_type()), | ||
pgComm_)); | ||
|
||
auto dstFlatDataSplits = dstFlatData.split_with_sizes(c10::IntArrayRef(recv_lengthsL), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- auto dstFlatDataSplits = dstFlatData.split_with_sizes(c10::IntArrayRef(recv_lengthsL), 0);
+ auto dstFlatDataSplits =
+ dstFlatData.split_with_sizes(c10::IntArrayRef(recv_lengthsL), 0);
42211ba
to
80cfb5c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! All tests pass, let's land!
@srinivas212 we need to import the latest version to trigger internal tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srinivas212 has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.
Thank you everybody for your review comments and feedback for adding this feature! |
As described in #32345, a prototype implementation to add an alltoall communication primitive to torch.distributed module and ProcessGroup abstract interface. Also, implements alltoall in ProcessGroupMPI backend.
@mnaumovfb @JianpingChen066 @dmudiger @srinivas212 @Jianhui-Li @mshiryaev @ftian1
cc @pietern @mrshenli @pritamdamania87 @zhaojuanmao @satgera @rohan-varma @gqchen @aazzolini @xush6528 @osalpekar