-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
If I create a chunked lazily-computed array, writing it to disk with to_netcdf()
computes and writes it with the threading and distributed schedulers, but not with the multiprocessing scheduler. The only reference I've found when searching for the exception message comes from this StackOverflow question.
MCVE Code Sample
import dask
import numpy as np
import xarray as xr
if __name__ == "__main__":
# Simple worker function.
def inner(ds):
if sum(ds.dims.values()) == 0:
return ds
return ds**2
# Some random data to work with.
ds = xr.Dataset(
{"test": (("a", "b"), np.random.uniform(size=(1000, 1000)))},
{"a": np.arange(1000), "b": np.arange(1000)}
)
# Chunk it and apply the worker to each chunk.
ds_chunked = ds.chunk({"a": 100, "b": 200})
ds_squared = ds_chunked.map_blocks(inner)
# Thread pool scheduler can compute while writing.
dask.config.set(scheduler="threads")
print("Writing thread pool test to disk.")
ds_squared.to_netcdf("test-threads.nc")
# Local cluster with distributed works too.
c = dask.distributed.Client()
dask.config.set(scheduler=c)
print("Writing local cluster test to disk.")
ds_squared.to_netcdf("test-localcluster.nc")
# Process pool scheduler can compute.
dask.config.set(scheduler="processes")
print("Computing with process pool scheduler.")
ds_squared.compute()
# But it cannot compute while writing.
print("Trying to write process pool test to disk.")
ds_squared.to_netcdf("test-process.nc")
Expected Output
Complete netCDF files should be created from all three schedulers.
Problem Description
The thread pool and distributed local cluster schedulers result in a complete output. The process pool scheduler fails when trying to write (note that test-process.nc is created with the header and coordinate information, but no actual data is written). The traceback is:
Traceback (most recent call last):
File "bug.py", line 54, in <module>
ds_squared.to_netcdf("test-process.nc")
File "/usr/lib/python3.8/site-packages/xarray/core/dataset.py", line 1535, in to_netcdf
return to_netcdf(
File "/usr/lib/python3.8/site-packages/xarray/backends/api.py", line 1097, in to_netcdf
writes = writer.sync(compute=compute)
File "/usr/lib/python3.8/site-packages/xarray/backends/common.py", line 198, in sync
delayed_store = da.store(
File "/usr/lib/python3.8/site-packages/dask/array/core.py", line 923, in store
result.compute(**kwargs)
File "/usr/lib/python3.8/site-packages/dask/base.py", line 165, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/usr/lib/python3.8/site-packages/dask/base.py", line 436, in compute
results = schedule(dsk, keys, **kwargs)
File "/usr/lib/python3.8/site-packages/dask/multiprocessing.py", line 212, in get
result = get_async(
File "/usr/lib/python3.8/site-packages/dask/local.py", line 494, in get_async
fire_task()
File "/usr/lib/python3.8/site-packages/dask/local.py", line 460, in fire_task
dumps((dsk[key], data)),
File "/usr/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 62, in dumps
cp.dump(obj)
File "/usr/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 538, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 101, in __getstate__
context.assert_spawning(self)
File "/usr/lib/python3.8/multiprocessing/context.py", line 363, in assert_spawning
raise err
RuntimeError: Lock objects should only be shared between processes through inheritance
With a bit of editing of the system multiprocessing module I was able to determine that the lock being reported by this exception was the first lock created. I then added a breakpoint to the Lock constructor to get a traceback of what was creating it:
File | Line | Function |
---|---|---|
core/dataset.py | 1535 | Dataset.to_netcdf |
backends/api.py | 1071 | to_netcdf |
backends/netCDF4_.py | 350 | open |
backends/locks.py | 114 | get_write_lock |
backends/locks.py | 39 | _get_multiprocessing_lock |
This last function creates the offending multiprocessing.Lock() object. Note that there are six Locks constructed and so its possible that the later-created ones would also cause an issue.
The h5netcdf backend has the same problem with Lock. However the SciPy backend gives a NotImplementedError for this:
ds_squared.to_netcdf("test-process.nc", engine="scipy")
Traceback (most recent call last):
File "bug.py", line 54, in <module>
ds_squared.to_netcdf("test-process.nc", engine="scipy")
File "/usr/lib/python3.8/site-packages/xarray/core/dataset.py", line 1535, in to_netcdf
return to_netcdf(
File "/usr/lib/python3.8/site-packages/xarray/backends/api.py", line 1056, in to_netcdf
raise NotImplementedError(
NotImplementedError: Writing netCDF files with the scipy backend is not currently supported with dask's multiprocessing scheduler
I'm not sure how simple it would be to get this working with the multiprocessing scheduler, or how vital it is given that the distributed scheduler works. If nothing else, it would be good to get the same NotImplementedError as with the SciPy backend.
Output of xr.show_versions()
commit: None
python: 3.8.1 (default, Jan 22 2020, 06:38:00)
[GCC 9.2.0]
python-bits: 64
OS: Linux
OS-release: 5.5.4-arch1-1
machine: x86_64
processor:
byteorder: little
LC_ALL: None
LANG: en_NZ.UTF-8
LOCALE: en_NZ.UTF-8
libhdf5: 1.10.5
libnetcdf: 4.7.3
xarray: 0.15.0
pandas: 1.0.1
numpy: 1.18.1
scipy: 1.4.1
netCDF4: 1.5.3
pydap: None
h5netcdf: 0.7.4
h5py: 2.10.0
Nio: None
zarr: None
cftime: 1.1.0
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: None
dask: 2.10.1
distributed: 2.10.0
matplotlib: 3.1.3
cartopy: 0.17.0
seaborn: None
numbagg: None
setuptools: 45.2.0
pip: 19.3
conda: None
pytest: 5.3.5
IPython: 7.12.0
sphinx: 2.4.2