Parallel Writes#
A common pattern with large distributed write jobs is to first initialize the dataset on a disk with all appropriate metadata, and any coordinate variables. Following this a large write job is kicked off in a distributed setting, where each worker is responsible for an independent "region" of the output.
Why is Icechunk different from any other Zarr store?#
The reason is that unlike Zarr, Icechunk is a "stateful" store. The Session object keeps a record of all writes, that is then bundled together in a commit. Thus Session.commit must be executed on a Session object that knows about all writes, including those executed remotely in a multi-processing or any other remote execution context.
Info
Learn about Icechunk consistency with a clichéd but instructive example in this blog post
Example#
Here is how you can execute such writes with Icechunk, illustrate with a ThreadPoolExecutor. First read some example data, and create an Icechunk Repository.
import xarray as xr
import tempfile
from icechunk import Repository, local_filesystem_storage
ds = xr.tutorial.open_dataset("rasm").isel(time=slice(24))
repo = Repository.create(local_filesystem_storage(tempfile.TemporaryDirectory().name))
session = repo.writable_session("main")
We will orchestrate so that each task writes one timestep. This is an arbitrary choice but determines what we set for the Zarr chunk size.
Initialize the dataset using Dataset.to_zarr and compute=False, this will NOT write any chunked array data, but will write all array metadata, and any in-memory arrays (only time in this case).
ds.to_zarr(session.store, compute=False, encoding={"Tair": {"chunks": chunks}}, mode="w", consolidated=False)
# this commit is optional, but may be useful in your workflow
print(session.commit("initialize store"))
B40PH9H1N203WCAJ8SJG
Multi-threading#
First define a function that constitutes one "write task".
from icechunk import Session
def write_timestamp(*, itime: int, session: Session) -> None:
# pass a list to isel to preserve the time dimension
ds = xr.tutorial.open_dataset("rasm").isel(time=[itime])
# region="auto" tells Xarray to infer which "region" of the output arrays to write to.
ds.to_zarr(session.store, region="auto", consolidated=False)
Now execute the writes.
from concurrent.futures import ThreadPoolExecutor, wait
session = repo.writable_session("main")
with ThreadPoolExecutor() as executor:
# submit the writes
futures = [executor.submit(write_timestamp, itime=i, session=session) for i in range(ds.sizes["time"])]
wait(futures)
print(session.commit("finished writes"))
Verify that the writes worked as expected:
ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False)
xr.testing.assert_identical(ds, ondisk)
Distributed writes#
There are fundamentally two different modes for distributed writes in Icechunk:
- "Cooperative" distributed writes, in which all of the changes being written are part of the same transaction. The point of this is to allow large-scale, massively-parallel writing to the store as part of a single coordinated job. In this scenario, it's the user's job to align the writing process with the Zarr chunks and avoid inconsistent metadata updates.
- "Uncooperative" writes, in which multiple workers are attempting to write to the same store in an uncoordinated way. This path relies on the optimistic concurrency mechanism to detect and resolve conflicts.
Info
This code will not execute with a ProcessPoolExecutor without some changes. Specifically it requires wrapping the code in a if __name__ == "__main__": block. See a full executable example here.
Cooperative distributed writes#
Any task execution framework (e.g. ProcessPoolExecutor, Joblib, Lithops, Dask Distributed, Ray, etc.) can be used instead of the ThreadPoolExecutor. However such workloads should account for Icechunk being a "stateful" store that records changes executed in a write session.
There are three key points to keep in mind:
- The
write_taskfunction must return theSession. It contains a record of the changes executed by this task. These changes must be manually communicated back to the coordinating process, since each of the distributed processes are working with their own independentSessioninstance. - Icechunk requires that users obtain a distributable writable
SessionusingSession.fork(). This creates a newForkSessionobject that can be pickled. Sessions can be forked only when they have no uncommitted changes. - The user must manually merge the
ForkSessionobjects into theSessionto create a meaningful commit.
First we modify write_task to return the Session:
from icechunk import Session
from icechunk.session import ForkSession
def write_timestamp(*, itime: int, session: ForkSession) -> ForkSession:
# pass a list to isel to preserve the time dimension
ds = xr.tutorial.open_dataset("rasm").isel(time=[itime])
# region="auto" tells Xarray to infer which "region" of the output arrays to write to.
ds.to_zarr(session.store, region="auto", consolidated=False)
return session
The steps for making a distribute write are as follows:
- fork the Session with
Session.fork, - gather the ForkSessions from individual tasks,
- merge the
Sessionwith the gathered ForkSessions usingSession.merge, and finally - make a successful commit using
Session.commit.
from concurrent.futures import ProcessPoolExecutor
session = repo.writable_session("main")
with ProcessPoolExecutor() as executor:
# obtain a writable session that can be pickled.
fork = session.fork()
# submit the writes
futures = [
executor.submit(write_timestamp, itime=i, session=fork)
for i in range(ds.sizes["time"])
]
# grab the Session objects from each individual write task
remote_sessions = [f.result() for f in futures]
# manually merge the remote sessions in to the local session
session.merge(*remote_sessions)
print(session.commit("finished writes"))
Verify that the writes worked as expected:
ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False)
xr.testing.assert_identical(ds, ondisk)
Uncooperative distributed writes#
Warning
Using multiprocessing start method 'fork' will result in deadlock when trying to open an existing repository. This happens because the files behind the repository needs to be locked. The 'fork' start method will copying not only the lock, but also the state of the lock. Thus all child processes will copy the file lock in an acquired state, leaving them hanging indefinitely waiting for the file lock to be released, which never happens. Polars has a similar issue, which is described in their documentation about multiprocessing. Putting mp.set_start_method('forkserver') at the beginning of the script will solve this issue. Only necessary for POSIX systems except MacOS, because MacOS and Windows do not support the fork method.
Here is an example of uncooperative distributed writes using multiprocessing, based on this discussion.
import multiprocessing as mp
import icechunk as ic
import zarr
def get_storage():
return ic.local_filesystem_storage(tempfile.TemporaryDirectory().name)
def worker(i):
print(f"Stated worker {i}")
storage = get_storage()
repo = ic.Repository.open(storage)
# keep trying until it succeeds
while True:
try:
session = repo.writable_session("main")
z = zarr.open(session.store, mode="r+")
print(f"Opened store for {i} | {dict(z.attrs)}")
a = z.attrs.get("done", [])
a.append(i)
z.attrs["done"] = a
session.commit(f"wrote from worker {i}")
break
except ic.ConflictError:
print(f"Conflict for {i}, retrying")
pass
def main():
# This is necessary on linux systems
mp.set_start_method('forkserver')
storage = get_storage()
repo = ic.Repository.create(storage)
session = repo.writable_session("main")
zarr.create(
shape=(10, 10),
chunks=(5, 5),
store=session.store,
overwrite=True,
)
session.commit("initialized dataset")
p1 = mp.Process(target=worker, args=(1,))
p2 = mp.Process(target=worker, args=(2,))
p1.start()
p2.start()
p1.join()
p2.join()
session = repo.readonly_session(branch="main")
z = zarr.open(session.store, mode="r")
print(z.attrs["done"])
print(list(repo.ancestry(branch="main")))
if __name__ == "__main__":
main()
This should output something like the following. (Note that the order of the writes is not guaranteed.)
Stated worker 1
Stated worker 2
Opened store for 1 | {}
Opened store for 2 | {}
Conflict for 1, retrying
Opened store for 1 | {'done': [2]}
[2, 1]
[SnapshotInfo(id="MGPV1YE1SY0799AZFFB0", parent_id=YAN3D2N7ANCNKCFN3JSG, written_at=datetime.datetime(2025,3,4,21,40,57,19985, tzinfo=datetime.timezone.utc), message="wrote from..."), SnapshotInfo(id="YAN3D2N7ANCNKCFN3JSG", parent_id=0M5H3J6SC8MYBQYWACC0, written_at=datetime.datetime(2025,3,4,21,40,56,734126, tzinfo=datetime.timezone.utc), message="wrote from..."), SnapshotInfo(id="0M5H3J6SC8MYBQYWACC0", parent_id=WKKQ9K7ZFXZER26SES5G, written_at=datetime.datetime(2025,3,4,21,40,56,47192, tzinfo=datetime.timezone.utc), message="initialize..."), SnapshotInfo(id="WKKQ9K7ZFXZER26SES5G", parent_id=None, written_at=datetime.datetime(2025,3,4,21,40,55,868277, tzinfo=datetime.timezone.utc), message="Repository...")]