Skip to content

Speed up distributed Dask by removing reference to array_container in FITSLoader#455

Merged
Cadair merged 2 commits intoDKISTDC:mainfrom
svank:streamline_loaders
Oct 28, 2024
Merged

Speed up distributed Dask by removing reference to array_container in FITSLoader#455
Cadair merged 2 commits intoDKISTDC:mainfrom
svank:streamline_loaders

Conversation

@svank
Copy link
Contributor

@svank svank commented Oct 26, 2024

I've been finding that the Dask arrays generated by this package don't give me the speed I would hope for, and this PR helps address that. (I'm working on another, too.)

My use case right now (my first with DKIST) is taking a ViSP data set and summing over wavelength to get a quasi-broadband view of what ViSP was seeing.

Doing this serially takes 1 m 53 s

ds = dkist.load_dataset("/home/svankooten/BXXDZ/")

def comp_slice(file):
    data = fits.getdata("/home/svankooten/BXXDZ/" + file)
    data = data[0]
    return data.sum(axis=0)

image = np.stack(list(map(comp_slice, ds.files.filenames)))

Doing this with multiprocessing takes 16 s with 8 cores

from multiprocessing import Pool

with Pool(8) as p:
    image = np.stack(p.map(comp_slice, ds.files.filenames))

The default dask scheduler uses threads, and with the GIL I get 250-300% CPU usage and it takes 1m 5s

ds.data.sum(axis=-2).compute();

Switching to Dask's distributed scheduler evades the GIL and ideally will look like the multiprocessing case, but it actually takes longer than the serial case, at 2m 33s

from dask.distributed import Client
client = Client(threads_per_worker=1, n_workers=8)
ds.data.sum(axis=-2).compute()

The task stream in the scheduler shows that there are lots of gaps between tasks
image

What I finally found as the cause is that each AstropyFITSLoader keeps a reference to its parent StripedExternalArray. As I understand it, the loaders get pickled and sent to the worker processes before they're used, and this reference causes these pickle payloads to be several megabytes each. This PR avoids pickling the StripedExternalArray. This makes the task stream much denser, and lets the computation complete in 47 s
image
(The x scale is different in this screenshot)

This beats the serial case (!) and the default threaded scheduler, but doesn't match multiprocessing. (Not sure how close this can get, I assume Dask has more overhead.) This PR doesn't change the run time for the default scheduler, which I suppose doesn't need to do any pickling.

This PR has the Loaders just record the basepath when they're created. If Filemanager.basepath is updated later on, the setter in StripedExternalArray now iterates through loader_array and updates each one. This has the downside of no longer having a single source of truth for basepath, but I think it's worth the speedup. (And it seems like a user would have to dig deep in internal API to change one but not the others.)

An alternate design I tried was to have have BaseFITSLoader keep its array_container reference, but I overrode BaseFITSLoader's pickling behavior so that only array_container.basepath is sent, not array_container itself. Then the unpickled loaders in the worker processes set a _basepath value (a snapshot of the "live" value at the time of pickling, which is when the dask calculation is triggered) but don't set array_container, and the basepath property returns whichever is set. This lets basepath be stored in only one location in the main process, but I think it has the side-effect that if a user loads a dataset, pickles it for some reason, unpickles it, updates the basepath and then triggers the Dask calculation, the pickling will have caused the loaders to drop their references and they won't get the updated basepath.

Hopefully this PR doesn't conflict with any future plans for FITSLoader!

@svank svank changed the title Don't keep reference to array_container in FITSLoader Speed up distributed Dask by removing reference to array_container in FITSLoader Oct 26, 2024
@codspeed-hq
Copy link

codspeed-hq bot commented Oct 26, 2024

CodSpeed Performance Report

Merging #455 will not alter performance

Comparing svank:streamline_loaders (049d1ab) with main (59edd53)

Summary

✅ 9 untouched benchmarks

@Cadair Cadair force-pushed the streamline_loaders branch from 234792d to 516f7a8 Compare October 28, 2024 13:20
@Cadair
Copy link
Member

Cadair commented Oct 28, 2024

Thanks a lot for your PR @svank 🚀

This is a great change, we haven't really had the time to do any significant optimisation on the loading code, so anything you can find would be very welcome.

I was curious to see the comparison between using distributed and using the local processes scheduler, so I checked out your branch and loaded your data.

Setup
import dask
from multiprocessing import Pool
import dkist
from astropy.io import fits
import numpy as np

ds = dkist.load_dataset("~/dkist_data/pid_2_86/BXXDZ")

ds_rebin = ds.rebin((1, 1, 936, 1))

def comp_slice(file):
    data = fits.getdata(ds.files.basepath / file)
    data = data[0]
    return data.sum(axis=0)
# baseline serial, numpy
%timeit -n 1 -r 1 np.stack(list(map(comp_slice, ds.files.filenames)))
# 1min 57s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
# simple multi-processing
p = Pool(16)
%timeit -n 1 -r 1 np.stack(p.map(comp_slice, ds.files.filenames))
# 12.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
p.close()
# Processes

dask.config.set(scheduler='processes')

%timeit -n 1 -r 1 ds.data.sum(axis=-2).compute();
# 18.1 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

%timeit -n 1 -r 1 ds_rebin.data.compute()
# 18.3 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

it's quite close, certainly good enough imo.

Without this PR, the last example took: 4min 34s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each) so this looks to be a massive improvement to me.

@Cadair Cadair force-pushed the streamline_loaders branch from 516f7a8 to 049d1ab Compare October 28, 2024 14:14
@Cadair Cadair added the No Changelog Entry Needed This pull request does not need a changelog entry label Oct 28, 2024
@Cadair Cadair enabled auto-merge (squash) October 28, 2024 14:17
@Cadair Cadair merged commit c696a85 into DKISTDC:main Oct 28, 2024
@svank svank deleted the streamline_loaders branch October 29, 2024 00:02
@svank
Copy link
Contributor Author

svank commented Oct 29, 2024

Interesting to see! With this PR I also have the processes scheduler coming in a bit slower than DIY parallelism, but twice as fast as the distributed scheduler. I wonder why---distributed's task stream has gaps but it's still mostly full. Maybe it has more overhead around starting and stopping each task?

@Cadair
Copy link
Member

Cadair commented Oct 29, 2024

I suspect there's more overhead, but I don't know the details. 2x as fast without distributed is certainly eyebrow raising.

@Cadair
Copy link
Member

Cadair commented Oct 29, 2024

@astrofrog suggested to me that it might be something to do with differences in how local process and distributed do or do not use threads on each worker.

@astrofrog
Copy link

As suggested to @Cadair right now I think the issue is that the chunks are very small and take <100ms to run, so distributed is going to be inefficient. I think it would be worth trying calling .rechunk to see if this helps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

No Changelog Entry Needed This pull request does not need a changelog entry

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants