-
-
Notifications
You must be signed in to change notification settings - Fork 750
Description
What happened:
A long-running calculation (TPC-X BB benchmark with dask/RAPIDS, query 2) with UCX always crashes in the distributed communication layer. I am using 162 workers. The error reproducibly shows up as
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /gpfs/alpine/world-shared/bif128/rapids-env/lib/python3.7/site-packages/distributed-2.25.0+6.g73fa9bd-py3.7.egg/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/gpfs/alpine/world-shared/bif128/rapids-env/lib/python3.7/site-packages/distributed-2.25.0+6.g73fa9bd-py3.7.egg/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/asyncio/tasks.py", line 405, in wait_for
await waiter
concurrent.futures._base.CancelledError
in the log files, followed by many pages of other errors.
What you expected to happen:
Successful completion of the calculation.
Minimal Complete Verifiable Example:
N/A
Anything else we need to know?:
After investigating, it turns out that distributed sets some hard-coded ad-hoc time outs in the code. When these expire, asyncio generates a CancelledError exception, which in turn takes down the UCX end point, and by a chain reaction, the entire cluster including the scheduler.
Example 1: (actually looks like a bug)
distributed/distributed/comm/core.py
Line 318 in ecaf140
| _(), timeout=min(deadline - time(), retry_timeout_backoff) |
Context:
with suppress(TimeoutError):
comm = await asyncio.wait_for(
_(), timeout=min(deadline - time(), retry_timeout_backoff)
)
Here, the timeout is taken as the minimum between the remaining time and an ad-hoc generated random time between 1.4 and 1.6s (retry_timeout_backoff). I assume the maximum is correct here, like this
_(), timeout=max(deadline - time(), retry_timeout_backoff)
The current implementation limits the maximum waiting time to always less than two seconds instead of the configured time in distributed.comm.timeouts.connect.
Example 2
In the communication handshake, here and here, timeouts are hardcoded to one second. This causes the same problem and they should be made configurable. In order to workaround my issue, I multiplied these timeouts by a factor of 100, or set them to deadline - time(), respectively.
Environment:
- Dask version: 2.25.0
- Python version: 3.7.0
- Operating System: Linux
- Install method (conda, pip, source): source