-
Notifications
You must be signed in to change notification settings - Fork 26.3k
Description
Currently, ProcessGroupGloo wraps arguments of a collective comm into a work and insert that work item into a queue. There is a separate thread running the runLoop function and pops work item from the queue and launch communications.
pytorch/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp
Lines 748 to 770 in 1f1d01d
| void ProcessGroupGloo::runLoop(int workerIndex) { | |
| std::unique_lock<std::mutex> lock(workMutex_); | |
| while (!stop_) { | |
| if (workQueue_.empty()) { | |
| workProduceCV_.wait(lock); | |
| continue; | |
| } | |
| auto work = std::move(workQueue_.front()); | |
| workQueue_.pop_front(); | |
| workInProgress_[workerIndex] = work; | |
| lock.unlock(); | |
| // Notify after releasing the lock so that the waiter | |
| // does not immediately block. | |
| workConsumeCV_.notify_one(); | |
| AsyncWork::execute(std::move(work)); | |
| lock.lock(); | |
| workInProgress_[workerIndex].reset(); | |
| } | |
| } |
The work item will first run the collective comm, then synchronize stream, and then mark the future as completed.
pytorch/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp
Lines 407 to 419 in 1f1d01d
| void ProcessGroupGloo::AsyncWork::execute(c10::intrusive_ptr<AsyncWork> work) { | |
| try { | |
| work->run(); | |
| } catch (...) { | |
| work->finishWorkGlooError(std::current_exception()); | |
| return; | |
| } | |
| // FIXME: We need to call it here since Future completion requires all | |
| // the work to be synchronized to CUDA. | |
| work->synchronize(); | |
| work->finishWorkGloo(); | |
| } |
There might be a problem here, where the background thread syncs to the current stream and can interfere with the computation ops. If no one is calling wait on the AsyncWork, it will create unnecessary synchronizations.
pytorch/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp
Lines 1308 to 1314 in 1f1d01d
| void synchronize() override { | |
| // Synchronize with the copy back to CUDA tensors. | |
| for(const auto i : c10::irange(inputs.size())) { | |
| c10::Device device = inputs[i].device(); | |
| events[i].block(c10::impl::VirtualGuardImpl(device.type()).getStream(device)); | |
| } | |
| } |
A better solution might be passing devices to the Future ctor and passing storage to markCompleted to enable CUDA Future logic.
pytorch/torch/csrc/distributed/c10d/ProcessGroupGloo.cpp
Lines 439 to 447 in 1f1d01d
| c10::intrusive_ptr<c10::ivalue::Future> createFutureAsOutput( | |
| const std::vector<std::vector<at::Tensor>>& outputTensors) { | |
| if (outputTensors.size() > 1) { | |
| return c10::make_intrusive<c10::ivalue::Future>( | |
| c10::ListType::create(c10::ListType::create(c10::TensorType::get()))); | |
| } | |
| return c10::make_intrusive<c10::ivalue::Future>( | |
| c10::ListType::create(c10::TensorType::get())); | |
| } |
cc @pietern @mrshenli @pritamdamania87 @zhaojuanmao @satgera @rohan-varma @gqchen @aazzolini @osalpekar @jiayisuse @agolynski @SciPioneer @H-Huang @mrzzd @cbalioglu @gcramer23