Skip to content

Conversation

@mnarodovitch
Copy link

@mnarodovitch mnarodovitch commented Sep 27, 2020

#4080

did more digging with the reproducer and identified the failure mode, which is causing connection timeouts and failed tasks on our 50 worker cluster.

b = bag.from_sequence([1]*100_000, partition_size=1)
bb = b.map_partitions(lambda *_, **__: [b'1'*2**20]*5).persist()
bc = bb.repartition(1000).persist()
  1. For the repartition step, the workers will hit the scheduler with 100_000 who_has requests in a quite short period in time.
  2. The hits will go through the ConnectionPool of Worker.scheduler, which has the default connection limit of 512. This will result in ~25,000 concurrent connection attempts and saturate the scheduler listener socket.

Logs would typically output:
Tornado connection attempt - message: Connect start: <SCHEDULER> 170,000 events - Worker side
Tornado connection success - message: Connect done 16,000 events - Worker side
Tornado connection accept - message: On connection 13,000 events - Scheduler side
Dask handshake success - message: handshake: 6,000 events - Worker and Scheduler side

Workers would typically fail tasks or crash due to connection timeouts.

  1. Setting a limit of 8 concurrent connections to the scheduler per worker resolved the issue

I doubt that this is a good solution and do not intend to do further work on this PR. Nevertheless, I believe that this 'fix' helps to shed light on the root-cause of the issue.

Michael Narodovitch added 4 commits September 27, 2020 15:05
@mnarodovitch mnarodovitch changed the title Log Connection timed out... Sep 27, 2020
@mnarodovitch mnarodovitch changed the title Connection timed out... Fix connection timed out... Sep 28, 2020
@mrocklin
Copy link
Member

Thank you for submitting your thoughts here @michaelnarodovitch . This is helpful.

I'm curious why there are 100,000 who_has calls. Looking through the Worker implementation it looks like we call who_has only when there is something unexpected, and even then we're careful to exponentially back off on these calls.

Regardless, it seems like we might want to ease back on the 512 connection limit on workers generally, given that they might all be talking to the scheduler simultaneously.

@mnarodovitch
Copy link
Author

mnarodovitch commented Sep 29, 2020

Thanks again for looking into that :) Probably, I was not entirely correct with the explanation. What I see clear evidence for in the logs, are 51,988 stack-traces

File "/usr/local/lib/python3.6/site-packages/distributed/worker.py", line 2161, in query_who_has
response = await retry_operation(self.scheduler.who_has, keys=deps)
await self.query_who_has(dep)

thrown from gather_dep

else:
# Exponential backoff to avoid hammering scheduler/worker
self.repetitively_busy += 1
await asyncio.sleep(0.100 * 1.5 ** self.repetitively_busy)
# See if anyone new has the data
await self.query_who_has(dep)
self.ensure_communicating()
.

The way, I understand it, is that the workers would query the scheduler to get the keys for the re-partition step.

Maybe it would be a good idea to refine the ConnectionPool, so that the semaphores limit connections per host? i.e. a per-host semaphore with max 8 concurrent connections per-host?
Or process the who_has queries in a BatchedComm ?

Do you see any other types of requests, which might stress the scheduler in a similar way, and may require special care?

Base automatically changed from master to main March 8, 2021 19:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants