Skip to content

Conversation

@rohan-varma
Copy link
Contributor

@rohan-varma rohan-varma commented Nov 11, 2019

Stack from ghstack:

Follow up from #28392. Adds a background thread to ProcessGroupAgent that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps futures_ and futureTimeouts. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing futureTimeouts_ map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function getRPCEndTime can be used to retrieve the key into this map.

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the futureTimeouts_ map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a shutdown variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: D18434215

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 11, 2019
Adds a wrapper around the existing createException function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to createExceptions for errors that aren't necessarilu c++
exceptions. This function is used by
#29601 and
#26336.

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 11, 2019
Adds a wrapper around the existing createException function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to createExceptions for errors that aren't necessarilu c++
exceptions. This function is used by
#29601 and
#26336.

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)

ghstack-source-id: 93682482
Pull Request resolved: #29605
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 11, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 93682940

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
rohan-varma added a commit that referenced this pull request Nov 12, 2019
Adds a wrapper around the existing `createException` function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to mark futures with exceptions for errors that aren't necessarily c++
exceptions. This function is used by
#29601 and #26336.

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 12, 2019
Adds a wrapper around the existing `createException` function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to mark futures with exceptions for errors that aren't necessarily c++
exceptions. This function is used by
#29601 and #26336.

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 12, 2019
Adds a wrapper around the existing `createException` function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to mark futures with exceptions for errors that aren't necessarily c++
exceptions. This function is used by
#29601 and #26336.

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 12, 2019
Pull Request resolved: #29605

Adds a wrapper around the existing createException function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to createExceptions for errors that aren't necessarilu c++
exceptions. This function is used by
#29601 and
#26336.
ghstack-source-id: 93705275

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 12, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 93710741

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
nextId_(0),
sendMutexes_(pg_->getSize()),
threadPool_(numSendRecvThreads) {
shutdown_.store(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add to the member initializer list.


void ProcessGroupAgent::pollTimedOutRPCs() {
while (!shutdown_.load()) {
std::this_thread::sleep_for(std::chrono::seconds(5));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can determine the time to wait by looking at the first entry in the ordered futures map. Then you either wait that long, or until you're woken up by a condition variable if there are no futures to begin with.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pietern, for this suggestion, should we add a new condition variable that wakes up this thread whenever there is a new future created? There is currently a futureCV but this notifies when a future has been received and marked as completed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that sounds good. In order to guarantee correctness we have to do this. There are all sorts of edge cases that we'd fail to cover otherwise. We only need to wait on this one if there are no futures to begin with, though. If there are, we can use the timeout of the first one. This assumes that it never happens that you first call fut1 with timeout=5 and right after that call fut2 with timeout=1 -- it assumes the absolute timestamp of timeouts will only increase. If we were to allow for this case, the logic becomes more complex and contentious, because you'd have to wake up every time a future is created, which seems wasteful to me.

rohan-varma added a commit that referenced this pull request Nov 12, 2019
Adds a wrapper around the existing `createException` function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to mark futures with exceptions for errors that aren't necessarily c++
exceptions. This function is used by
#29601 and #26336.

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 12, 2019
Pull Request resolved: #29605

Adds a wrapper around the existing createException function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to createExceptions for errors that aren't necessarilu c++
exceptions. This function is used by
#29601 and
#26336.
ghstack-source-id: 93764993

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)
rohan-varma added a commit that referenced this pull request Nov 13, 2019
Adds a wrapper around the existing `createException` function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to mark futures with exceptions for errors that aren't necessarily c++
exceptions. This function is used by
#29601 and #26336.

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 13, 2019
Pull Request resolved: #29605

Adds a wrapper around the existing createException function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to createExceptions for errors that aren't necessarilu c++
exceptions. This function is used by
#29601 and
#26336.
ghstack-source-id: 93803072

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)
rohan-varma added a commit that referenced this pull request Nov 13, 2019
Adds a wrapper around the existing `createException` function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to mark futures with exceptions for errors that aren't necessarily c++
exceptions. This function is used by
#29601 and #26336.

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 13, 2019
Pull Request resolved: #29605

Adds a wrapper around the existing createException function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to createExceptions for errors that aren't necessarilu c++
exceptions. This function is used by
#29601 and
#26336.
ghstack-source-id: 93819039

Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)
facebook-github-bot pushed a commit that referenced this pull request Nov 13, 2019
Summary:
Pull Request resolved: #29605

Adds a wrapper around the existing createException function that
allows passing of an error string, instead of a regular C++ exception. This
allows us to createExceptions for errors that aren't necessarilu c++
exceptions. This function is used by
#29601 and
#26336.
ghstack-source-id: 93819039

Test Plan: Unit tests pass

Differential Revision: D18439216

fbshipit-source-id: 70b6a2e4f107304e322cdd2630847ad0071bc0c1
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
@rohan-varma
Copy link
Contributor Author

I reworked the PR a bit to ensure that if a new future with a smaller TTL is created, it is appropriately inserted into the beginning of the map, and also implemented @jjlilley's suggestion to notify on the futureTimeoutCV if such an event occurs. The main change is that the futureTimeouts_ map is now keyed by the expiry time of the RPC, instead of the beginning time (see PR for explanation).

return timedOutFutures;
}

const std::chrono::milliseconds ProcessGroupAgent::getRPCRemainingTime(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like these functions don't really belong in PG agent since they are rpc/future specific. As a follow up we should figure out where these sort of functions should go

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, this could even be in private namespace {}, but this is fine too for now

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 18, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 94097468

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 18, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 94109478

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
Copy link
Contributor

@pritamdamania87 pritamdamania87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, @jjlilley could you take another look as well? Thanks!

Copy link

@jjlilley jjlilley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as well

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 19, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 94167479

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 19, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 9417513

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
@rohan-varma
Copy link
Contributor Author

rohan-varma commented Nov 19, 2019

Windows tests is broken, I think since this PR: #29747 - will submit a fix

@facebook-github-bot
Copy link
Contributor

This pull request has been merged in 8351350.

startTime_(startTime),
dstRank_(dstRank),
timeout_(timeout) {}
FutureInfo() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beware: this allows for instances with garbage data. I'd either add member initializers here, or remove the default constructor entirely (if you're worried about inserting them into maps, use std::move or emplace).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #30197

}
auto futureInfo = FutureInfo(future, futureStartTime, to.id_, timeout);
futures_[requestId] = futureInfo;
auto rpcEndTime = getRPCEndTime(futureInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohan-varma Can we store the rpcEndTime into FutureInfo to save a second call to getRPCEndTime on receiving?

I don't see futureInfo.startTime_ used else where except for calculating rpcEndTime.

Copy link
Contributor

@xush6528 xush6528 Nov 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am changing this in #30342.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this would be a good optimization.

// Do not hold the lock while marking futures completed, as markCompleted()
// could invoke callbacks.
lock.unlock();
for (const auto& timedOutFuture : timedOutFutures) {
Copy link
Contributor

@xush6528 xush6528 Nov 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not futureCV_.notify_all(); after unlocking?
I think the size of futures_ map and the size futureTimeouts_ map have been reduced after const auto timedOutFutures = processTimedOutFutures();, then we can unblock ProcessGroupAgent::join() immediately.

But considering we will remove member function, ::join(), I think we will remove this CV as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this in #30355.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we don't hold the lock when notifyiing? It is unlocked on line 516.

}

if (sleepTime == INFINITE_TIMEOUT) {
futureTimeoutCV_.wait(lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a continue; here?

Since when being waken up from ::send(..) call, the end times should not has arrived yet. There is no point of scanning the 2 maps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this in #30355.

}

const std::vector<ProcessGroupAgent::FutureInfo> ProcessGroupAgent::
processTimedOutFutures() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice this function is mutating the 2 maps that need the mutex's protection.

I guess it would be better if this function takes a lock, which is moved from the same lock used by futureTimeoutCV_.wait_for(lock, sleepTime);

And at the end of the function,

lock.unlock()
futureCV_.notify_all();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this in #30355.

rohan-varma added a commit that referenced this pull request Dec 3, 2019
Addresses @pietern's comment on #29601. This default constructor was added because `std::map`'s `operator[]` requires a default constructor. However, instead of using `operator[]`, we can
use `emplace` and remove the constructor, to ensure that the `FutureInfo` struct
doesn't get constructed with garbage values.

Differential Revision: [D18627675](https://our.internmc.facebook.com/intern/diff/D18627675/)

[ghstack-poisoned]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Merged module: rpc Related to RPC, distributed autograd, RRef, and distributed optimizer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants