Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
8f6d4a7
adding async for datatrees
aladinor Sep 13, 2025
77d4357
adding async method to _maybe_create_index
aladinor Sep 13, 2025
abb5f8d
using async as complete instead of gathering results
aladinor Sep 13, 2025
0c9c66c
adding tests for open_group, open_dtree and _maybe_create_index using…
aladinor Sep 14, 2025
7e174bf
ensuing _maybe_create_default_indexes_async is compatible with zarr v2
aladinor Sep 14, 2025
182c794
resolving the mypy type errors
aladinor Sep 14, 2025
db10454
attemp 2: resolving mypy type errors
aladinor Sep 14, 2025
dece3de
refactor: consolidate async index creation for DataTree opening
aladinor Dec 12, 2025
b48e8ea
perf: remove unnecessary semaphore from async datatree opening
aladinor Dec 12, 2025
e20b386
fix: add zarr v2 fallback for datatree opening
aladinor Dec 12, 2025
c2cb527
updating whats-new.rst file
aladinor Dec 13, 2025
6b5b4d3
fix: re-add semaphore to async datatree opening to prevent deadlocks
aladinor Dec 13, 2025
5d69cee
refactor: use async index creation in sync open_datatree for zarr
aladinor Jan 10, 2026
b0a1e5f
fix: add type ignore for mypy arg-type error in open_datatree_async
aladinor Jan 10, 2026
3f8f223
fix: add type annotations and fix Windows path in test
aladinor Jan 10, 2026
ff5cd7e
fix: add type annotations to nested async functions for mypy
aladinor Jan 10, 2026
ff2bd20
refactor: remove public open_datatree_async API per review feedback
aladinor Jan 14, 2026
d68d44c
refactor: convert _build_group_members to module-level helper function
aladinor Jan 16, 2026
6f78621
fix: add cast for mypy type checking in _build_group_members
aladinor Jan 16, 2026
16a5558
Update xarray/backends/api.py
aladinor Jan 16, 2026
df8b61f
refactor: use sync index creation in _maybe_create_default_indexes_async
aladinor Jan 16, 2026
05afdae
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 16, 2026
d4880b5
Update xarray/backends/api.py
aladinor Jan 16, 2026
6a4e87a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 16, 2026
89d9721
Update xarray/backends/api.py
aladinor Jan 16, 2026
980c882
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 16, 2026
27ba9f6
Address PR review: TaskGroup, max_concurrency, and open_dataset_async
aladinor Feb 3, 2026
6f4219c
Fix async deadlock risks: use zarr built-in members(), run_in_executo…
aladinor Feb 16, 2026
f5cb80c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 16, 2026
cd7bd60
Fix zarr v2 fallback in _iter_zarr_groups_async
aladinor Feb 16, 2026
15203d9
Trim verbose comments to match xarray style
aladinor Feb 16, 2026
06f3df8
Remove double index creation from async DataTree open
aladinor Feb 17, 2026
06a4a55
Use native async for DataTree open, threads only for CPU decode
aladinor Feb 18, 2026
ebf203e
Support glob patterns in open_datatree(group=...) for selective group…
aladinor Feb 22, 2026
31174a7
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ New Features

- Added ``inherit='all_coords'`` option to :py:meth:`DataTree.to_dataset` to inherit
all parent coordinates, not just indexed ones (:issue:`10812`, :pull:`11230`).
- Added ``max_concurrency`` parameter to :py:func:`open_datatree` to control
the maximum number of concurrent I/O operations when opening groups in parallel
with the Zarr backend (:pull:`10742`).
By `Alfonso Ladino <https://github.com/aladinor>`_.

Breaking Changes
Expand Down Expand Up @@ -337,6 +340,9 @@ Documentation
Performance
~~~~~~~~~~~

- Improve performance of :py:func:`open_datatree` for zarr stores by using async/concurrent
loading of groups and indexes (:pull:`10742`).
By `Alfonso Ladino <https://github.com/aladinor>`_.
- Add a fastpath to the backend plugin system for standard engines (:issue:`10178`, :pull:`10937`).
By `Sam Levang <https://github.com/slevang>`_.
- Optimize :py:class:`~xarray.coding.variables.CFMaskCoder` decoder (:pull:`11105`).
Expand Down
97 changes: 92 additions & 5 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
NestedSequence,
T_Chunks,
)
from xarray.core.variable import Variable

T_NetcdfEngine = Literal["netcdf4", "scipy", "h5netcdf"]
T_Engine = Union[
Expand Down Expand Up @@ -349,7 +350,47 @@ def _datatree_from_backend_datatree(

_protect_datatree_variables_inplace(backend_tree, cache)
if create_default_indexes:
tree = backend_tree.map_over_datasets(_maybe_create_default_indexes)
_use_zarr_async = False
if engine == "zarr":
from xarray.backends.zarr import _zarr_v3

_use_zarr_async = _zarr_v3()

if _use_zarr_async:
from zarr.core.sync import sync as zarr_sync

async def create_indexes_async() -> dict[str, Dataset]:
import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(
max_workers=10, thread_name_prefix="xarray-idx"
)
try:
results: dict[str, Dataset] = {}

async def _create_index_for_node(
path: str, ds: Dataset
) -> tuple[str, Dataset]:
return path, await _maybe_create_default_indexes_async(
ds, executor=executor
)

tasks = [
_create_index_for_node(path, node.dataset)
for path, [node] in group_subtrees(backend_tree)
]
for fut in asyncio.as_completed(tasks):
path, ds = await fut
results[path] = ds
return results
finally:
executor.shutdown(wait=True, cancel_futures=True)

results = zarr_sync(create_indexes_async())
tree = DataTree.from_dict(results, name=backend_tree.name)
else:
tree = backend_tree.map_over_datasets(_maybe_create_default_indexes)
else:
tree = backend_tree
if chunks is not None:
Expand Down Expand Up @@ -386,6 +427,33 @@ def _datatree_from_backend_datatree(
return tree


async def _maybe_create_default_indexes_async(ds: Dataset, executor=None) -> Dataset:
import asyncio

to_index_names = [
name
for name, coord in ds.coords.items()
if coord.dims == (name,) and name not in ds.xindexes
]

if not to_index_names:
return ds

loop = asyncio.get_running_loop()

async def load_var(var: Variable) -> Variable:
try:
return await var.load_async()
except NotImplementedError:
return await loop.run_in_executor(executor, var.load)

await asyncio.gather(*[load_var(ds.variables[name]) for name in to_index_names])

variables = {name: ds.variables[name] for name in to_index_names}
new_coords = Coordinates(variables)
return ds.assign_coords(new_coords)


def open_dataset(
filename_or_obj: T_PathFileOrDataStore,
*,
Expand Down Expand Up @@ -882,6 +950,7 @@ def open_datatree(
chunked_array_type: str | None = None,
from_array_kwargs: dict[str, Any] | None = None,
backend_kwargs: dict[str, Any] | None = None,
max_concurrency: int | None = None,
**kwargs,
) -> DataTree:
"""
Expand Down Expand Up @@ -1014,15 +1083,26 @@ def open_datatree(
chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg.
For example if :py:func:`dask.array.Array` objects are used for chunking, additional kwargs will be passed
to :py:func:`dask.array.from_array`. Experimental API that should not be relied upon.
max_concurrency : int, optional
Maximum number of concurrent I/O operations when opening groups in
parallel. This limits the number of groups that are loaded simultaneously.
Useful for controlling resource usage with large datatrees or stores
that may have limitations on concurrent access (e.g., icechunk).
Only used by backends that support parallel loading (currently Zarr v3).
If None (default), the backend uses its default value (typically 10).
backend_kwargs: dict
Additional keyword arguments passed on to the engine open function,
equivalent to `**kwargs`.
**kwargs: dict
Additional keyword arguments passed on to the engine open function.
For example:

- 'group': path to the group in the given file to open as the root group as
a str.
- 'group': path to the group in the given file to open as the root
group as a str. If the string contains glob metacharacters
(``*``, ``?``, ``[``), it is interpreted as a pattern and only
groups whose paths match are loaded (along with their ancestors).
For example, ``group="*/sweep_0"`` loads every ``sweep_0`` one
level deep while skipping sibling groups.
- 'lock': resource lock to use when reading data from disk. Only
relevant when using dask or another form of parallelism. By default,
appropriate locks are chosen to safely read and write files with the
Expand Down Expand Up @@ -1074,6 +1154,9 @@ def open_datatree(
)
overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)

if max_concurrency is not None:
kwargs["max_concurrency"] = max_concurrency

backend_tree = backend.open_datatree(
filename_or_obj,
drop_variables=drop_variables,
Expand Down Expand Up @@ -1265,8 +1348,12 @@ def open_groups(
Additional keyword arguments passed on to the engine open function.
For example:

- 'group': path to the group in the given file to open as the root group as
a str.
- 'group': path to the group in the given file to open as the root
group as a str. If the string contains glob metacharacters
(``*``, ``?``, ``[``), it is interpreted as a pattern and only
groups whose paths match are loaded (along with their ancestors).
For example, ``group="*/sweep_0"`` loads every ``sweep_0`` one
level deep while skipping sibling groups.
- 'lock': resource lock to use when reading data from disk. Only
relevant when using dask or another form of parallelism. By default,
appropriate locks are chosen to safely read and write files with the
Expand Down
31 changes: 31 additions & 0 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,37 @@ def _iter_nc_groups(root, parent="/"):
yield from _iter_nc_groups(group, parent=gpath)


def _is_glob_pattern(pattern: str) -> bool:
return any(c in pattern for c in "*?[")


def _filter_group_paths(group_paths: Iterable[str], pattern: str) -> list[str]:
from xarray.core.treenode import NodePath

matched: set[str] = {"/"}
for path in group_paths:
np_ = NodePath(path)
if np_.match(pattern):
matched.add(path)
for parent in np_.parents:
p = str(parent)
if p:
matched.add(p)

return [p for p in group_paths if p in matched]


def _resolve_group_and_filter(
group: str | None,
all_group_paths: list[str],
) -> tuple[str | None, list[str]]:
if group is None:
return None, all_group_paths
if _is_glob_pattern(group):
return None, _filter_group_paths(all_group_paths, group)
return group, all_group_paths


def find_root_and_group(ds):
"""Find the root and group name of a netCDF4/h5netcdf dataset."""
hierarchy = ()
Expand Down
22 changes: 15 additions & 7 deletions xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,11 @@ def open_groups_as_dict(
open_kwargs: dict[str, Any] | None = None,
**kwargs,
) -> dict[str, Dataset]:
from xarray.backends.common import _iter_nc_groups
from xarray.backends.common import (
_is_glob_pattern,
_iter_nc_groups,
_resolve_group_and_filter,
)
from xarray.core.treenode import NodePath
from xarray.core.utils import close_on_error

Expand All @@ -664,10 +668,12 @@ def open_groups_as_dict(
emit_phony_dims_warning, phony_dims = _check_phony_dims(phony_dims)

filename_or_obj = _normalize_filename_or_obj(filename_or_obj)

effective_group = None if (group and _is_glob_pattern(group)) else group
store = H5NetCDFStore.open(
filename_or_obj,
format=format,
group=group,
group=effective_group,
lock=lock,
invalid_netcdf=invalid_netcdf,
phony_dims=phony_dims,
Expand All @@ -678,15 +684,17 @@ def open_groups_as_dict(
open_kwargs=open_kwargs,
)

# Check for a group and make it a parent if it exists
if group:
parent = NodePath("/") / NodePath(group)
if effective_group:
parent = NodePath("/") / NodePath(effective_group)
else:
parent = NodePath("/")

manager = store._manager
all_group_paths = list(_iter_nc_groups(store.ds, parent=parent))
_, filtered_paths = _resolve_group_and_filter(group, all_group_paths)

groups_dict = {}
for path_group in _iter_nc_groups(store.ds, parent=parent):
for path_group in filtered_paths:
group_store = H5NetCDFStore(manager, group=path_group, **kwargs)
store_entrypoint = StoreBackendEntrypoint()
with close_on_error(group_store):
Expand All @@ -701,7 +709,7 @@ def open_groups_as_dict(
decode_timedelta=decode_timedelta,
)

if group:
if effective_group:
group_name = str(NodePath(path_group).relative_to(parent))
else:
group_name = str(NodePath(path_group))
Expand Down
22 changes: 15 additions & 7 deletions xarray/backends/netCDF4_.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,13 +859,19 @@ def open_groups_as_dict(
autoclose=False,
**kwargs,
) -> dict[str, Dataset]:
from xarray.backends.common import _iter_nc_groups
from xarray.backends.common import (
_is_glob_pattern,
_iter_nc_groups,
_resolve_group_and_filter,
)
from xarray.core.treenode import NodePath

filename_or_obj = _normalize_path(filename_or_obj)

effective_group = None if (group and _is_glob_pattern(group)) else group
store = NetCDF4DataStore.open(
filename_or_obj,
group=group,
group=effective_group,
format=format,
clobber=clobber,
diskless=diskless,
Expand All @@ -875,15 +881,17 @@ def open_groups_as_dict(
autoclose=autoclose,
)

# Check for a group and make it a parent if it exists
if group:
parent = NodePath("/") / NodePath(group)
if effective_group:
parent = NodePath("/") / NodePath(effective_group)
else:
parent = NodePath("/")

manager = store._manager
all_group_paths = list(_iter_nc_groups(store.ds, parent=parent))
_, filtered_paths = _resolve_group_and_filter(group, all_group_paths)

groups_dict = {}
for path_group in _iter_nc_groups(store.ds, parent=parent):
for path_group in filtered_paths:
group_store = NetCDF4DataStore(manager, group=path_group, **kwargs)
store_entrypoint = StoreBackendEntrypoint()
with close_on_error(group_store):
Expand All @@ -897,7 +905,7 @@ def open_groups_as_dict(
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
)
if group:
if effective_group:
group_name = str(NodePath(path_group).relative_to(parent))
else:
group_name = str(NodePath(path_group))
Expand Down
33 changes: 33 additions & 0 deletions xarray/backends/store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from collections.abc import Iterable
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -72,5 +73,37 @@ def open_dataset(

return ds

async def open_dataset_async(
self,
filename_or_obj: T_PathFileOrDataStore,
*,
mask_and_scale=True,
decode_times=True,
concat_characters=True,
decode_coords=True,
drop_variables: str | Iterable[str] | None = None,
set_indexes: bool = True,
use_cftime=None,
decode_timedelta=None,
) -> Dataset:
"""Async version of open_dataset.

Offloads the entire open_dataset operation to a thread to avoid blocking
the event loop. This is necessary because decode_cf_variables can trigger
data reads (e.g., for time decoding) which may use synchronous I/O.
"""
return await asyncio.to_thread(
self.open_dataset,
filename_or_obj,
mask_and_scale=mask_and_scale,
decode_times=decode_times,
concat_characters=concat_characters,
decode_coords=decode_coords,
drop_variables=drop_variables,
set_indexes=set_indexes,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
)


BACKEND_ENTRYPOINTS["store"] = (None, StoreBackendEntrypoint)
Loading
Loading