diff --git a/pyproject.toml b/pyproject.toml index e39dda6a..223eacdf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ Pillow = "*" geopandas = "*" huggingface_hub = "*" openslide-bin = "*" +wsidicom = "*" # Optional dependencies (can be marked as optional in later versions) transformers = "*" diff --git a/run_batch_of_slides.py b/run_batch_of_slides.py index 7c68e624..e3522802 100644 --- a/run_batch_of_slides.py +++ b/run_batch_of_slides.py @@ -59,8 +59,8 @@ def build_parser() -> argparse.ArgumentParser: help='Custom keys used to store the resolution as MPP (micron per pixel) in your list of whole-slide image.') parser.add_argument('--custom_list_of_wsis', type=str, default=None, help='Custom list of WSIs specified in a csv file.') - parser.add_argument('--reader_type', type=str, choices=['openslide', 'image', 'cucim', 'sdpc'], default=None, - help='Force the use of a specific WSI image reader. Options are ["openslide", "image", "cucim", "sdpc"]. Defaults to None (auto-determine which reader to use).') + parser.add_argument('--reader_type', type=str, choices=['openslide', 'image', 'cucim', 'sdpc', "dicom"], default=None, + help='Force the use of a specific WSI image reader. Options are ["openslide", "image", "cucim", "sdpc", "dicom"]. Defaults to None (auto-determine which reader to use).') parser.add_argument("--search_nested", action="store_true", help=("If set, recursively search for whole-slide images (WSIs) within all subdirectories of " "`wsi_source`. Uses `os.walk` to include slides from nested folders. " diff --git a/trident/Processor.py b/trident/Processor.py index d3223fd7..791eebbd 100644 --- a/trident/Processor.py +++ b/trident/Processor.py @@ -107,6 +107,7 @@ def __init__( self.job_dir = job_dir self.wsi_source = wsi_source + # TODO: should I add DICOM here as well? self.wsi_ext = wsi_ext or (list(PIL_EXTENSIONS) + list(OPENSLIDE_EXTENSIONS) + list(SDPC_EXTENSIONS)) self.skip_errors = skip_errors self.custom_mpp_keys = custom_mpp_keys diff --git a/trident/__init__.py b/trident/__init__.py index 6cc936fd..5efd55cb 100644 --- a/trident/__init__.py +++ b/trident/__init__.py @@ -9,6 +9,7 @@ from trident.wsi_objects.CuCIMWSI import CuCIMWSI from trident.wsi_objects.ImageWSI import ImageWSI from trident.wsi_objects.SDPCWSI import SDPCWSI +from trident.wsi_objects.DICOMWSI import DICOMWSI from trident.wsi_objects.WSIFactory import load_wsi, WSIReaderType from trident.wsi_objects.WSIPatcher import OpenSlideWSIPatcher, WSIPatcher from trident.wsi_objects.WSIPatcherDataset import WSIPatcherDataset @@ -28,6 +29,7 @@ "ImageWSI", "CuCIMWSI", "SDPCWSI", + "DICOMWSI", "WSIPatcher", "OpenSlideWSIPatcher", "WSIPatcherDataset", diff --git a/trident/wsi_objects/DICOMWSI.py b/trident/wsi_objects/DICOMWSI.py new file mode 100644 index 00000000..23a5555b --- /dev/null +++ b/trident/wsi_objects/DICOMWSI.py @@ -0,0 +1,188 @@ +from __future__ import annotations +import numpy as np +from wsidicom import WsiDicom +from PIL import Image +from typing import List, Tuple, Union, Optional + +from trident.wsi_objects.WSI import WSI, ReadMode + + +class DICOMWSI(WSI): + + def __init__(self, slide_path, **kwargs) -> None: + """ + Initialize a DICOMWSI instance for DICOM whole-slide images. + + Parameters + ---------- + slide_path : str + Path to the DICOM WSI file or directory. + **kwargs : dict + Additional keyword arguments forwarded to the base `WSI` class. + - lazy_init (bool, default=True): Whether to defer loading WSI and metadata. + + Example + ------- + >>> wsi = DICOMWSI(slide_path="path/to/wsi", lazy_init=False) + >>> print(wsi) + + """ + super().__init__(slide_path, **kwargs) + + def _lazy_initialize(self) -> None: + """ + Lazily initialize the WSI using the DICOM backend. + + This method opens a whole-slide image using the wsidicom backend, extracting + key metadata including dimensions, magnification, and multiresolution pyramid + information. + + Raises + ------ + FileNotFoundError + If the DICOM WSI file cannot be found. + Exception + If an unexpected error occurs during WSI initialization. + + Notes + ----- + After initialization, the following attributes are set: + - `width` and `height`: spatial dimensions of the base level. + - `dimensions`: (width, height) tuple from the highest resolution. + - `level_count`: number of resolution levels in the image pyramid. + - `level_downsamples`: downsampling factors for each level. + - `level_dimensions`: image dimensions at each level. + - `mpp`: microns per pixel. + - `mag`: estimated magnification level. + """ + super()._lazy_initialize() + + if not self.lazy_init: + try: + self.img = WsiDicom.open(self.slide_path) + self.dimensions = self.get_dimensions() + self.width, self.height = self.dimensions + self.level_count = len(self.img.levels) + self.level_downsamples = self.get_downsamples() + self.level_dimensions = [level.size.to_tuple() for level in self.img.levels] + self.mpp = self.img.mpp.to_tuple()[0] + self.mag = self._fetch_magnification(self.custom_mpp_keys) + self.lazy_init = True + + except Exception as e: + raise RuntimeError(f"Failed to initialize WSI with DICOM backend: {e}") from e + + def read_region( + self, + location: Tuple[int, int], + level: int, + size: Tuple[int, int], + read_as: ReadMode = 'pil', + ) -> Union[Image.Image, np.ndarray]: + """ + Extract a specific region from the DICOM whole-slide image. + + Parameters + ---------- + location : Tuple[int, int] + (x, y) coordinates of the top-left corner of the region to extract, relative to the base level. + level : int + Pyramid level to read from. + size : Tuple[int, int] + (width, height) of the region to extract. + read_as : {'pil', 'numpy'}, optional + Output format for the region: + - 'pil': returns a PIL Image (default) + - 'numpy': returns a NumPy array (H, W, 3) + + Returns + ------- + Union[PIL.Image.Image, np.ndarray] + Extracted image region in the specified format. + + Raises + ------ + ValueError + If `read_as` is not one of 'pil' or 'numpy'. + + Notes + ----- + The `location` is automatically converted to the coordinate system of the requested pyramid level. + """ + # 'location' in wsidicom is relative to specified level as opposed to base level like in OpenSlide + location_ = (int(location[0] / self.level_downsamples[level]), int(location[1] / self.level_downsamples[level])) + + # Get slide dimensions for the requested level + level_shape = self.level_dimensions[level] + x, y = location_ + w, h = size + + # Calculate the region inside the slide + x_end = min(x + w, level_shape[0]) + y_end = min(y + h, level_shape[1]) + x_start = max(x, 0) + y_start = max(y, 0) + + # Read the valid region + region_w = max(0, x_end - x_start) + region_h = max(0, y_end - y_start) + region = None + if region_w > 0 and region_h > 0: + region = self.img.read_region((x_start, y_start), level, (region_w, region_h)) + region = np.array(region) + else: + region = np.zeros((h, w, 3), dtype=np.uint8) + + # Prepare output and place the valid region + output = np.zeros((h, w, 3), dtype=np.uint8) + x_off = x_start - x + y_off = y_start - y + output[y_off:y_off+region_h, x_off:x_off+region_w] = region[:region_h, :region_w] + + + if read_as == 'pil': + return Image.fromarray(output).convert("RGB") + elif read_as == 'numpy': + return output + else: + raise ValueError(f"Invalid `read_as` value: {read_as}. Must be 'pil', 'numpy'.") + + def get_dimensions(self) -> Tuple[int, int]: + """ + Return the dimensions (width, height) of the DICOM WSI at the highest resolution. + + Returns + ------- + tuple of int + (width, height) in pixels. + """ + return self.img.size.to_tuple() + + def get_downsamples(self) -> List[float]: + """ + Get the downsampling factors for each pyramid level in the DICOM WSI. + + Returns + ------- + list of float + Downsampling factors relative to the highest resolution level. + """ + base_mpp = self.img.mpp + downsamples = [np.floor((level.mpp / base_mpp).to_tuple()[0]) for level in self.img.levels] + return downsamples + + def get_thumbnail(self, size: tuple[int, int]) -> Image.Image: + """ + Generate a thumbnail of the DICOM WSI. + + Parameters + ---------- + size : tuple of int + Desired (width, height) of the thumbnail. + + Returns + ------- + PIL.Image.Image + RGB thumbnail as a PIL Image. + """ + return self.img.read_thumbnail(size).convert('RGB') diff --git a/trident/wsi_objects/WSIFactory.py b/trident/wsi_objects/WSIFactory.py index 1e75b0d4..7a7b3b06 100644 --- a/trident/wsi_objects/WSIFactory.py +++ b/trident/wsi_objects/WSIFactory.py @@ -1,4 +1,3 @@ - import os from typing import Optional, Literal, Union @@ -6,31 +5,40 @@ from trident.wsi_objects.ImageWSI import ImageWSI from trident.wsi_objects.CuCIMWSI import CuCIMWSI from trident.wsi_objects.SDPCWSI import SDPCWSI +from trident.wsi_objects.DICOMWSI import DICOMWSI + +WSIReaderType = Literal['openslide', 'image', 'cucim', 'sdpc', 'dicom'] -WSIReaderType = Literal['openslide', 'image', 'cucim', 'sdpc'] OPENSLIDE_EXTENSIONS = {'.svs', '.tif', '.tiff', '.ndpi', '.vms', '.vmu', '.scn', '.mrxs'} CUCIM_EXTENSIONS = {'.svs', '.tif', '.tiff'} SDPC_EXTENSIONS = {'.sdpc'} PIL_EXTENSIONS = {'.png', '.jpg', '.jpeg'} +DICOM_EXTENSIONS = {'.dcm', '.dicom'} +def _is_dicom_folder(path): + if not os.path.isdir(path): + return False + return any(fname.lower().endswith('.dcm') for fname in os.listdir(path)) def load_wsi( slide_path: str, reader_type: Optional[WSIReaderType] = None, **kwargs -) -> Union[OpenSlideWSI, ImageWSI, CuCIMWSI, SDPCWSI]: +) -> Union[OpenSlideWSI, ImageWSI, CuCIMWSI, SDPCWSI, DICOMWSI]: + """ Load a whole-slide image (WSI) using the appropriate backend. By default, uses OpenSlideWSI for OpenSlide-supported file extensions, - and ImageWSI for others. Users may override this behavior by explicitly + dicomwsi for DICOM files and ImageWSI for others. + Users may override this behavior by explicitly specifying a reader using the `reader_type` argument. Parameters ---------- slide_path : str Path to the whole-slide image. - reader_type : {'openslide', 'image', 'cucim', 'sdpc'}, optional + reader_type : {'openslide', 'image', 'cucim', 'sdpc', 'dicom'}, optional Manually specify the WSI reader to use. If None (default), selection is automatic based on file extension. **kwargs : dict @@ -38,7 +46,7 @@ def load_wsi( Returns ------- - Union[OpenSlideWSI, ImageWSI, CuCIMWSI, SDPCWSI] + Union[OpenSlideWSI, ImageWSI, CuCIMWSI, SDPCWSI, DICOMWSI] An instance of the appropriate WSI reader. Raises @@ -75,11 +83,26 @@ def load_wsi( f"Unsupported file format '{ext}' for CuCIM. " f"Supported whole-slide image formats are: {', '.join(CUCIM_EXTENSIONS)}." ) + + elif reader_type == 'dicom': + if not os.path.isdir(slide_path): + raise ValueError("For DICOM, slide_path must be a directory containing .dcm files.") + if not _is_dicom_folder(slide_path): + raise ValueError("Provided directory does not contain .dcm files.") + name = os.path.basename(os.path.normpath(slide_path)) + ".dcm" + return DICOMWSI(slide_path=slide_path, name=name, **kwargs) elif reader_type is None: if ext in OPENSLIDE_EXTENSIONS: return OpenSlideWSI(slide_path=slide_path, **kwargs) elif ext in SDPC_EXTENSIONS: return SDPCWSI(slide_path=slide_path, **kwargs) + elif os.path.isdir(slide_path) and _is_dicom_folder(slide_path): + name = os.path.basename(os.path.normpath(slide_path)) + ".dcm" + return DICOMWSI(slide_path=slide_path, name=name, **kwargs) else: return ImageWSI(slide_path=slide_path, **kwargs) + + else: + raise ValueError(f"Unknown reader_type: {reader_type}. Choose from 'openslide', 'image', 'cucim', 'sdpc', or 'dicom'.") +