-
Notifications
You must be signed in to change notification settings - Fork 26.3k
Implement backend-agnostic rpc._wait_all_workers() utility (#30692) #30693
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…0692) Summary: Pull Request resolved: pytorch#30692 We need a backend-agnostic mechanism to do barrier-like operation before locally destroy RRef context and shutdown RPC Agent. ghstack-source-id: 94872632 Test Plan: # Unit tests ``` buck test mode/dev-nosan //caffe2/test:rpc_fork -- test_wait_all_workers buck-out/gen/caffe2/test/rpc_fork\#binary.par -r test_wait_all_workers$ ``` ``` buck test mode/dev-nosan //caffe2/test:rpc_fork_thrift -- test_wait_all_workers buck-out/gen/caffe2/test/rpc_fork_thrift\#binary.par -r test_wait_all_workers$ ``` Differential Revision: D18643137 fbshipit-source-id: 3a4ffbe948f249d328e79d8e6603a941a7b72ff5
|
This pull request was exported from Phabricator. Differential Revision: D18643137 |
mrshenli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test failures are real:
Dec 03 23:45:14 ======================================================================
Dec 03 23:45:14 FAIL [0.871s]: test_nested_rref_stress (__main__.RpcTestWithSpawn)
Dec 03 23:45:14 ----------------------------------------------------------------------
Dec 03 23:45:14 Traceback (most recent call last):
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_distributed.py", line 130, in wrapper
Dec 03 23:45:14 self._join_processes(fn)
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_distributed.py", line 211, in _join_processes
Dec 03 23:45:14 self._check_return_codes(elapsed_time)
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_distributed.py", line 231, in _check_return_codes
Dec 03 23:45:14 self.assertEqual(p.exitcode, first_process.exitcode)
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_utils.py", line 839, in assertEqual
Dec 03 23:45:14 super(TestCase, self).assertLessEqual(abs(x - y), prec, message)
Dec 03 23:45:14 AssertionError: 1 not less than or equal to 1e-05 :
Dec 03 23:45:14
Dec 03 23:45:14 ======================================================================
Dec 03 23:45:14 FAIL [0.702s]: test_rref_forward_chain (__main__.RpcTestWithSpawn)
Dec 03 23:45:14 ----------------------------------------------------------------------
Dec 03 23:45:14 Traceback (most recent call last):
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_distributed.py", line 130, in wrapper
Dec 03 23:45:14 self._join_processes(fn)
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_distributed.py", line 211, in _join_processes
Dec 03 23:45:14 self._check_return_codes(elapsed_time)
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_distributed.py", line 231, in _check_return_codes
Dec 03 23:45:14 self.assertEqual(p.exitcode, first_process.exitcode)
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_utils.py", line 839, in assertEqual
Dec 03 23:45:14 super(TestCase, self).assertLessEqual(abs(x - y), prec, message)
Dec 03 23:45:14 AssertionError: 1 not less than or equal to 1e-05 :
Dec 03 23:45:14
Dec 03 23:45:14 ======================================================================
Dec 03 23:45:14 FAIL [0.676s]: test_shutdown (__main__.RpcTestWithSpawn)
Dec 03 23:45:14 ----------------------------------------------------------------------
Dec 03 23:45:14 Traceback (most recent call last):
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_distributed.py", line 130, in wrapper
Dec 03 23:45:14 self._join_processes(fn)
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_distributed.py", line 211, in _join_processes
Dec 03 23:45:14 self._check_return_codes(elapsed_time)
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_distributed.py", line 235, in _check_return_codes
Dec 03 23:45:14 self.assertEqual(first_process.exitcode, 0)
Dec 03 23:45:14 File "/var/lib/jenkins/workspace/test/common_utils.py", line 839, in assertEqual
Dec 03 23:45:14 super(TestCase, self).assertLessEqual(abs(x - y), prec, message)
Dec 03 23:45:14 AssertionError: 1 not less than or equal to 1e-05 :
| if _agent is None: | ||
| return | ||
|
|
||
| assert _ALL_WORKER_NAMES is not None, ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why using names instead of IDs for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because we would like to have better error message in _on_master_follower_report_done ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, just for ease of debugging. It's not an API on the critical path like send/recv. So I didn't put squeezing performance on my mind. We could always optimize it and use get_worker_info(id).name to print debugging messages.
| # worker1/2 calls this immediately and has some works after it. | ||
| # worker3 calls this immediately and has no more work. | ||
| rpc.api._wait_all_workers() | ||
| rpc.shutdown(graceful=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this test will actually call shutdown twice with graceful=False. This will be a little complicated to convey to users: allowing calling shutdown arbitrary times as long as graceful=False but only once if graceful=True. Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the call to shutdown is redundant, I wanted to call _wait_all_workers() explicitly to make sure it is truly tested.
Not intentional for shutdown. I think it's the conclusion that we want to first make wait_all_workers only support being called for once.
I can make it be ing supported to be called multiple times. But that will introduce a state, like context id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's the conclusion that we want to first make wait_all_workers only support being called for once.
Yes, and I think that's good enough for now. But, we do allow calling shutdown multiple times, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change essentially makes shutdown not support calling multiple times. An exception would be thrown on the second call.
| _wait_all_workers() | ||
| rpc.shutdown() | ||
| rpc.shutdown(graceful=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This changes the test behavior. IIUC, this was to test calling _wait_all_workers() and then call shutdown(graceful=True). cc @rohan-varma
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same reason, _wait_all_workers does not support being called twice.
|
Moved to #30710 due to consistent exporting failure. |
We need a backend-agnostic mechanism to do barrier-like operation before locally destroy RRef context and shutdown RPC Agent.
Differential Revision: D18643137