Skip to content

File written by to_netcdf() not closed when Dataset is generated from dask delayed object using a dask Client() #2376

@astoeriko

Description

@astoeriko

Code Sample

import numpy as np
import xarray as xr
import dask.array as da
import dask
from dask.distributed import Client

@dask.delayed
def run_sim(n_time):
    result = np.array([np.random.randn(n_time)])
    return result

client = Client()

# Parameters
n_sims = 5
n_time = 100
output_file = 'out.nc'

# if I use this as output, computing the data after reopening the file 
#produces an error
out = da.stack([da.from_delayed(run_sim(n_time), (1,n_time,),np.float64) for i in range(n_sims)])

# If I use this as output, reopening the netcdf file is no problem
#out = np.random.randn(n_sims,2,n_time) 

ds = xr.Dataset({'var1': (['realization', 'time'], out[:,0,:])},
                 coords={'realization': np.arange(n_sims),
                         'time': np.arange(n_time)*.1})

# Save to a netcdf file -> at this point, computations will be carried out
ds.to_netcdf(output_file, engine='netcdf4')

# Reopen the file
with xr.open_dataset(output_file, chunks={'realization': 2}, engine='netcdf4')as ds:
    # Now acces the data
    ds.compute()

Problem description

When I generate a Dataset using a dask delayed object and save the Dataset to a netcdf file, it seems that the file is not properly closed. When trying to reopen it, I get an error (see below). Also, ncdump -h fails on the output file after it has been written. However, after the first unsuccessful attempt to open the file, the file seems to be closed. I can run ncdump -h on it and a second attempt to open it works.

Note that the problem only arises if I

  • store output form a dask delayed object in the Dataset (not if I store a simple numpy array of random numbers)
  • start a dask.distributed.Client()

This issue is related to my question on stackoverflow.

Traceback of the python code:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-2-83478559c186> in <module>()
     36 with xr.open_dataset(output_file, chunks={'realization': 2}, engine='netcdf4')as ds:
     37     # Now acces the data
---> 38     ds.compute()

~/miniconda3/lib/python3.6/site-packages/xarray/core/dataset.py in compute(self, **kwargs)
    592         """
    593         new = self.copy(deep=False)
--> 594         return new.load(**kwargs)
    595 
    596     def _persist_inplace(self, **kwargs):

~/miniconda3/lib/python3.6/site-packages/xarray/core/dataset.py in load(self, **kwargs)
    489 
    490             # evaluate all the dask arrays simultaneously
--> 491             evaluated_data = da.compute(*lazy_data.values(), **kwargs)
    492 
    493             for k, data in zip(lazy_data, evaluated_data):

~/miniconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    400     keys = [x.__dask_keys__() for x in collections]
    401     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 402     results = schedule(dsk, keys, **kwargs)
    403     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    404 

~/miniconda3/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
   2191             try:
   2192                 results = self.gather(packed, asynchronous=asynchronous,
-> 2193                                       direct=direct)
   2194             finally:
   2195                 for f in futures.values():

~/miniconda3/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1566             return self.sync(self._gather, futures, errors=errors,
   1567                              direct=direct, local_worker=local_worker,
-> 1568                              asynchronous=asynchronous)
   1569 
   1570     @gen.coroutine

~/miniconda3/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    651             return future
    652         else:
--> 653             return sync(self.loop, func, *args, **kwargs)
    654 
    655     def __repr__(self):

~/miniconda3/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    275             e.wait(10)
    276     if error[0]:
--> 277         six.reraise(*error[0])
    278     else:
    279         return result[0]

~/miniconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/miniconda3/lib/python3.6/site-packages/distributed/utils.py in f()
    260             if timeout is not None:
    261                 future = gen.with_timeout(timedelta(seconds=timeout), future)
--> 262             result[0] = yield future
    263         except Exception as exc:
    264             error[0] = sys.exc_info()

~/miniconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

~/miniconda3/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

~/miniconda3/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1445                             six.reraise(type(exception),
   1446                                         exception,
-> 1447                                         traceback)
   1448                     if errors == 'skip':
   1449                         bad_keys.add(key)

~/miniconda3/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/miniconda3/lib/python3.6/site-packages/dask/array/core.py in getter()
     87         c = a[b]
     88         if asarray:
---> 89             c = np.asarray(c)
     90     finally:
     91         if lock:

~/miniconda3/lib/python3.6/site-packages/numpy/core/numeric.py in asarray()
    490 
    491     """
--> 492     return array(a, dtype, copy=False, order=order)
    493 
    494 

~/miniconda3/lib/python3.6/site-packages/xarray/core/indexing.py in __array__()
    600 
    601     def __array__(self, dtype=None):
--> 602         return np.asarray(self.array, dtype=dtype)
    603 
    604     def __getitem__(self, key):

~/miniconda3/lib/python3.6/site-packages/numpy/core/numeric.py in asarray()
    490 
    491     """
--> 492     return array(a, dtype, copy=False, order=order)
    493 
    494 

~/miniconda3/lib/python3.6/site-packages/xarray/core/indexing.py in __array__()
    506     def __array__(self, dtype=None):
    507         array = as_indexable(self.array)
--> 508         return np.asarray(array[self.key], dtype=None)
    509 
    510     def transpose(self, order):

~/miniconda3/lib/python3.6/site-packages/xarray/backends/netCDF4_.py in __getitem__()
     62             getitem = operator.getitem
     63 
---> 64         with self.datastore.ensure_open(autoclose=True):
     65             try:
     66                 array = getitem(self.get_array(), key.tuple)

~/miniconda3/lib/python3.6/contextlib.py in __enter__()
     79     def __enter__(self):
     80         try:
---> 81             return next(self.gen)
     82         except StopIteration:
     83             raise RuntimeError("generator didn't yield") from None

~/miniconda3/lib/python3.6/site-packages/xarray/backends/common.py in ensure_open()
    502         if not self._isopen:
    503             try:
--> 504                 self._ds = self._opener()
    505                 self._isopen = True
    506                 yield

~/miniconda3/lib/python3.6/site-packages/xarray/backends/netCDF4_.py in _open_netcdf4_group()
    229     import netCDF4 as nc4
    230 
--> 231     ds = nc4.Dataset(filename, mode=mode, **kwargs)
    232 
    233     with close_on_error(ds):

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Dataset.__init__()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4._ensure_nc_success()

OSError: [Errno -101] NetCDF: HDF error: b'/home/user/code/test/out.nc'

Output of ncdump -h after writing the file (before reopening):

HDF5-DIAG: Error detected in HDF5 (1.10.2) thread 139952254916352:
  #000: H5F.c line 511 in H5Fopen(): unable to open file
    major: File accessibilty
    minor: Unable to open file
  #001: H5Fint.c line 1519 in H5F_open(): unable to lock the file
    major: File accessibilty
    minor: Unable to open file
  #002: H5FD.c line 1650 in H5FD_lock(): driver lock request failed
    major: Virtual File Layer
    minor: Can't update object
  #003: H5FDsec2.c line 941 in H5FD_sec2_lock(): unable to lock file, errno = 11, error message = 'Resource temporarily unavailable'
    major: File accessibilty
    minor: Bad file ID accessed
ncdump: out.nc: NetCDF: HDF error

Expected Output

The netcdf-file is closed after writing it with to_netcdf().

Output of xr.show_versions()

Details

INSTALLED VERSIONS

commit: None
python: 3.6.5.final.0
python-bits: 64
OS: Linux
OS-release: 4.4.0-133-generic
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8

xarray: 0.10.8
pandas: 0.23.3
numpy: 1.14.5
scipy: 1.1.0
netCDF4: 1.4.1
h5netcdf: 0.6.2
h5py: 2.8.0
Nio: None
zarr: None
bottleneck: 1.2.1
cyordereddict: None
dask: 0.18.2
distributed: 1.22.1
matplotlib: 2.2.2
cartopy: None
seaborn: 0.9.0
setuptools: 40.0.0
pip: 18.0
conda: 4.5.10
pytest: 3.6.4
IPython: 6.5.0
sphinx: 1.7.5

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions