-
Notifications
You must be signed in to change notification settings - Fork 26.3k
[rpc] poll for timed out futures in process group agent #29601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Adds a wrapper around the existing createException function that allows passing of an error string, instead of a regular C++ exception. This allows us to createExceptions for errors that aren't necessarilu c++ exceptions. This function is used by #29601 and #26336. Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/) [ghstack-poisoned]
Adds a wrapper around the existing createException function that allows passing of an error string, instead of a regular C++ exception. This allows us to createExceptions for errors that aren't necessarilu c++ exceptions. This function is used by #29601 and #26336. Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/) ghstack-source-id: 93682482 Pull Request resolved: #29605
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 93682940 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
Adds a wrapper around the existing `createException` function that allows passing of an error string, instead of a regular C++ exception. This allows us to mark futures with exceptions for errors that aren't necessarily c++ exceptions. This function is used by #29601 and #26336. Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/) [ghstack-poisoned]
Adds a wrapper around the existing `createException` function that allows passing of an error string, instead of a regular C++ exception. This allows us to mark futures with exceptions for errors that aren't necessarily c++ exceptions. This function is used by #29601 and #26336. Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/) [ghstack-poisoned]
Adds a wrapper around the existing `createException` function that allows passing of an error string, instead of a regular C++ exception. This allows us to mark futures with exceptions for errors that aren't necessarily c++ exceptions. This function is used by #29601 and #26336. Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/) [ghstack-poisoned]
Pull Request resolved: #29605 Adds a wrapper around the existing createException function that allows passing of an error string, instead of a regular C++ exception. This allows us to createExceptions for errors that aren't necessarilu c++ exceptions. This function is used by #29601 and #26336. ghstack-source-id: 93705275 Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 93710741 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
| nextId_(0), | ||
| sendMutexes_(pg_->getSize()), | ||
| threadPool_(numSendRecvThreads) { | ||
| shutdown_.store(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add to the member initializer list.
|
|
||
| void ProcessGroupAgent::pollTimedOutRPCs() { | ||
| while (!shutdown_.load()) { | ||
| std::this_thread::sleep_for(std::chrono::seconds(5)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can determine the time to wait by looking at the first entry in the ordered futures map. Then you either wait that long, or until you're woken up by a condition variable if there are no futures to begin with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pietern, for this suggestion, should we add a new condition variable that wakes up this thread whenever there is a new future created? There is currently a futureCV but this notifies when a future has been received and marked as completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that sounds good. In order to guarantee correctness we have to do this. There are all sorts of edge cases that we'd fail to cover otherwise. We only need to wait on this one if there are no futures to begin with, though. If there are, we can use the timeout of the first one. This assumes that it never happens that you first call fut1 with timeout=5 and right after that call fut2 with timeout=1 -- it assumes the absolute timestamp of timeouts will only increase. If we were to allow for this case, the logic becomes more complex and contentious, because you'd have to wake up every time a future is created, which seems wasteful to me.
Adds a wrapper around the existing `createException` function that allows passing of an error string, instead of a regular C++ exception. This allows us to mark futures with exceptions for errors that aren't necessarily c++ exceptions. This function is used by #29601 and #26336. Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/) [ghstack-poisoned]
Pull Request resolved: #29605 Adds a wrapper around the existing createException function that allows passing of an error string, instead of a regular C++ exception. This allows us to createExceptions for errors that aren't necessarilu c++ exceptions. This function is used by #29601 and #26336. ghstack-source-id: 93764993 Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)
Adds a wrapper around the existing `createException` function that allows passing of an error string, instead of a regular C++ exception. This allows us to mark futures with exceptions for errors that aren't necessarily c++ exceptions. This function is used by #29601 and #26336. Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/) [ghstack-poisoned]
Pull Request resolved: #29605 Adds a wrapper around the existing createException function that allows passing of an error string, instead of a regular C++ exception. This allows us to createExceptions for errors that aren't necessarilu c++ exceptions. This function is used by #29601 and #26336. ghstack-source-id: 93803072 Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)
Adds a wrapper around the existing `createException` function that allows passing of an error string, instead of a regular C++ exception. This allows us to mark futures with exceptions for errors that aren't necessarily c++ exceptions. This function is used by #29601 and #26336. Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/) [ghstack-poisoned]
Pull Request resolved: #29605 Adds a wrapper around the existing createException function that allows passing of an error string, instead of a regular C++ exception. This allows us to createExceptions for errors that aren't necessarilu c++ exceptions. This function is used by #29601 and #26336. ghstack-source-id: 93819039 Differential Revision: [D18439216](https://our.internmc.facebook.com/intern/diff/D18439216/)
Summary: Pull Request resolved: #29605 Adds a wrapper around the existing createException function that allows passing of an error string, instead of a regular C++ exception. This allows us to createExceptions for errors that aren't necessarilu c++ exceptions. This function is used by #29601 and #26336. ghstack-source-id: 93819039 Test Plan: Unit tests pass Differential Revision: D18439216 fbshipit-source-id: 70b6a2e4f107304e322cdd2630847ad0071bc0c1
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
|
I reworked the PR a bit to ensure that if a new future with a smaller TTL is created, it is appropriately inserted into the beginning of the map, and also implemented @jjlilley's suggestion to notify on the |
| return timedOutFutures; | ||
| } | ||
|
|
||
| const std::chrono::milliseconds ProcessGroupAgent::getRPCRemainingTime( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like these functions don't really belong in PG agent since they are rpc/future specific. As a follow up we should figure out where these sort of functions should go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, this could even be in private namespace {}, but this is fine too for now
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 94097468 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 94109478 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
pritamdamania87
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, @jjlilley could you take another look as well? Thanks!
jjlilley
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 94167479 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 9417513 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
|
Windows tests is broken, I think since this PR: #29747 - will submit a fix |
|
This pull request has been merged in 8351350. |
| startTime_(startTime), | ||
| dstRank_(dstRank), | ||
| timeout_(timeout) {} | ||
| FutureInfo() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beware: this allows for instances with garbage data. I'd either add member initializers here, or remove the default constructor entirely (if you're worried about inserting them into maps, use std::move or emplace).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in #30197
| } | ||
| auto futureInfo = FutureInfo(future, futureStartTime, to.id_, timeout); | ||
| futures_[requestId] = futureInfo; | ||
| auto rpcEndTime = getRPCEndTime(futureInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rohan-varma Can we store the rpcEndTime into FutureInfo to save a second call to getRPCEndTime on receiving?
I don't see futureInfo.startTime_ used else where except for calculating rpcEndTime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am changing this in #30342.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this would be a good optimization.
| // Do not hold the lock while marking futures completed, as markCompleted() | ||
| // could invoke callbacks. | ||
| lock.unlock(); | ||
| for (const auto& timedOutFuture : timedOutFutures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not futureCV_.notify_all(); after unlocking?
I think the size of futures_ map and the size futureTimeouts_ map have been reduced after const auto timedOutFutures = processTimedOutFutures();, then we can unblock ProcessGroupAgent::join() immediately.
But considering we will remove member function, ::join(), I think we will remove this CV as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this in #30355.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we don't hold the lock when notifyiing? It is unlocked on line 516.
| } | ||
|
|
||
| if (sleepTime == INFINITE_TIMEOUT) { | ||
| futureTimeoutCV_.wait(lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a continue; here?
Since when being waken up from ::send(..) call, the end times should not has arrived yet. There is no point of scanning the 2 maps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this in #30355.
| } | ||
|
|
||
| const std::vector<ProcessGroupAgent::FutureInfo> ProcessGroupAgent:: | ||
| processTimedOutFutures() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notice this function is mutating the 2 maps that need the mutex's protection.
I guess it would be better if this function takes a lock, which is moved from the same lock used by futureTimeoutCV_.wait_for(lock, sleepTime);
And at the end of the function,
lock.unlock()
futureCV_.notify_all();There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this in #30355.
Addresses @pietern's comment on #29601. This default constructor was added because `std::map`'s `operator[]` requires a default constructor. However, instead of using `operator[]`, we can use `emplace` and remove the constructor, to ensure that the `FutureInfo` struct doesn't get constructed with garbage values. Differential Revision: [D18627675](https://our.internmc.facebook.com/intern/diff/D18627675/) [ghstack-poisoned]
Stack from ghstack:
Follow up from #28392. Adds a background thread to
ProcessGroupAgentthat polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding mapsfutures_andfutureTimeouts. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.The existing
futureTimeouts_map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper functiongetRPCEndTimecan be used to retrieve the key into this map.Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the
futureTimeouts_map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).Also adds a
shutdownvariable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.Differential Revision: D18434215