Skip to content

Conversation

@TomAugspurger
Copy link
Member

@TomAugspurger TomAugspurger commented Oct 27, 2020

This adds an option to da.from_array to control how the array object
itself is included in the task graph. By default, there's no change:
there will be a single key whose value is the array object. The tasks
giving each chunk of the array refer to the array by using its key.

With inline_array=True, each chunk task in the task graph will have
the array itself included in the values.

The same keyword is added to from_zarr, which uses from_array internally.

I think that this closes #6668. I don't
think that we would automatically switch the default to True, which
would certainly close it.

This adds an option to `da.from_array` to control how the array object
itself is included in the task graph. By default, there's no change:
there will be a single key whose value is the array object. The tasks
giving each chunk of the array refer to the array by using its key.

With `inline_array=True`, each chunk task in the task graph will have
the array itself included in the values.

I think that this closes dask#6668. I don't
think that we would automatically switch the default to True, which
would certainly close it.
@TomAugspurger
Copy link
Member Author

cc @shoyer, if this is what you had in mind for the nicer API. Functionally, I think it's equivalent to the custom inline optimization, it just happens at graph construction time.

import dask.array
import numpy as np
import dask

# create data on disk
n = 125 * 4
x = dask.array.zeros((12500, 10000), chunks=('10MB', -1))
dask.array.to_zarr(x, 'saved_x1.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_y1.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_x2.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_y2.zarr', overwrite=True)

x1 = dask.array.from_zarr('saved_x1.zarr', inline_array=True)
y1 = dask.array.from_zarr('saved_x2.zarr', inline_array=True)
x2 = dask.array.from_zarr('saved_y1.zarr', inline_array=True)
y2 = dask.array.from_zarr('saved_y2.zarr', inline_array=True)


def evaluate(x1, y1, x2, y2):
  u = dask.array.stack([x1, y1])
  v = dask.array.stack([x2, y2])
  components = [u, v, u ** 2 + v ** 2]
  return [
      abs(c[0] - c[1]).mean(axis=-1)
      for c in components
  ]

dask.visualize(evaluate(x1[:n], y1[:n], x2[:n], y2[:n]), optimize_graph=True,
               color="order", cmap="autumn", node_attr={"penwidth": "4"})

mydask

versus this task graph with inline_array=False (the default):

bad


I guess there may be one remaining task from #6668:

[@shoyer] If so, perhaps it would make sense to think about applying this optimization automatically when not using the distributed scheduler.

I'm still thinking through that.

@TomAugspurger
Copy link
Member Author

I added some documentation on this. @mrocklin IIRC, in the past you've had some hesitation to documenting these failures. I believe your objections were along the lines of

  1. It's hard to generalize from a particular example and apply it to your problem.
  2. It's possible (likely?) that any specific ordering problem can be fixed, rendering the example obsolete. I'm kicking around some ideas about somehow ignoring common leaf nodes in ordering that might fix Stephan's issue).

I agree with those downsides, but I still think this is worth including with some explicit caveats:

  1. This is an advanced topic that most people shouldn't hit.
  2. Even when you hit an ordering issue, it may not be the end of the world.
  3. Making changes to achieve "ideal" ordering may have other downsides that slow down the overall computation time.

FireShot Capture 013 - Ordering — Dask documentation -

@mrocklin
Copy link
Member

mrocklin commented Oct 28, 2020 via email

@TomAugspurger
Copy link
Member Author

From #6203 (comment)

I'm asking what the default value for inlining should be for the from_zarr
function. Should from_zarr inline or not inline? Are there things about a
zarr array that could help us better choose a default value?

For example, if the zarr array holds data in memory, then it should
probably not inline. If it points to data on-disk then maybe it's ok? (Or
maybe not, and we should consider caching like Xarray does)

Is it surprising that even with inlined arrays, we see about the same serialized size?

In [8]: import zarr
   ...: import numpy as np
   ...: import dask.array as da
   ...: from distributed.protocol import serialize
   ...:
   ...: a = zarr.ones((12_500, 10_000), chunks=(125, 10_000))
   ...: a[:] = np.random.random(size=a.shape)
   ...:
   ...: x1 = da.from_zarr(a)
   ...:
   ...: x2 = da.from_zarr(a, inline_array=True)
   ...:
   ...: len(x1.dask), len(x2.dask)
   ...:
   ...: len(serialize(x1)[1][0]) / len(serialize(x2)[1][0])

Out[8]: 0.9999993250547557

I would have expected the size of the inlined version to be about 100x, since there are 100 references to the array. But someone (dask? pickle?) is clever enough that to handle that case.

So I guess it comes down to the cost of creating these Zarr objects (including their stores?)

@shoyer
Copy link
Member

shoyer commented Oct 29, 2020

I would have expected the size of the inlined version to be about 100x, since there are 100 references to the array. But someone (dask? pickle?) is clever enough that to handle that case.

Right, pickle is very clever. Each Python object it encounters is only gets serialized once.

It's interesting that dask distributed seems to do the same thing when serializing graphs. I was imagining there might be cases where tasks get individually shipped off to different processes (requiring the Zarr array to be re-serialized), but maybe not? Or maybe the overhead in these cases doesn't matter?

In that case, perhaps it is always the right answer to use inline_array=True always with Zarr and other uses of from_array with serializable arrays (e.g., in xarray). We can't do it generically only because many on-disk array types like those from h5py can't be pickled.

@martindurant
Copy link
Member

I wonder how this compares to an explicit client.scatter(broadcast=True) for the array object, which only serialises and send once, but you get copies everywhere. Of course, that breaks the graph/execute boundary; but wondering about the conceptual model here.

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Nov 5, 2020

Just a guess, but I think the main (only?) difference would be that client.scatter(broadcast=True) creates just one instance on the client (before scatter) and many instances on the cluster. With this PR and inline_array=True I think you get many instances on the client and on the cluster.

That said, client.scatter is only a solution for the distributed scheduler. This issue affects the local schedulers too, and the pain is felt most acutely on the local scheduler where static ordering tends to be even more important to keeping memory use down.

@martindurant
Copy link
Member

Thanks for the thoughts @TomAugspurger . I would certainly not advocate for special workarounds that only applied to distributed - unless there was a really compelling reason.

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Nov 16, 2020

The main outstanding issue right now is the default for from_zarr: it's possible that from_zarr(..., inline_array=True) is strictly better (we already know it's not strictly better for the general from_array).

Any objections to just merging this as is, and getting feedback from users on if / when inline_array=True is worse? I'll be sure to make the pangeo community aware of this option, and will try to gather feedback.

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @TomAugspurger! This looks good overall, I've just got a few small comments

@jrbourbeau jrbourbeau merged commit e1301de into dask:master Dec 19, 2020
@TomAugspurger
Copy link
Member Author

Thanks James!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Independently chunked computation loads entire zarr arrays into memory

5 participants