Skip to content

Hardcoded time outs lead to complete teardown of communication layer #4118

@jglaser

Description

@jglaser

What happened:

A long-running calculation (TPC-X BB benchmark with dask/RAPIDS, query 2) with UCX always crashes in the distributed communication layer. I am using 162 workers. The error reproducibly shows up as

Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /gpfs/alpine/world-shared/bif128/rapids-env/lib/python3.7/site-packages/distributed-2.25.0+6.g73fa9bd-py3.7.egg/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
  File "/gpfs/alpine/world-shared/bif128/rapids-env/lib/python3.7/site-packages/distributed-2.25.0+6.g73fa9bd-py3.7.egg/distributed/comm/core.py", line 297, in _
    handshake = await asyncio.wait_for(comm.read(), 1)
  File "/autofs/nccs-svm1_sw/summit/.swci/0-core/opt/spack/20180914/linux-rhel7-ppc64le/gcc-4.8.5/python-3.7.0-ei3mpdncii74xsn55t5kxpuc46i3oezn/lib/python3.7/asyncio/tasks.py", line 405, in wait_for
    await waiter
concurrent.futures._base.CancelledError

in the log files, followed by many pages of other errors.

What you expected to happen:

Successful completion of the calculation.

Minimal Complete Verifiable Example:

N/A

Anything else we need to know?:

After investigating, it turns out that distributed sets some hard-coded ad-hoc time outs in the code. When these expire, asyncio generates a CancelledError exception, which in turn takes down the UCX end point, and by a chain reaction, the entire cluster including the scheduler.

Example 1: (actually looks like a bug)

_(), timeout=min(deadline - time(), retry_timeout_backoff)

Context:

                with suppress(TimeoutError):
                    comm = await asyncio.wait_for(
                        _(), timeout=min(deadline - time(), retry_timeout_backoff)
                    )

Here, the timeout is taken as the minimum between the remaining time and an ad-hoc generated random time between 1.4 and 1.6s (retry_timeout_backoff). I assume the maximum is correct here, like this

                        _(), timeout=max(deadline - time(), retry_timeout_backoff)

The current implementation limits the maximum waiting time to always less than two seconds instead of the configured time in distributed.comm.timeouts.connect.

Example 2

In the communication handshake, here and here, timeouts are hardcoded to one second. This causes the same problem and they should be made configurable. In order to workaround my issue, I multiplied these timeouts by a factor of 100, or set them to deadline - time(), respectively.

Environment:

  • Dask version: 2.25.0
  • Python version: 3.7.0
  • Operating System: Linux
  • Install method (conda, pip, source): source

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions