diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index e34fc7d2..6ed1c52a 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -132,6 +132,7 @@ def __init__(self, folder: Union[str, Path]) -> None: # IMPORANT! IF YOU ADD MORE, REMEMBER TO CLEAR THEM IN `init_container()`! self._current_pack_id: Optional[int] = None self._config: Optional[dict] = None + self._additional_pack_locations: Optional[dict] = None def get_folder(self) -> str: """Return the path to the folder that will host the object-store container.""" @@ -247,10 +248,66 @@ def _get_pack_path_from_pack_id( """ pack_id = str(pack_id) assert self._is_valid_pack_id( - pack_id, allow_repack_pack=allow_repack_pack + pack_id, allow_repack_pack=False ), f"Invalid pack ID {pack_id}" + assert ( + allow_repack_pack is False + ), "Please use _get_repack_path_from_pack_id to get a repack path valid for a pack_id" + + # Not in the main repository folder - try to locate it from the additional storages + if not os.path.isfile(os.path.join(self._get_pack_folder(), pack_id)): + + # locate the file path from the cache + loc_cache = self._get_additional_pack_locations(refresh=False) + if pack_id in loc_cache and os.path.isfile(loc_cache[pack_id]): + return loc_cache[pack_id] + + # File moved? Refresh the cache again + loc_cache = self._get_additional_pack_locations(refresh=True) + # Try again + if pack_id in loc_cache: + return loc_cache[pack_id] + + # Use the default path in the main repo folder return os.path.join(self._get_pack_folder(), pack_id) + def _get_additional_pack_locations(self, refresh=False) -> Dict[str, str]: + """Get cached pack location and refresh the cache if needed""" + # Build a dictionary mapping pack name to pack's absolute path + if self._additional_pack_locations is None or refresh: + self._additional_pack_locations = {} + for path in self.additional_pack_repos: + packs_folder = Path(path) / "packs" + for filepath in packs_folder.glob("*"): + self._additional_pack_locations[filepath.name] = str( + filepath.resolve() + ) + + return self._additional_pack_locations + + def _is_pack_in_additional_storage(self, pack_id: Union[str, int]) -> bool: + """Return wether a pack is in additional storage""" + pack_id = str(pack_id) + if os.path.isfile(os.path.join(self._get_pack_folder(), str(pack_id))): + return False + if pack_id in self._get_additional_pack_locations(): + return True + if pack_id in self._get_additional_pack_locations(refresh=True): + return True + raise RuntimeError(f"Pack {pack_id} is missing!") + + def _get_repack_path_from_pack_id(self, pack_id: Union[int, str]) -> str: + """ + Return the path of a repack file for a given pack + + This ensures that the path is on the same repository folder as the pack file + of the give pack_id. + """ + pack_id = str(pack_id) + pack_path = Path(self._get_pack_path_from_pack_id(pack_id)) + repack_file = pack_path.parent / str(self._REPACK_PACK_ID) + return str(repack_file) + def _get_pack_index_path(self) -> str: """Return the path to the SQLite file containing the index of packed objects.""" return os.path.join(self._folder, f"packs{self._PACK_INDEX_SUFFIX}") @@ -267,12 +324,19 @@ def _get_pack_id_to_write_to(self) -> int: """ # Default to zero if not set (e.g. if it's None) pack_id = self._current_pack_id or 0 + additional_packs = list( + map(int, self._get_additional_pack_locations(refresh=True).keys()) + ) + max_addon_packid = max(additional_packs) if additional_packs else -10 while True: pack_path = self._get_pack_path_from_pack_id(pack_id) - if not os.path.exists(pack_path): + if not os.path.exists(pack_path) and pack_id > max_addon_packid: # Use this ID - the pack file does not exist yet break - if os.path.getsize(pack_path) < self.pack_size_target: + if ( + os.path.getsize(pack_path) < self.pack_size_target + and pack_id not in additional_packs + ): # Use this ID - the pack file is not "full" yet break # Try the next pack @@ -345,6 +409,7 @@ def init_container( # (at least the container_id, possibly the rest), and the other caches self._config = None self._current_pack_id = None + self._additional_pack_locations = None if self.is_initialised: raise FileExistsError( @@ -385,6 +450,7 @@ def init_container( "hash_type": hash_type, "container_id": container_id, "compression_algorithm": compression_algorithm, + "additional_pack_repos": [], }, fhandle, ) @@ -399,7 +465,7 @@ def init_container( self._get_session(create=True) - def _get_repository_config(self) -> Dict[str, Union[int, str]]: + def _get_repository_config(self) -> Dict[str, Union[int, str, List]]: """Return the repository config.""" if self._config is None: if not self.is_initialised: @@ -410,6 +476,33 @@ def _get_repository_config(self) -> Dict[str, Union[int, str]]: self._config = json.load(fhandle) return self._config + def add_additional_pack_repo(self, repo_path): + """Add additional pack repository to the container""" + repo_path = Path(repo_path).resolve() + repo_path.mkdir(exist_ok=True) + pack_path = repo_path / "packs" + pack_path.mkdir(exist_ok=True) + + if (repo_path / "config.json").is_file(): + # This repository already exists + with open(repo_path / "config.json") as fhandle: + repo_config = json.load(fhandle) + if repo_config["container_id"] != self.container_id: + raise ValueError( + "Trying to add a pack repository not belonging to this container!" + ) + else: + # A new repository - store the the container_id in the config.json file + with open(repo_path / "config.json", mode="w") as fhandle: + json.dump({"container_id": self.container_id}, fhandle) + # Add the folder to the list of additional packs + repo_list: List[str] = self._get_repository_config()["additional_pack_repos"] # type: ignore[assignment] + repo_list.append(str(repo_path)) + + # Save the configuration file + with open(self._get_config_file(), "w") as fhandle: + json.dump(self._config, fhandle) + @property def loose_prefix_len(self) -> int: """Return the length of the prefix of loose objects, when sharding. @@ -434,6 +527,12 @@ def hash_type(self) -> str: """ return self._get_repository_config()["hash_type"] # type: ignore[return-value] + @property + def additional_pack_repos(self) -> List[Path]: + """Additional pack repository paths""" + repo_list: List[str] = self._get_repository_config()["additional_pack_repos"] # type: ignore[assignment] + return [Path(repo) for repo in repo_list] + @property def container_id(self) -> str: """Return the repository unique ID. @@ -1134,7 +1233,7 @@ def get_total_size(self) -> Dict[str, int]: @contextmanager def lock_pack( - self, pack_id: str, allow_repack_pack: bool = False + self, pack_id: str, lock_repack: bool = False ) -> Iterator[StreamWriteBytesType]: """Lock the given pack id. Use as a context manager. @@ -1143,15 +1242,19 @@ def lock_pack( Important to use for avoiding concurrent access/append to the same file. :param pack_id: a string with a valid pack name. - :param allow_pack_repack: if True, allow to open the pack file used for repacking + :param lock_repack: if True, is the corresponding repack file that will be locked and returned. """ - assert self._is_valid_pack_id(pack_id, allow_repack_pack=allow_repack_pack) + assert self._is_valid_pack_id(pack_id, allow_repack_pack=False) + + if lock_repack: + pack_file = self._get_repack_path_from_pack_id(pack_id) + else: + pack_file = self._get_pack_path_from_pack_id( + pack_id, allow_repack_pack=False + ) # Open file in exclusive mode - lock_file = os.path.join(self._get_pack_folder(), f"{pack_id}.lock") - pack_file = self._get_pack_path_from_pack_id( - pack_id, allow_repack_pack=allow_repack_pack - ) + lock_file = Path(pack_file).with_suffix(".lock") try: with open(lock_file, "x"): with open(pack_file, "ab") as pack_handle: @@ -1185,12 +1288,16 @@ def _list_loose(self) -> Iterator[str]: continue yield first_level - def _list_packs(self) -> Iterator[str]: + def _list_packs(self, only_main_repo=False) -> Iterator[str]: """Iterate over packs. .. note:: this returns a generator of the pack IDs. """ - for fname in os.listdir(self._get_pack_folder()): + + all_path = os.listdir(self._get_pack_folder()) + if not only_main_repo: + all_path += self._get_additional_pack_locations(refresh=True) + for fname in all_path: ## I actually check for pack index files # if not fname.endswith(self._PACK_INDEX_SUFFIX): # continue @@ -2514,7 +2621,7 @@ def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None: :param compress_mode: see docstring of ``repack_pack``. """ - for pack_id in self._list_packs(): + for pack_id in self._list_packs(only_main_repo=True): self.repack_pack(pack_id, compress_mode=compress_mode) self._vacuum() @@ -2540,11 +2647,13 @@ def repack_pack( pack_id ) - # Check that it does not exist + # Get the path of the pack and repack file - they are not expected to change thoughout this method + repack_path = self._get_repack_path_from_pack_id(pack_id) + pack_path = self._get_pack_path_from_pack_id(pack_id) + + # The repack file should not exist yet assert not os.path.exists( - self._get_pack_path_from_pack_id( - self._REPACK_PACK_ID, allow_repack_pack=True - ) + repack_path ), "The repack pack '{}' already exists, probably a previous repacking aborted?".format( self._REPACK_PACK_ID ) @@ -2556,17 +2665,15 @@ def repack_pack( ).all() if not one_object_in_pack: # No objects. Clean up the pack file, if it exists. - if os.path.exists(self._get_pack_path_from_pack_id(pack_id)): - os.remove(self._get_pack_path_from_pack_id(pack_id)) + if os.path.exists(pack_path): + os.remove(pack_path) return obj_dicts = [] # At least one object. Let's repack. We have checked before that the # REPACK_PACK_ID did not exist. - with self.lock_pack( - str(self._REPACK_PACK_ID), allow_repack_pack=True - ) as write_pack_handle: - with open(self._get_pack_path_from_pack_id(pack_id), "rb") as read_pack: + with self.lock_pack(pack_id, lock_repack=True) as write_pack_handle: + with open(pack_path, "rb") as read_pack: stmt = ( select( Obj.id, @@ -2612,9 +2719,7 @@ def repack_pack( obj_dicts.append(obj_dict) safe_flush_to_disk( write_pack_handle, - self._get_pack_path_from_pack_id( - self._REPACK_PACK_ID, allow_repack_pack=True - ), + repack_path, ) # We are done with data transfer. @@ -2638,17 +2743,15 @@ def repack_pack( pack_id=pack_id, repack_id=self._REPACK_PACK_ID ) ) - os.remove(self._get_pack_path_from_pack_id(pack_id)) + os.remove(pack_path) # I need now to move the file back. I need to be careful, to avoid conditions in which # I remain with inconsistent data. # Since hard links seem to be supported on all three platforms, I do a hard link # of -1 back to the correct pack ID. os.link( - self._get_pack_path_from_pack_id( - self._REPACK_PACK_ID, allow_repack_pack=True - ), - self._get_pack_path_from_pack_id(pack_id), + repack_path, + pack_path, ) # Before deleting the source (pack -1) I need now to update again all @@ -2664,10 +2767,8 @@ def repack_pack( # I am not doing this for now # I now can unlink/delete the original source os.unlink( - self._get_pack_path_from_pack_id( - self._REPACK_PACK_ID, allow_repack_pack=True - ) + repack_path, ) # We are now done. The temporary pack is gone, and the old `pack_id` - # has now been replaced with an udpated, repacked pack. + # has now been replaced with an updated, repacked pack. diff --git a/tests/test_container.py b/tests/test_container.py index a3103845..fa796d53 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3336,6 +3336,49 @@ def test_container_id(temp_container): assert old_container_id != temp_container.container_id +def test_additional_pack_repo(temp_container): + """Test adding additional pack locations""" + + temp_dir = tempfile.mkdtemp() + + temp_container: Container = temp_container + + # Insert some random data + expected = {} + for idx in range(100): + content = f"{idx}".encode() + expected[temp_container.add_object(content)] = content + + temp_container.pack_all_loose() + temp_container.add_additional_pack_repo(temp_dir) + + # Move the pack 0 to the additional repository + shutil.move( + os.path.join(temp_container._get_pack_folder(), "0"), + os.path.join(temp_dir, "packs"), + ) + new_pack = pathlib.Path(temp_container._get_pack_path_from_pack_id(0)).resolve() + assert new_pack.relative_to(pathlib.Path(temp_dir).resolve()) + + # Now the pack to write to is pack 1 + assert temp_container._get_pack_id_to_write_to() == 1 + retrieved = temp_container.get_objects_content(expected.keys()) + assert retrieved == expected + + # Adding data to pack 1 and validate + for idx in range(100, 200): + content = f"{idx}".encode() + expected[temp_container.add_object(content)] = content + temp_container.pack_all_loose() + retrieved = temp_container.get_objects_content(expected.keys()) + assert retrieved == expected + + # The first part should be the pack 00 and the second part is in pack 1 + assert pathlib.Path(temp_container._get_pack_path_from_pack_id(1)).is_file() + assert temp_container.get_object_meta(list(expected.keys())[0])["pack_id"] == 0 + assert temp_container.get_object_meta(list(expected.keys())[-1])["pack_id"] == 1 + + @pytest.mark.parametrize( "compression_algorithm", [