0% found this document useful (0 votes)
80 views41 pages

Chapter2-Working With Dask Arrays

The document discusses chunking NumPy arrays in Dask to enable parallel processing. It introduces working with Dask arrays, including chunking a NumPy array when creating a Dask array. Methods like sum(), mean(), and visualization are demonstrated on Dask arrays. The document also covers extracting a Dask array from an HDF5 file for distributed computation on large weather data.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
80 views41 pages

Chapter2-Working With Dask Arrays

The document discusses chunking NumPy arrays in Dask to enable parallel processing. It introduces working with Dask arrays, including chunking a NumPy array when creating a Dask array. Methods like sum(), mean(), and visualization are demonstrated on Dask arrays. The document also covers extracting a Dask array from an HDF5 file for distributed computation on large weather data.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 41

Chunking Arrays in

Dask
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

Dhavide Aruliah
Director of Training, Anaconda
What we've seen so far...
Measuring memory usage

Reading large les in chunks

Computing with generators

Computing with
dask.delayed

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Working with Numpy arrays
import numpy as np
a = np.random.rand(10000)
print(a.shape, a.dtype)

(10000,) float64

print(a.sum())

5017.32043995

print(a.mean())

0.501732043995

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Working with Dask arrays
import dask.array as da
a_dask = da.from_array(a, chunks=len(a) // 4)
a_dask.chunks

((2500, 2500, 2500, 2500),)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Aggregating in chunks
n_chunks = 4
chunk_size = len(a) // n_chunks
result = 0 # Accumulate sum
for k in range(n_chunks):
offset = k * chunk_size # Track offset
a_chunk= a[offset:offset + chunk_size] # Slice chunk
result += a_chunk.sum()
print(result)

5017.32043995

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Aggregating with Dask arrays
a_dask = da.from_array(a, chunks=len(a)//n_chunks)
result = a_dask.sum()
result

dask.array<sum-aggregate, shape=(), dtype=float64, chunksize=()>

print(result.compute())

5017.32043995

result.visualize(rankdir='LR')

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Task graph

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Dask array methods/attributes
A ributes: shape , ndim , nbytes , dtype , size , etc.

Aggregations: max , min , mean , std , var , sum , prod , etc.

Array transformations: reshape , repeat , stack , flatten ,


transpose , T , etc.

Mathematical operations: round , real , imag , conj , dot ,


etc.

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Timing array computations
import h5py, time

with h5py.File('dist.hdf5', 'r') as dset:


...: dist = dset['dist'][:]
dist_dask8 = da.from_array(dist, chunks=dist.shape[0]//8)
t_start = time.time(); \
...: mean8 = dist_dask8.mean().compute(); \
...: t_end = time.time()
t_elapsed = (t_end - t_start) * 1000 # Elapsed time in ms
print('Elapsed time: {} ms'.format(t_elapsed))

Elapsed time: 180.96423149108887 ms

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Let's practice!
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N
Computing with
Multidimensional
Arrays
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

Dhavide Aruliah
Director of Training, Anaconda
A Numpy array of time series data
import numpy as np
time_series = np.loadtxt('max_temps.csv', dtype=np.int64)
print(time_series.dtype)

int64

print(time_series.shape)

(21,)

print(time_series.ndim)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Reshaping time series data
print(time_series)

[49 51 60 54 47 50 64 58 47 43 50 63 67 68 64 48 55 46 66 51 52]

table = time_series.reshape((3,7)) # Reshaped row-wise


print(table) # Display the result

[[49 51 60 54 47 50 64]
[58 47 43 50 63 67 68]
[64 48 55 46 66 51 52]]

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Reshaping: Getting the order correct!
print(time_series) # Column-wise: correct
time_series.reshape((7,3),
order='F')
[49 51 60 54 47 ... 46 66 51 52]

array([[49, 58, 64],


# Incorrect!
[51, 47, 48],
time_series.reshape((7,3))
[60, 43, 55],
[54, 50, 46],
array([[49, 51, 60], [47, 63, 66],
[54, 47, 50], [50, 67, 51],
[64, 58, 47], [64, 68, 52]])
[43, 50, 63],
[67, 68, 64],
[48, 55, 46],
[66, 51, 52]])

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Using reshape: Row- & column-major ordering
Row-major ordering (outermost index changes fastest)
order='C' (consistent with C; default)

Column-major ordering (innermost index changes fastest)


order='F' (consistent with FORTRAN)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Indexing in multiple dimensions
print(table) # Display the result

[[49 51 60 54 47 50 64]
[58 47 43 50 63 67 68]
[64 48 55 46 66 51 52]]

table[0, 4] # value from Week 0, Day 4

47

table[1, 2:5] # values from Week 1, Days 2, 3, & 4

array([43, 50, 63])

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Indexing in multiple dimensions
table[0::2, ::3] # values from Weeks 0 & 2, Days 0, 3, & 6

array([[49, 54, 64],


[64, 46, 52]])

table[0] # Equivalent to table[0, :]

array([49, 51, 60, 54, 47, 50, 64])

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Aggregating multidimensional arrays
print(table)

[[49 51 60 54 47 50 64]
[58 47 43 50 63 67 68]
[64 48 55 46 66 51 52]]

table.mean() # mean of *every* entry in table

54.904761904761905

# Averages for days


daily_means = table.mean(axis=0)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Aggregating multidimensional arrays
daily_means # Mean computed of rows (for each day)

array([ 57. , 48.66666667, 52.66666667, 50. ,


58.66666667, 56. , 61.33333333])

weekly_means = table.mean(axis=1)
weekly_means # mean computed of columns (for each week)

array([ 53.57142857, 56.57142857, 54.57142857])

table.mean(axis=(0,1)) # mean of rows, then columns

54.904761904761905

PARALLEL PROGRAMMING WITH DASK IN PYTHON


table - daily_means # This works!

array([[ -8. , 2.33333333, 7.33333333, 4. ,


-11.66666667, -6. , 2.66666667],
[ 1. , -1.66666667, -9.66666667, 0. ,
4.33333333, 11. , 6.66666667],
[ 7. , -0.66666667, 2.33333333, -4. ,
7.33333333, -5. , -9.33333333]])

table - weekly_means # This doesn't!

ValueError Traceback (most recent call last)


---> 1 table - weekly_means # This doesn't!

ValueError: operands could not be broadcast together with shapes


(3,7) (3,)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Broadcasting rules
Compatible Arrays:
1. same ndim : all dimensions same or 1

2. di erent ndim : smaller shape prepended with ones & #1.


applies

Broadcasting: copy array values to missing dimensions, then


do arithmetic

PARALLEL PROGRAMMING WITH DASK IN PYTHON


PARALLEL PROGRAMMING WITH DASK IN PYTHON
print(table.shape) table - daily_means :
(3,7) - (7,) →
(3, 7) (3,7) - (1,7) : compatible

table - weekly_means :
print(daily_means.shape)
(3,7) - (3,) →
(3,7) - (1,3) :
(7,)
incompatible
print(weekly_means.shape)

table -
(3,) weekly_means.reshape((3,1
: (3,7) - (3,1) :
# This works now! compatible
result = table -
weekly_means.reshape((3,1))

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Connecting with Dask
data = np.loadtxt('', usecols=(1,2,3,4), dtype=np.int64)
data.shape

(366, 4)

type(data)

numpy.ndarray

data_dask = da.from_array(data, chunks=(366,2))


result = data_dask.std(axis=0) # Standard deviation down columns
result.compute()

array([ 15.08196053, 14.9456851 , 15.52548285, 14.47228351])

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Let's practice!
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N
Analyzing Weather
Data
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

Dhavide Aruliah
Director of Training, Anaconda
PARALLEL PROGRAMMING WITH DASK IN PYTHON
HDF5 format

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Using HDF5 files
import h5py # import module for reading HDF5 files

# Open HDF5 File object


data_store = h5py.File('tmax.2008.hdf5')
for key in data_store.keys(): # iterate over keys
print(key)

tmax

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Extracting Dask array from HDF5
data = data_store['tmax'] # bind to data for introspection
type(data)

h5py._hl.dataset.Dataset

data.shape # Aha, 3D array: (2D for each month)

(12, 444, 922)

import dask.array as da
data_dask = da.from_array(data, chunks=(1, 444, 922))

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Aggregating while ignoring NaNs
data_dask.min() # Yields unevaluated Dask Array

dask.array<amin-aggregate, shape=(), dtype=float64, chunksize=()>

data_dask.min().compute() # Force computation

nan

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Aggregating while ignoring NaNs
da.nanmin(data_dask).compute() # Ignoring nans

-22.329354809176536

lo = da.nanmin(data_dask).compute()
hi = da.nanmax(data_dask).compute()
print(lo, hi)

-22.3293548092 47.7625806255

PARALLEL PROGRAMMING WITH DASK IN PYTHON


PARALLEL PROGRAMMING WITH DASK IN PYTHON
Producing a visualization of data_dask
N_months = data_dask.shape[0] # Number of images
import matplotlib.pyplot as plt
fig, panels = plt.subplots(nrows=4, ncols=3)
for month, panel in zip(range(N_months), panels.flatten()):
im = panel.imshow(data_dask[month, :, :],
origin='lower',
vmin=lo, vmax=hi)
panel.set_title('2008-{:02d}'.format(month+1))
panel.axis('off')

plt.suptitle('Monthly averages (max. daily temperature [C])');


plt.colorbar(im, ax=panels.ravel().tolist()); # Common colorbar
plt.show()

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Stacking arrays
import numpy as np
a = np.ones(3); b = 2 * a; c = 3 * a
print(a, '\n'); print(b, '\n'); print(c)

[ 1. 1. 1.]

[ 2. 2. 2.]

[ 3. 3. 3.]

PARALLEL PROGRAMMING WITH DASK IN PYTHON


np.stack([a, b]) # Makes 2D array of shape (2,3)

array([[ 1., 1., 1.],


[ 2., 2., 2.]])

np.stack([a, b], axis=0) # Same as above

array([[ 1., 1., 1.],


[ 2., 2., 2.]])

np.stack([a, b], axis=1) # Makes 2D array of shape (3,2)

array([[ 1., 2.],


[ 1., 2.],
[ 1., 2.]])

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Stacking one-dimensional arrays
X = np.stack([a, b]); \
Y = np.stack([b, c]); \
Z = np.stack([c, a])
print(X, '\n'); print(Y, '\n'); print(Z, '\n')

[[ 1. 1. 1.]
[ 2. 2. 2.]]

[[ 2. 2. 2.]
[ 3. 3. 3.]]

[[ 3. 3. 3.]
[ 1. 1. 1.]]

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Stacking two-dimensional arrays
np.stack([X, Y, Z]) # Makes 3D array of shape (3, 2, 3)

array([[[ 1., 1., 1.],


[ 2., 2., 2.]],

[[ 2., 2., 2.],


[ 3., 3., 3.]],

[[ 3., 3., 3.],


[ 1., 1., 1.]]])

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Stacking two-dimensional arrays
# Makes 3D array of shape (2, 3, 3)
np.stack([X, Y, Z], axis=1)

array([[[ 1., 1., 1.],


[ 2., 2., 2.],
[ 3., 3., 3.]],

[[ 2., 2., 2.],


[ 3., 3., 3.],
[ 1., 1., 1.]]])

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Putting array blocks together

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Let's practice!
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

You might also like