-
-
Notifications
You must be signed in to change notification settings - Fork 750
Fix distributed comm connect timeout logic #4167
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The previous timeout was too small for a connect on a highly loaded system (e.g. one oversubscribed with worker threads). We do three things here: - Move the connect retry loop out to its own separate task. The loop with backoff was to retry to work with things like dynamic DNS (e.g. k8s), which should only affect the initial connect. The handshake can then happen with a separate timeout. This shortens the amount of work being done in the capped timeout, which helps a bit. - We up the initial minimum timeout to something larger. Originally it was 1.5 with some jitter, we up it to a minimum of 5 seconds or a fraction of the total timeout. Since a DNS race condition should only happen rarely, this shouldn't cause degradation in that case but gives a longer initial connect attempt in the common case. - Call `comm.abort` on handshake timeout in `inproc` comms, rather than letting the comm finalizer handle it (squashes an innocuous warning).
|
Hi @jcrist thanks for raising this, I'll try it out and get back to you. In the meantime, could you please help me understand what you mean by
? |
This is specific to the bug a user reported in prefect - they had created a cluster using However, this configuration revealed an issue in the connect timeout logic that was only found due to the above configuration overloading the python VM resulting in slow connections. I suspect the same bug will be hit by some more realistic distributed systems as well (possibly your bug). |
|
Test failure seems unrelated (flaky test?) |
|
Yeah, flaky test (with tornado 5 & windows). #4166 is changing it to a skip. |
|
This introduces a logic bug, do not merge, will fix later today. |
|
Gave it a try with b = bag.from_sequence([1]*100_000, partition_size=1)
bb = b.map_partitions(lambda *_, **__: [b'1'*2**20]*5).persist()
bc = bb.repartition(1000).persist(). Doesn't fix #4080 . But it gives new errors: |
| async def _(): | ||
| comm = await connector.connect( | ||
| loc, deserialize=deserialize, **connection_args | ||
| with suppress(TimeoutError): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why that? If the server socket is saturated/unresponsive, this may timeout, hit the break, and spam connection requests without any backoff? Maybe I understand it wrong...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's part of the logic bug I mentioned. The other part of the bug has to do with failures/timeouts in the handshake not being properly handled (which would lead to your increase in handshake errors). I'm not sure what the cause of your other new errors is though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure on the cause of the 2nd error ether. My intuituon is that os may have ran out of ports. Happy to give it an other try later. Thx for looking into this:))
|
Hello @jcrist ! I am using Dask Yarn & Dask Distributed 2.23. I am having trouble getting my client to connect to my scheduler in a 3000-worker cluster. I wonder if this issue is related to what I am seeing. If so, is there anything I can do on my side right now to alleviate this issue? Thank you! |
|
For a cluster that large, I'd expect the scheduler to get bogged down a bit and be slower to respond, which would result in the timeout issues you're seeing. Unfortunately the handshake timeouts (which are the likely culprit on a system that large) are hardcoded in the latest release, so you can't easily adjust them. You might try increasing the connect timeouts on your client machine to something like
The issue I was describing is very specific to one of our test setups, and isn't realistic for real-world workflows. It does show the same issue you're seeing though, which is why I was using it as a test case. A dask worker process is free to use as many threads as the application can make use of (for pandas heavy work I've found ~4 threads to be a good rule of thumb). Increasing or decreasing the number of threads won't affect the comms if the workers are running in separate processes (which is true for you and most users). |
|
Hello @jcrist ! Thanks for answering so quick. 3 questions after reading your reply. I tried increasing client timeout to 100s, but that didn't work. I spent some time tweaking linux settings on the machine running scheduler process (fd limit, sync queue max length, etc). My client and scheduler are on different machines. |
I think so. There are a few other timeouts you could try (see #4118 (comment)), but I doubt they'd help. You could also try downgrading to distributed 2.22.0, which shouldn't have this issue (you may still need to up the connect timeout, but there's no hardcoded handshake timeout in 2.22.0).
Unlikely, the timeouts that are likely causing your issue are at the application level and are more affected by the Python VM and dask code than linux settings.
Usually I'd recommend applying config like this uniformly, but for your case (client not connecting, but rest of cluster up), the timeouts would be on the client machine. If you restart and retry with the other timeouts I mentioned above, it'd be best to apply those settings uniformly throughout the cluster. |
|
@jcrist , I tried with the timeout across the cluster. Strangely, my client doesn't have problems connecting to a 3000-worker cluster with 2GB worker mem, but has problems with 14GB worker mem. I thought # of worker was the culprit. Is this symptom consistent with what you would expect? |
|
Hmmm, that's interesting, but doesn't really make sense to me. Perhaps your resource manager (YARN) is packing the smaller workers/scheduler closer together network-wise, reducing networking overhead? Failures like this have so many confounding variables (how busy were the machines, how busy was the network, ...), my guess is you just happened to get a successful connection and it has nothing to do with the size of the workers. |
|
@jcrist Yeah you are right. I teared down the cluster and re-tried, and it doesn't work on 2GB workers. I'm trying on 2.22.0, and I'll try with a new Dask Distributed release - when could I expect Dask Distributed 2.30.1? :) |
|
Can this be closed in favor of #4176? |
|
Yeah that was my intention, forgot. Thanks. |

The previous timeout was limited to too short a timeout per attempt for a connect & handshake on a highly loaded system (e.g. one oversubscribed with worker threads). We do three things here:
Move the connect retry loop out to its own separate task. The loop with backoff was to retry to work with things like dynamic DNS (e.g. k8s), which should only affect the initial connect. The handshake can then happen with a separate timeout. This shortens the amount of work being done in the capped timeout, which helps a bit.
We up the initial minimum timeout to something larger. Originally it was 1.5 with some jitter, we up it to a minimum of 5 seconds or a fraction of the total timeout. Since a DNS race condition should only happen rarely, this shouldn't cause degradation in that case but gives a longer initial connect attempt in the common case.
Call
comm.aborton handshake timeout ininproccomms, rather than letting the comm finalizer handle it (squashes an innocuous warning).Fixes #4165.
Note that at some level of inproc workers things will still fail due to limits of the python VM, so this isn't really a real use case (users should have multiple threads for a single inproc worker, not multiple inproc workers). However, I think the fix provided here is still useful for some more realistic systems, as before the full connect process (connect & handshake) would get the following timeout attempts by default:
[1.5, 2.25, 3.375, 2.875], none of which may have been long enough to complete in a distributed setting.