Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions src/spatialdata/_io/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from spatialdata._core.spatialdata import SpatialData
from spatialdata._io.format import RasterFormatType, RasterFormatV01, RasterFormatV02, RasterFormatV03
from spatialdata._logging import logger
from spatialdata._utils import get_pyramid_levels
from spatialdata.models._utils import (
MappingToCoordinateSystem_t,
Expand Down Expand Up @@ -357,17 +358,45 @@ def _search_for_backing_files_recursively(subgraph: Any, files: list[str]) -> No
# This occurs when for example points and images are mixed, the main task still starts with
# read_parquet, but the execution happens through a subgraph which we iterate over to get the
# actual read_parquet task.
for task in v.args[0].values():
# Recursively go through tasks, this is required because differences between dask versions.
piece_dict = _find_piece_dict(task)
if isinstance(piece_dict, dict) and "piece" in piece_dict:
parquet_file, check0, check1 = piece_dict["piece"] # type: ignore[misc]
if not parquet_file.endswith(".parquet") or check0 is not None or check1 is not None:
raise ValueError(
f"Unable to parse the parquet file from the dask subgraph {subgraph}. Please "
f"report this bug."
)
files.append(os.path.realpath(parquet_file))
#
# v.args[0] has two known shapes:
# dict – keys are task keys, values are Task objects (classic subgraph case)
# list – list of piece dicts produced when aggregate_files=True aggregates multiple
# parquet files into one partition; check0/check1 are row-group selectors
# ([0], []) rather than None, so only the file extension is validated.
args0 = v.args[0]
if isinstance(args0, dict):
for task in args0.values():
# Recursively go through tasks, this is required because differences between dask
# versions.
piece_dict = _find_piece_dict(task)
if isinstance(piece_dict, dict) and "piece" in piece_dict:
parquet_file, check0, check1 = piece_dict["piece"] # type: ignore[misc]
if (
not parquet_file.endswith(".parquet")
or check0 is not None
or check1 is not None
):
raise ValueError(
f"Unable to parse the parquet file from the dask subgraph {subgraph}. "
f"Please report this bug."
)
files.append(os.path.realpath(parquet_file))
elif isinstance(args0, list):
for item in args0:
if isinstance(item, dict) and "piece" in item:
parquet_file = item["piece"][0]
if not parquet_file.endswith(".parquet"):
raise ValueError(
f"Unable to parse the parquet file from the dask subgraph {subgraph}. "
f"Please report this bug."
)
files.append(os.path.realpath(parquet_file))
else:
logger.warning(
f"Unexpected type {type(args0)} for v.args[0] in the read_parquet task graph. "
f"Backing files may not be detected correctly. Please report this as a bug."
)


def _backed_elements_contained_in_path(path: Path, object: SpatialData | SpatialElement | AnnData) -> list[bool]:
Expand Down
Loading