Skip to content

Multiplying and converting units does not work with Dask distributed #5347

@bouweandela

Description

@bouweandela

🐛 Bug Report

How To Reproduce

Steps to reproduce the behaviour:

  1. Start a Dask distributed cluster
from distributed import Client
client = Client()
  1. Run the following code
import iris.cube
import dask.array as da

cube = iris.cube.Cube(da.arange(1), units='m')
print(cube.units)
cube.units *= 's-1'
cube.convert_units('m s-1')
print(cube.units)
cube.data

this results in the following output and stack trace:

m
m s-1

2023-06-13 14:45:46,860 - distributed.worker - WARNING - Compute Failed
Key:       ('arange-convert-04958e25515e3c121fef2da38b7c71b6', 0)
Function:  execute_task
args:      ((subgraph_callable-2202f709-07a7-4d1b-a05c-91e49aa96208, (functools.partial(<function arange at 0x7f5c4bcca340>, like=None), 0, 1, 1, 1, dtype('float64'))))
kwargs:    {}
Exception: 'ValueError("Unable to convert from \'Unit(\'unknown\')\' to \'Unit(\'m s-1\')\'.")'

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[3], line 10
      8 cube.convert_units('m s-1')
      9 print(cube.units)
---> 10 cube.data

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/iris/cube.py:2462, in Cube.data(self)
   2429 @property
   2430 def data(self):
   2431     """
   2432     The :class:`numpy.ndarray` representing the multi-dimensional data of
   2433     the cube.
   (...)
   2460 
   2461     """
-> 2462     return self._data_manager.data

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/iris/_data_manager.py:206, in DataManager.data(self)
    203 if self.has_lazy_data():
    204     try:
    205         # Realise the lazy data.
--> 206         result = as_concrete_data(self._lazy_array)
    207         # Assign the realised result.
    208         self._real_array = result

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/iris/_lazy_data.py:279, in as_concrete_data(data)
    262 """
    263 Return the actual content of a lazy array, as a numpy array.
    264 If the input data is a NumPy `ndarray` or masked array, return it
   (...)
    276 
    277 """
    278 if is_lazy_data(data):
--> 279     (data,) = _co_realise_lazy_arrays([data])
    281 return data

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/iris/_lazy_data.py:242, in _co_realise_lazy_arrays(arrays)
    227 def _co_realise_lazy_arrays(arrays):
    228     """
    229     Compute multiple lazy arrays and return a list of real values.
    230 
   (...)
    240 
    241     """
--> 242     computed_arrays = da.compute(*arrays)
    243     results = []
    244     for lazy_in, real_out in zip(arrays, computed_arrays):
    245         # Ensure we always have arrays.
    246         # Note : in some cases dask (and numpy) will return a scalar
    247         # numpy.int/numpy.float object rather than an ndarray.
    248         # Recorded in https://github.com/dask/dask/issues/2111.

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/dask/base.py:595, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    592     keys.append(x.__dask_keys__())
    593     postcomputes.append(x.__dask_postcompute__())
--> 595 results = schedule(dsk, keys, **kwargs)
    596 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/distributed/client.py:3243, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3241         should_rejoin = False
   3242 try:
-> 3243     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3244 finally:
   3245     for f in futures.values():

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/distributed/client.py:2368, in Client.gather(self, futures, errors, direct, asynchronous)
   2366 except ValueError:
   2367     local_worker = None
-> 2368 return self.sync(
   2369     self._gather,
   2370     futures,
   2371     errors=errors,
   2372     direct=direct,
   2373     local_worker=local_worker,
   2374     asynchronous=asynchronous,
   2375 )

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/distributed/utils.py:351, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    349     return future
    350 else:
--> 351     return sync(
    352         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    353     )

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/distributed/utils.py:418, in sync(loop, func, callback_timeout, *args, **kwargs)
    416 if error:
    417     typ, exc, tb = error
--> 418     raise exc.with_traceback(tb)
    419 else:
    420     return result

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/distributed/utils.py:391, in sync.<locals>.f()
    389         future = wait_for(future, callback_timeout)
    390     future = asyncio.ensure_future(future)
--> 391     result = yield future
    392 except Exception:
    393     error = sys.exc_info()

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/tornado/gen.py:767, in Runner.run(self)
    765 try:
    766     try:
--> 767         value = future.result()
    768     except Exception as e:
    769         # Save the exception for later. It's important that
    770         # gen.throw() not be called inside this try/except block
    771         # because that makes sys.exc_info behave unexpectedly.
    772         exc: Optional[Exception] = e

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/distributed/client.py:2231, in Client._gather(self, futures, errors, direct, local_worker)
   2229         exc = CancelledError(key)
   2230     else:
-> 2231         raise exception.with_traceback(traceback)
   2232     raise exc
   2233 if errors == "skip":

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/dask/optimization.py:992, in __call__()
    990 if not len(args) == len(self.inkeys):
    991     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 992 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/dask/core.py:151, in get()
    149 for key in toposort(dsk):
    150     task = dsk[key]
--> 151     result = _execute_task(task, cache)
    152     cache[key] = result
    153 result = _execute_task(out, cache)

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/dask/core.py:121, in _execute_task()
    117     func, args = arg[0], arg[1:]
    118     # Note: Don't assign the subtask results to a variable. numpy detects
    119     # temporaries by their reference count and can execute certain
    120     # operations in-place.
--> 121     return func(*(_execute_task(a, cache) for a in args))
    122 elif not ishashable(arg):
    123     return arg

File ~/mambaforge/envs/esmvalcore-v2.9.0rc1/lib/python3.11/site-packages/cf_units/__init__.py:1918, in convert()
   1916     return result
   1917 else:
-> 1918     raise ValueError(
   1919         "Unable to convert from '%r' to '%r'." % (self, other)
   1920     )

ValueError: Unable to convert from 'Unit('unknown')' to 'Unit('m s-1')'.

Expected behaviour

I would expect the code above to realize the cube data without throwing an exception. This works fine if I skip step 1), i.e. do not start a Dask distributed cluster.

Environment

  • OS & Version: 23.04
  • Iris Version: 3.6.0

Metadata

Metadata

Type

No type

Projects

Status

✅ Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions