Skip to content

GPU Tokenization #6718

@quasiben

Description

@quasiben

In exploring some issues with @pentschev we found ourselves looking at tokenize. Typically, dask operates with the assumption that the hashing function is deterministic even when applied to data like numpy arrays:

In [1]: from dask.base import tokenize

In [2]: import numpy as np

In [3]: tokenize(np.array([1, 2, 3]))
Out[3]: '6df7d63420af1411f533b4050768e4a1'

In [4]: tokenize(np.array([1, 2, 3]))
Out[4]: '6df7d63420af1411f533b4050768e4a1'

In the above np.array([1, 2, 3]) is always tokenized to the same value. However, this is not true for GPU objects such as cupy or cudf:

In [5]: import cupy as cp

In [6]: tokenize(cp.array([1, 2, 3]))
Out[6]: '87831aff129fae6d9d2e36fe05430d70'

In [7]: tokenize(cp.array([1, 2, 3]))
Out[7]: '36ddd35ef2ac1a5c281f3b60ca7250a5'

I don't think there are any safety issues here -- dask and GPUs have been playing nicely for some time now. However, without deterministic hashing is data more likely to be duplicated/passed ? Scattering kind of shows this to be true:

In [9]: f = client.scatter(np.array([1, 2, 3]), broadcast=True)

In [10]: f
Out[10]: <Future: finished, type: numpy.ndarray, key: ndarray-6df7d63420af1411f533b4050768e4a1>

In [11]: client.who_has()
Out[11]:
{'ndarray-6df7d63420af1411f533b4050768e4a1': ('tcp://127.0.0.1:39983',
  'tcp://127.0.0.1:41329')}

In [12]: f2 = client.scatter(np.array([1, 2, 3]), broadcast=True)

In [13]: client.who_has()
Out[13]:
{'ndarray-6df7d63420af1411f533b4050768e4a1': ('tcp://127.0.0.1:39983',
  'tcp://127.0.0.1:41329')}

In [14]: f3 = client.scatter(np.array([1, 2, 3]), broadcast=True)

In [15]: client.who_has()
Out[15]:
{'ndarray-6df7d63420af1411f533b4050768e4a1': ('tcp://127.0.0.1:39983',
  'tcp://127.0.0.1:41329')}

Dask recognizes that the key (based on the token) already exists on the worker and does not move data from the client to the workers. However, with CuPy arrays we do see this duplication:

In [16]: f4 = client.scatter(cp.array([1, 2, 3]), broadcast=True)

In [17]: client.who_has()
Out[17]:
{'ndarray-6df7d63420af1411f533b4050768e4a1': ('tcp://127.0.0.1:39983',
  'tcp://127.0.0.1:41329'),
 'ndarray-9e308d79b5c73000133e434a5cc306f8': ('tcp://127.0.0.1:39983',
  'tcp://127.0.0.1:41329')}

In [18]: f5 = client.scatter(cp.array([1, 2, 3]), broadcast=True)

In [19]: client.who_has()
Out[19]:
{'ndarray-6df7d63420af1411f533b4050768e4a1': ('tcp://127.0.0.1:39983',
  'tcp://127.0.0.1:41329'),
 'ndarray-9e308d79b5c73000133e434a5cc306f8': ('tcp://127.0.0.1:39983',
  'tcp://127.0.0.1:41329'),
 'ndarray-5af7ad88c12452437b68b7dcdc7e0c23': ('tcp://127.0.0.1:39983',
  'tcp://127.0.0.1:41329')}

Would implementing a GPU hashing scheme be helpful here ?

cc @jakirkham @madsbk @rjzamora

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions