Skip to content

ProcessGroupGloo CUDA Comm Synchronization Looks Wrong #62300

@mrshenli

Description

@mrshenli

Currently, ProcessGroupGloo wraps arguments of a collective comm into a work and insert that work item into a queue. There is a separate thread running the runLoop function and pops work item from the queue and launch communications.

void ProcessGroupGloo::runLoop(int workerIndex) {
std::unique_lock<std::mutex> lock(workMutex_);
while (!stop_) {
if (workQueue_.empty()) {
workProduceCV_.wait(lock);
continue;
}
auto work = std::move(workQueue_.front());
workQueue_.pop_front();
workInProgress_[workerIndex] = work;
lock.unlock();
// Notify after releasing the lock so that the waiter
// does not immediately block.
workConsumeCV_.notify_one();
AsyncWork::execute(std::move(work));
lock.lock();
workInProgress_[workerIndex].reset();
}
}

The work item will first run the collective comm, then synchronize stream, and then mark the future as completed.

void ProcessGroupGloo::AsyncWork::execute(c10::intrusive_ptr<AsyncWork> work) {
try {
work->run();
} catch (...) {
work->finishWorkGlooError(std::current_exception());
return;
}
// FIXME: We need to call it here since Future completion requires all
// the work to be synchronized to CUDA.
work->synchronize();
work->finishWorkGloo();
}

There might be a problem here, where the background thread syncs to the current stream and can interfere with the computation ops. If no one is calling wait on the AsyncWork, it will create unnecessary synchronizations.

void synchronize() override {
// Synchronize with the copy back to CUDA tensors.
for(const auto i : c10::irange(inputs.size())) {
c10::Device device = inputs[i].device();
events[i].block(c10::impl::VirtualGuardImpl(device.type()).getStream(device));
}
}

A better solution might be passing devices to the Future ctor and passing storage to markCompleted to enable CUDA Future logic.

c10::intrusive_ptr<c10::ivalue::Future> createFutureAsOutput(
const std::vector<std::vector<at::Tensor>>& outputTensors) {
if (outputTensors.size() > 1) {
return c10::make_intrusive<c10::ivalue::Future>(
c10::ListType::create(c10::ListType::create(c10::TensorType::get())));
}
return c10::make_intrusive<c10::ivalue::Future>(
c10::ListType::create(c10::TensorType::get()));
}

cc @pietern @mrshenli @pritamdamania87 @zhaojuanmao @satgera @rohan-varma @gqchen @aazzolini @osalpekar @jiayisuse @agolynski @SciPioneer @H-Huang @mrzzd @cbalioglu @gcramer23

Metadata

Metadata

Assignees

No one assigned

    Labels

    module: c10dIssues/PRs related to collective communications and process groupsoncall: distributedAdd this issue/PR to distributed oncall triage queue

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions