-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
What happened:
The following example creates four 1 GB zarr arrays on disk, chunked into 100 chunks along only their first axis. The computation graph is entirely independent along the chunked axis, yet dask multi-threaded scheduler loads two of these arrays (2 GB of data) into memory.
What you expected to happen:
I expect this computation never to use more than a small amount of memory (e.g., perhaps 10x the chunk size), because each CPU never needs to load more than one chunk at a time.
Minimal Complete Verifiable Example:
import dask.array
from dask.diagnostics import ResourceProfiler
from multiprocessing.pool import ThreadPool
import dask.config
import matplotlib.pyplot as plt
import numpy as np
# create data on disk
x = dask.array.zeros((12500, 10000), chunks=('10MB', -1))
dask.array.to_zarr(x, 'saved_x1.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_y1.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_x2.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_y2.zarr', overwrite=True)
# load and profile data
x1 = dask.array.from_zarr('saved_x1.zarr')
y1 = dask.array.from_zarr('saved_x2.zarr')
x2 = dask.array.from_zarr('saved_y1.zarr')
y2 = dask.array.from_zarr('saved_y2.zarr')
def evaluate(x1, y1, x2, y2):
u = dask.array.stack([x1, y1])
v = dask.array.stack([x2, y2])
components = [u, v, u ** 2 + v ** 2]
return [
abs(c[0] - c[1]).mean(axis=-1)
for c in components
]
rprof = ResourceProfiler(dt=0.1)
with dask.config.set(pool=ThreadPool(1)):
with rprof:
results = evaluate(x1, y1, x2, y2)
dask.compute(results)Here are the memory profiling results:

Here's what the optimized computational graph looks like, showing just the first two chunks along the first axis:
dask.visualize(evaluate(x1[:250], y1[:250], x2[:250], y2[:250]), optimize_graph=True)

See this notebook for full details: https://nbviewer.jupyter.org/gist/shoyer/181ae78ec2f22df815e02ab0f5db3603
Anything else we need to know?:
It is easy to make this computation work in a streaming fashion, e.g., by removing any one of the listed components. But my understanding was that unless there is overlap in the computational graph (ala #874) dask is supposed to always do depth first evaluation of the task graph, which should ensure computation like this is always done in a streaming way. That does not appear to be the case here.
Note: This is a simplified example from a real use-case involving xarray, which does not use from_zarr but rather its own system for feeding loaded zarr arrays into dask.
Environment:
- Dask version: 2.27 / master
- Python version: Python 3.6 / 3.8
- Operating System: Linux / OS X
- Install method (conda, pip, source): source