-
Notifications
You must be signed in to change notification settings - Fork 26.3k
[pytorch][rpc] add exception handling in enqueueSend #26336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Previously, we were swallowing exceptions in ProcessGroupAgent::enqueueSend. This diff passes in a future into enqueueSend, and marks that future with a message that has type Exception if an exception was caught. Differential Revision: [D17416185](https://our.internmc.facebook.com/intern/diff/D17416185/) [ghstack-poisoned]
Previously, we were swallowing exceptions in ProcessGroupAgent::enqueueSend. Per #25516, This diff passes in a future into enqueueSend, and marks that future with a message that has type Exception if an exception was caught. Differential Revision: [D17416185](https://our.internmc.facebook.com/intern/diff/D17416185/) [ghstack-poisoned]
Pull Request resolved: #26336 Previously, we were swallowing exceptions in ProcessGroupAgent::enqueueSend. This diff passes in a future into enqueueSend, and marks that future with a message that has type Exception if an exception was caught. ghstack-source-id: 90226784 Differential Revision: [D17416185](https://our.internmc.facebook.com/intern/diff/D17416185/)
| } | ||
| message.setId(requestId); | ||
| } else { | ||
| future->markCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding of the issue, we want to pass the future into enqueueSend and mark it with an exception message type if we catch an exception in the lambda. But the future API only lets us calls markCompleted once, so we'd have to remove this.
| } | ||
| future->markCompleted(); | ||
| } catch (...) { | ||
| future->markCompleted(Message({}, {}, MessageType::EXCEPTION)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hides the underlying cause. Always make it visible. This mutes programming errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @pritamdamania87 , is this what you meant by catching the exception and marking the future with the exception? If so, is there a way to make sure that the exception information isn't lost? I was thinking we could put the exception information into the message's payload for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do something like this:
} catch (std::exception& e) {
future->markCompleted(createException(work.message_, e));
} catch (...) {
future->markCompleted(Message({"Unknown exception occurred"});
}
You can probably refactor createException from request_callback.cpp and re-use it here.
pritamdamania87
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a unit test for this? One idea is we can call enqueueSend with an invalid message and verify an appropriate exception is thrown.
| for (auto& pendingSend : pendingSends) { | ||
| pendingSend->wait(); | ||
| } | ||
| future->markCompleted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't right, we should mark the future completed once we receive the response. We should mark the future completed here only if we don't require a response.
| } | ||
| future->markCompleted(); | ||
| } catch (...) { | ||
| future->markCompleted(Message({}, {}, MessageType::EXCEPTION)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do something like this:
} catch (std::exception& e) {
future->markCompleted(createException(work.message_, e));
} catch (...) {
future->markCompleted(Message({"Unknown exception occurred"});
}
You can probably refactor createException from request_callback.cpp and re-use it here.
|
@mrshenli Would be nice if you could take a look at this PR too. |
| void ProcessGroupAgent::enqueueSend(SendWork work) { | ||
| void ProcessGroupAgent::enqueueSend( | ||
| SendWork work, | ||
| std::shared_ptr<FutureMessage> future) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This future can be retrieved using futures_[work.message_.id()], do we still need to pass in the future as an arg?
| enqueueSend( | ||
| SendWork(workerIds_[dst], Message({}, {}, MessageType::SHUTDOWN))); | ||
| SendWork(workerIds_[dst], Message({}, {}, MessageType::SHUTDOWN)), | ||
| future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh I see, this future is not added into the futures_ map. Shall add this to the map as well and use a similar signature for enqueueSend and enqueueRecv ?
|
I am wondering if it is possible to come up with some tests for this. @pietern would it work if we manually force the receiver process to exit after creating the process group. Will the sender then timeout on the send? |
|
That's possible, yeah. You can make the receiver take 1 message and then exit. The next send operation will not timeout but fail immediately with a "connection reset by peer" error, most likely. Otherwise, you could even synthesize some |
Previously, we were swallowing exceptions in ProcessGroupAgent::enqueueSend. Per #25516, This diff passes in a future into enqueueSend, and marks that future with a message that has type Exception if an exception was caught. Differential Revision: [D17416185](https://our.internmc.facebook.com/intern/diff/D17416185/) [ghstack-poisoned]
Previously, we were swallowing exceptions in ProcessGroupAgent::enqueueSend. Per #25516, This diff passes in a future into enqueueSend, and marks that future with a message that has type Exception if an exception was caught. Differential Revision: [D17416185](https://our.internmc.facebook.com/intern/diff/D17416185/) [ghstack-poisoned]
Previously, we were swallowing exceptions in ProcessGroupAgent::enqueueSend. Per #25516, This diff passes in a future into enqueueSend, and marks that future with a message that has type Exception if an exception was caught. Differential Revision: [D17416185](https://our.internmc.facebook.com/intern/diff/D17416185/) [ghstack-poisoned]
Pull Request resolved: #26336 Previously, we were swallowing exceptions in ProcessGroupAgent::enqueueSend. This diff passes in a future into enqueueSend, and marks that future with a message that has type Exception if an exception was caught. ghstack-source-id: 91237144 Differential Revision: [D17416185](https://our.internmc.facebook.com/intern/diff/D17416185/)
Previously, we were swallowing exceptions in ProcessGroupAgent::enqueueSend. Per #25516, This diff passes in a future into enqueueSend, and marks that future with a message that has type Exception if an exception was caught. Differential Revision: [D17416185](https://our.internmc.facebook.com/intern/diff/D17416185/) [ghstack-poisoned]
Previously, we were swallowing exceptions in ProcessGroupAgent::enqueueSend. Per #25516, This diff passes in a future into enqueueSend, and marks that future with a message that has type Exception if an exception was caught. Differential Revision: [D17416185](https://our.internmc.facebook.com/intern/diff/D17416185/) [ghstack-poisoned]
Previously, we were swallowing exceptions in ProcessGroupAgent::enqueueSend. Per #25516, This diff passes in a future into enqueueSend, and marks that future with a message that has type Exception if an exception was caught. Differential Revision: [D17416185](https://our.internmc.facebook.com/intern/diff/D17416185/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
…nd report them." Resubmit of #26336, since that PR was a few months old and many things have changed in PG agent. Adds support to catch exceptions in ProcessGroupAgent::enqueueSend and report them in the future by marking the future as completed with an exception indicating the error. An example of when this could happen is if the receiving side aborts when the sender is sending the message, previously, we would hang until the timeout is hit, and the original exception would be lost. Also adds a destructor that tries to do a non-graceful `shutdown()`, which is needed, otherwise, the tests would complain about the threads not being terminated properly. This destructor is now possible since we have implemented a shutdown that does not rely on sending messages to the other workers telling them to shutdown. A slight modification is made to `dist_utils.dist_init`: the `rpc.shutdown()` call at the end should be called with `graceful=clean_shutdown`, so that we can force a non-graceful shutdown in tests. This is used in the added unit test and does not affect the other callsites. Differential Revision: [D18901981](https://our.internmc.facebook.com/intern/diff/D18901981/) [ghstack-poisoned]
Stack from ghstack:
in
ProcessGroupAgent::enqueueSend, exceptions on the caller side can result in the RPC blocking forever, since the future is never marked as finished. This diff adds exception handling intoenqueueSendso that the future is marked as completed with an exception, preventing this. See #25516, for more context.Note: exceptions on the callee side (such as calling a function with an invalid # of arguments) are already handled in the callback function that is executed on the callee side.
Test plan: Run the following script:
Previously, this code would hang until the timeout passed into
init_process_groupwas hit, and we'd see an error such asRuntimeError: [../third_party/gloo/gloo/transport/tcp/unbound_buffer.cc:119] Timed out waiting 1800000ms for send operation to complete. (note that this is not the original exception, which was lost)With this patch, the exception is caught and passed up:
The added unit test simulates this behavior.
Differential Revision: D17416185