From 80923fde9672c37cdc85d5da40e8ffaa45239930 Mon Sep 17 00:00:00 2001 From: Luca Marconato Date: Wed, 13 May 2026 18:04:29 +0200 Subject: [PATCH] fix _search_for_backing_files_recursively() to support multifile parquet --- src/spatialdata/_io/_utils.py | 51 +++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/src/spatialdata/_io/_utils.py b/src/spatialdata/_io/_utils.py index 6690d111..81f95d09 100644 --- a/src/spatialdata/_io/_utils.py +++ b/src/spatialdata/_io/_utils.py @@ -27,6 +27,7 @@ from spatialdata._core.spatialdata import SpatialData from spatialdata._io.format import RasterFormatType, RasterFormatV01, RasterFormatV02, RasterFormatV03 +from spatialdata._logging import logger from spatialdata._utils import get_pyramid_levels from spatialdata.models._utils import ( MappingToCoordinateSystem_t, @@ -357,17 +358,45 @@ def _search_for_backing_files_recursively(subgraph: Any, files: list[str]) -> No # This occurs when for example points and images are mixed, the main task still starts with # read_parquet, but the execution happens through a subgraph which we iterate over to get the # actual read_parquet task. - for task in v.args[0].values(): - # Recursively go through tasks, this is required because differences between dask versions. - piece_dict = _find_piece_dict(task) - if isinstance(piece_dict, dict) and "piece" in piece_dict: - parquet_file, check0, check1 = piece_dict["piece"] # type: ignore[misc] - if not parquet_file.endswith(".parquet") or check0 is not None or check1 is not None: - raise ValueError( - f"Unable to parse the parquet file from the dask subgraph {subgraph}. Please " - f"report this bug." - ) - files.append(os.path.realpath(parquet_file)) + # + # v.args[0] has two known shapes: + # dict – keys are task keys, values are Task objects (classic subgraph case) + # list – list of piece dicts produced when aggregate_files=True aggregates multiple + # parquet files into one partition; check0/check1 are row-group selectors + # ([0], []) rather than None, so only the file extension is validated. + args0 = v.args[0] + if isinstance(args0, dict): + for task in args0.values(): + # Recursively go through tasks, this is required because differences between dask + # versions. + piece_dict = _find_piece_dict(task) + if isinstance(piece_dict, dict) and "piece" in piece_dict: + parquet_file, check0, check1 = piece_dict["piece"] # type: ignore[misc] + if ( + not parquet_file.endswith(".parquet") + or check0 is not None + or check1 is not None + ): + raise ValueError( + f"Unable to parse the parquet file from the dask subgraph {subgraph}. " + f"Please report this bug." + ) + files.append(os.path.realpath(parquet_file)) + elif isinstance(args0, list): + for item in args0: + if isinstance(item, dict) and "piece" in item: + parquet_file = item["piece"][0] + if not parquet_file.endswith(".parquet"): + raise ValueError( + f"Unable to parse the parquet file from the dask subgraph {subgraph}. " + f"Please report this bug." + ) + files.append(os.path.realpath(parquet_file)) + else: + logger.warning( + f"Unexpected type {type(args0)} for v.args[0] in the read_parquet task graph. " + f"Backing files may not be detected correctly. Please report this as a bug." + ) def _backed_elements_contained_in_path(path: Path, object: SpatialData | SpatialElement | AnnData) -> list[bool]: