Skip to content

Conversation

@mrocklin
Copy link
Member

Also check for invalid arguments

cc @pentschev

I ran into this while trying to get Xarray tests back up and running.

@pentschev
Copy link
Member

In general, this looks like a good improvement towards #4952, but I think in the future we may still try to improve this further to allow passing other exceptions to the user as well.

@mrocklin
Copy link
Member Author

I'm still working on this. It got a bit deeper than I expected. I'll take another crack at this tomorrow.

@pentschev
Copy link
Member

What is still left that you want to tackle?

mrocklin added 6 commits June 19, 2019 11:06
1.  We now allow non-array meta.  This ends up being common in temporary
    arrays for gufuncs and reductions
2.  Users can now specify explicit meta= in blockwise
3.  We no longer expect metas to support the astype method
@pentschev
Copy link
Member

I'd like to do another review once you're done, could you please ping me then @mrocklin ?

@mrocklin
Copy link
Member Author

I'd like to do another review once you're done, could you please ping me then @mrocklin ?

This got a bit more complex than it was previously. Review would be welcome.

@mrocklin mrocklin mentioned this pull request Jun 20, 2019

if (isinstance(chunks, str) or
isinstance(chunks, tuple) and
chunks and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't isinstance(chunks, str) or isinstance(chunks, tuple) and chunks redundant? If chunks is either str or tuple, then it can't be None or False.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line can be removed for code clarity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it can be (), which is falsey :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I didn't know about that, thanks for clarifying!

inds = tuple(range(x.ndim))
# The dtype of `tmp` doesn't actually matter, and may be incorrect.
tmp = blockwise(chunk, inds, x, inds, axis=axis, keepdims=True, dtype=x.dtype)
tmp = blockwise(chunk, inds, x, inds, axis=axis, keepdims=True, dtype=dtype or float)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't x.dtype be used anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes we construct Dask arrays that don't have array-like metadata. This tends to happen in reductions a lot where we pass through tuples or dicts rather than arrays. In these cases, calling x.dtype will err.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better than to have getattr(x, 'dtype', dtype) instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't really matter here. The dtype of tmp isn't very relevant. We ignore it in the following lines. We just need to not call x.dtype, which can now sometimes err.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, thanks for clarifying.

@shoyer
Copy link
Member

shoyer commented Jun 20, 2019

Testing this branch with pydata/xarray#3027 still shows one test failure:

Details
___________________________________________ TestDataset.test_dask_is_lazy ____________________________________________

values = dask.array<xarray-var1, shape=(8, 9), dtype=float64, chunksize=(8, 9)>, axis = None, skipna = None
kwargs = {}, func = <function nanmean at 0x110e686a8>, nanname = 'nanmean'
msg = 'mean is not yet implemented on dask arrays'

    def f(values, axis=None, skipna=None, **kwargs):
        if kwargs.pop('out', None) is not None:
            raise TypeError('`out` is not valid for {}'.format(name))

        values = asarray(values)

        if coerce_strings and values.dtype.kind in 'SU':
            values = values.astype(object)

        func = None
        if skipna or (skipna is None and values.dtype.kind in 'cfO'):
            nanname = 'nan' + name
            func = getattr(nanops, nanname)
        else:
            func = _dask_or_eager_func(name)

        try:
>           return func(values, axis=axis, **kwargs)

xarray/core/duck_array_ops.py:246:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

a = dask.array<xarray-var1, shape=(8, 9), dtype=float64, chunksize=(8, 9)>, axis = None, dtype = None, out = None

    def nanmean(a, axis=None, dtype=None, out=None):
        if a.dtype.kind == 'O':
            return _nanmean_ddof_object(0, a, axis=axis, dtype=dtype)

        if isinstance(a, dask_array_type):
>           return dask_array.nanmean(a, axis=axis, dtype=dtype)

xarray/core/nanops.py:157:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

a = dask.array<xarray-var1, shape=(8, 9), dtype=float64, chunksize=(8, 9)>, axis = None, dtype = None
keepdims = False, split_every = None, out = None

    def nanmean(a, axis=None, dtype=None, keepdims=False, split_every=None,
                out=None):
        if dtype is not None:
            dt = dtype
        else:
            dt = getattr(np.mean(np.empty(shape=(1,), dtype=a.dtype)), 'dtype', object)
        return reduction(a, partial(mean_chunk, sum=chunk.nansum, numel=nannumel),
                         mean_agg, axis=axis, keepdims=keepdims, dtype=dt,
                         split_every=split_every, out=out,
                         concatenate=False,
>                        combine=partial(mean_combine, sum=chunk.nansum, numel=nannumel))

../dask/dask/array/reductions.py:459:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

x = dask.array<xarray-var1, shape=(8, 9), dtype=float64, chunksize=(8, 9)>
chunk = functools.partial(<function mean_chunk at 0x1011f9158>, sum=<function nansum at 0x102be02f0>, numel=<function nannumel at 0x1011f90d0>, dtype=dtype('float64'))
aggregate = functools.partial(<function mean_agg at 0x1011f9268>, dtype=dtype('float64')), axis = (0, 1)
keepdims = False, dtype = dtype('float64'), split_every = None
combine = functools.partial(<function mean_combine at 0x1011f91e0>, sum=<function nansum at 0x102be02f0>, numel=<function nannumel at 0x1011f90d0>)
name = None, out = None, concatenate = False, output_size = 1, meta = None

    def reduction(x, chunk, aggregate, axis=None, keepdims=False, dtype=None,
                  split_every=None, combine=None, name=None, out=None,
                  concatenate=True, output_size=1, meta=None):
        """ General version of reductions

        Parameters
        ----------
        x: Array
            Data being reduced along one or more axes
        chunk: callable(x_chunk, axis, keepdims)
            First function to be executed when resolving the dask graph.
            This function is applied in parallel to all original chunks of x.
            See below for function parameters.
        combine: callable(x_chunk, axis, keepdims), optional
            Function used for intermediate recursive aggregation (see
            split_every below). If omitted, it defaults to aggregate.
            If the reduction can be performed in less than 3 steps, it will not
            be invoked at all.
        aggregate: callable(x_chunk, axis, keepdims)
            Last function to be executed when resolving the dask graph,
            producing the final output. It is always invoked, even when the reduced
            Array counts a single chunk along the reduced axes.
        axis: int or sequence of ints, optional
            Axis or axes to aggregate upon. If omitted, aggregate along all axes.
        keepdims: boolean, optional
            Whether the reduction function should preserve the reduced axes,
            leaving them at size ``output_size``, or remove them.
        dtype: np.dtype, optional
            Force output dtype. Defaults to x.dtype if omitted.
        split_every: int >= 2 or dict(axis: int), optional
            Determines the depth of the recursive aggregation. If set to or more
            than the number of input chunks, the aggregation will be performed in
            two steps, one ``chunk`` function per input chunk and a single
            ``aggregate`` function at the end. If set to less than that, an
            intermediate ``combine`` function will be used, so that any one
            ``combine`` or ``aggregate`` function has no more than ``split_every``
            inputs. The depth of the aggregation graph will be
            :math:`log_{split_every}(input chunks along reduced axes)`. Setting to
            a low value can reduce cache size and network transfers, at the cost of
            more CPU and a larger dask graph.

            Omit to let dask heuristically decide a good default. A default can
            also be set globally with the ``split_every`` key in
            :mod:`dask.config`.
        name: str, optional
            Prefix of the keys of the intermediate and output nodes. If omitted it
            defaults to the function names.
        out: Array, optional
            Another dask array whose contents will be replaced. Omit to create a
            new one. Note that, unlike in numpy, this setting gives no performance
            benefits whatsoever, but can still be useful  if one needs to preserve
            the references to a previously existing Array.
        concatenate: bool, optional
            If True (the default), the outputs of the ``chunk``/``combine``
            functions are concatenated into a single np.array before being passed
            to the ``combine``/``aggregate`` functions. If False, the input of
            ``combine`` and ``aggregate`` will be either a list of the raw outputs
            of the previous step or a single output, and the function will have to
            concatenate it itself. It can be useful to set this to False if the
            chunk and/or combine steps do not produce np.arrays.
        output_size: int >= 1, optional
            Size of the output of the ``aggregate`` function along the reduced
            axes. Ignored if keepdims is False.

        Returns
        -------
        dask array

        **Function Parameters**

        x_chunk: numpy.ndarray
            Individual input chunk. For ``chunk`` functions, it is one of the
            original chunks of x. For ``combine`` and ``aggregate`` functions, it's
            the concatenation of the outputs produced by the previous ``chunk`` or
            ``combine`` functions. If concatenate=False, it's a list of the raw
            outputs from the previous functions.
        axis: tuple
            Normalized list of axes to reduce upon, e.g. ``(0, )``
            Scalar, negative, and None axes have been normalized away.
            Note that some numpy reduction functions cannot reduce along multiple
            axes at once and strictly require an int in input. Such functions have
            to be wrapped to cope.
        keepdims: bool
            Whether the reduction function should preserve the reduced axes or
            remove them.
        """
        if axis is None:
            axis = tuple(range(x.ndim))
        if isinstance(axis, Integral):
            axis = (axis,)
        axis = validate_axis(axis, x.ndim)

        if dtype is None:
            raise ValueError("Must specify dtype")
        if 'dtype' in getargspec(chunk).args:
            chunk = partial(chunk, dtype=dtype)
        if 'dtype' in getargspec(aggregate).args:
            aggregate = partial(aggregate, dtype=dtype)

        # Map chunk across all blocks
        inds = tuple(range(x.ndim))
        # The dtype of `tmp` doesn't actually matter, and may be incorrect.
        tmp = blockwise(chunk, inds, x, inds, axis=axis, keepdims=True, dtype=dtype or float)
        tmp._chunks = tuple((output_size, ) * len(c) if i in axis else c
                            for i, c in enumerate(tmp.chunks))

        if meta is None and hasattr(x, '_meta'):
            try:
                reduced_meta = compute_meta(chunk, x.dtype, x._meta, axis=axis,
                                            keepdims=True, meta=True)
            except TypeError:
                reduced_meta = compute_meta(chunk, x.dtype, x._meta, axis=axis,
                                            keepdims=True)
            except ValueError:
                pass
        else:
            reduced_meta = None

        result = _tree_reduce(tmp, aggregate, axis, keepdims, dtype, split_every,
                              combine, name=name, concatenate=concatenate,
>                             reduced_meta=reduced_meta)

../dask/dask/array/reductions.py:163:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

x = dask.array<mean_chunk, shape=(1, 1), dtype=float64, chunksize=(1, 1)>
aggregate = functools.partial(<function mean_agg at 0x1011f9268>, dtype=dtype('float64')), axis = (0, 1)
keepdims = False, dtype = dtype('float64'), split_every = {0: 2, 1: 2}
combine = functools.partial(<function mean_combine at 0x1011f91e0>, sum=<function nansum at 0x102be02f0>, numel=<function nannumel at 0x1011f90d0>)
name = None, concatenate = False
reduced_meta = ImplicitToExplicitIndexingAdapter(array=MemoryCachedArray(array=CopyOnWriteArray(array=LazilyOuterIndexedArray(array=I...
         1.93375201,  1.11587938,  0.82373153, -1.35300754]])), key=BasicIndexer((slice(0, 0, 1), slice(0, 0, 1)))))))

    def _tree_reduce(x, aggregate, axis, keepdims, dtype, split_every=None,
                     combine=None, name=None, concatenate=True, reduced_meta=None):
        """ Perform the tree reduction step of a reduction.

        Lower level, users should use ``reduction`` or ``arg_reduction`` directly.
        """
        # Normalize split_every
        split_every = split_every or config.get('split_every', 4)
        if isinstance(split_every, dict):
            split_every = dict((k, split_every.get(k, 2)) for k in axis)
        elif isinstance(split_every, Integral):
            n = builtins.max(int(split_every ** (1 / (len(axis) or 1))), 2)
            split_every = dict.fromkeys(axis, n)
        else:
            raise ValueError("split_every must be a int or a dict")

        # Reduce across intermediates
        depth = 1
        for i, n in enumerate(x.numblocks):
            if i in split_every and split_every[i] != 1:
                depth = int(builtins.max(depth, ceil(log(n, split_every[i]))))
        func = partial(combine or aggregate, axis=axis, keepdims=True)
        if concatenate:
            func = compose(func, partial(_concatenate2, axes=axis))
        for i in range(depth - 1):
            x = partial_reduce(func, x, split_every, True, dtype=dtype,
                               name=(name or funcname(combine or aggregate)) + '-partial',
                               reduced_meta=reduced_meta)
        func = partial(aggregate, axis=axis, keepdims=keepdims)
        if concatenate:
            func = compose(func, partial(_concatenate2, axes=axis))
        return partial_reduce(func, x, split_every, keepdims=keepdims, dtype=dtype,
                              name=(name or funcname(aggregate)) + '-aggregate',
>                             reduced_meta=reduced_meta)

../dask/dask/array/reductions.py:205:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

func = functools.partial(<function mean_agg at 0x1011f9268>, dtype=dtype('float64'), axis=(0, 1), keepdims=False)
x = dask.array<mean_chunk, shape=(1, 1), dtype=float64, chunksize=(1, 1)>, split_every = {0: 2, 1: 2}
keepdims = False, dtype = dtype('float64'), name = 'mean_agg-aggregate-e8bf85c183b943d8c3f90251685a79b6'
reduced_meta = ImplicitToExplicitIndexingAdapter(array=MemoryCachedArray(array=CopyOnWriteArray(array=LazilyOuterIndexedArray(array=I...
         1.93375201,  1.11587938,  0.82373153, -1.35300754]])), key=BasicIndexer((slice(0, 0, 1), slice(0, 0, 1)))))))

    def partial_reduce(func, x, split_every, keepdims=False, dtype=None, name=None,
                       reduced_meta=None):
        """ Partial reduction across multiple axes.

        Parameters
        ----------
        func : function
        x : Array
        split_every : dict
            Maximum reduction block sizes in each dimension.

        Examples
        --------
        Reduce across axis 0 and 2, merging a maximum of 1 block in the 0th
        dimension, and 3 blocks in the 2nd dimension:

        >>> partial_reduce(np.min, x, {0: 1, 2: 3})    # doctest: +SKIP
        """
        name = (name or funcname(func)) + '-' + tokenize(func, x, split_every,
                                                         keepdims, dtype)
        parts = [list(partition_all(split_every.get(i, 1), range(n))) for (i, n)
                 in enumerate(x.numblocks)]
        keys = product(*map(range, map(len, parts)))
        out_chunks = [tuple(1 for p in partition_all(split_every[i], c)) if i
                      in split_every else c for (i, c) in enumerate(x.chunks)]
        if not keepdims:
            out_axis = [i for i in range(x.ndim) if i not in split_every]
            getter = lambda k: get(out_axis, k)
            keys = map(getter, keys)
            out_chunks = list(getter(out_chunks))
        dsk = {}
        for k, p in zip(keys, product(*parts)):
            decided = dict((i, j[0]) for (i, j) in enumerate(p) if len(j) == 1)
            dummy = dict(i for i in enumerate(p) if i[0] not in decided)
            g = lol_tuples((x.name,), range(x.ndim), decided, dummy)
            dsk[(name,) + k] = (func, g)
        graph = HighLevelGraph.from_collections(name, dsk, dependencies=[x])

        meta = x._meta
        if reduced_meta is not None:
            try:
>               meta = func(reduced_meta, meta=True)

../dask/dask/array/reductions.py:249:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

pairs = ImplicitToExplicitIndexingAdapter(array=MemoryCachedArray(array=CopyOnWriteArray(array=LazilyOuterIndexedArray(array=I...
         1.93375201,  1.11587938,  0.82373153, -1.35300754]])), key=BasicIndexer((slice(0, 0, 1), slice(0, 0, 1)))))))
dtype = dtype('float64'), axis = (0, 1), meta = True, kwargs = {'keepdims': False}
ns = ImplicitToExplicitIndexingAdapter(array=MemoryCachedArray(array=CopyOnWriteArray(array=LazilyOuterIndexedArray(array=I...
         1.93375201,  1.11587938,  0.82373153, -1.35300754]])), key=BasicIndexer((slice(0, 0, 1), slice(0, 0, 1)))))))

    def mean_agg(pairs, dtype='f8', axis=None, meta=False, **kwargs):
        ns = deepmap(lambda pair: pair['n'], pairs) if not meta else pairs
>       n = _concatenate2(ns, axes=axis).sum(axis=axis, dtype=dtype, **kwargs)
E       AttributeError: 'ImplicitToExplicitIndexingAdapter' object has no attribute 'sum'

../dask/dask/array/reductions.py:427: AttributeError

During handling of the above exception, another exception occurred:

self = <xarray.tests.test_dataset.TestDataset object at 0x1c190d9198>

    @requires_dask
    def test_dask_is_lazy(self):
        store = InaccessibleVariableDataStore()
        create_test_data().dump_to_store(store)
        ds = open_dataset(store).chunk()

        with pytest.raises(UnexpectedDataAccess):
            ds.load()
        with pytest.raises(UnexpectedDataAccess):
            ds['var1'].values

        # these should not raise UnexpectedDataAccess:
        ds.var1.data
        ds.isel(time=10)
        ds.isel(time=slice(10), dim1=[0]).isel(dim1=0, dim2=-1)
        ds.transpose()
>       ds.mean()

xarray/tests/test_dataset.py:814:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
xarray/core/common.py:65: in wrapped_func
    **kwargs)
xarray/core/dataset.py:3203: in reduce
    **kwargs)
xarray/core/variable.py:1377: in reduce
    data = func(input_data, **kwargs)
xarray/core/duck_array_ops.py:359: in mean
    return _mean(array, axis=axis, skipna=skipna, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

values = dask.array<xarray-var1, shape=(8, 9), dtype=float64, chunksize=(8, 9)>, axis = None, skipna = None
kwargs = {}, func = <function nanmean at 0x110e686a8>, nanname = 'nanmean'
msg = 'mean is not yet implemented on dask arrays'

    def f(values, axis=None, skipna=None, **kwargs):
        if kwargs.pop('out', None) is not None:
            raise TypeError('`out` is not valid for {}'.format(name))

        values = asarray(values)

        if coerce_strings and values.dtype.kind in 'SU':
            values = values.astype(object)

        func = None
        if skipna or (skipna is None and values.dtype.kind in 'cfO'):
            nanname = 'nan' + name
            func = getattr(nanops, nanname)
        else:
            func = _dask_or_eager_func(name)

        try:
            return func(values, axis=axis, **kwargs)
        except AttributeError:
            if isinstance(values, dask_array_type):
                try:  # dask/dask#3133 dask sometimes needs dtype argument
                    # if func does not accept dtype, then raises TypeError
                    return func(values, axis=axis, dtype=values.dtype,
                                **kwargs)
                except (AttributeError, TypeError):
                    msg = '%s is not yet implemented on dask arrays' % name
            else:
                msg = ('%s is not available with skipna=False with the '
                       'installed version of numpy; upgrade to numpy 1.12 '
                       'or newer to use skipna=True or skipna=None' % name)
>           raise NotImplementedError(msg)
E           NotImplementedError: mean is not yet implemented on dask arrays

xarray/core/duck_array_ops.py:259: NotImplementedError

meta = np.empty((0,) * ndim, dtype=dtype or x.dtype)

if np.isscalar(meta):
meta = np.array(meta)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that I have resolved this in 83ff1d4

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I can confirm. Thanks for that!

meta = meta.astype(_dtype)

if np.isscalar(meta):
meta = np.array(meta)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine this may cause problems for libraries such as CuPy, just like the entry in meta_from_array, but there's no test at the moment to confirm my suspicion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've resolved this elsewhere, but I'm not entirely certain.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's leave it for now, we will address it if we find a failing case in the future.

@pentschev
Copy link
Member

@shoyer I just quickly tried to reproduce the error from test_dask_is_lazy using xarray master, but I can't, either with Dask master or this branch. Did you get that error from CI?

@shoyer
Copy link
Member

shoyer commented Jun 20, 2019

@pentschev the test failure only appears on the xarray branch from pydata/xarray#3027 (with the dask branch from this PR)

It would probably make sense to try to only use NumPy functions + indexing/dtype/shape on meta objects (rather than any additional methods). Those are all that are currently required for dask's from_array.

@mrocklin
Copy link
Member Author

It would probably make sense to try to only use NumPy functions + indexing/dtype/shape on meta objects (rather than any additional methods). Those are all that are currently required for dask's from_array.

Right, so replacing the .sum method with np.sum is easy enough. That does seem to trigger an access into the InaccessibleArray type though. This raises a broader question of "when should Dask array reject a meta value, and use a numpy array instead?"

Having Xarray indexer objects as metadata in dask arrays probably doesn't make sense. Meta should represent the type of our blocks once they're computed rather than the type of data that we started with. I just tested this with h5py in ad55df3 and it works there because we slice the object down and that produces a numpy array. The lazy slicing of Xarray indexer objects fouls things up here.

We can make Dask array operations more robust to this, which is good regardless, but it might also make sense for Xarray to explicitly populate a meta= field when it calls da.from_array (or whatever other methods it may use). Thoughts?

@pentschev
Copy link
Member

This raises a broader question of "when should Dask array reject a meta value, and use a numpy array instead?"

I would like to say: never. But that's probably unrealistic. At the very least it would be nice to increase _meta testing to help us prevent regressions. Testing with Sparse is great, but currently we can't test everything with it, so I'm relying on testing with CuPy, which implements a larger part of NumPy's API. I think finding a way to solve #4538 may help us in testing things better.

We can make Dask array operations more robust to this, which is good regardless, but it might also make sense for Xarray to explicitly populate a meta= field when it calls da.from_array (or whatever other methods it may use). Thoughts?

If that's an option on the xarray side, I think it would be a nice -- and probably not so dirty -- way of solving it. I don't really have a better idea at the moment.

@shoyer
Copy link
Member

shoyer commented Jun 21, 2019

We can make Dask array operations more robust to this, which is good regardless, but it might also make sense for Xarray to explicitly populate a meta= field when it calls da.from_array (or whatever other methods it may use). Thoughts?

If that's an option on the xarray side, I think it would be a nice -- and probably not so dirty -- way of solving it. I don't really have a better idea at the moment.

Yes, this sounds like a good option. Our "lazy backend array" classes are somewhat unusual -- most classes like this create NumPy arrays when indexed.

It seems pretty ugly to need to set _meta directly, so maybe we could add a meta keyword argument to from_array? Maybe even supporting a short-cut form like meta=np.ndarray, so I don't need to get really in the weeds of dask's data model by writing something like np.empty((0,)*x.ndim, x.dtype)?

Even if you're using backend arrays where the default meta would work (e.g., h5py), setting meta explicitly is probably often a good idea. Indexing is often but not always cheap (e.g., for data accessed over a network), and I doubt indexing with empty slices has been optimized for most array backends (even though in theory this could be done).

@mrocklin
Copy link
Member Author

I've added da.from_array(..., meta=) in 8fe2128

@pentschev
Copy link
Member

How are you so fast @mrocklin? I was writing an answer and you were ready with a commit for that.

@pentschev
Copy link
Member

Maybe even supporting a short-cut form like meta=np.ndarray, so I don't need to get really in the weeds of dask's data model by writing something like np.empty((0,)*x.ndim, x.dtype)?

I would recommend using dask.array.utils.meta_from_array(x) here.

@mrocklin
Copy link
Member Author

Maybe even supporting a short-cut form like meta=np.ndarray, so I don't need to get really in the weeds of dask's data model by writing something like np.empty((0,)*x.ndim, x.dtype)?

Done in 99a83d3

I would like to merge this soonish if that's possible. @shoyer are we ok on the Xarray side?

@shoyer
Copy link
Member

shoyer commented Jun 22, 2019 via email

@mrocklin
Copy link
Member Author

In [1]: import dask.array as da

In [2]: import numpy as np

In [3]: da.from_array(np.ones(5), meta=np.ndarray)
Out[3]: dask.array<array, shape=(5,), dtype=float64, chunksize=(5,)>

In [4]: _._meta
Out[4]: array([], dtype=float64)

@mrocklin mrocklin merged commit 243cb10 into dask:master Jun 23, 2019
@mrocklin mrocklin deleted the compute-meta branch June 23, 2019 06:54
@mrocklin
Copy link
Member Author

This is in. Thanks @shoyer and @pentschev for review. @shoyer please speak up if there are further Xarray issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants