Skip to content

Conversation

ddkalamk
Copy link
Contributor

@ddkalamk ddkalamk commented Jan 17, 2020

As described in #32345, a prototype implementation to add an alltoall communication primitive to torch.distributed module and ProcessGroup abstract interface. Also, implements alltoall in ProcessGroupMPI backend.

@mnaumovfb @JianpingChen066 @dmudiger @srinivas212 @Jianhui-Li @mshiryaev @ftian1

cc @pietern @mrshenli @pritamdamania87 @zhaojuanmao @satgera @rohan-varma @gqchen @aazzolini @xush6528 @osalpekar

@mrshenli
Copy link
Contributor

cc @agolynski

@mrshenli mrshenli added the oncall: distributed Add this issue/PR to distributed oncall triage queue label Jan 17, 2020
Comment on lines 1470 to 1472
input,
group=group.WORLD,
async_op=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix indent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@agolynski
Copy link
Contributor

Could you add tests for this?

@ddkalamk
Copy link
Contributor Author

Could you add tests for this?

Could you please point me to similar test for other primitives? Sorry, I have least experience writing formal tests :(

@mrshenli
Copy link
Contributor

@ddkalamk similar tests live in test_c10d.py and test_distributed.py. Under the current setup, the ProcessGroup.* are tested in test_c10d.py and torch.distributed.* are tested in test_distributed.py.

Copy link
Contributor

@mrshenli mrshenli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for contributing! I added some minor comments inline.

group=group.WORLD,
async_op=False):
"""
Each process splits input tensor equally and then scatters the split list to all processes in a group.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the tensor cannot be equally split across world_size? e.g., input is [0, 1, 2, 3], and world_size = 3? Will it error out before running the allToAll op, or does different ranks need to provide different size output tensors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At present, it expects to split both input and output exactly but code doesn't check for it. However, ideally we want to support the case where input and/or output does not split equally.
I propose adding input_split_sizes and output_split_sizes optional arguments to provide list of per rank integer split sizes when they are not equally split (something similar to what torch.split() uses). Another question is, if input/output tensors are multi-dimensional, do we split whole tensor or just dim 0 and what the split sizes would map to? Again, my recommendation is to assume dim 0 implicitly as other dims would cause non-contiguous splits and require copies anyway. So, better we use explicit list of tensors when split is not along dim 0. In either case, sum of split sizes should be less than or equal to total tensor size or dim 0 size depending one which behavior we pick.
Finally, I don't know if there is use case for it or not, but if we want to match MPI_Alltoallv semantic, we may also add optional list of offsets for input and output tensors from where each split starts.

Let me know your thoughts and I will update PR accordingly.

PS: Similar discussion applies to all_gather_base, scatter_base and gather_base.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one more question regarding APIs:

  virtual std::shared_ptr<ProcessGroup::Work> allgather(
      std::vector<std::vector<at::Tensor>>& outputTensors,
      std::vector<at::Tensor>& inputTensors,
      const AllgatherOptions& opts = AllgatherOptions()) = 0;

  virtual std::shared_ptr<ProcessGroup::Work> allgather_base(
      at::Tensor& outputBuffer,
      at::Tensor& inputBuffer,
      const AllgatherOptions& opts = AllgatherOptions()) = 0;

These are the abstract definitions of ProcessGroup::allgather and ProcessGroup::allgather_base. I see before adding allgather_base, each ProcessGroup method was taking either vector<Tensor> or vector<vector<Tensor>> as arguments. But all gather simply take single tensor as argument. Is this part of planned API clean up or allgather_base is expected to take a vector<Tensor> as argument?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @agolynski, could you please comment on the plans for allgather and allgather_base?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mrshenli @agolynski,
I have updated PR with my proposal to support uneven split. Please take a look. I have also added more examples to documentation of API.
cc @dmudiger @srinivas212

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@@ -14,6 +14,7 @@
GatherOptions,
ReduceOptions,
ReduceScatterOptions,
AllToAllOptions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import list was following alphabet order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix this in next update.

[ 3 7 11 15] # Rank 3

Essentially, it is similar to following operation:
scatter_list = list(input.chunk(world_size))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this show as a code block in the doc. We might need to add >>> prefix here, e.g.:

Example::
Make sure that ``MASTER_ADDRESS`` and ``MASTER_PORT`` are set properly
on both workers. Refer to :meth:`~torch.distributed.init_process_group`
API for more details. For example,
>>> export MASTER_ADDRESS=localhost
>>> export MASTER_port=5678
Then run the following code in two different processes:
>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> # do some work
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
>>> # ready to shutdown
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> # wait for worker 0 to finish work, and then shutdown.
>>> rpc.shutdown()

Could you please build the doc locally and post a screenshot of the API doc?

You can server the page using this script locally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update the example and doc as we finalize on API semantics and arguments. Will add >>> prefix at that time.

@yf225 yf225 added the triaged This issue has been looked at a team member, and triaged and prioritized into an appropriate module label Jan 29, 2020
@dr-ci
Copy link

dr-ci bot commented Feb 12, 2020

💊 CircleCI build failures summary and remediations

As of commit 80cfb5c (more details on the Dr. CI page):


💚 💚 Looks good so far! There are no CircleCI failures yet. 💚 💚


This comment was automatically generated by Dr. CI (expand for details).Follow this link to opt-out of these comments for your Pull Requests.

Please report bugs/suggestions on the GitHub issue tracker.

See how this bot performed.

This comment has been revised 27 times.

@ddkalamk
Copy link
Contributor Author

@mrshenli @agolynski
Can you please review and provide your comments?

output_split_sizes=[],
input_split_sizes=[],
group=group.WORLD,
async_op=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @ddkalamk, really nice work! Couple of suggestions on the API and implementations.

  1. Looks like there are two "flavors" of AlltoAll in this PR: a version that logically splits a single tensor into 'N' partitions on each rank and one that already takes a pre-split list of tensors. I wonder if it would make sense to also have two APIs or only support the "list-of-tensors" form. That way, it's clear when the splits should be specified. One thought I had for signatures was:
    • def alltoall_list(inputs: List[Tensor]) -> List[Tensor] and
    • def alltoall_single(input: Tensor, dimension: int, splits:List[int]) -> Tensor. This will logically partition input into nproc partitions along dimension using splits. Optionally if splits is None or [], you can follow a partitioning strategy similar to ScaLAPACK's one-dimensional block distribution (https://www.netlib.org/scalapack/slug/node75.html#figbcol)
  2. Ideally, all output metadata should be computable just form inputs. The downside is that computing output displacements from input displacements requires an additional alltoall or allgather but typically with alltoall (at least in my experience) you do need to do this step anyway and most of the time goes in communicating the actual arrays and not the initial displacement data. Additionally, this makes using the API much less error-prone.
  3. I saw that MPI now supports non-blocking versions of alltoall/alltoallv. Have you considered supporting these as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Sudarshan!

2.> I think that's a convenient option. My guess if we orchestrate learning from a single source then we can avoid this round of communication for uneven splits as well. I presume for even splits it would be similar to other collective communication where we don't require extra round of communication to ensure sanity of the inputs.

Let's maybe offer this as an option as a followup PR if necessary? (this PR might get too complicated)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Sudarshan and Alexander for your feedback. Here are my thoughts.
1.> The current distributed API only support XX_list() versions, e.g. scatter, gather or allgather. All of these take list of tensors and none has option to take single tensor and split internally. However, disadvantage of this is we add overhead of copying them to contiguous buffer inside MPI backend. To avoid that I have added single tensor version. I believe the newly added ProcessGroup::allgather_base() has similar motivation. However, there is no corresponding change added to python API. Therefore, I just overloaded current API for both the semantics. I think it is more of a API naming decision. If you prefer to name them as alltoall_list and alltoall_single, i will make that change.

2.> Computing output splits is one time job and typically done in the init phase. Doing it inside API would add significant performance overhead. I guess that's reason MPI_Alltoall allows it to specify explicitly.

3.> current MPI backend uses dedicated thread to drive communication. Therefore, from pytorch application point of view, communication is still asynchronous. Using async MPI collective calls would require changes to core MPI backend implementation and I think out of scope for this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, here's my two cents but I'll defer to the experts like @agolynski and @srinivas212

  1. On the API, having one flavor of alltoall that follows the convention of existing collectives and works with a list of tensors might be sufficient for the first version? Slicing tensors can be cheap if they're all just views of the same larger tensor.
  2. On computing recv offsets inside the API: This adds an overhead of a single alltoall with scalars which is usually tiny compared to the subsequent alltoall or alltoallv to exchange the actual data. On the positive side, this less error-prone for the user. My two cents would be towards a simpler API for now and add additional knobs later if needed for performance. But I'll defer to the experts on this.
  3. Sounds good, thanks for explaining.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Agree with you that with current convention of existing collectives, only version with list of tensors fits well. However, I added single tensor version looking at newly added allgather_base API. The discussion about using single tensor instead of list of tensor applies to almost all collectives and they all need common mechanism to support uneven splits.
  2. About computing receive offsets, if we focus on list of tensor version of API, we need this information before making alltoall call to create list of correct sized output tensors. This is very similar to existing gather or allgather APIs which take list of output tensors. The single tensor version is just extension of this conventional list based version where we are passing size information as part of list of split_sizes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drdarshan re. 2> I think there's another issue as well. It might be a bit tricky to implement the case you refer to because the background thread is simply executing one collective call at a time. We will need queue up two collective calls and have the background thread have the output of first one feed into the second one. On the flip side, it also does not make sense to do this from the main thread because it would block the main thread exposing the communication overheads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @ddkalamk, thanks and you're right. Let's keep the API consistent with the one for distributed.all_gather. Maybe down the line, we can revise the collectives together (or provide utility functions) to compute output shapes automatically.

std::vector<at::Tensor>& outputTensors,
std::vector<at::Tensor>& inputTensors,
std::vector<int>& outputSplitSizes,
std::vector<int>& inputSplitSizes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why these are not const std::vector<T>&?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just followed how ProcessGroup::allgather is written, no other APIs seem to have const before std::vector<T>&

  virtual std::shared_ptr<ProcessGroup::Work> allgather(
      std::vector<std::vector<at::Tensor>>& outputTensors,
      std::vector<at::Tensor>& inputTensors,
      const AllgatherOptions& opts = AllgatherOptions()) = 0;


if(outputSplitSizes.size() == 0 && inputSplitSizes.size() == 0) {
// We can use alltoall
if ((outputTensors[0].numel() != inputTensors[0].numel()) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK to assume you will not have empty lists passed down to this layer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here list will always have exactly one tensor. If there is nothing to send (or receive), we can pass an empty tensor with zero size.

checkSingleTensorHelper(inputTensors[0]);
checkSingleTensorHelper(outputTensors[0]);
if(outputTensors[0].size(0) % size_ != 0) {
throw std::runtime_error("Tensor's dim 0 does not divide equally across group size");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::runtime_error is used for a lot of error conditions - is there a more specific one for non-conforming arguments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't find anything better :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agolynski - do you have any thoughts here? If it is conventional to throw std::runtime_error and they surface idiomatically on the Python side, we can leave it as-is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maybe change such errors to std::invalid_argument?

auto dstdata = (entry->dst)[0];
c10::DeviceGuard guard(srcdata.device());
std::unique_lock<std::mutex> globalLock(pgGlobalMutex_);
MPI_CHECK(MPI_Alltoall(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing that you're assuming that both input and output tensors are non-strided? I.e., they are stored contiguously in row-major order on both sending and receiving ends?

Typically, you don't have to make this assumption. You can create a single MPI_Type_vector per rank than encodes stride information and let MPI handle "flattening" of data internally. For now, just a check to make sure the data is indeed in contiguous row-major order is sufficient just so you don't get garbled outputs silently.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drdarshan the issue with letting MPI handle flattening of data internally is performance. We have a single MPI background thread and most MPI implementations don't optimize MPI_Type_vector and other derived data types. But like you already mentioned, the extra check might be helpful to avoid silent errors and will require checking the entire input and output tensor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @srinivas212 on performance aspect. The necessary check is already there in checkSingleTensor call. I am following the same strategy as used in other API implementation code.

std::vector<int> recv_lengths(size_);
std::vector<int> send_offsets(size_);
std::vector<int> recv_offsets(size_);
auto computeLengthsAndOffsets = [this](std::vector<int> split_sizes, at::Tensor &tensor, std::vector<int> &lengths, std::vector<int> &offsets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it idiomatic to use lambda functions this way? You can just hoist this as a utility method in this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally it should be an utility method as functionality is useful for other APIs as well if we decide to extend them with XX_base versions and add support for uneven split. But for now it is required only in alltoall implementation, so I localized it as lambda function.

Copy link
Contributor

@agolynski agolynski Mar 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that it pollutes the code somewhat arguably making it less readable, how about a static helper function just outside ProcessGroupMPI::alltoall_base so we don't need to do that move later?

you don't need to capture [this] for this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaner is to make these utility methods. Will move them near checkSingleTensor.

std::vector<int> recv_lengths(size_);
std::vector<int> send_offsets(size_);
std::vector<int> recv_offsets(size_);
auto computeLengthsAndOffsets = [this](std::vector<int> split_sizes, at::Tensor &tensor, std::vector<int> &lengths, std::vector<int> &offsets) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split_sizes will pass by value. Is this intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, will change it to pass by reference.

std::vector<int> recv_offsets(size_);
auto computeLengthsAndOffsets = [this](std::vector<int> split_sizes, at::Tensor &tensor, std::vector<int> &lengths, std::vector<int> &offsets) {
if(split_sizes.size() == 0) {
int split_size = tensor.numel() / size_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can overflow for large tensors and MPI will complain about negative splits and crash the program. [unfortunately looks like the standard still specifies displacements to be 32-bit ints for backwards compatibility]. To be 100% safe, I suggest running a sanity check collective right before calling MPI to make sure all arguments are within range. You can also use something like the SafeInt library to avoid writing a lot of these checks explicitly (https://github.com/dcleblanc/SafeInt/blob/master/SafeInt.hpp)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, agree. Will change std::vector<int>& to std::vector<int64_t>& in the arguments and add check for overflow.

auto computeLengthsAndOffsets = [this](std::vector<at::Tensor> &tensors, std::vector<int> &lengths, std::vector<int> &offsets) {
int offset = 0;
for(int i = 0; i < size_; i++) {
lengths[i] = tensors[i].numel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the comment above, this is doing an unchecked cast from int64_t to int which can overflow for tensors > 2GB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, will fix it

if (split_sizes.size() == 0) {
if (tensor.size(0) % group_size != 0) {
throw std::runtime_error(
"Tensor's dim 0 does not divide equally across group size");
Copy link
Contributor

@agolynski agolynski Mar 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about invalid argument error here (and similar cases below)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I just change it in alltoall implementation? All other places just use std::runtime_error e.g. checkSingleTensorHelper. I thought it will be inconsistent. Let me know if I should change.

// Checking the input tensor's validity
void checkSingleTensorHelper(const at::Tensor& tensor) {
  if (!tensor.is_contiguous()) {
    throw std::runtime_error("input tensor has to be contiguous");
  }
  if (tensor.is_sparse()) {
    throw std::runtime_error("input tensor has to be dense");
  }
  if (tensor.is_cuda() && !cudaAwareMpiCheck()) {
    throw std::runtime_error(
        "CUDA tensor detected and the MPI used doesn't "
        "have CUDA-aware MPI support");
  }
}

Comment on lines 118 to 119
std::vector<int>& lengths,
std::vector<int>& offsets,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since these are output params, do you mind making them last and having
std::vector* lengths
e.g.
https://google.github.io/styleguide/cppguide.html#Output_Parameters

again: it's not a hard rule, but I think it improved readability for generations to come :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Comment on lines 749 to 756
std::vector<int64_t> send_lengthsL(send_lengths.begin(), send_lengths.end());
std::vector<int64_t> recv_lengthsL(recv_lengths.begin(), recv_lengths.end());
at::Tensor srcFlatData = at::empty({src_len}, srcdata[0].options());
at::Tensor dstFlatData = at::empty({dst_len}, dstdata[0].options());
auto srcFlatDataSplits = srcFlatData.split_with_sizes(c10::IntArrayRef(send_lengthsL), 0);
for (int i = 0; i < size_; i++) {
srcFlatDataSplits[i].copy_(srcdata[i].view({-1}));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use primitives that other process groups are already using, e.g. flattenDenseTensors and newLikeFlat?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but those primitives won't work here as they need equal sized tensors and we may have unequal sized tensors in the list here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, apologies! I think flattenDenseTensors should still work for your usecase (there is no limitation on sizes there)? newLikeFlat won't work unfortunately...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I am keeping it as is, but plan to optimize it later to check if list of tensors is derived from flat tensor. Will change this at that time.

@@ -1465,6 +1466,128 @@ def reduce_scatter(output,
work.wait()


def all_to_all(output,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mind splitting this API into two: one for single tensor with output_split_sizes and one for list of tensors?
This would be good if we want to do type safety checking in PY, e.g. mypy and other API in this file more or less follow this convention (inputs/outputs are either single tensors, list of tensors or list of lists of tensors)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with other APIs, I would prefer to keep all_to_all name for list variant. What name would you suggest for single tensor variant? all_to_all_base or all_to_all_single or something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe all_to_all and all_to_all_single?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mrshenli and @agolynski

Is there an easy way to detect if a list of tensors is created from one big tensor using split or chunk operation? If yes, we can potentially eliminate need for all_to_all_single. I see that they share the same underlying storage but don't know if we can use any specific tensor property to infer that. The need for all_to_all_single comes from extra copies involved to create contiguous tensor inside MPI backend. If we can somehow avoid that, we can eliminate need for single tensor based version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do it, but I think it's good for consistency to have single tensor all-to-all API.

if you are interested in detecting such tensors you can take a look at #33924

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @agolynski . I had almost reinvented the same thing though I was trying to more generalize it to support uneven sizes and also different storage base, if possible, using offsets parameter available in most of vector versions of MPI APIs. (e.g. MPI_Allgatherv, MPI_Alltoallv). But I wasn't sure about right way to check. This PR confirms that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe all_to_all and all_to_all_single?

Done!

@drdarshan
Copy link
Contributor

@mrshenli @agolynski @drdarsha

@mrshenli @agolynski @drdarshan
Thanks for your time and discussion. Updated code to reflect changes from the discussion. Please take a look. Except tests, I tried to address all other comments/suggestions.

Hello @ddkalamk - no other major concerns from me. Thank you!

Comment on lines 1471 to 1472
output_split_sizes=[],
input_split_sizes=[],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lint failures on these two lines are real:

  {
    path: 'torch/distributed/distributed_c10d.py',
    start_line: 1471,
    end_line: 1471,
    start_column: 35,
    end_column: 35,
    annotation_level: 'failure',
    message: '[B006] Do not use mutable data structures for argument defaults.  They are created during function definition time. All calls to the function reuse this one instance of that data structure, persisting changes between them.'
  },
  {
    path: 'torch/distributed/distributed_c10d.py',
    start_line: 1472,
    end_line: 1472,
    start_column: 34,
    end_column: 34,
    annotation_level: 'failure',
    message: '[B006] Do not use mutable data structures for argument defaults.  They are created during function definition time. All calls to the function reuse this one instance of that data structure, persisting changes between them.'
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it to use None instead of [] as default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

@ddkalamk
Copy link
Contributor Author

@mrsalehi @agolynski @drdarshan
Sorry for the delay, had gotten busy with something more urgent. Pushed a new commit with discussed changes and updated documentation accordingly. Please take a look and let me know if this looks good.

@ddkalamk
Copy link
Contributor Author

Hi @mrsalehi @agolynski,

Could you please take a look to make progress on this PR?

Thanks.

Copy link
Contributor

@facebook-github-bot facebook-github-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srinivas212 has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.

Copy link
Contributor

@mrshenli mrshenli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ddkalamk

Sorry about the delay, and thanks a lot for the big effort. I left some comments inline, and most of them are nits.

Requesting changes mostly for two reasons:

  1. We will need test coverage before we can land this.
  2. Let's first mark this feature as experimental, so that we still have the chance to make API changes later.

Thank you!!

"""
Each process splits input tensor and then scatters the split list
to all processes in a group. Then concatenate the received tensors from all
the processes in the group and return single output tensor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a warning here to leave us wiggle room for future API changes?

.. warning::
  `all_to_all_single` is experimental and subject to change.

"""
Each process scatters list of input tensors to all processes in a group and
return gathered list of tensors in output list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Shall we add a warning here to leave us wiggle room for future API changes?

.. warning::
  `all_to_all_single` is experimental and subject to change.

int64_t computeLengthsAndOffsets(
const std::vector<int64_t>& split_sizes,
const at::Tensor& tensor,
std::vector<int>* lengths,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for passing in pointers instead of references?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed as per comment from @agolynski #32361 (comment)

nit: since these are output params, do you mind making them last and having
std::vector* lengths
e.g.
https://google.github.io/styleguide/cppguide.html#Output_Parameters

again: it's not a hard rule, but I think it improved readability for generations to come :)

@ddkalamk
Copy link
Contributor Author

Hey @ddkalamk

Sorry about the delay, and thanks a lot for the big effort. I left some comments inline, and most of them are nits.

Requesting changes mostly for two reasons:

  1. We will need test coverage before we can land this.
  2. Let's first mark this feature as experimental, so that we still have the chance to make API changes later.

Thank you!!

Hi @mrshenli, Thanks for the review.
Pushed another commit with suggested changes, please check. we are working on adding test coverage. So will have it soon.

@ddkalamk
Copy link
Contributor Author

Hi @mrshenli

Looking at Scatter/Gather tests, I have pushed few tests for all_to_all, please take a look and let me know if those looks good. Hoping to see this merge to master soon.

Thanks,
Dhiraj

@ddkalamk
Copy link
Contributor Author

@mrshenli Rebased to latest master with squash to single commit. Let me know if anything is still blocking.

-Dhiraj

@ddkalamk ddkalamk force-pushed the alltoall_support branch 2 times, most recently from bd2cb08 to 42211ba Compare March 30, 2020 18:33
Copy link
Contributor

@mrshenli mrshenli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Will stamp when all tests pass.

Copy link
Contributor

@facebook-github-bot facebook-github-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srinivas212 has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.

c10::DeviceGuard guard(srcdata.device());
std::unique_lock<std::mutex> globalLock(pgGlobalMutex_);
MPI_CHECK(MPI_Alltoall(
srcdata.data_ptr(),
Copy link
Contributor

@mrshenli mrshenli Mar 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, does clang-format say we should do 6-space indent here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-format says this should be 4 spaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extremely sorry @mrshenli. I am so dumb that I don't know how to use clang-format. Hopefully, I have done it write.
Please let me know if these are the correct steps to apply clang-format.

  • Stage the changes using git add
  • Run git-clang-format
  • Stage the new changes again

Last time I searched on web and added few command to my vimrc file, but looks like I didn't do it right. Sorry. Is there any similar way to format python code as well that i should use?

Copy link
Contributor

@mrshenli mrshenli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing some internal lint warnings on this diff. Could you please apply clang-format to the changes?

std::vector<int64_t> outputSplitSizes,
std::vector<int64_t> inputSplitSizes) {
return pg.alltoall_base(
output, input, outputSplitSizes, inputSplitSizes, ::c10d::AllToAllOptions());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-                    output, input, outputSplitSizes, inputSplitSizes, ::c10d::AllToAllOptions());
+                    output,
+                    input,
+                    outputSplitSizes,
+                    inputSplitSizes,
+                    ::c10d::AllToAllOptions());

std::vector<at::Tensor>& output,
std::vector<at::Tensor>& input) {
return pg.alltoall(
output, input, ::c10d::AllToAllOptions());
Copy link
Contributor

@mrshenli mrshenli Mar 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-                return pg.alltoall(
-                    output, input, ::c10d::AllToAllOptions());
+                return pg.alltoall(output, input, ::c10d::AllToAllOptions());

c10::DeviceGuard guard(srcdata.device());
std::unique_lock<std::mutex> globalLock(pgGlobalMutex_);
MPI_CHECK(MPI_Alltoall(
srcdata.data_ptr(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-format says this should be 4 spaces.

c10::DeviceGuard guard(srcdata.device());
std::unique_lock<std::mutex> globalLock(pgGlobalMutex_);
MPI_CHECK(MPI_Alltoallv(
srcdata.data_ptr(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

mpiDatatype.at(dstdata[0].scalar_type()),
pgComm_));

auto dstFlatDataSplits = dstFlatData.split_with_sizes(c10::IntArrayRef(recv_lengthsL), 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-        auto dstFlatDataSplits = dstFlatData.split_with_sizes(c10::IntArrayRef(recv_lengthsL), 0);
+        auto dstFlatDataSplits =
+            dstFlatData.split_with_sizes(c10::IntArrayRef(recv_lengthsL), 0);

Copy link
Contributor

@mrshenli mrshenli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! All tests pass, let's land!

@srinivas212 we need to import the latest version to trigger internal tests.

Copy link
Contributor

@facebook-github-bot facebook-github-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srinivas212 has imported this pull request. If you are a Facebook employee, you can view this diff on Phabricator.

@ddkalamk
Copy link
Contributor Author

ddkalamk commented Apr 1, 2020

Thank you everybody for your review comments and feedback for adding this feature!

@ddkalamk ddkalamk deleted the alltoall_support branch April 2, 2020 03:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
oncall: distributed Add this issue/PR to distributed oncall triage queue open source triaged This issue has been looked at a team member, and triaged and prioritized into an appropriate module
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants