Skip to content

Conversation

@jcrist
Copy link
Member

@jcrist jcrist commented Oct 14, 2020

The previous timeout was limited to too short a timeout per attempt for a connect & handshake on a highly loaded system (e.g. one oversubscribed with worker threads). We do three things here:

  • Move the connect retry loop out to its own separate task. The loop with backoff was to retry to work with things like dynamic DNS (e.g. k8s), which should only affect the initial connect. The handshake can then happen with a separate timeout. This shortens the amount of work being done in the capped timeout, which helps a bit.

  • We up the initial minimum timeout to something larger. Originally it was 1.5 with some jitter, we up it to a minimum of 5 seconds or a fraction of the total timeout. Since a DNS race condition should only happen rarely, this shouldn't cause degradation in that case but gives a longer initial connect attempt in the common case.

  • Call comm.abort on handshake timeout in inproc comms, rather than letting the comm finalizer handle it (squashes an innocuous warning).

Fixes #4165.

Note that at some level of inproc workers things will still fail due to limits of the python VM, so this isn't really a real use case (users should have multiple threads for a single inproc worker, not multiple inproc workers). However, I think the fix provided here is still useful for some more realistic systems, as before the full connect process (connect & handshake) would get the following timeout attempts by default: [1.5, 2.25, 3.375, 2.875], none of which may have been long enough to complete in a distributed setting.

The previous timeout was too small for a connect on a highly loaded
system (e.g. one oversubscribed with worker threads). We do three things
here:

- Move the connect retry loop out to its own separate task. The loop
  with backoff was to retry to work with things like dynamic DNS (e.g.
  k8s), which should only affect the initial connect. The handshake can
  then happen with a separate timeout. This shortens the amount of work
  being done in the capped timeout, which helps a bit.

- We up the initial minimum timeout to something larger. Originally it
  was 1.5 with some jitter, we up it to a minimum of 5 seconds or a
  fraction of the total timeout. Since a DNS race condition should only
  happen rarely, this shouldn't cause degradation in that case but gives
  a longer initial connect attempt in the common case.

- Call `comm.abort` on handshake timeout in `inproc` comms, rather than
  letting the comm finalizer handle it (squashes an innocuous warning).
@TomAugspurger
Copy link
Member

Great to see this, thanks Jim!

Do you have a sense for whether this fixes the related(?) comms issues people have been seeing? ( #4133, #4080)

@jcrist
Copy link
Member Author

jcrist commented Oct 14, 2020

I'm not sure, but it might (though I suspect if it fixes #4133 that's because fewer connect failures avoid the underlying deadlock issue). #4080 might be helped by this though. I didn't see a clean reproducible example for either - I'll ping on #4080 to see if they can try it on their code.

@KrishanBhasin
Copy link
Contributor

Hi @jcrist thanks for raising this, I'll try it out and get back to you.

In the meantime, could you please help me understand what you mean by

users should have multiple threads for a single inproc worker, not multiple inproc workers

?

@jcrist
Copy link
Member Author

jcrist commented Oct 14, 2020

In the meantime, could you please help me understand what you mean by

This is specific to the bug a user reported in prefect - they had created a cluster using inproc comms with multiple workers, each with a single thread. There's no benefit to doing this, and it will be slower due to added serialization and management costs. An inproc (threads-only) cluster should really only have one "worker" that has access to multiple threads.

However, this configuration revealed an issue in the connect timeout logic that was only found due to the above configuration overloading the python VM resulting in slow connections. I suspect the same bug will be hit by some more realistic distributed systems as well (possibly your bug).

@jcrist
Copy link
Member Author

jcrist commented Oct 14, 2020

Test failure seems unrelated (flaky test?)

@TomAugspurger
Copy link
Member

Yeah, flaky test (with tornado 5 & windows). #4166 is changing it to a skip.

@jcrist
Copy link
Member Author

jcrist commented Oct 14, 2020

This introduces a logic bug, do not merge, will fix later today.

@mnarodovitch
Copy link

mnarodovitch commented Oct 14, 2020

Gave it a try with export DASK_DISTRIBUTED__COMM__RETRY__COUNT=10 and

b = bag.from_sequence([1]*100_000, partition_size=1)
bb = b.map_partitions(lambda *_, **__: [b'1'*2**20]*5).persist()
bc = bb.repartition(1000).persist()

. Doesn't fix #4080 .

But it gives new errors:
235,000 x Connection closed before handshake completed
1,600,000 x

distributed.utils_comm - INFO - Retrying functools.partial(<function PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc at 0x7ffb9a89d9d8>, keys=("('lambda-e654bde3f6d43fa0f1f600b833b0855b', 53065)",)) after exception in attempt 2/10: Timed out trying to connect to 'tcp://10.3.156.229:8786' after 10 s: Timed out trying to connect to 'tcp://10.3.156.229:8786' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x7ffb9bee3748>: OSError: [Errno 99] Cannot assign requested address
--

System dashboard of the scheduler
image

async def _():
comm = await connector.connect(
loc, deserialize=deserialize, **connection_args
with suppress(TimeoutError):
Copy link

@mnarodovitch mnarodovitch Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why that? If the server socket is saturated/unresponsive, this may timeout, hit the break, and spam connection requests without any backoff? Maybe I understand it wrong...

Copy link
Member Author

@jcrist jcrist Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's part of the logic bug I mentioned. The other part of the bug has to do with failures/timeouts in the handshake not being properly handled (which would lead to your increase in handshake errors). I'm not sure what the cause of your other new errors is though

Copy link

@mnarodovitch mnarodovitch Oct 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure on the cause of the 2nd error ether. My intuituon is that os may have ran out of ports. Happy to give it an other try later. Thx for looking into this:))

@jennakwon06
Copy link

Hello @jcrist !

I am using Dask Yarn & Dask Distributed 2.23.

I am having trouble getting my client to connect to my scheduler in a 3000-worker cluster.
It's not a firewall issue as I can connect to a 500-worker cluster with the same setup. I raised a stack overflow here .

I wonder if this issue is related to what I am seeing. If so, is there anything I can do on my side right now to alleviate this issue?
Also, I wonder what precisely you mean by "a highly loaded system (e.g. one oversubscribed with worker threads)". How many worker threads is too many?

Thank you!

@jcrist
Copy link
Member Author

jcrist commented Oct 22, 2020

For a cluster that large, I'd expect the scheduler to get bogged down a bit and be slower to respond, which would result in the timeout issues you're seeing. Unfortunately the handshake timeouts (which are the likely culprit on a system that large) are hardcoded in the latest release, so you can't easily adjust them. You might try increasing the connect timeouts on your client machine to something like 100s (easiest way would be setting an environment variable DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=100s), but that would only affect the time to get an active connection, not the handshake, so I wouldn't be surprised if that fails. PR #4176 is farther along than this PR, and is probably the fix you'll want.

Also, I wonder what precisely you mean by "a highly loaded system (e.g. one oversubscribed with worker threads)". How many worker threads is too many?

The issue I was describing is very specific to one of our test setups, and isn't realistic for real-world workflows. It does show the same issue you're seeing though, which is why I was using it as a test case. A dask worker process is free to use as many threads as the application can make use of (for pandas heavy work I've found ~4 threads to be a good rule of thumb). Increasing or decreasing the number of threads won't affect the comms if the workers are running in separate processes (which is true for you and most users).

@jennakwon06
Copy link

jennakwon06 commented Oct 22, 2020

Hello @jcrist ! Thanks for answering so quick. 3 questions after reading your reply.

I tried increasing client timeout to 100s, but that didn't work.
(1) Would the best course of action be to wait for a new release containing PR #4176 ?

I spent some time tweaking linux settings on the machine running scheduler process (fd limit, sync queue max length, etc).
(2) Does that help this issue at all?

My client and scheduler are on different machines.
(3) Is it important for dask.distributed Configuration settings to match between them?

@jcrist
Copy link
Member Author

jcrist commented Oct 22, 2020

Would the best course of action be to wait for a new release containing PR

I think so. There are a few other timeouts you could try (see #4118 (comment)), but I doubt they'd help.

You could also try downgrading to distributed 2.22.0, which shouldn't have this issue (you may still need to up the connect timeout, but there's no hardcoded handshake timeout in 2.22.0).

Does that help this issue at all?

Unlikely, the timeouts that are likely causing your issue are at the application level and are more affected by the Python VM and dask code than linux settings.

Is it important for dask.distributed Configuration settings to match between them?

Usually I'd recommend applying config like this uniformly, but for your case (client not connecting, but rest of cluster up), the timeouts would be on the client machine. If you restart and retry with the other timeouts I mentioned above, it'd be best to apply those settings uniformly throughout the cluster.

@jennakwon06
Copy link

@jcrist , I tried with the timeout across the cluster.

Strangely, my client doesn't have problems connecting to a 3000-worker cluster with 2GB worker mem, but has problems with 14GB worker mem.

I thought # of worker was the culprit. Is this symptom consistent with what you would expect?

@jcrist
Copy link
Member Author

jcrist commented Oct 22, 2020

Hmmm, that's interesting, but doesn't really make sense to me. Perhaps your resource manager (YARN) is packing the smaller workers/scheduler closer together network-wise, reducing networking overhead? Failures like this have so many confounding variables (how busy were the machines, how busy was the network, ...), my guess is you just happened to get a successful connection and it has nothing to do with the size of the workers.

@jennakwon06
Copy link

jennakwon06 commented Oct 22, 2020

@jcrist Yeah you are right. I teared down the cluster and re-tried, and it doesn't work on 2GB workers.

I'm trying on 2.22.0, and I'll try with a new Dask Distributed release - when could I expect Dask Distributed 2.30.1? :)

@TomAugspurger
Copy link
Member

Can this be closed in favor of #4176?

@jcrist
Copy link
Member Author

jcrist commented Oct 30, 2020

Yeah that was my intention, forgot. Thanks.

@jcrist jcrist closed this Oct 30, 2020
@jcrist jcrist deleted the timeout-fix-distributed-connect branch October 30, 2020 16:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Frequent CommClosed issues w/ inproc comms on Prefect

5 participants