-
Notifications
You must be signed in to change notification settings - Fork 26.3k
[torchelastic][c10d] Fix store prefix race in rendezvous #135957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
[ghstack-poisoned]
🔗 Helpful Links🧪 See artifacts and rendered test results at hud.pytorch.org/pr/135957
Note: Links to docs will display an error until the docs builds have been completed. ❌ 1 New Failure, 4 Unrelated FailuresAs of commit 4eb521f with merge base a0c76ea ( NEW FAILURE - The following job has failed:
FLAKY - The following jobs failed but were likely due to flakiness present on trunk:
This comment was automatically generated by Dr. CI and updates every 15 minutes. |
|
Send out the PR for RFC first and unit test is on the way. |
1. We want to take option 3 as discussed in #135712, so every time when we retry, we create a new TCPStore so that we don't need to append attemp prefix and avoid eventually TCPStore sync failure. 2. Upon checking the code, we creating a new TCPStore in c10d_rendezvous_backend.py before we even hitting the the logic inside dynamic_rendezvous.py. Since we use `multi_tenant=True`, so we don't create new server in TCPStore.cpp if this flag is set: ```cpp if (opts.multiTenant) { std::lock_guard<std::mutex> guard{cache_mutex_}; // If the caller is okay with a multi-tenant store, first check if we // already have a TCPServer running on the specified port. if (opts.port > 0) { auto pos = cachedServers_.find(opts.port); if (pos != cachedServers_.end()) { server = pos->second.lock(); if (server != nullptr) { return server; } // Looks like the TCPStore has been disposed, make sure that we release // the control block. cachedServers_.erase(pos); } } server = startCore(); cachedServers_.emplace(server->port(), server); } ``` so if we are sure the TCPStore server is destroyed everytime we retry, we are recreating a TCPStore server already; otherwise, we need more changes to the elastic code. 3. This changes will force broadcasting a new mast address and port everytime when dynamic_rendezvous's `next_rendezvous` is called. Send a PR first to collect feedback. cc XilunWu H-Huang awgu kwen2501 wanchaol fegin wz337 wconstab d4l3k c-p-i-o [ghstack-poisoned]
…ous" 1. We want to take option 3 as discussed in #135712, so every time when we retry, we create a new TCPStore so that we don't need to append attemp prefix and avoid eventually TCPStore sync failure. 2. Upon checking the code, we creating a new TCPStore in c10d_rendezvous_backend.py before we even hitting the the logic inside dynamic_rendezvous.py. Since we use `multi_tenant=True`, so we don't create new server in TCPStore.cpp if this flag is set: ```cpp if (opts.multiTenant) { std::lock_guard<std::mutex> guard{cache_mutex_}; // If the caller is okay with a multi-tenant store, first check if we // already have a TCPServer running on the specified port. if (opts.port > 0) { auto pos = cachedServers_.find(opts.port); if (pos != cachedServers_.end()) { server = pos->second.lock(); if (server != nullptr) { return server; } // Looks like the TCPStore has been disposed, make sure that we release // the control block. cachedServers_.erase(pos); } } server = startCore(); cachedServers_.emplace(server->port(), server); } ``` so if we are sure the TCPStore server is destroyed everytime we retry, we are recreating a TCPStore server already; otherwise, we need more changes to the elastic code. 3. This changes will force broadcasting a new mast address and port everytime when dynamic_rendezvous's `next_rendezvous` is called. Send a PR first to collect feedback. cc XilunWu H-Huang awgu kwen2501 wanchaol fegin wz337 wconstab d4l3k c-p-i-o [ghstack-poisoned]
d4l3k
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good -- can we add some test for this?
…ous" 1. We want to take option 3 as discussed in #135712, so every time when we retry, we create a new TCPStore so that we don't need to append attemp prefix and avoid eventually TCPStore sync failure. 2. Upon checking the code, we creating a new TCPStore in c10d_rendezvous_backend.py before we even hitting the the logic inside dynamic_rendezvous.py. Since we use `multi_tenant=True`, so we don't create new server in TCPStore.cpp if this flag is set: ```cpp if (opts.multiTenant) { std::lock_guard<std::mutex> guard{cache_mutex_}; // If the caller is okay with a multi-tenant store, first check if we // already have a TCPServer running on the specified port. if (opts.port > 0) { auto pos = cachedServers_.find(opts.port); if (pos != cachedServers_.end()) { server = pos->second.lock(); if (server != nullptr) { return server; } // Looks like the TCPStore has been disposed, make sure that we release // the control block. cachedServers_.erase(pos); } } server = startCore(); cachedServers_.emplace(server->port(), server); } ``` so if we are sure the TCPStore server is destroyed everytime we retry, we are recreating a TCPStore server already; otherwise, we need more changes to the elastic code. 3. This changes will force broadcasting a new mast address and port everytime when dynamic_rendezvous's `next_rendezvous` is called. Send a PR first to collect feedback. cc XilunWu H-Huang awgu kwen2501 wanchaol fegin wz337 wconstab d4l3k c-p-i-o [ghstack-poisoned]
…ous" 1. We want to take option 3 as discussed in #135712, so every time when we retry, we create a new TCPStore so that we don't need to append attemp prefix and avoid eventually TCPStore sync failure. 2. Upon checking the code, we creating a new TCPStore in c10d_rendezvous_backend.py before we even hitting the the logic inside dynamic_rendezvous.py. Since we use `multi_tenant=True`, so we don't create new server in TCPStore.cpp if this flag is set: ```cpp if (opts.multiTenant) { std::lock_guard<std::mutex> guard{cache_mutex_}; // If the caller is okay with a multi-tenant store, first check if we // already have a TCPServer running on the specified port. if (opts.port > 0) { auto pos = cachedServers_.find(opts.port); if (pos != cachedServers_.end()) { server = pos->second.lock(); if (server != nullptr) { return server; } // Looks like the TCPStore has been disposed, make sure that we release // the control block. cachedServers_.erase(pos); } } server = startCore(); cachedServers_.emplace(server->port(), server); } ``` so if we are sure the TCPStore server is destroyed everytime we retry, we are recreating a TCPStore server already; otherwise, we need more changes to the elastic code. 3. This changes will force broadcasting a new mast address and port everytime when dynamic_rendezvous's `next_rendezvous` is called. Send a PR first to collect feedback. cc XilunWu H-Huang awgu kwen2501 wanchaol fegin wz337 wconstab d4l3k c-p-i-o [ghstack-poisoned]
|
Looks like there are some existing test cases already, we just need to update them. |
…ous" 1. We want to take option 3 as discussed in #135712, so every time when we retry, we create a new TCPStore server first so that we don't need to append attempt count as prefix and avoid eventually TCPStore sync failure. (This is only for the TCPStore sharing enabled case) 2. We reuse the port from the newly created TCPStore server and first bind it to port 0 so that the system will allocate a new available one to us. 3. Then the port be broadcasted for dynamic_rendezvous. Only one more question, what do we do about the store created from (_create_tcp_store) torch/distributed/elastic/rendezvous/c10d_rendezvous_backend.py, are we ok with creating a duplicate TCPStore server? cc XilunWu H-Huang awgu kwen2501 wanchaol fegin wz337 wconstab d4l3k c-p-i-o [ghstack-poisoned]
1. We want to take option 3 as discussed in #135712, so every time when we retry, we create a new TCPStore server first so that we don't need to append attempt count as prefix and avoid eventually TCPStore sync failure. (This is only for the TCPStore sharing enabled case) 2. We start a new server bound to an ephemeral port (i.e. 0) so it gets assigned to a free port. We then pass that downstream (trainer or c10d). By doing so, TCPStore is managed by the elastic agent rather than having a race condition on binding to a specific port in the trainer. 3. Then the port be broadcasted for dynamic_rendezvous. Only one more question, what do we do about the store created from (_create_tcp_store) torch/distributed/elastic/rendezvous/c10d_rendezvous_backend.py, are we ok with creating a duplicate TCPStore server? cc XilunWu H-Huang awgu kwen2501 wanchaol fegin wz337 wconstab d4l3k c-p-i-o [ghstack-poisoned]
1. We want to take option 3 as discussed in #135712, so every time when we retry, we create a new TCPStore server first so that we don't need to append attempt count as prefix and avoid eventually TCPStore sync failure. (This is only for the TCPStore sharing enabled case) 2. We start a new server bound to an ephemeral port (i.e. 0) so it gets assigned to a free port. We then pass that downstream (trainer or c10d). By doing so, TCPStore is managed by the elastic agent rather than having a race condition on binding to a specific port in the trainer. 3. Then the port be broadcasted for dynamic_rendezvous. Only one more question, what do we do about the store created from (_create_tcp_store) torch/distributed/elastic/rendezvous/c10d_rendezvous_backend.py, are we ok with creating a duplicate TCPStore server? cc XilunWu H-Huang awgu kwen2501 wanchaol fegin wz337 wconstab d4l3k c-p-i-o [ghstack-poisoned]
|
@fduwjj has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@pytorchbot merge (Initiating merge automatically since Phabricator Diff has merged) |
Merge failedReason: This PR has internal changes and must be landed via Phabricator! Please try reimporting/rexporting the PR! Details for Dev Infra teamRaised by workflow job |
…h#135957) 1. We want to take option 3 as discussed in pytorch#135712, so every time when we retry, we create a new TCPStore server first so that we don't need to append attempt count as prefix and avoid eventually TCPStore sync failure. (This is only for the TCPStore sharing enabled case) 2. We start a new server bound to an ephemeral port (i.e. 0) so it gets assigned to a free port. We then pass that downstream (trainer or c10d). By doing so, TCPStore is managed by the elastic agent rather than having a race condition on binding to a specific port in the trainer. 3. Then the port be broadcasted for dynamic_rendezvous. Only one more question, what do we do about the store created from (_create_tcp_store) torch/distributed/elastic/rendezvous/c10d_rendezvous_backend.py, are we ok with creating a duplicate TCPStore server? Pull Request resolved: pytorch#135957 Approved by: https://github.com/d4l3k, https://github.com/c-p-i-o
…pytorch#135957)" This reverts commit 5033a1c. Reverted pytorch#135957 on behalf of https://github.com/facebook-github-bot due to Diff reverted internally ([comment](pytorch#135957 (comment)))
|
@pytorchbot merge -f 'landed internally' |
Merge startedYour change will be merged immediately since you used the force (-f) flag, bypassing any CI checks (ETA: 1-5 minutes). Please use Learn more about merging in the wiki. Questions? Feedback? Please reach out to the PyTorch DevX Team |
Merge failedReason: This PR has internal changes and must be landed via Phabricator! Please try reimporting/rexporting the PR! Details for Dev Infra teamRaised by workflow job |
|
@pytorchbot merge -f 'landed internally. one more try' |
Merge startedYour change will be merged immediately since you used the force (-f) flag, bypassing any CI checks (ETA: 1-5 minutes). Please use Learn more about merging in the wiki. Questions? Feedback? Please reach out to the PyTorch DevX Team |
Merge failedReason: This PR has internal changes and must be landed via Phabricator! Please try reimporting/rexporting the PR! Details for Dev Infra teamRaised by workflow job |
|
Please note. This change was reverted internally. I suggest we close this PR and open a new one if we want to land this change |
|
recreate a PR in #136768 |
Stack from ghstack (oldest at bottom):
Only one more question, what do we do about the store created from (_create_tcp_store) torch/distributed/elastic/rendezvous/c10d_rendezvous_backend.py, are we ok with creating a duplicate TCPStore server?
cc @XilunWu @H-Huang @awgu @kwen2501 @wanchaol @fegin @wz337 @wconstab @d4l3k @c-p-i-o
Differential Revision: D63396829