-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
By switching to numpy_groupies we are vectorizing our groupby reductions. I think we can do the same for groupby's binary ops.
Here's an example array
import numpy as np
import xarray as xr
%load_ext memory_profiler
N = 4 * 2000
da = xr.DataArray(
np.random.random((N, N)),
dims=("x", "y"),
coords={"labels": ("x", np.repeat(["a", "b", "c", "d", "e", "f", "g", "h"], repeats=N//8))},
)Consider this "anomaly" calculation, anomaly defined relative to the group mean
def anom_current(da):
grouped = da.groupby("labels")
mean = grouped.mean()
anom = grouped - mean
return anomWith this approach, we loop over each group and apply the binary operation:
xarray/xarray/core/computation.py
Lines 502 to 525 in a1635d3
| iterators = [] | |
| for arg in args: | |
| if isinstance(arg, GroupBy): | |
| iterator = (value for _, value in arg) | |
| elif hasattr(arg, "dims") and grouped_dim in arg.dims: | |
| if isinstance(arg, Variable): | |
| raise ValueError( | |
| "groupby operations cannot be performed with " | |
| "xarray.Variable objects that share a dimension with " | |
| "the grouped dimension" | |
| ) | |
| iterator = _iter_over_selections(arg, grouped_dim, unique_values) | |
| else: | |
| iterator = itertools.repeat(arg) | |
| iterators.append(iterator) | |
| applied = (func(*zipped_args) for zipped_args in zip(*iterators)) | |
| applied_example, applied = peek_at(applied) | |
| combine = first_groupby._combine | |
| if isinstance(applied_example, tuple): | |
| combined = tuple(combine(output) for output in zip(*applied)) | |
| else: | |
| combined = combine(applied) | |
| return combined |
This saves some memory, but becomes slow for large number of groups.
We could instead do
def anom_vectorized(da):
mean = da.groupby("labels").mean()
mean_expanded = mean.sel(labels=da.labels)
anom = da - mean_expanded
return anom
Now we are faster, but construct an extra array as big as the original array (I think this is an OK tradeoff).
%timeit anom_current(da)
# 1.4 s ± 20.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit anom_vectorized(da)
# 937 ms ± 5.26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
(I haven't experimented with dask yet, so the following is just a theory).
I think the real benefit comes with dask. Depending on where the groups are located relative to chunking, we could end up creating a lot of tiny chunks by splitting up existing chunks. With the vectorized approach we can do better.
Ideally we would reindex the "mean" dask array with a numpy-array-of-repeated-ints such that the chunking of mean_expanded exactly matches the chunking of da along the grouped dimension.
In practice, dask.array.take doesn't allow specifying "output chunks" so we'd end up chunking "mean_expanded" based on dask's automatic heuristics, and then rechunking again for the binary operation.
Thoughts?
cc @rabernat