Skip to content

Independently chunked computation loads entire zarr arrays into memory #6668

@shoyer

Description

@shoyer

What happened:

The following example creates four 1 GB zarr arrays on disk, chunked into 100 chunks along only their first axis. The computation graph is entirely independent along the chunked axis, yet dask multi-threaded scheduler loads two of these arrays (2 GB of data) into memory.

What you expected to happen:

I expect this computation never to use more than a small amount of memory (e.g., perhaps 10x the chunk size), because each CPU never needs to load more than one chunk at a time.

Minimal Complete Verifiable Example:

import dask.array
from dask.diagnostics import ResourceProfiler
from multiprocessing.pool import ThreadPool
import dask.config
import matplotlib.pyplot as plt
import numpy as np

# create data on disk
x = dask.array.zeros((12500, 10000), chunks=('10MB', -1))
dask.array.to_zarr(x, 'saved_x1.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_y1.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_x2.zarr', overwrite=True)
dask.array.to_zarr(x, 'saved_y2.zarr', overwrite=True)

# load and profile data
x1 = dask.array.from_zarr('saved_x1.zarr')
y1 = dask.array.from_zarr('saved_x2.zarr')
x2 = dask.array.from_zarr('saved_y1.zarr')
y2 = dask.array.from_zarr('saved_y2.zarr')

def evaluate(x1, y1, x2, y2):
  u = dask.array.stack([x1, y1])
  v = dask.array.stack([x2, y2])
  components = [u, v, u ** 2 + v ** 2]
  return [
      abs(c[0] - c[1]).mean(axis=-1)
      for c in components
  ]

rprof = ResourceProfiler(dt=0.1)

with dask.config.set(pool=ThreadPool(1)):
  with rprof:
    results = evaluate(x1, y1, x2, y2)
    dask.compute(results)

Here are the memory profiling results:
image

Here's what the optimized computational graph looks like, showing just the first two chunks along the first axis:
dask.visualize(evaluate(x1[:250], y1[:250], x2[:250], y2[:250]), optimize_graph=True)
image

See this notebook for full details: https://nbviewer.jupyter.org/gist/shoyer/181ae78ec2f22df815e02ab0f5db3603

Anything else we need to know?:

It is easy to make this computation work in a streaming fashion, e.g., by removing any one of the listed components. But my understanding was that unless there is overlap in the computational graph (ala #874) dask is supposed to always do depth first evaluation of the task graph, which should ensure computation like this is always done in a streaming way. That does not appear to be the case here.

Note: This is a simplified example from a real use-case involving xarray, which does not use from_zarr but rather its own system for feeding loaded zarr arrays into dask.

Environment:

  • Dask version: 2.27 / master
  • Python version: Python 3.6 / 3.8
  • Operating System: Linux / OS X
  • Install method (conda, pip, source): source

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions