Skip to content

Home / dask

Distributed Writes with dask#

You can use Icechunk in conjunction with Xarray and Dask to perform large-scale distributed writes from a multi-node cluster. However, because of how Icechunk works, it's not possible to use the existing Dask.Array.to_zarr or Xarray.Dataset.to_zarr functions with either the Dask multiprocessing or distributed schedulers. (It is fine with the multithreaded scheduler.)

Instead, Icechunk provides its own specialized functions to make distributed writes with Dask and Xarray. This page explains how to use these specialized functions.

Start with an icechunk store and dask arrays.

import icechunk
import tempfile

# initialize the icechunk store
storage = icechunk.local_filesystem_storage(tempfile.TemporaryDirectory().name)
repo = icechunk.Repository.create(storage)
session = repo.writable_session("main")

Icechunk + Dask#

Use icechunk.dask.store_dask to write a Dask array to an Icechunk store. The API follows that of dask.array.store without support for the compute kwarg.

First create a dask array to write:

import dask.array as da
shape = (100, 100)
dask_chunks = (20, 20)
dask_array = da.random.random(shape, chunks=dask_chunks)

Now create the Zarr array you will write to.

import zarr

zarr_chunks = (10, 10)
group = zarr.group(store=session.store, overwrite=True)

zarray = group.create_array(
    "array",
    shape=shape,
    chunks=zarr_chunks,
    dtype="f8",
    fill_value=float("nan"),
)
session.commit("initialize array")
Note that the chunks in the store are a divisor of the dask chunks. This means each individual write task is independent, and will not conflict. It is your responsibility to ensure that such conflicts are avoided.

First remember to fork the session before re-opening the Zarr array. store_dask will merge all the remote write sessions on the cluster before returning back a single merged ForkSession.

import icechunk.dask

session = repo.writable_session("main")
fork = session.fork()
zarray = zarr.open_array(fork.store, path="array")
remote_session = icechunk.dask.store_dask(
    sources=[dask_array],
    targets=[zarray]
)

Merge the remote session in to the local Session
session.merge(remote_session)

Finally commit your changes!

print(session.commit("wrote a dask array!"))

PWC6XCS9HR4AXJV1BAH0

Icechunk + Dask + Xarray#

The icechunk.xarray.to_icechunk is functionally identical to Xarray's Dataset.to_zarr, including many of the same keyword arguments. Notably the compute kwarg is not supported. Now roundtrip an xarray dataset

import distributed
import icechunk.xarray
import xarray as xr

client = distributed.Client()

session = repo.writable_session("main")
dataset = xr.tutorial.open_dataset(
    "rasm",
    chunks={"time": 1}).isel(time=slice(24)
    )

# `to_icechunk` takes care of handling the forking
icechunk.xarray.to_icechunk(dataset, session, mode="w")
# remember you must commit before executing a distributed read.
print(session.commit("wrote an Xarray dataset!"))

roundtripped = xr.open_zarr(session.store, consolidated=False)
print(dataset.identical(roundtripped))
2MECMXNZ8FG7YSBZMW00
True