Skip to content

Conversation

@TomAugspurger
Copy link
Member

Inlining means that we take a graph like

{
  "zarr-arrray": (zarr.Array, ...),
  "arr-0-0": (getitem, "zarr-array", 0, 0),
  "arr-0-1": (getitem, "zarr-array", 0, 0),
}

and replace it with something like

zarr_array = zarr.Array(...)
{
  "arr-0-0": (getitem, zarr_array, 0, 0),
  "arr-0-1": (getitem, zarr_array, 0, 1),
}

In this case, we want to inline the zarr.Array to ensure that we get good task fusion
later on. For reasons currently beyond me, the presence of a root node confuses our
fusion algorithms. Compare the two

# setup
import dask.array as da
original = "original.zarr"

a = da.random.random(size=(4, 4000, 4000), chunks=(1, 4000, 4000))
a.to_zarr(original, overwrite=True)
b = da.from_zarr(original)
c = b.rechunk({1: 2000, 2: 2000})
with dask.config.set(**{"optimization.fuse.ave-width": 4}):
    display(c.visualize(optimize_graph=True))

On master

master

On this PR

pr

Failure to fuse tends to cause excess communication and memory issues on the distributed scheduler. Compare the to task graph panes in these reports.

c.to_zarr() on master:

https://gistcdn.rawgit.org/TomAugspurger/1ebffe84db72e630229efb3a4b9cd7d2/19f63c5b972332708ffec041927f0cd764a302a6/master.html

c.to_zarr() on this PR:

https://gistcdn.rawgit.org/TomAugspurger/1fae16981f23a8ff892f120da7dbdba4/c1101da0e41759d2f81642369ed48e1c910147f7/pr.html

xref #5105. There's one more issue with to_zarr that I want to track down. I'm seeing what looks like unnecessary data moving around, but need to verify.

cc @alimanfoo, @rabernat, and @rsignell-usgs since this has affected you.

Setting the fuse_ave_width to be greater or equal the rechunk factor (4 in this case) is quite important right now. I'm hopeful we'll be able to do this automatically in the future.

This improves task fusion.
assert "auto" in str(info.value)


def test_from_zarr_unique_name():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved all the Zarr tests to their own file. Hope that's OK.

from dask.utils import tmpdir


def test_from_zarr_unique_name():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just moved tests.

assert a2.chunks == a.chunks


def test_from_zarry_inline_getter():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only new test, asserting that we're using getter_inline in from_zarr.

@mrocklin
Copy link
Member

This functionality has flopped back and forth a couple of times. I don't recall the PR/issue where it was brought up before, but I think the problem was that people were using data stores that were not as trivial to move around as Zarr files. Perhaps git blame can point to the previous conversation.

@rabernat
Copy link
Contributor

Thanks for investigating this Tom. It seems like you are on the right track to solving some long-standing performance issues related to large distributed reads.

@TomAugspurger
Copy link
Member Author

I didn't see much in the git blame. In the original implementation https://github.com/dask/dask/pull/3561/files @jakirkham didn't inline the Array. Perhaps it was HDF?

I'll this try out with some zarr datasets in cloud storage later today.

@mrocklin
Copy link
Member

mrocklin commented May 13, 2020 via email

@TomAugspurger
Copy link
Member Author

Hmmm unfortunately this doesn't look like an unambiguous win. I whether or not it's good to inline the getitem depends on the downstream operations.

I'm going to explore why the presence of the root node causes issues with fusion in the first place.

@jakirkham
Copy link
Member

I didn't see much in the git blame. In the original implementation https://github.com/dask/dask/pull/3561/files @jakirkham didn't inline the Array. Perhaps it was HDF?

My guess is I just did whatever was easiest to get us started :)

@eriknw
Copy link
Member

eriknw commented May 20, 2020

@TomAugspurger, I'm confused by what you say you want to do and what you did in this PR. To me, it looks like the Zarr array wasn't inlined. The getters were inlined. Please let me know if I misunderstood something.

Also, I wouldn't expect fuse to do anything for this example. fuse works on reductions or linear chains. The given example is the opposite of that.

Looking at the example from #5105 (comment)

import dask
import dask.array as da
a = da.random.random((4, 4000, 4000), chunks=(1, 4000, 4000))
da.to_zarr(a, "/tmp/data/original.zarr", overwrite=True)
data = da.from_zarr("/tmp/data/original.zarr")
desired = data.rechunk({0: 4, 1: 2000, 2: 2000})
data2 = data.rechunk({1: 2000, 2: 2000})
data2.to_zarr("/tmp/data/temp.zarr", overwrite=True)
data3 = da.from_zarr("/tmp/data/temp.zarr")
data4 = data3.rechunk({0: 4})
with dask.config.set(**{"optimization.fuse.ave-width": 4}):
    display(data4.visualize(optimize_graph=True))

Master gives
master

This PR gives (due to inlining, not fuse, so {"optimization.fuse.ave-width": 1} also gives this graph)
2603

For this example, fuse refuses to fuse on master because hold_keys specifically tells fuse not to fuse the getitems. We can trick hold_keys with a hack:

diff --git a/dask/array/core.py b/dask/array/core.py
index 4d38c63c..c4dda7b7 100644
--- a/dask/array/core.py
+++ b/dask/array/core.py
@@ -2861,7 +2861,11 @@ def from_zarr(
     chunks = chunks if chunks is not None else z.chunks
     if name is None:
         name = "from-zarr-" + tokenize(z, component, storage_options, chunks, **kwargs)
-    return from_array(z, chunks, name=name)
+    arr = from_array(z, chunks, name=name)
+    alias = 'alias-' + name
+    arr.dask.layers[name][alias] = arr.dask.layers[name][name]
+    arr.dask.layers[name][name] = alias
+    return arr


 def to_zarr(

which gives
alias

As an observation, inlining is much more overzealous than fusing and more difficult to control. Is there a getter that isn't held by hold_keys (so is always fuse-able) but also doesn't get inlined?

@eriknw
Copy link
Member

eriknw commented May 20, 2020

Is there a getter that isn't held by hold_keys (so is always fuse-able) but also doesn't get inlined?

Answering my own question... here's a less hacky way (learned from this PR) to avoid both hold_keys and inline_functions for Zarr getters. This may have more reasonable and controllable behavior:

diff --git a/dask/array/core.py b/dask/array/core.py
index 4d38c63c..02b91d6a 100644
--- a/dask/array/core.py
+++ b/dask/array/core.py
@@ -135,6 +135,10 @@ def getter_inline(a, b, asarray=True, lock=None):
     return getter(a, b, asarray=asarray, lock=lock)


+def getter_nohold(a, b, asarray=True, lock=None):
+    return getter(a, b, asarray=asarray, lock=lock)
+
+
 from .optimization import optimize, fuse_slice


@@ -2861,7 +2865,7 @@ def from_zarr(
     chunks = chunks if chunks is not None else z.chunks
     if name is None:
         name = "from-zarr-" + tokenize(z, component, storage_options, chunks, **kwargs)
-    return from_array(z, chunks, name=name)
+    return from_array(z, chunks, name=name, getitem=getter_nohold)


 def to_zarr(

@TomAugspurger
Copy link
Member Author

TomAugspurger commented May 20, 2020

FWIW, I may be abandoning this PR. It's not obviously better on all workloads. I also think that Jim's PR to fuse Delayed achieves what I wanted, but will need to double check.

To me, it looks like the Zarr array wasn't inlined. The getters were inlined.

Yes, I think that you're correct.

@jakirkham
Copy link
Member

Which PR is that?

@eriknw
Copy link
Member

eriknw commented May 20, 2020

Must be #6222, and I'm curious how you expect that to help here.

What workloads perform poorly with this PR?

@jakirkham
Copy link
Member

Thanks Erik!

Would also be curious to know. Though guessing these are some of the issues.

@jakirkham
Copy link
Member

FWIW, I may be abandoning this PR. It's not obviously better on all workloads.

I'm a bit curious about this point, @TomAugspurger. Is there a workload where this would behave worse? If so, by how much?

@TomAugspurger
Copy link
Member Author

Sorry for the delay in responding here. #6773 is making this a user-facing keyword. da.from_zarr(..., inline_array=True). Taking my original example:

import dask
import dask.array as da
original = "original.zarr"

a = da.random.random(size=(4, 4000, 4000), chunks=(1, 4000, 4000))
a.to_zarr(original, overwrite=True)
b = da.from_zarr(original)
c = b.rechunk({1: 2000, 2: 2000})

with dask.config.set(**{"optimization.fuse.ave-width": 4}):
    c.visualize(optimize_graph=True, color="order", cmap="autumn", filename="bad")

bad

order doesn't do well there. We finish the first chain, and then jump between the remaining three thanks to just using the tie-breaker of the key name to decide which to work on next. We can hopefully fix that in order, but for now, with the array inlined, we finish off each chain before moving onto the next:

b = da.from_zarr(original, inline_array=True)
c = b.rechunk({1: 2000, 2: 2000})

with dask.config.set(**{"optimization.fuse.ave-width": 4}):
    c.visualize(optimize_graph=True, color="order", cmap="autumn", filename="good")

good

@mrocklin
Copy link
Member

mrocklin commented Oct 28, 2020 via email

@jakirkham
Copy link
Member

I might not be following the question. Could you please provide an example? Think I'm just missing context 😅

@mrocklin
Copy link
Member

mrocklin commented Oct 28, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants