-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
In exploring some issues with @pentschev we found ourselves looking at tokenize. Typically, dask operates with the assumption that the hashing function is deterministic even when applied to data like numpy arrays:
In [1]: from dask.base import tokenize
In [2]: import numpy as np
In [3]: tokenize(np.array([1, 2, 3]))
Out[3]: '6df7d63420af1411f533b4050768e4a1'
In [4]: tokenize(np.array([1, 2, 3]))
Out[4]: '6df7d63420af1411f533b4050768e4a1'In the above np.array([1, 2, 3]) is always tokenized to the same value. However, this is not true for GPU objects such as cupy or cudf:
In [5]: import cupy as cp
In [6]: tokenize(cp.array([1, 2, 3]))
Out[6]: '87831aff129fae6d9d2e36fe05430d70'
In [7]: tokenize(cp.array([1, 2, 3]))
Out[7]: '36ddd35ef2ac1a5c281f3b60ca7250a5'
I don't think there are any safety issues here -- dask and GPUs have been playing nicely for some time now. However, without deterministic hashing is data more likely to be duplicated/passed ? Scattering kind of shows this to be true:
In [9]: f = client.scatter(np.array([1, 2, 3]), broadcast=True)
In [10]: f
Out[10]: <Future: finished, type: numpy.ndarray, key: ndarray-6df7d63420af1411f533b4050768e4a1>
In [11]: client.who_has()
Out[11]:
{'ndarray-6df7d63420af1411f533b4050768e4a1': ('tcp://127.0.0.1:39983',
'tcp://127.0.0.1:41329')}
In [12]: f2 = client.scatter(np.array([1, 2, 3]), broadcast=True)
In [13]: client.who_has()
Out[13]:
{'ndarray-6df7d63420af1411f533b4050768e4a1': ('tcp://127.0.0.1:39983',
'tcp://127.0.0.1:41329')}
In [14]: f3 = client.scatter(np.array([1, 2, 3]), broadcast=True)
In [15]: client.who_has()
Out[15]:
{'ndarray-6df7d63420af1411f533b4050768e4a1': ('tcp://127.0.0.1:39983',
'tcp://127.0.0.1:41329')}Dask recognizes that the key (based on the token) already exists on the worker and does not move data from the client to the workers. However, with CuPy arrays we do see this duplication:
In [16]: f4 = client.scatter(cp.array([1, 2, 3]), broadcast=True)
In [17]: client.who_has()
Out[17]:
{'ndarray-6df7d63420af1411f533b4050768e4a1': ('tcp://127.0.0.1:39983',
'tcp://127.0.0.1:41329'),
'ndarray-9e308d79b5c73000133e434a5cc306f8': ('tcp://127.0.0.1:39983',
'tcp://127.0.0.1:41329')}
In [18]: f5 = client.scatter(cp.array([1, 2, 3]), broadcast=True)
In [19]: client.who_has()
Out[19]:
{'ndarray-6df7d63420af1411f533b4050768e4a1': ('tcp://127.0.0.1:39983',
'tcp://127.0.0.1:41329'),
'ndarray-9e308d79b5c73000133e434a5cc306f8': ('tcp://127.0.0.1:39983',
'tcp://127.0.0.1:41329'),
'ndarray-5af7ad88c12452437b68b7dcdc7e0c23': ('tcp://127.0.0.1:39983',
'tcp://127.0.0.1:41329')}Would implementing a GPU hashing scheme be helpful here ?