Skip to content

Conversation

@rohan-varma
Copy link
Contributor

@rohan-varma rohan-varma commented Oct 21, 2019

Stack from ghstack:

Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds a timeout parameter to process group agent (in miliseconds) that can be set by the user in Python, and also keeps track of timeouts by maintaining a map from timeout --> requests as suggested by @pritamdamania87. The map must stay in sync with the existing futures_ map, so we write/delete to this map only when the futures_ map is written/deleted.

Note that this diff does not implement the actual process of detecting/handling timed out RPCs. The next step will be to implement a background thread that polls this map periodically and marks futures that have timed out. After this we can also add relevant unit tests. There's still an open question about how we should handle futures that complete/return from the caller after the timeout.

Differential Revision: D18025163

timeouts

Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out.

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)

[ghstack-poisoned]
@rohan-varma rohan-varma changed the title [distributed] add timers to future and timeout to rpc to support future timeouts [WIP][distributed] add timers to future and timeout to rpc to support future timeouts Oct 21, 2019
… to support future timeouts"

timeouts**

timeouts

Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out.

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)

[ghstack-poisoned]
… to support future timeouts"

timeouts**

timeouts

Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out.

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)

[ghstack-poisoned]
@rohan-varma rohan-varma requested a review from apaszke as a code owner October 22, 2019 00:07
… to support future timeouts"

timeouts**

timeouts

Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out.

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Oct 22, 2019
timeouts

Pull Request resolved: #28392

Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out.
ghstack-source-id: 92337906

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)
@rohan-varma rohan-varma changed the title [WIP][distributed] add timers to future and timeout to rpc to support future timeouts [distributed] add timeout for RPC futures, and ability to set timeout when initializing rpc Oct 22, 2019
test/rpc_test.py Outdated
@requires_process_group_agent("PROCESS_GROUP rpc backend specific test, skip")
def test_timeout_set(self):
rpc_timeout_seconds = rpc.get_rpc_timeout().seconds
self.assertEqual(rpc_timeout_seconds, 100)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the default timeout for all RPCs to be 100 seconds for now, but I'm not sure what a good default is. If it's too small, users could get time outs when in fact the operation was going to complete successfully.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use 10s for now but also provide a way to specify infinite timeout. Some RPC frameworks use timeout=0 for this. We should document this as well.

test/rpc_test.py Outdated
timeout_function,
args=(3,)
)
time.sleep(1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not ideal to do in a test, but I'm not sure how to test that the timer works other than this.

@zhaojuanmao
Copy link
Contributor

Could we add timeout outside of future class? e.g, in process group agent. One reason is that we will use IValue::Future soon, which does not have timeout setting.

@rohan-varma
Copy link
Contributor Author

Could we add timeout outside of future class? e.g, in process group agent. One reason is that we will use IValue::Future soon, which does not have timeout setting.

We'd need to store some information indicating the (relative) time the future was started, so that we can check if the future has timed out. Would it be okay to have this as an additional structure in ProcessGroupAgent?

@pritamdamania87
Copy link
Contributor

The future itself should not have a timeout. ProcessGroupAgent should have a timeout on a request and mark the appropriate future with an "Operation timed out" exception.

test/rpc_test.py Outdated
args=(3,)
)
time.sleep(1)
self.assertTrue(fut.check_time_elapsed(timedelta(seconds=0)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should expose a method like check_time_elapsed to users.

test/rpc_test.py Outdated
self.assertEqual(fut.wait(), torch.ones(n, n) * 2)

@dist_init
def test_future_timer(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the test we should call a method that takes more time than the timeout and verify that future.wait() throws a timed out exception.

test/rpc_test.py Outdated
@requires_process_group_agent("PROCESS_GROUP rpc backend specific test, skip")
def test_timeout_set(self):
rpc_timeout_seconds = rpc.get_rpc_timeout().seconds
self.assertEqual(rpc_timeout_seconds, 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: make 100 a constant in the rpc_api.py that you can use elsewhere.

py::arg("process_group"),
py::arg("num_send_recv_threads") = 4)
py::arg("num_send_recv_threads") = 4,
py::arg("future_timeout") = std::chrono::seconds(100))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use a constant.

sendMutexes_(pg_->getSize()),
threadPool_(numSendRecvThreads) {
threadPool_(numSendRecvThreads),
futureTimeout_(futureTimeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be called rpc timeout and not future timeout

return allWorkerInfo_[id];
}

const std::chrono::seconds& ProcessGroupAgent::getRpcTimeout() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should store millisecond precision in this class and not at seconds precision.

std::string workerName,
std::shared_ptr<c10d::ProcessGroup> pg,
int numSendRecvThreads = 4);
int numSendRecvThreads = 4,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not related to this PR, but make 4 a const too.

self_rank=-1,
init_method=None,
num_send_recv_threads=4):
num_send_recv_threads=4, rpc_timeout=100):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rpc_timeout should be specified by the user in milliseconds and not seconds. Some users might want to set it to 100ms for example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we enforce that the user facing API only accepts timedelta as the timeout parameter? This would make the API much cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also probably rename it to rpc_timeout_ms

test/rpc_test.py Outdated
@requires_process_group_agent("PROCESS_GROUP rpc backend specific test, skip")
def test_timeout_set(self):
rpc_timeout_seconds = rpc.get_rpc_timeout().seconds
self.assertEqual(rpc_timeout_seconds, 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use 10s for now but also provide a way to specify infinite timeout. Some RPC frameworks use timeout=0 for this. We should document this as well.

Copy link
Contributor

@pietern pietern left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the caller always responsible for timing out an operation?

#pragma once

#include <torch/csrc/distributed/rpc/message.h>
#include <chrono>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Include stdlib before everything else.

// TODO: make message_ an optional field, and get rid of UNKNOWN message type
Message message_;
// clock to measure time elapsed since the future was created.
std::chrono::steady_clock clock_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not used.

// TODO: consider using ivalue::Future.
struct TORCH_API FutureMessage final {
public:
FutureMessage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicit

if (completed_) {
return false;
}
const auto now = std::chrono::high_resolution_clock::now();
Copy link
Contributor

@satgera satgera Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use std::chrono::steady_clock every place in this diff from what I recall high_resolution_clock isn't guaranteed to be monotonic and may be more expensive than steady_clock.

…set timeout when initializing rpc"


Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. In the next diff, we will add a separate thread to ProcessGroupAgent to clean up these futures if they have timed out.

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Oct 25, 2019
… when initializing rpc

Pull Request resolved: #28392

Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out.
ghstack-source-id: 92601334

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)
…set timeout when initializing rpc"


Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. In the next diff, we will add a separate thread to ProcessGroupAgent to clean up these futures if they have timed out.

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Oct 25, 2019
… when initializing rpc

Pull Request resolved: #28392

Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out.
ghstack-source-id: 92603029

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)
…set timeout when initializing rpc"


Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. In the next diff, we will add a separate thread to ProcessGroupAgent to clean up these futures if they have timed out.

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Oct 25, 2019
… when initializing rpc

Pull Request resolved: #28392

Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out.
ghstack-source-id: 92607009

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)
…set timeout when initializing rpc"


Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds timers to the future object,
provides functionality to check if a future is timed out, and allows
specification of the timeout when initializing rpc. In the next diff, we will add a separate thread to ProcessGroupAgent to clean up these futures if they have timed out.

Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 17, 2019
…s group agent"


Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 17, 2019
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 17, 2019
…s group agent"


Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 17, 2019
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 17, 2019
…s group agent"


Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 17, 2019
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 17, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 94086158

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
rohan-varma added a commit that referenced this pull request Nov 18, 2019
…s group agent"


Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 18, 2019
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 18, 2019
…s group agent"


Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 18, 2019
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 18, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 94097468

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
rohan-varma added a commit that referenced this pull request Nov 18, 2019
…s group agent"


Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 18, 2019
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 18, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 94109478

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
rohan-varma added a commit that referenced this pull request Nov 19, 2019
…s group agent"


Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 19, 2019
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 19, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 94167479

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
rohan-varma added a commit that referenced this pull request Nov 19, 2019
…s group agent"


Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 19, 2019
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. 

Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out).

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)

[ghstack-poisoned]
rohan-varma added a commit that referenced this pull request Nov 19, 2019
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 94175131

Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
facebook-github-bot pushed a commit that referenced this pull request Nov 19, 2019
Summary:
Pull Request resolved: #29601

Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up.

Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent.
ghstack-source-id: 94175131

Test Plan: Added unit tests

Differential Revision: D18434215

fbshipit-source-id: c48abdb8759fe1447200ec66bb9d4b1c50ec4535
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants