-
Notifications
You must be signed in to change notification settings - Fork 26.3k
[distributed] add support for rpc timeouts in process group agent #28392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
timeouts Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out. Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/) [ghstack-poisoned]
… to support future timeouts" timeouts** timeouts Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out. Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/) [ghstack-poisoned]
… to support future timeouts" timeouts** timeouts Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out. Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/) [ghstack-poisoned]
… to support future timeouts" timeouts** timeouts Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out. Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/) [ghstack-poisoned]
timeouts Pull Request resolved: #28392 Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out. ghstack-source-id: 92337906 Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)
test/rpc_test.py
Outdated
| @requires_process_group_agent("PROCESS_GROUP rpc backend specific test, skip") | ||
| def test_timeout_set(self): | ||
| rpc_timeout_seconds = rpc.get_rpc_timeout().seconds | ||
| self.assertEqual(rpc_timeout_seconds, 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the default timeout for all RPCs to be 100 seconds for now, but I'm not sure what a good default is. If it's too small, users could get time outs when in fact the operation was going to complete successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use 10s for now but also provide a way to specify infinite timeout. Some RPC frameworks use timeout=0 for this. We should document this as well.
test/rpc_test.py
Outdated
| timeout_function, | ||
| args=(3,) | ||
| ) | ||
| time.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not ideal to do in a test, but I'm not sure how to test that the timer works other than this.
|
Could we add timeout outside of future class? e.g, in process group agent. One reason is that we will use IValue::Future soon, which does not have timeout setting. |
We'd need to store some information indicating the (relative) time the future was started, so that we can check if the future has timed out. Would it be okay to have this as an additional structure in ProcessGroupAgent? |
|
The future itself should not have a timeout. ProcessGroupAgent should have a timeout on a request and mark the appropriate future with an "Operation timed out" exception. |
test/rpc_test.py
Outdated
| args=(3,) | ||
| ) | ||
| time.sleep(1) | ||
| self.assertTrue(fut.check_time_elapsed(timedelta(seconds=0))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should expose a method like check_time_elapsed to users.
test/rpc_test.py
Outdated
| self.assertEqual(fut.wait(), torch.ones(n, n) * 2) | ||
|
|
||
| @dist_init | ||
| def test_future_timer(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the test we should call a method that takes more time than the timeout and verify that future.wait() throws a timed out exception.
test/rpc_test.py
Outdated
| @requires_process_group_agent("PROCESS_GROUP rpc backend specific test, skip") | ||
| def test_timeout_set(self): | ||
| rpc_timeout_seconds = rpc.get_rpc_timeout().seconds | ||
| self.assertEqual(rpc_timeout_seconds, 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make 100 a constant in the rpc_api.py that you can use elsewhere.
torch/csrc/distributed/rpc/init.cpp
Outdated
| py::arg("process_group"), | ||
| py::arg("num_send_recv_threads") = 4) | ||
| py::arg("num_send_recv_threads") = 4, | ||
| py::arg("future_timeout") = std::chrono::seconds(100)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use a constant.
| sendMutexes_(pg_->getSize()), | ||
| threadPool_(numSendRecvThreads) { | ||
| threadPool_(numSendRecvThreads), | ||
| futureTimeout_(futureTimeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be called rpc timeout and not future timeout
| return allWorkerInfo_[id]; | ||
| } | ||
|
|
||
| const std::chrono::seconds& ProcessGroupAgent::getRpcTimeout() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should store millisecond precision in this class and not at seconds precision.
| std::string workerName, | ||
| std::shared_ptr<c10d::ProcessGroup> pg, | ||
| int numSendRecvThreads = 4); | ||
| int numSendRecvThreads = 4, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not related to this PR, but make 4 a const too.
torch/distributed/rpc/__init__.py
Outdated
| self_rank=-1, | ||
| init_method=None, | ||
| num_send_recv_threads=4): | ||
| num_send_recv_threads=4, rpc_timeout=100): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rpc_timeout should be specified by the user in milliseconds and not seconds. Some users might want to set it to 100ms for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we enforce that the user facing API only accepts timedelta as the timeout parameter? This would make the API much cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also probably rename it to rpc_timeout_ms
test/rpc_test.py
Outdated
| @requires_process_group_agent("PROCESS_GROUP rpc backend specific test, skip") | ||
| def test_timeout_set(self): | ||
| rpc_timeout_seconds = rpc.get_rpc_timeout().seconds | ||
| self.assertEqual(rpc_timeout_seconds, 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use 10s for now but also provide a way to specify infinite timeout. Some RPC frameworks use timeout=0 for this. We should document this as well.
pietern
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the caller always responsible for timing out an operation?
| #pragma once | ||
|
|
||
| #include <torch/csrc/distributed/rpc/message.h> | ||
| #include <chrono> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include stdlib before everything else.
| // TODO: make message_ an optional field, and get rid of UNKNOWN message type | ||
| Message message_; | ||
| // clock to measure time elapsed since the future was created. | ||
| std::chrono::steady_clock clock_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used.
| // TODO: consider using ivalue::Future. | ||
| struct TORCH_API FutureMessage final { | ||
| public: | ||
| FutureMessage(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explicit
| if (completed_) { | ||
| return false; | ||
| } | ||
| const auto now = std::chrono::high_resolution_clock::now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use std::chrono::steady_clock every place in this diff from what I recall high_resolution_clock isn't guaranteed to be monotonic and may be more expensive than steady_clock.
…set timeout when initializing rpc" Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. In the next diff, we will add a separate thread to ProcessGroupAgent to clean up these futures if they have timed out. Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/) [ghstack-poisoned]
… when initializing rpc Pull Request resolved: #28392 Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out. ghstack-source-id: 92601334 Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)
…set timeout when initializing rpc" Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. In the next diff, we will add a separate thread to ProcessGroupAgent to clean up these futures if they have timed out. Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/) [ghstack-poisoned]
… when initializing rpc Pull Request resolved: #28392 Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out. ghstack-source-id: 92603029 Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)
…set timeout when initializing rpc" Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. In the next diff, we will add a separate thread to ProcessGroupAgent to clean up these futures if they have timed out. Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/) [ghstack-poisoned]
… when initializing rpc Pull Request resolved: #28392 Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. A future diff will check for these timeouts and mark the future completed with an exception indicating that it has timed out. ghstack-source-id: 92607009 Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/)
…set timeout when initializing rpc" Per #25531, we want to clean up futures when we detect that there are failures/timeouts. As a first step, this diff adds timers to the future object, provides functionality to check if a future is timed out, and allows specification of the timeout when initializing rpc. In the next diff, we will add a separate thread to ProcessGroupAgent to clean up these futures if they have timed out. Differential Revision: [D18025163](https://our.internmc.facebook.com/intern/diff/D18025163/) [ghstack-poisoned]
…s group agent" Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
…s group agent" Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
…s group agent" Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 94086158 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
…s group agent" Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
…s group agent" Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 94097468 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
…s group agent" Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 94109478 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
…s group agent" Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 94167479 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
…s group agent" Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. The existing `futureTimeouts_` map is reworked so that the key is now the time (in ms) that the RPC would timeout, instead of when the RPC began. The reason for this is that it allows us to order futures which have different timeouts, which is a feature that we'd want to support in the future. A helper function `getRPCEndTime` can be used to retrieve the key into this map. Since we'd like to be able to quickly stop checking for timed out RPCs after looking at the first entry in the `futureTimeouts_` map, we cannot key by beginning time (an RPC with a much smaller timeout may later be created) or by the timeout itself (since an RPC with a large timeout would not be at the beginning, even if it is about to time out). Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/) [ghstack-poisoned]
Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 94175131 Differential Revision: [D18434215](https://our.internmc.facebook.com/intern/diff/D18434215/)
Summary: Pull Request resolved: #29601 Follow up from #28392. Adds a background thread to `ProcessGroupAgent` that polls for timed out RPCs at a pre-set interval, and marks them as completed with a timeout exception if they have timed out. Also deletes the futures from the corresponding maps `futures_` and `futureTimeouts`. Unit tests are added to ensure that timed out RPCs are appropriately cleaned up. Also adds a `shutdown` variable to process group agent to control the shutting down of this background thread, which can eventually be extended to use for controlling a clean shutdown of process group agent. ghstack-source-id: 94175131 Test Plan: Added unit tests Differential Revision: D18434215 fbshipit-source-id: c48abdb8759fe1447200ec66bb9d4b1c50ec4535
Stack from ghstack:
Per #25531, we want to clean up futures when we detect that there are
failures/timeouts. As a first step, this diff adds a timeout parameter to process group agent (in miliseconds) that can be set by the user in Python, and also keeps track of timeouts by maintaining a map from timeout --> requests as suggested by @pritamdamania87. The map must stay in sync with the existing
futures_map, so we write/delete to this map only when the futures_ map is written/deleted.Note that this diff does not implement the actual process of detecting/handling timed out RPCs. The next step will be to implement a background thread that polls this map periodically and marks futures that have timed out. After this we can also add relevant unit tests. There's still an open question about how we should handle futures that complete/return from the caller after the timeout.
Differential Revision: D18025163