Speed up distributed Dask by removing reference to array_container in FITSLoader#455
Speed up distributed Dask by removing reference to array_container in FITSLoader#455Cadair merged 2 commits intoDKISTDC:mainfrom
Conversation
CodSpeed Performance ReportMerging #455 will not alter performanceComparing Summary
|
234792d to
516f7a8
Compare
|
Thanks a lot for your PR @svank 🚀 This is a great change, we haven't really had the time to do any significant optimisation on the loading code, so anything you can find would be very welcome. I was curious to see the comparison between using distributed and using the local processes scheduler, so I checked out your branch and loaded your data. Setupimport dask
from multiprocessing import Pool
import dkist
from astropy.io import fits
import numpy as np
ds = dkist.load_dataset("~/dkist_data/pid_2_86/BXXDZ")
ds_rebin = ds.rebin((1, 1, 936, 1))
def comp_slice(file):
data = fits.getdata(ds.files.basepath / file)
data = data[0]
return data.sum(axis=0)# baseline serial, numpy
%timeit -n 1 -r 1 np.stack(list(map(comp_slice, ds.files.filenames)))
# 1min 57s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)# simple multi-processing
p = Pool(16)
%timeit -n 1 -r 1 np.stack(p.map(comp_slice, ds.files.filenames))
# 12.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
p.close()# Processes
dask.config.set(scheduler='processes')
%timeit -n 1 -r 1 ds.data.sum(axis=-2).compute();
# 18.1 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
%timeit -n 1 -r 1 ds_rebin.data.compute()
# 18.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)it's quite close, certainly good enough imo. Without this PR, the last example took: |
516f7a8 to
049d1ab
Compare
|
Interesting to see! With this PR I also have the |
|
I suspect there's more overhead, but I don't know the details. 2x as fast without distributed is certainly eyebrow raising. |
|
@astrofrog suggested to me that it might be something to do with differences in how local process and distributed do or do not use threads on each worker. |
|
As suggested to @Cadair right now I think the issue is that the chunks are very small and take <100ms to run, so distributed is going to be inefficient. I think it would be worth trying calling |
I've been finding that the Dask arrays generated by this package don't give me the speed I would hope for, and this PR helps address that. (I'm working on another, too.)
My use case right now (my first with DKIST) is taking a ViSP data set and summing over wavelength to get a quasi-broadband view of what ViSP was seeing.
Doing this serially takes 1 m 53 s
Doing this with multiprocessing takes 16 s with 8 cores
The default dask scheduler uses threads, and with the GIL I get 250-300% CPU usage and it takes 1m 5s
Switching to Dask's distributed scheduler evades the GIL and ideally will look like the multiprocessing case, but it actually takes longer than the serial case, at 2m 33s
The task stream in the scheduler shows that there are lots of gaps between tasks

What I finally found as the cause is that each

AstropyFITSLoaderkeeps a reference to its parentStripedExternalArray. As I understand it, the loaders get pickled and sent to the worker processes before they're used, and this reference causes these pickle payloads to be several megabytes each. This PR avoids pickling theStripedExternalArray. This makes the task stream much denser, and lets the computation complete in 47 s(The x scale is different in this screenshot)
This beats the serial case (!) and the default threaded scheduler, but doesn't match multiprocessing. (Not sure how close this can get, I assume Dask has more overhead.) This PR doesn't change the run time for the default scheduler, which I suppose doesn't need to do any pickling.
This PR has the
Loadersjust record thebasepathwhen they're created. IfFilemanager.basepathis updated later on, the setter inStripedExternalArraynow iterates throughloader_arrayand updates each one. This has the downside of no longer having a single source of truth forbasepath, but I think it's worth the speedup. (And it seems like a user would have to dig deep in internal API to change one but not the others.)An alternate design I tried was to have have
BaseFITSLoaderkeep itsarray_containerreference, but I overrodeBaseFITSLoader's pickling behavior so that onlyarray_container.basepathis sent, notarray_containeritself. Then the unpickled loaders in the worker processes set a_basepathvalue (a snapshot of the "live" value at the time of pickling, which is when the dask calculation is triggered) but don't setarray_container, and thebasepathproperty returns whichever is set. This letsbasepathbe stored in only one location in the main process, but I think it has the side-effect that if a user loads a dataset, pickles it for some reason, unpickles it, updates the basepath and then triggers the Dask calculation, the pickling will have caused the loaders to drop their references and they won't get the updated basepath.Hopefully this PR doesn't conflict with any future plans for
FITSLoader!