Skip to content

to_netcdf() doesn't work with multiprocessing scheduler #3781

@bcbnz

Description

@bcbnz

If I create a chunked lazily-computed array, writing it to disk with to_netcdf() computes and writes it with the threading and distributed schedulers, but not with the multiprocessing scheduler. The only reference I've found when searching for the exception message comes from this StackOverflow question.

MCVE Code Sample

import dask
import numpy as np
import xarray as xr

if __name__ == "__main__":
    # Simple worker function.
    def inner(ds):
        if sum(ds.dims.values()) == 0:
            return ds
        return ds**2

    # Some random data to work with.
    ds = xr.Dataset(
            {"test": (("a", "b"), np.random.uniform(size=(1000, 1000)))},
            {"a": np.arange(1000), "b": np.arange(1000)}
    )

    # Chunk it and apply the worker to each chunk.
    ds_chunked = ds.chunk({"a": 100, "b": 200})
    ds_squared = ds_chunked.map_blocks(inner)

    # Thread pool scheduler can compute while writing.
    dask.config.set(scheduler="threads")
    print("Writing thread pool test to disk.")
    ds_squared.to_netcdf("test-threads.nc")

    # Local cluster with distributed works too.
    c = dask.distributed.Client()
    dask.config.set(scheduler=c)
    print("Writing local cluster test to disk.")
    ds_squared.to_netcdf("test-localcluster.nc")

    # Process pool scheduler can compute.
    dask.config.set(scheduler="processes")
    print("Computing with process pool scheduler.")
    ds_squared.compute()

    # But it cannot compute while writing.
    print("Trying to write process pool test to disk.")
    ds_squared.to_netcdf("test-process.nc")

Expected Output

Complete netCDF files should be created from all three schedulers.

Problem Description

The thread pool and distributed local cluster schedulers result in a complete output. The process pool scheduler fails when trying to write (note that test-process.nc is created with the header and coordinate information, but no actual data is written). The traceback is:

Traceback (most recent call last):
  File "bug.py", line 54, in <module>
    ds_squared.to_netcdf("test-process.nc")
  File "/usr/lib/python3.8/site-packages/xarray/core/dataset.py", line 1535, in to_netcdf
    return to_netcdf(
  File "/usr/lib/python3.8/site-packages/xarray/backends/api.py", line 1097, in to_netcdf
    writes = writer.sync(compute=compute)
  File "/usr/lib/python3.8/site-packages/xarray/backends/common.py", line 198, in sync
    delayed_store = da.store(
  File "/usr/lib/python3.8/site-packages/dask/array/core.py", line 923, in store
    result.compute(**kwargs)
  File "/usr/lib/python3.8/site-packages/dask/base.py", line 165, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/lib/python3.8/site-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/lib/python3.8/site-packages/dask/multiprocessing.py", line 212, in get
    result = get_async(
  File "/usr/lib/python3.8/site-packages/dask/local.py", line 494, in get_async
    fire_task()
  File "/usr/lib/python3.8/site-packages/dask/local.py", line 460, in fire_task
    dumps((dsk[key], data)),
  File "/usr/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 62, in dumps
    cp.dump(obj)
  File "/usr/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 538, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 101, in __getstate__
    context.assert_spawning(self)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 363, in assert_spawning
    raise err
RuntimeError: Lock objects should only be shared between processes through inheritance

With a bit of editing of the system multiprocessing module I was able to determine that the lock being reported by this exception was the first lock created. I then added a breakpoint to the Lock constructor to get a traceback of what was creating it:

File Line Function
core/dataset.py 1535 Dataset.to_netcdf
backends/api.py 1071 to_netcdf
backends/netCDF4_.py 350 open
backends/locks.py 114 get_write_lock
backends/locks.py 39 _get_multiprocessing_lock

This last function creates the offending multiprocessing.Lock() object. Note that there are six Locks constructed and so its possible that the later-created ones would also cause an issue.

The h5netcdf backend has the same problem with Lock. However the SciPy backend gives a NotImplementedError for this:

ds_squared.to_netcdf("test-process.nc", engine="scipy")
Traceback (most recent call last):
  File "bug.py", line 54, in <module>
    ds_squared.to_netcdf("test-process.nc", engine="scipy")
  File "/usr/lib/python3.8/site-packages/xarray/core/dataset.py", line 1535, in to_netcdf
    return to_netcdf(
  File "/usr/lib/python3.8/site-packages/xarray/backends/api.py", line 1056, in to_netcdf
    raise NotImplementedError(
NotImplementedError: Writing netCDF files with the scipy backend is not currently supported with dask's multiprocessing scheduler

I'm not sure how simple it would be to get this working with the multiprocessing scheduler, or how vital it is given that the distributed scheduler works. If nothing else, it would be good to get the same NotImplementedError as with the SciPy backend.

Output of xr.show_versions()

commit: None
python: 3.8.1 (default, Jan 22 2020, 06:38:00)
[GCC 9.2.0]
python-bits: 64
OS: Linux
OS-release: 5.5.4-arch1-1
machine: x86_64
processor:
byteorder: little
LC_ALL: None
LANG: en_NZ.UTF-8
LOCALE: en_NZ.UTF-8
libhdf5: 1.10.5
libnetcdf: 4.7.3

xarray: 0.15.0
pandas: 1.0.1
numpy: 1.18.1
scipy: 1.4.1
netCDF4: 1.5.3
pydap: None
h5netcdf: 0.7.4
h5py: 2.10.0
Nio: None
zarr: None
cftime: 1.1.0
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: None
dask: 2.10.1
distributed: 2.10.0
matplotlib: 3.1.3
cartopy: 0.17.0
seaborn: None
numbagg: None
setuptools: 45.2.0
pip: 19.3
conda: None
pytest: 5.3.5
IPython: 7.12.0
sphinx: 2.4.2

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions