From 92ce21218af673622734629c2fd1d54b8ae6fb96 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 18 Aug 2022 17:57:14 +0200 Subject: [PATCH 01/19] add sharding storage transformer --- zarr/_storage/store.py | 86 +++++++++++++++++++++++++---------- zarr/core.py | 12 ++--- zarr/meta.py | 3 +- zarr/tests/test_core.py | 45 +++++++++++++++++- zarr/tests/test_storage_v3.py | 14 +++++- 5 files changed, 128 insertions(+), 32 deletions(-) diff --git a/zarr/_storage/store.py b/zarr/_storage/store.py index 6e4076d23c..34bb6fcd67 100644 --- a/zarr/_storage/store.py +++ b/zarr/_storage/store.py @@ -261,12 +261,17 @@ def rmdir(self, path=None): """Remove a data path and all its subkeys and related metadata. Expects a path without the data or meta root prefix.""" + def supports_efficient_get_partial_values(self): + return False + def get_partial_values(self, key_ranges): """Get multiple partial values. key_ranges can be an iterable of key, range pairs, where a range specifies two integers range_start and range_length as a tuple, (range_start, range_length). - Length may be None to indicate to read until the end. + range_length may be None to indicate to read until the end. + range_start may be negative to start reading range_start bytes + from the end of the file. A key may occur multiple times with different ranges.""" results = [None] * len(key_ranges) indexed_ranges_by_key = defaultdict(list) @@ -281,12 +286,17 @@ def get_partial_values(self, key_ranges): results[i] = value[range_from:range_from + range_length] return results + def supports_efficient_set_partial_values(self): + return False + def set_partial_values(self, key_start_values): """Set multiple partial values. key_start_values can be an iterable of key, start and value triplets as tuples, (key, start, value), where start defines the offset in bytes. A key may occur multiple times with different starts and non-overlapping values. - Also, start may only be beyond the current value if other values fill the gap.""" + Also, start may only be beyond the current value if other values fill the gap. + start may be negative to start writing start bytes from the current + end of the file, ending the file with the new value.""" unique_keys = set(next(zip(*key_start_values))) values = {} for key in unique_keys: @@ -303,7 +313,10 @@ def set_partial_values(self, key_start_values): + f"since it is beyond the data at key {key}, " + f"having length {len(values[key])}." ) - values[key][start:start + len(value)] = value + if start < 0: + values[key][start:] = value + else: + values[key][start:start + len(value)] = value for key, value in values.items(): self[key] = value @@ -372,7 +385,7 @@ def __init__(self, _type) -> None: self.type = _type self._inner_store = None - def _copy_for_array(self, inner_store): + def _copy_for_array(self, array, inner_store): transformer_copy = copy(self) transformer_copy._inner_store = inner_store return transformer_copy @@ -412,6 +425,40 @@ def inner_store(self) -> Union["StorageTransformer", StoreV3]: ) return self._inner_store + # The following implementations are usually fine to keep as-is: + + def __eq__(self, other): + return ( + type(self) == type(other) and + self._inner_store == other._inner_store and + self.get_config() == other.get_config() + ) + + def erase(self, key): + self.__delitem__(key) + + def list(self): + return list(self.keys()) + + def list_dir(self, prefix): + """ + TODO: carefully test this with trailing/leading slashes + """ + if prefix: # allow prefix = "" ? + assert prefix.endswith("/") + + all_keys = self.list_prefix(prefix) + len_prefix = len(prefix) + keys = [] + prefixes = [] + for k in all_keys: + trail = k[len_prefix:] + if "/" not in trail: + keys.append(prefix + trail) + else: + prefixes.append(prefix + trail.split("/", maxsplit=1)[0] + "/") + return keys, list(set(prefixes)) + def is_readable(self): return self.inner_store.is_readable() @@ -424,6 +471,9 @@ def is_listable(self): def is_erasable(self): return self.inner_store.is_erasable() + def clear(self): + return self.inner_store.clear() + def __enter__(self): return self.inner_store.__enter__() @@ -433,27 +483,21 @@ def __exit__(self, exc_type, exc_value, traceback): def close(self) -> None: return self.inner_store.close() + # The following implementations might need to be re-implemented + # by subclasses implementing storage transformers: + def rename(self, src_path: str, dst_path: str) -> None: return self.inner_store.rename(src_path, dst_path) def list_prefix(self, prefix): return self.inner_store.list_prefix(prefix) - def erase(self, key): - return self.inner_store.erase(key) - def erase_prefix(self, prefix): return self.inner_store.erase_prefix(prefix) def rmdir(self, path=None): return self.inner_store.rmdir(path) - def list_dir(self, prefix): - return self.inner_store.list_dir(prefix) - - def list(self): - return self.inner_store.list() - def __contains__(self, key): return self.inner_store.__contains__(key) @@ -472,22 +516,18 @@ def __iter__(self): def __len__(self): return self.inner_store.__len__() + def supports_efficient_get_partial_values(self): + return self.inner_store.supports_efficient_get_partial_values() + def get_partial_values(self, key_ranges): return self.inner_store.get_partial_values(key_ranges) + def supports_efficient_set_partial_values(self): + return self.inner_store.supports_efficient_set_partial_values() + def set_partial_values(self, key_start_values): return self.inner_store.set_partial_values(key_start_values) - def clear(self): - return self.inner_store.clear() - - def __eq__(self, other): - return ( - type(self) == type(other) and - self._inner_store == other._inner_store and - self.get_config() == other.get_config() - ) - # allow MutableMapping for backwards compatibility StoreLike = Union[BaseStore, MutableMapping] diff --git a/zarr/core.py b/zarr/core.py index 4faa19727b..9ecdf9c51e 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -284,15 +284,15 @@ def _load_metadata_nosync(self): storage_transformers = meta.get('storage_transformers', []) transformed_store = self._store for storage_transformer in storage_transformers: - transformed_store = storage_transformer._copy_for_array(transformed_store) - self._store = transformed_store + transformed_store = storage_transformer._copy_for_array(self, transformed_store) + self._transformed_store = transformed_store if self._chunk_store is not None: transformed_chunk_store = self._chunk_store for storage_transformer in storage_transformers: transformed_chunk_store = ( - storage_transformer._copy_for_array(transformed_chunk_store) + storage_transformer._copy_for_array(self, transformed_chunk_store) ) - self._chunk_store = transformed_chunk_store + self._transformed_chunk_store = transformed_chunk_store def _refresh_metadata(self): if not self._cache_metadata: @@ -336,7 +336,7 @@ def _flush_metadata_nosync(self): @property def store(self): """A MutableMapping providing the underlying storage for the array.""" - return self._store + return self._transformed_store @property def path(self): @@ -376,7 +376,7 @@ def chunk_store(self): if self._chunk_store is None: return self._store else: - return self._chunk_store + return self._transformed_chunk_store @property def shape(self): diff --git a/zarr/meta.py b/zarr/meta.py index 41a90101b5..b493e833f0 100644 --- a/zarr/meta.py +++ b/zarr/meta.py @@ -477,9 +477,10 @@ def _encode_storage_transformer_metadata( @classmethod def _decode_storage_transformer_metadata(cls, meta: Mapping) -> "StorageTransformer": from zarr.tests.test_storage_v3 import DummyStorageTransfomer + from zarr._storage.v3_storage_transformers import ShardingStorageTransformer # This might be changed to a proper registry in the future - KNOWN_STORAGE_TRANSFORMERS = [DummyStorageTransfomer] + KNOWN_STORAGE_TRANSFORMERS = [DummyStorageTransfomer, ShardingStorageTransformer] conf = meta.get('configuration', {}) extension_uri = meta['extension'] diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index 484fb3f286..d612146575 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -23,6 +23,7 @@ _prefix_to_attrs_key, _prefix_to_group_key ) +from .._storage.v3_storage_transformers import ShardingStorageTransformer from zarr.core import Array from zarr.errors import ArrayNotFoundError, ContainsGroupError from zarr.meta import json_loads @@ -811,7 +812,6 @@ def test_pickle(self): attrs_cache = z.attrs.cache a = np.random.randint(0, 1000, 1000) z[:] = a - # round trip through pickle dump = pickle.dumps(z) # some stores cannot be opened twice at the same time, need to close @@ -3380,6 +3380,49 @@ def expected(self): ] +@pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") +class TestArrayWithShardingStorageTransformerV3(TestArrayWithPathV3): + + @staticmethod + def create_array(array_path='arr1', read_only=False, **kwargs): + store = KVStoreV3(dict()) + cache_metadata = kwargs.pop('cache_metadata', True) + cache_attrs = kwargs.pop('cache_attrs', True) + write_empty_chunks = kwargs.pop('write_empty_chunks', True) + num_dims = 1 if isinstance(kwargs["shape"], int) else len(kwargs["shape"]) + sharding_transformer = ShardingStorageTransformer( + "indexed", chunks_per_shard=(2, ) * num_dims + ) + init_array(store, path=array_path, storage_transformers=[sharding_transformer], **kwargs) + return Array(store, path=array_path, read_only=read_only, + cache_metadata=cache_metadata, + cache_attrs=cache_attrs, write_empty_chunks=write_empty_chunks) + + # def test_nbytes_stored(self): + # pass # not implemented + + def test_nbytes_stored(self): + z = self.create_array(shape=1000, chunks=100) + expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json') + assert expect_nbytes_stored == z.nbytes_stored + z[:] = 42 + expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json') + assert expect_nbytes_stored == z.nbytes_stored + + # mess with store + z.chunk_store[data_root + z._key_prefix + 'foo'] = list(range(10)) + assert -1 == z.nbytes_stored + + def expected(self): + return [ + "b46294e25b1d816055e7937780265c0d8d5d6c47", + "5b52b03dde558c4c2313e55cf7ed9898d397e485", + "ef7f726387c1bc235ac205f77567276d28872477", + "fd944727c0d058e594d7b3800781a4786af5f0de", + "4ce1eebc42dc03690d917b7ff4363df6946c2745", + ] + + @pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") def test_array_mismatched_store_versions(): store_v3 = KVStoreV3(dict()) diff --git a/zarr/tests/test_storage_v3.py b/zarr/tests/test_storage_v3.py index a6fef788db..1f45157459 100644 --- a/zarr/tests/test_storage_v3.py +++ b/zarr/tests/test_storage_v3.py @@ -263,7 +263,9 @@ def test_get_partial_values(self): (data_root + 'foo', (0, 1)) ] ) - assert [b'd', b'b', b'z', b'abc', b'defg', b'defg'] == store.get_partial_values( + assert [ + b'd', b'b', b'z', b'abc', b'defg', b'defg', b'g', b'ef' + ] == store.get_partial_values( [ (data_root + 'foo', (3, 1)), (data_root + 'foo', (1, 1)), @@ -271,6 +273,8 @@ def test_get_partial_values(self): (data_root + 'foo', (0, 3)), (data_root + 'foo', (3, 4)), (data_root + 'foo', (3, None)), + (data_root + 'foo', (-1, None)), + (data_root + 'foo', (-3, 2)), ] ) @@ -300,6 +304,14 @@ def test_set_partial_values(self): ) assert store[data_root + 'foo'] == b'hoodefdone' assert store[data_root + 'baz'] == b'zzzzaaaa' + store.set_partial_values( + [ + (data_root + 'foo', -2, b'NE'), + (data_root + 'baz', -5, b'q'), + ] + ) + assert store[data_root + 'foo'] == b'hoodefdoNE' + assert store[data_root + 'baz'] == b'zzzq' class TestMappingStoreV3(StoreV3Tests): From f6c87b4992d029faf482ee0392b3c6c8cd360db6 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 18 Aug 2022 18:00:30 +0200 Subject: [PATCH 02/19] add actual transformer --- zarr/_storage/v3_storage_transformers.py | 339 +++++++++++++++++++++++ 1 file changed, 339 insertions(+) create mode 100644 zarr/_storage/v3_storage_transformers.py diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py new file mode 100644 index 0000000000..9e8c3967ea --- /dev/null +++ b/zarr/_storage/v3_storage_transformers.py @@ -0,0 +1,339 @@ +import functools +import itertools +from typing import NamedTuple, Tuple, Optional, Union, Iterator + +import numpy as np + +from zarr._storage.store import StorageTransformer, StoreV3, _rmdir_from_keys_v3 +from zarr.util import normalize_storage_path + + +MAX_UINT_64 = 2 ** 64 - 1 + + +def _is_data_key(key: str) -> bool: + return key.startswith("data/root") + + +class _ShardIndex(NamedTuple): + store: "ShardingStorageTransformer" + # dtype uint64, shape (chunks_per_shard_0, chunks_per_shard_1, ..., 2) + offsets_and_lengths: np.ndarray + + def __localize_chunk__(self, chunk: Tuple[int, ...]) -> Tuple[int, ...]: + return tuple( + chunk_i % shard_i + for chunk_i, shard_i in zip(chunk, self.store.chunks_per_shard) + ) + + def is_all_empty(self) -> bool: + return np.array_equiv(self.offsets_and_lengths, MAX_UINT_64) + + def get_chunk_slice(self, chunk: Tuple[int, ...]) -> Optional[slice]: + localized_chunk = self.__localize_chunk__(chunk) + chunk_start, chunk_len = self.offsets_and_lengths[localized_chunk] + if (chunk_start, chunk_len) == (MAX_UINT_64, MAX_UINT_64): + return None + else: + return slice(chunk_start, chunk_start + chunk_len) + + def set_chunk_slice( + self, chunk: Tuple[int, ...], chunk_slice: Optional[slice] + ) -> None: + localized_chunk = self.__localize_chunk__(chunk) + if chunk_slice is None: + self.offsets_and_lengths[localized_chunk] = (MAX_UINT_64, MAX_UINT_64) + else: + self.offsets_and_lengths[localized_chunk] = ( + chunk_slice.start, + chunk_slice.stop - chunk_slice.start, + ) + + def to_bytes(self) -> bytes: + return self.offsets_and_lengths.tobytes(order="C") + + @classmethod + def from_bytes( + cls, buffer: Union[bytes, bytearray], store: "ShardingStorageTransformer" + ) -> "_ShardIndex": + try: + return cls( + store=store, + offsets_and_lengths=np.frombuffer(bytearray(buffer), dtype=" None: + super().__init__(_type) + chunks_per_shard = tuple(int(i) for i in chunks_per_shard) + if chunks_per_shard == (): + chunks_per_shard = (1, ) + self._dimension_separator = None + self.chunks_per_shard = chunks_per_shard + self._num_chunks_per_shard = functools.reduce( + lambda x, y: x * y, chunks_per_shard, 1 + ) + + def _copy_for_array(self, array, inner_store): + transformer_copy = super()._copy_for_array(array, inner_store) + transformer_copy._dimension_separator = array._dimension_separator + if len(array._shape) > len(self.chunks_per_shard): + # The array shape might be longer when initialized with subdtypes. + # subdtypes dimensions come last, therefore padding chunks_per_shard + # with ones, effectively disabling sharding on the unlisted dimensions. + transformer_copy.chunks_per_shard += ( + (1, ) * (len(array._shape) - len(self.chunks_per_shard)) + ) + return transformer_copy + + @property + def dimension_separator(self) -> str: + assert self._dimension_separator is not None, ( + "dimension_separator is not initialized, first get a copy via _copy_for_array." + ) + return self._dimension_separator + + def _key_to_shard(self, chunk_key: str) -> Tuple[str, Tuple[int, ...]]: + prefix, _, chunk_string = chunk_key.rpartition("c") + chunk_subkeys = tuple( + map(int, chunk_string.split(self.dimension_separator)) + ) if chunk_string else (0, ) + shard_key_tuple = ( + subkey // shard_i + for subkey, shard_i in zip(chunk_subkeys, self.chunks_per_shard) + ) + shard_key = ( + prefix + "c" + self.dimension_separator.join(map(str, shard_key_tuple)) + ) + return shard_key, chunk_subkeys + + def _get_index_from_store(self, shard_key: str) -> _ShardIndex: + # At the end of each shard 2*64bit per chunk for offset and length define the index: + return _ShardIndex.from_bytes( + self.inner_store.get_partial_values( + [(shard_key, (-16 * self._num_chunks_per_shard, None))] + )[0], + self, + ) + + def _get_index_from_buffer(self, buffer: Union[bytes, bytearray]) -> _ShardIndex: + # At the end of each shard 2*64bit per chunk for offset and length define the index: + return _ShardIndex.from_bytes(buffer[-16 * self._num_chunks_per_shard:], self) + + def _get_chunks_in_shard(self, shard_key: str) -> Iterator[Tuple[int, ...]]: + _, _, chunk_string = shard_key.rpartition("c") + shard_key_tuple = tuple( + map(int, chunk_string.split(self.dimension_separator)) + ) if chunk_string else (0, ) + for chunk_offset in itertools.product( + *(range(i) for i in self.chunks_per_shard) + ): + yield tuple( + shard_key_i * shards_i + offset_i + for shard_key_i, offset_i, shards_i in zip( + shard_key_tuple, chunk_offset, self.chunks_per_shard + ) + ) + + def __getitem__(self, key): + if _is_data_key(key): + if self.supports_efficient_get_partial_values(): + # Use the partial implementation, which fetches the index seperately + return self.get_partial_values([(key, (0, None))])[0] + shard_key, chunk_subkey = self._key_to_shard(key) + try: + full_shard_value = self.inner_store[shard_key] + except KeyError: + raise KeyError(key) + index = self._get_index_from_buffer(full_shard_value) + chunk_slice = index.get_chunk_slice(chunk_subkey) + if chunk_slice is not None: + return full_shard_value[chunk_slice] + else: + raise KeyError(key) + else: + return self.inner_store.__getitem__(key) + + def __setitem__(self, key, value): + if _is_data_key(key): + shard_key, chunk_subkey = self._key_to_shard(key) + chunks_to_read = set(self._get_chunks_in_shard(shard_key)) + chunks_to_read.remove(chunk_subkey) + new_content = {chunk_subkey: value} + try: + if self.supports_efficient_get_partial_values(): + index = self._get_index_from_store(shard_key) + full_shard_value = None + else: + full_shard_value = self.inner_store[shard_key] + index = self._get_index_from_buffer(full_shard_value) + except KeyError: + index = _ShardIndex.create_empty(self) + else: + chunk_slices = [ + (chunk_to_read, index.get_chunk_slice(chunk_to_read)) + for chunk_to_read in chunks_to_read + ] + valid_chunk_slices = [ + (chunk_to_read, chunk_slice) + for chunk_to_read, chunk_slice in chunk_slices + if chunk_slice is not None + ] + # use get_partial_values if less than half of the available chunks must be read: + # (This can be changed when set_partial_values can be used efficiently.) + use_partial_get = ( + self.supports_efficient_get_partial_values() + and len(valid_chunk_slices) < len(chunk_slices) / 2 + ) + + if use_partial_get: + chunk_values = self.inner_store.get_partial_values( + [ + ( + shard_key, + ( + chunk_slice.start, + chunk_slice.stop - chunk_slice.start, + ), + ) + for _, chunk_slice in valid_chunk_slices + ] + ) + for chunk_value, (chunk_to_read, _) in zip( + chunk_values, valid_chunk_slices + ): + new_content[chunk_to_read] = chunk_value + else: + if full_shard_value is None: + full_shard_value = self.inner_store[shard_key] + for chunk_to_read, chunk_slice in valid_chunk_slices: + if chunk_slice is not None: + new_content[chunk_to_read] = full_shard_value[chunk_slice] + + shard_content = b"" + for chunk_subkey, chunk_content in new_content.items(): + chunk_slice = slice( + len(shard_content), len(shard_content) + len(chunk_content) + ) + index.set_chunk_slice(chunk_subkey, chunk_slice) + shard_content += chunk_content + # Appending the index at the end of the shard: + shard_content += index.to_bytes() + self.inner_store[shard_key] = shard_content + else: + self.inner_store[key] = value + + def __delitem__(self, key): + if _is_data_key(key): + shard_key, chunk_subkey = self._key_to_shard(key) + try: + index = self._get_index_from_store(shard_key) + except KeyError: + raise KeyError(key) + + index.set_chunk_slice(chunk_subkey, None) + + if index.is_all_empty(): + del self.inner_store[shard_key] + else: + index_bytes = index.to_bytes() + self.set_partial_values([(shard_key, -len(index_bytes), index_bytes)]) + else: + del self.inner_store[key] + + def _shard_key_to_original_keys(self, key: str) -> Iterator[str]: + if _is_data_key(key): + index = self._get_index_from_store(key) + prefix, _, _ = key.rpartition("c") + for chunk_tuple in self._get_chunks_in_shard(key): + if index.get_chunk_slice(chunk_tuple) is not None: + yield prefix + "c" + self.dimension_separator.join( + map(str, chunk_tuple) + ) + else: + yield key + + def __iter__(self) -> Iterator[str]: + for key in self.inner_store: + yield from self._shard_key_to_original_keys(key) + + def __len__(self): + return sum(1 for _ in self.keys()) + + def get_partial_values(self, key_ranges): + if self.supports_efficient_get_partial_values(): + transformed_key_ranges = [] + cached_indices = {} + for key, range_ in key_ranges: + if _is_data_key(key): + shard_key, chunk_subkey = self._key_to_shard(key) + try: + index = cached_indices[shard_key] + except KeyError: + index = self._get_index_from_store(shard_key) + cached_indices[shard_key] = index + chunk_slice = index.get_chunk_slice(chunk_subkey) + range_start, range_length = range_ + transformed_key_ranges.append( + (shard_key, (range_start + chunk_slice.satrt, range_length)) + ) + else: + transformed_key_ranges.append((key, range_)) + return self.inner_store.get_partial_values(transformed_key_ranges) + else: + return StoreV3.get_partial_values(self, key_ranges) + + def supports_efficient_set_partial_values(self): + return False + + def set_partial_values(self, key_start_values): + # This does not yet implement efficient set_partial_values + return StoreV3.set_partial_values(self, key_start_values) + + def rename(self, src_path: str, dst_path: str) -> None: + return StoreV3.rename(self, src_path, dst_path) # type: ignore[arg-type] + + def list_prefix(self, prefix): + if _is_data_key(prefix): + return StoreV3.list_prefix(self, prefix) + else: + return self.inner_store.list_prefix(prefix) + + def erase_prefix(self, prefix): + if _is_data_key(prefix): + return StoreV3.erase_prefix(self, prefix) + else: + return self.inner_store.erase_prefix(prefix) + + def rmdir(self, path=None): + path = normalize_storage_path(path) + _rmdir_from_keys_v3(self, path) # type: ignore + + def __contains__(self, key): + if _is_data_key(key): + shard_key, chunk_subkeys = self._key_to_shard(key) + try: + index = self._get_index_from_store(shard_key) + except KeyError: + return False + chunk_slice = index.get_chunk_slice(chunk_subkeys) + return chunk_slice is not None + else: + return self._inner_store.__contains__(key) From df2dd71b779ce9b940a0d28f50173d96768f48af Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Mon, 22 Aug 2022 09:57:46 +0200 Subject: [PATCH 03/19] fixe, and allow partial reads for uncompressed v3 arrays --- zarr/_storage/store.py | 8 ++- zarr/_storage/v3.py | 31 +++++++++ zarr/_storage/v3_storage_transformers.py | 40 ++++++++--- zarr/core.py | 84 +++++++++++++++++------- zarr/tests/test_core.py | 63 ++++++++++++++++-- zarr/tests/test_creation.py | 4 +- zarr/tests/test_storage_v3.py | 2 +- zarr/util.py | 19 ++++++ 8 files changed, 207 insertions(+), 44 deletions(-) diff --git a/zarr/_storage/store.py b/zarr/_storage/store.py index 34bb6fcd67..a55de9c799 100644 --- a/zarr/_storage/store.py +++ b/zarr/_storage/store.py @@ -272,13 +272,17 @@ def get_partial_values(self, key_ranges): range_length may be None to indicate to read until the end. range_start may be negative to start reading range_start bytes from the end of the file. - A key may occur multiple times with different ranges.""" + A key may occur multiple times with different ranges. + Inserts None for missing keys into the returned list.""" results = [None] * len(key_ranges) indexed_ranges_by_key = defaultdict(list) for i, (key, range_) in enumerate(key_ranges): indexed_ranges_by_key[key].append((i, range_)) for key, indexed_ranges in indexed_ranges_by_key.items(): - value = self[key] + try: + value = self[key] + except KeyError: + continue for i, (range_from, range_length) in indexed_ranges: if range_length is None: results[i] = value[range_from:] diff --git a/zarr/_storage/v3.py b/zarr/_storage/v3.py index 540b62ef7e..47bf813dc8 100644 --- a/zarr/_storage/v3.py +++ b/zarr/_storage/v3.py @@ -182,6 +182,37 @@ def rmdir(self, path=None): if self.fs.isdir(store_path): self.fs.rm(store_path, recursive=True) + def supports_efficient_get_partial_values(self): + return True + + def get_partial_values(self, key_ranges): + """Get multiple partial values. + key_ranges can be an iterable of key, range pairs, + where a range specifies two integers range_start and range_length + as a tuple, (range_start, range_length). + range_length may be None to indicate to read until the end. + range_start may be negative to start reading range_start bytes + from the end of the file. + A key may occur multiple times with different ranges. + Inserts None for missing keys into the returned list.""" + results = [] + for key, (range_start, range_length) in key_ranges: + key = self._normalize_key(key) + path = self.dir_path(key) + try: + if range_start < 0: + if range_length is None: + result = self.fs.tail(path, size=-range_start) + else: + size = self.fs.size(path) + result = self.fs.read_block(path, size + range_start, range_length) + else: + result = self.fs.read_block(path, range_start, range_length) + except self.map.missing_exceptions: + result = None + results.append(result) + return results + class MemoryStoreV3(MemoryStore, StoreV3): diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py index 9e8c3967ea..cffba81680 100644 --- a/zarr/_storage/v3_storage_transformers.py +++ b/zarr/_storage/v3_storage_transformers.py @@ -35,7 +35,7 @@ def get_chunk_slice(self, chunk: Tuple[int, ...]) -> Optional[slice]: if (chunk_start, chunk_len) == (MAX_UINT_64, MAX_UINT_64): return None else: - return slice(chunk_start, chunk_start + chunk_len) + return slice(int(chunk_start), int(chunk_start + chunk_len)) def set_chunk_slice( self, chunk: Tuple[int, ...], chunk_slice: Optional[slice] @@ -126,10 +126,13 @@ def _key_to_shard(self, chunk_key: str) -> Tuple[str, Tuple[int, ...]]: def _get_index_from_store(self, shard_key: str) -> _ShardIndex: # At the end of each shard 2*64bit per chunk for offset and length define the index: + index_bytes = self.inner_store.get_partial_values( + [(shard_key, (-16 * self._num_chunks_per_shard, None))] + )[0] + if index_bytes is None: + raise KeyError(shard_key) return _ShardIndex.from_bytes( - self.inner_store.get_partial_values( - [(shard_key, (-16 * self._num_chunks_per_shard, None))] - )[0], + index_bytes, self, ) @@ -156,7 +159,11 @@ def __getitem__(self, key): if _is_data_key(key): if self.supports_efficient_get_partial_values(): # Use the partial implementation, which fetches the index seperately - return self.get_partial_values([(key, (0, None))])[0] + value = self.get_partial_values([(key, (0, None))])[0] + if value is None: + raise KeyError(key) + else: + return value shard_key, chunk_subkey = self._key_to_shard(key) try: full_shard_value = self.inner_store[shard_key] @@ -254,7 +261,7 @@ def __delitem__(self, key): del self.inner_store[shard_key] else: index_bytes = index.to_bytes() - self.set_partial_values([(shard_key, -len(index_bytes), index_bytes)]) + self.inner_store.set_partial_values([(shard_key, -len(index_bytes), index_bytes)]) else: del self.inner_store[key] @@ -281,22 +288,35 @@ def get_partial_values(self, key_ranges): if self.supports_efficient_get_partial_values(): transformed_key_ranges = [] cached_indices = {} - for key, range_ in key_ranges: + none_indices = [] + for i, (key, range_) in enumerate(key_ranges): if _is_data_key(key): shard_key, chunk_subkey = self._key_to_shard(key) try: index = cached_indices[shard_key] except KeyError: - index = self._get_index_from_store(shard_key) + try: + index = self._get_index_from_store(shard_key) + except KeyError: + none_indices.append(i) + continue cached_indices[shard_key] = index chunk_slice = index.get_chunk_slice(chunk_subkey) + if chunk_slice is None: + none_indices.append(i) + continue range_start, range_length = range_ + if range_length is None: + range_length = chunk_slice.stop - chunk_slice.start transformed_key_ranges.append( - (shard_key, (range_start + chunk_slice.satrt, range_length)) + (shard_key, (range_start + chunk_slice.start, range_length)) ) else: transformed_key_ranges.append((key, range_)) - return self.inner_store.get_partial_values(transformed_key_ranges) + values = self.inner_store.get_partial_values(transformed_key_ranges) + for i in none_indices: + values.insert(i, None) + return values else: return StoreV3.get_partial_values(self, key_ranges) diff --git a/zarr/core.py b/zarr/core.py index 9ecdf9c51e..b434460a68 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -51,6 +51,7 @@ normalize_shape, normalize_storage_path, PartialReadBuffer, + UncompressedPartialReadBufferV3, ) @@ -180,6 +181,7 @@ def __init__( self._store = store self._chunk_store = chunk_store + self._transformed_chunk_store = None self._path = normalize_storage_path(path) if self._path: self._key_prefix = self._path + '/' @@ -282,17 +284,13 @@ def _load_metadata_nosync(self): if self._version == 3: storage_transformers = meta.get('storage_transformers', []) - transformed_store = self._store - for storage_transformer in storage_transformers: - transformed_store = storage_transformer._copy_for_array(self, transformed_store) - self._transformed_store = transformed_store - if self._chunk_store is not None: - transformed_chunk_store = self._chunk_store + if storage_transformers: + transformed_store = self._chunk_store or self._store for storage_transformer in storage_transformers: - transformed_chunk_store = ( - storage_transformer._copy_for_array(self, transformed_chunk_store) + transformed_store = storage_transformer._copy_for_array( + self, transformed_store ) - self._transformed_chunk_store = transformed_chunk_store + self._transformed_chunk_store = transformed_store def _refresh_metadata(self): if not self._cache_metadata: @@ -336,7 +334,7 @@ def _flush_metadata_nosync(self): @property def store(self): """A MutableMapping providing the underlying storage for the array.""" - return self._transformed_store + return self._store @property def path(self): @@ -373,10 +371,12 @@ def read_only(self, value): @property def chunk_store(self): """A MutableMapping providing the underlying storage for array chunks.""" - if self._chunk_store is None: - return self._store - else: + if self._transformed_chunk_store is not None: return self._transformed_chunk_store + elif self._chunk_store is not None: + return self._chunk_store + else: + return self._store @property def shape(self): @@ -1252,8 +1252,12 @@ def _get_selection(self, indexer, out=None, fields=None): check_array_shape('out', out, out_shape) # iterate over chunks - if not hasattr(self.chunk_store, "getitems") or \ - any(map(lambda x: x == 0, self.shape)): + if ( + not hasattr(self.chunk_store, "getitems") and not ( + hasattr(self.chunk_store, "get_partial_values") and + self.chunk_store.supports_efficient_get_partial_values() + ) + ) or any(map(lambda x: x == 0, self.shape)): # sequentially get one key at a time from storage for chunk_coords, chunk_selection, out_selection in indexer: @@ -1790,7 +1794,7 @@ def _set_selection(self, indexer, value, fields=None): check_array_shape('value', value, sel_shape) # iterate over chunks in range - if not hasattr(self.store, "setitems") or self._synchronizer is not None \ + if not hasattr(self.chunk_store, "setitems") or self._synchronizer is not None \ or any(map(lambda x: x == 0, self.shape)): # iterative approach for chunk_coords, chunk_selection, out_selection in indexer: @@ -1868,10 +1872,12 @@ def _process_chunk( # contiguous, so we can decompress directly from the chunk # into the destination array if self._compressor: - if isinstance(cdata, PartialReadBuffer): + if isinstance(cdata, (PartialReadBuffer, UncompressedPartialReadBufferV3)): cdata = cdata.read_full() self._compressor.decode(cdata, dest) else: + if isinstance(cdata, UncompressedPartialReadBufferV3): + cdata = cdata.read_full() chunk = ensure_ndarray(cdata).view(self._dtype) chunk = chunk.reshape(self._chunks, order=self._order) np.copyto(dest, chunk) @@ -1893,13 +1899,21 @@ def _process_chunk( else dim for i, dim in enumerate(self.chunks) ] - cdata.read_part(start, nitems) - chunk_partial = self._decode_chunk( - cdata.buff, - start=start, - nitems=nitems, - expected_shape=expected_shape, - ) + if isinstance(cdata, UncompressedPartialReadBufferV3): + chunk_partial = self._decode_chunk( + cdata.read_part(start, nitems), + start=start, + nitems=nitems, + expected_shape=expected_shape, + ) + else: + cdata.read_part(start, nitems) + chunk_partial = self._decode_chunk( + cdata.buff, + start=start, + nitems=nitems, + expected_shape=expected_shape, + ) tmp[partial_out_selection] = chunk_partial out[out_selection] = tmp[chunk_selection] return @@ -1994,9 +2008,29 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, for ckey in ckeys if ckey in self.chunk_store } + elif ( + self._partial_decompress + and not self._compressor + and not fields + and self.dtype != object + and hasattr(self.chunk_store, "get_partial_values") + and self.chunk_store.supports_efficient_get_partial_values() + ): + partial_read_decode = True + cdatas = { + ckey: UncompressedPartialReadBufferV3( + ckey, self.chunk_store, itemsize=self.itemsize + ) + for ckey in ckeys + if ckey in self.chunk_store + } else: partial_read_decode = False - cdatas = self.chunk_store.getitems(ckeys, on_error="omit") + if not hasattr(self.chunk_store, "getitems"): + values = self.chunk_store.get_partial_values([(ckey, (0, None)) for ckey in ckeys]) + cdatas = {key: value for key, value in zip(ckeys, values) if value is not None} + else: + cdatas = self.chunk_store.getitems(ckeys, on_error="omit") for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection): if ckey in cdatas: self._process_chunk( diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index d612146575..56b7cc9cd4 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -3287,6 +3287,54 @@ def expected(self): ] +@pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") +@pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") +class TestArrayWithFSStoreV3PartialReadUncompressedSharded( + TestArrayWithPathV3, TestArrayWithFSStorePartialRead +): + + @staticmethod + def create_array(array_path='arr1', read_only=False, **kwargs): + path = mkdtemp() + atexit.register(shutil.rmtree, path) + store = FSStoreV3(path) + cache_metadata = kwargs.pop("cache_metadata", True) + cache_attrs = kwargs.pop("cache_attrs", True) + write_empty_chunks = kwargs.pop('write_empty_chunks', True) + kwargs.setdefault('compressor', None) + num_dims = 1 if isinstance(kwargs["shape"], int) else len(kwargs["shape"]) + sharding_transformer = ShardingStorageTransformer( + "indexed", chunks_per_shard=(2, ) * num_dims + ) + init_array(store, path=array_path, storage_transformers=[sharding_transformer], **kwargs) + return Array( + store, + path=array_path, + read_only=read_only, + cache_metadata=cache_metadata, + cache_attrs=cache_attrs, + partial_decompress=True, + write_empty_chunks=write_empty_chunks, + ) + + def test_nbytes_stored(self): + z = self.create_array(shape=1000, chunks=100) + expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json') + assert expect_nbytes_stored == z.nbytes_stored + z[:] = 42 + expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json') + assert expect_nbytes_stored == z.nbytes_stored + + def expected(self): + return [ + "90109fc2a4e17efbcb447003ea1c08828b91f71e", + "2b73519f7260dba3ddce0d2b70041888856fec6b", + "bca5798be2ed71d444f3045b05432d937682b7dd", + "9ff1084501e28520e577662a6e3073f1116c76a2", + "882a97cad42417f90f111d0cb916a21579650467", + ] + + @pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") @pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") class TestArrayWithFSStoreV3Nested(TestArrayWithPathV3, TestArrayWithFSStoreNested): @@ -3398,9 +3446,6 @@ def create_array(array_path='arr1', read_only=False, **kwargs): cache_metadata=cache_metadata, cache_attrs=cache_attrs, write_empty_chunks=write_empty_chunks) - # def test_nbytes_stored(self): - # pass # not implemented - def test_nbytes_stored(self): z = self.create_array(shape=1000, chunks=100) expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json') @@ -3410,9 +3455,19 @@ def test_nbytes_stored(self): assert expect_nbytes_stored == z.nbytes_stored # mess with store - z.chunk_store[data_root + z._key_prefix + 'foo'] = list(range(10)) + z.store[data_root + z._key_prefix + 'foo'] = list(range(10)) assert -1 == z.nbytes_stored + def test_keys_inner_store(self): + z = self.create_array(shape=1000, chunks=100) + assert z.chunk_store.keys() == z._store.keys() + meta_keys = set(z.store.keys()) + z[:] = 42 + assert len(z.chunk_store.keys() - meta_keys) == 10 + # inner store should have half the data keys, + # since chunks_per_shard is 2: + assert len(z._store.keys() - meta_keys) == 5 + def expected(self): return [ "b46294e25b1d816055e7937780265c0d8d5d6c47", diff --git a/zarr/tests/test_creation.py b/zarr/tests/test_creation.py index c289fbc639..fea146d832 100644 --- a/zarr/tests/test_creation.py +++ b/zarr/tests/test_creation.py @@ -734,5 +734,5 @@ def test_create_with_storage_transformers(): test_value=DummyStorageTransfomer.TEST_CONSTANT ) z = create(1000000000, chunks=True, storage_transformers=[transformer], **kwargs) - assert isinstance(z._store, DummyStorageTransfomer) - assert z._store.test_value == DummyStorageTransfomer.TEST_CONSTANT + assert isinstance(z.chunk_store, DummyStorageTransfomer) + assert z.chunk_store.test_value == DummyStorageTransfomer.TEST_CONSTANT diff --git a/zarr/tests/test_storage_v3.py b/zarr/tests/test_storage_v3.py index 1f45157459..3666c01594 100644 --- a/zarr/tests/test_storage_v3.py +++ b/zarr/tests/test_storage_v3.py @@ -528,7 +528,7 @@ def create_store(self, **kwargs): storage_transformer = DummyStorageTransfomer( "dummy_type", test_value=DummyStorageTransfomer.TEST_CONSTANT ) - return storage_transformer._copy_for_array(inner_store) + return storage_transformer._copy_for_array(None, inner_store) def test_method_forwarding(self): store = self.create_store() diff --git a/zarr/util.py b/zarr/util.py index cc3bd50356..9c313f4915 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -640,6 +640,25 @@ def read_full(self): return self.chunk_store[self.store_key] +class UncompressedPartialReadBufferV3: + def __init__(self, store_key, chunk_store, itemsize): + assert chunk_store.supports_efficient_get_partial_values() + self.chunk_store = chunk_store + self.store_key = store_key + self.itemsize = itemsize + + def prepare_chunk(self): + pass + + def read_part(self, start, nitems): + return self.chunk_store.get_partial_values( + [(self.store_key, (start * self.itemsize, nitems * self.itemsize))] + )[0] + + def read_full(self): + return self.chunk_store[self.store_key] + + def retry_call(callabl: Callable, args=None, kwargs=None, From 83c9389cafabec55a3f2be072e64a22812631012 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Mon, 22 Aug 2022 18:26:15 +0200 Subject: [PATCH 04/19] make lgtm happy --- zarr/_storage/v3_storage_transformers.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py index cffba81680..5b47f7e001 100644 --- a/zarr/_storage/v3_storage_transformers.py +++ b/zarr/_storage/v3_storage_transformers.py @@ -76,7 +76,7 @@ def create_empty(cls, store: "ShardingStorageTransformer"): ) -class ShardingStorageTransformer(StorageTransformer): +class ShardingStorageTransformer(StorageTransformer): # lgtm[py/missing-equals] extension_uri = "https://purl.org/zarr/spec/storage_transformers/sharding/1.0" valid_types = ["indexed"] @@ -325,10 +325,10 @@ def supports_efficient_set_partial_values(self): def set_partial_values(self, key_start_values): # This does not yet implement efficient set_partial_values - return StoreV3.set_partial_values(self, key_start_values) + StoreV3.set_partial_values(self, key_start_values) def rename(self, src_path: str, dst_path: str) -> None: - return StoreV3.rename(self, src_path, dst_path) # type: ignore[arg-type] + StoreV3.rename(self, src_path, dst_path) # type: ignore[arg-type] def list_prefix(self, prefix): if _is_data_key(prefix): @@ -338,9 +338,9 @@ def list_prefix(self, prefix): def erase_prefix(self, prefix): if _is_data_key(prefix): - return StoreV3.erase_prefix(self, prefix) + StoreV3.erase_prefix(self, prefix) else: - return self.inner_store.erase_prefix(prefix) + self.inner_store.erase_prefix(prefix) def rmdir(self, path=None): path = normalize_storage_path(path) From fde61e86e59d8743ec7b6b90131f3cd4d58b5b42 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Mon, 22 Aug 2022 18:28:29 +0200 Subject: [PATCH 05/19] add release note --- docs/release.rst | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/release.rst b/docs/release.rst index 0b90ea8da1..6b8a2d85dc 100644 --- a/docs/release.rst +++ b/docs/release.rst @@ -38,11 +38,13 @@ Bug fixes Enhancements ~~~~~~~~~~~~ -* **Improve Zarr V3 support, adding partial store read/write and storage transformers.** +* **Improve Zarr V3 support, adding partial store read/write, storage transformers and sharding.** Add two features of the [v3 spec](https://zarr-specs.readthedocs.io/en/latest/core/v3.0.html): - * storage transformers * `get_partial_values` and `set_partial_values` - By :user:`Jonathan Striebel `; :issue:`1096`. + * efficient `get_partial_values` implementation for `FSStoreV3` + * storage transformers interface + * sharding storage transformer + By :user:`Jonathan Striebel `; :issue:`1096`, :issue:`1111`. Documentation From de4de18c31f21b34bbf2d6f97f27d2519a1b48f6 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Tue, 23 Aug 2022 13:48:30 +0200 Subject: [PATCH 06/19] better coverage --- zarr/_storage/v3_storage_transformers.py | 8 ++++---- zarr/tests/test_core.py | 11 +++++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py index 5b47f7e001..9ed4104e07 100644 --- a/zarr/_storage/v3_storage_transformers.py +++ b/zarr/_storage/v3_storage_transformers.py @@ -63,7 +63,7 @@ def from_bytes( *store.chunks_per_shard, 2, order="C" ), ) - except ValueError as e: + except ValueError as e: # pragma: no cover raise RuntimeError from e @classmethod @@ -244,7 +244,7 @@ def __setitem__(self, key, value): # Appending the index at the end of the shard: shard_content += index.to_bytes() self.inner_store[shard_key] = shard_content - else: + else: # pragma: no cover self.inner_store[key] = value def __delitem__(self, key): @@ -262,7 +262,7 @@ def __delitem__(self, key): else: index_bytes = index.to_bytes() self.inner_store.set_partial_values([(shard_key, -len(index_bytes), index_bytes)]) - else: + else: # pragma: no cover del self.inner_store[key] def _shard_key_to_original_keys(self, key: str) -> Iterator[str]: @@ -311,7 +311,7 @@ def get_partial_values(self, key_ranges): transformed_key_ranges.append( (shard_key, (range_start + chunk_slice.start, range_length)) ) - else: + else: # pragma: no cover transformed_key_ranges.append((key, range_)) values = self.inner_store.get_partial_values(transformed_key_ranges) for i in none_indices: diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index 56b7cc9cd4..0f2cf6ea51 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -3325,6 +3325,11 @@ def test_nbytes_stored(self): expect_nbytes_stored = sum(buffer_size(v) for k, v in z._store.items() if k != 'zarr.json') assert expect_nbytes_stored == z.nbytes_stored + def test_supports_efficient_get_set_partial_values(self): + z = self.create_array(shape=100, chunks=10) + assert z.chunk_store.supports_efficient_get_partial_values() + assert not z.chunk_store.supports_efficient_set_partial_values() + def expected(self): return [ "90109fc2a4e17efbcb447003ea1c08828b91f71e", @@ -3437,6 +3442,7 @@ def create_array(array_path='arr1', read_only=False, **kwargs): cache_metadata = kwargs.pop('cache_metadata', True) cache_attrs = kwargs.pop('cache_attrs', True) write_empty_chunks = kwargs.pop('write_empty_chunks', True) + kwargs.setdefault('compressor', None) num_dims = 1 if isinstance(kwargs["shape"], int) else len(kwargs["shape"]) sharding_transformer = ShardingStorageTransformer( "indexed", chunks_per_shard=(2, ) * num_dims @@ -3468,6 +3474,11 @@ def test_keys_inner_store(self): # since chunks_per_shard is 2: assert len(z._store.keys() - meta_keys) == 5 + def test_supports_efficient_get_set_partial_values(self): + z = self.create_array(shape=100, chunks=10) + assert not z.chunk_store.supports_efficient_get_partial_values() + assert not z.chunk_store.supports_efficient_set_partial_values() + def expected(self): return [ "b46294e25b1d816055e7937780265c0d8d5d6c47", From 0deb2b63935b4fe4bc8138ff9817ea204474bfa4 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Tue, 23 Aug 2022 14:07:02 +0200 Subject: [PATCH 07/19] fix hexdigest --- zarr/tests/test_core.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index 0f2cf6ea51..2f9fab4934 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -3481,11 +3481,11 @@ def test_supports_efficient_get_set_partial_values(self): def expected(self): return [ - "b46294e25b1d816055e7937780265c0d8d5d6c47", - "5b52b03dde558c4c2313e55cf7ed9898d397e485", - "ef7f726387c1bc235ac205f77567276d28872477", - "fd944727c0d058e594d7b3800781a4786af5f0de", - "4ce1eebc42dc03690d917b7ff4363df6946c2745", + '90109fc2a4e17efbcb447003ea1c08828b91f71e', + '2b73519f7260dba3ddce0d2b70041888856fec6b', + 'bca5798be2ed71d444f3045b05432d937682b7dd', + '9ff1084501e28520e577662a6e3073f1116c76a2', + '882a97cad42417f90f111d0cb916a21579650467', ] From d3eda71a9bbc201f58169101e6107c37d9e2627d Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Tue, 23 Aug 2022 17:17:26 +0200 Subject: [PATCH 08/19] improve tests --- zarr/_storage/v3_storage_transformers.py | 47 ++++++++++++++---------- zarr/tests/test_storage_v3.py | 26 +++++++++---- 2 files changed, 47 insertions(+), 26 deletions(-) diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py index 9ed4104e07..72c3178e86 100644 --- a/zarr/_storage/v3_storage_transformers.py +++ b/zarr/_storage/v3_storage_transformers.py @@ -11,10 +11,6 @@ MAX_UINT_64 = 2 ** 64 - 1 -def _is_data_key(key: str) -> bool: - return key.startswith("data/root") - - class _ShardIndex(NamedTuple): store: "ShardingStorageTransformer" # dtype uint64, shape (chunks_per_shard_0, chunks_per_shard_1, ..., 2) @@ -77,23 +73,33 @@ def create_empty(cls, store: "ShardingStorageTransformer"): class ShardingStorageTransformer(StorageTransformer): # lgtm[py/missing-equals] + """Implements sharding as a storage transformer, as described in the spec: + https://zarr-specs.readthedocs.io/en/latest/extensions/storage-transformers/sharding/v1.0.html + https://purl.org/zarr/spec/storage_transformers/sharding/1.0 + """ + extension_uri = "https://purl.org/zarr/spec/storage_transformers/sharding/1.0" valid_types = ["indexed"] def __init__(self, _type, chunks_per_shard) -> None: super().__init__(_type) - chunks_per_shard = tuple(int(i) for i in chunks_per_shard) - if chunks_per_shard == (): - chunks_per_shard = (1, ) - self._dimension_separator = None + if isinstance(chunks_per_shard, int): + chunks_per_shard = (chunks_per_shard, ) + else: + chunks_per_shard = tuple(int(i) for i in chunks_per_shard) + if chunks_per_shard == (): + chunks_per_shard = (1, ) self.chunks_per_shard = chunks_per_shard self._num_chunks_per_shard = functools.reduce( lambda x, y: x * y, chunks_per_shard, 1 ) + self._dimension_separator = None + self._data_key_prefix = None def _copy_for_array(self, array, inner_store): transformer_copy = super()._copy_for_array(array, inner_store) transformer_copy._dimension_separator = array._dimension_separator + transformer_copy._data_key_prefix = array._data_key_prefix if len(array._shape) > len(self.chunks_per_shard): # The array shape might be longer when initialized with subdtypes. # subdtypes dimensions come last, therefore padding chunks_per_shard @@ -110,6 +116,12 @@ def dimension_separator(self) -> str: ) return self._dimension_separator + def _is_data_key(self, key: str) -> bool: + assert self._data_key_prefix is not None, ( + "data_key_prefix is not initialized, first get a copy via _copy_for_array." + ) + return key.startswith(self._data_key_prefix) + def _key_to_shard(self, chunk_key: str) -> Tuple[str, Tuple[int, ...]]: prefix, _, chunk_string = chunk_key.rpartition("c") chunk_subkeys = tuple( @@ -156,7 +168,7 @@ def _get_chunks_in_shard(self, shard_key: str) -> Iterator[Tuple[int, ...]]: ) def __getitem__(self, key): - if _is_data_key(key): + if self._is_data_key(key): if self.supports_efficient_get_partial_values(): # Use the partial implementation, which fetches the index seperately value = self.get_partial_values([(key, (0, None))])[0] @@ -179,7 +191,7 @@ def __getitem__(self, key): return self.inner_store.__getitem__(key) def __setitem__(self, key, value): - if _is_data_key(key): + if self._is_data_key(key): shard_key, chunk_subkey = self._key_to_shard(key) chunks_to_read = set(self._get_chunks_in_shard(shard_key)) chunks_to_read.remove(chunk_subkey) @@ -248,7 +260,7 @@ def __setitem__(self, key, value): self.inner_store[key] = value def __delitem__(self, key): - if _is_data_key(key): + if self._is_data_key(key): shard_key, chunk_subkey = self._key_to_shard(key) try: index = self._get_index_from_store(shard_key) @@ -266,7 +278,7 @@ def __delitem__(self, key): del self.inner_store[key] def _shard_key_to_original_keys(self, key: str) -> Iterator[str]: - if _is_data_key(key): + if self._is_data_key(key): index = self._get_index_from_store(key) prefix, _, _ = key.rpartition("c") for chunk_tuple in self._get_chunks_in_shard(key): @@ -290,7 +302,7 @@ def get_partial_values(self, key_ranges): cached_indices = {} none_indices = [] for i, (key, range_) in enumerate(key_ranges): - if _is_data_key(key): + if self._is_data_key(key): shard_key, chunk_subkey = self._key_to_shard(key) try: index = cached_indices[shard_key] @@ -331,13 +343,10 @@ def rename(self, src_path: str, dst_path: str) -> None: StoreV3.rename(self, src_path, dst_path) # type: ignore[arg-type] def list_prefix(self, prefix): - if _is_data_key(prefix): - return StoreV3.list_prefix(self, prefix) - else: - return self.inner_store.list_prefix(prefix) + return StoreV3.list_prefix(self, prefix) def erase_prefix(self, prefix): - if _is_data_key(prefix): + if self._is_data_key(prefix): StoreV3.erase_prefix(self, prefix) else: self.inner_store.erase_prefix(prefix) @@ -347,7 +356,7 @@ def rmdir(self, path=None): _rmdir_from_keys_v3(self, path) # type: ignore def __contains__(self, key): - if _is_data_key(key): + if self._is_data_key(key): shard_key, chunk_subkeys = self._key_to_shard(key) try: index = self._get_index_from_store(shard_key) diff --git a/zarr/tests/test_storage_v3.py b/zarr/tests/test_storage_v3.py index 8d077ef129..98ef47b5b5 100644 --- a/zarr/tests/test_storage_v3.py +++ b/zarr/tests/test_storage_v3.py @@ -10,6 +10,8 @@ import zarr from zarr._storage.store import _get_hierarchy_metadata, v3_api_available, StorageTransformer +from zarr._storage.v3_storage_transformers import ShardingStorageTransformer +from zarr.core import Array from zarr.meta import _default_entry_point_metadata_v3 from zarr.storage import (atexit_rmglob, atexit_rmtree, data_root, default_compressor, getsize, init_array, meta_root, @@ -527,22 +529,32 @@ class TestStorageTransformerV3(TestMappingStoreV3): def create_store(self, **kwargs): inner_store = super().create_store(**kwargs) - storage_transformer = DummyStorageTransfomer( + dummy_transformer = DummyStorageTransfomer( "dummy_type", test_value=DummyStorageTransfomer.TEST_CONSTANT ) - return storage_transformer._copy_for_array(None, inner_store) + sharding_transformer = ShardingStorageTransformer( + "indexed", chunks_per_shard=(2, ), + ) + path = 'bla' + init_array(inner_store, path=path, shape=1000, chunks=100, + dimension_separator=".", + storage_transformers=[dummy_transformer, sharding_transformer]) + store = Array(store=inner_store, path=path).chunk_store + store.clear() + return store def test_method_forwarding(self): store = self.create_store() - assert store.list() == store.inner_store.list() - assert store.list_dir(data_root) == store.inner_store.list_dir(data_root) + inner_store = store.inner_store.inner_store + assert store.list() == inner_store.list() + assert store.list_dir(data_root) == inner_store.list_dir(data_root) assert store.is_readable() assert store.is_writeable() assert store.is_listable() - store.inner_store._readable = False - store.inner_store._writeable = False - store.inner_store._listable = False + inner_store._readable = False + inner_store._writeable = False + inner_store._listable = False assert not store.is_readable() assert not store.is_writeable() assert not store.is_listable() From 093926c8e73c90e0b4cf9e61b42a306879b0ef49 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Wed, 24 Aug 2022 15:26:10 +0200 Subject: [PATCH 09/19] fix order of storage transformers --- zarr/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zarr/core.py b/zarr/core.py index b434460a68..b6085f1fcc 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -286,7 +286,7 @@ def _load_metadata_nosync(self): storage_transformers = meta.get('storage_transformers', []) if storage_transformers: transformed_store = self._chunk_store or self._store - for storage_transformer in storage_transformers: + for storage_transformer in storage_transformers[::-1]: transformed_store = storage_transformer._copy_for_array( self, transformed_store ) From e7b14b7ec07ab56c990365bee639add8f80c80b4 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 25 Aug 2022 14:12:19 +0200 Subject: [PATCH 10/19] minor test improvement --- zarr/tests/test_storage_v3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zarr/tests/test_storage_v3.py b/zarr/tests/test_storage_v3.py index 98ef47b5b5..ffb0db74a9 100644 --- a/zarr/tests/test_storage_v3.py +++ b/zarr/tests/test_storage_v3.py @@ -533,14 +533,14 @@ def create_store(self, **kwargs): "dummy_type", test_value=DummyStorageTransfomer.TEST_CONSTANT ) sharding_transformer = ShardingStorageTransformer( - "indexed", chunks_per_shard=(2, ), + "indexed", chunks_per_shard=2, ) path = 'bla' init_array(inner_store, path=path, shape=1000, chunks=100, dimension_separator=".", storage_transformers=[dummy_transformer, sharding_transformer]) store = Array(store=inner_store, path=path).chunk_store - store.clear() + store.erase_prefix("/") return store def test_method_forwarding(self): From a52300c330af8281629ec902ec3e56546723ce1f Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 25 Aug 2022 15:35:46 +0200 Subject: [PATCH 11/19] minor test update --- zarr/tests/test_storage_v3.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zarr/tests/test_storage_v3.py b/zarr/tests/test_storage_v3.py index ffb0db74a9..f9c5ac4fbb 100644 --- a/zarr/tests/test_storage_v3.py +++ b/zarr/tests/test_storage_v3.py @@ -540,7 +540,8 @@ def create_store(self, **kwargs): dimension_separator=".", storage_transformers=[dummy_transformer, sharding_transformer]) store = Array(store=inner_store, path=path).chunk_store - store.erase_prefix("/") + store.erase_prefix("data/root/bla/") + store.clear() return store def test_method_forwarding(self): From a9604810528afc46ff483312140209f9901ea9b6 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 8 Sep 2022 15:55:53 +0200 Subject: [PATCH 12/19] apply PR feedback --- zarr/_storage/v3.py | 11 ++++------- zarr/core.py | 2 +- zarr/tests/test_storage_v3.py | 5 ++++- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/zarr/_storage/v3.py b/zarr/_storage/v3.py index 47bf813dc8..080a95debd 100644 --- a/zarr/_storage/v3.py +++ b/zarr/_storage/v3.py @@ -200,14 +200,11 @@ def get_partial_values(self, key_ranges): key = self._normalize_key(key) path = self.dir_path(key) try: - if range_start < 0: - if range_length is None: - result = self.fs.tail(path, size=-range_start) - else: - size = self.fs.size(path) - result = self.fs.read_block(path, size + range_start, range_length) + if range_start is None or range_length is None: + end = None else: - result = self.fs.read_block(path, range_start, range_length) + end = range_start + range_length + result = self.fs.cat_file(path, start=range_start, end=end) except self.map.missing_exceptions: result = None results.append(result) diff --git a/zarr/core.py b/zarr/core.py index b6085f1fcc..bec828f6c2 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1872,7 +1872,7 @@ def _process_chunk( # contiguous, so we can decompress directly from the chunk # into the destination array if self._compressor: - if isinstance(cdata, (PartialReadBuffer, UncompressedPartialReadBufferV3)): + if isinstance(cdata, PartialReadBuffer): cdata = cdata.read_full() self._compressor.decode(cdata, dest) else: diff --git a/zarr/tests/test_storage_v3.py b/zarr/tests/test_storage_v3.py index f9c5ac4fbb..599051e55a 100644 --- a/zarr/tests/test_storage_v3.py +++ b/zarr/tests/test_storage_v3.py @@ -99,7 +99,10 @@ class DummyStorageTransfomer(StorageTransformer): def __init__(self, _type, test_value) -> None: super().__init__(_type) - assert test_value == self.TEST_CONSTANT + if test_value != self.TEST_CONSTANT: + raise ValueError( + f"test_value must be {self.TEST_CONSTANT}, but is {test_value}" + ) self.test_value = test_value From 91f10ff00be70d7d52b150982298e11457d3f383 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Mon, 12 Dec 2022 14:24:12 +0100 Subject: [PATCH 13/19] call ensure_bytes in sharding transformer --- zarr/_storage/v3_storage_transformers.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py index 72c3178e86..8de99fee88 100644 --- a/zarr/_storage/v3_storage_transformers.py +++ b/zarr/_storage/v3_storage_transformers.py @@ -2,6 +2,7 @@ import itertools from typing import NamedTuple, Tuple, Optional, Union, Iterator +from numcodecs.compat import ensure_bytes import numpy as np from zarr._storage.store import StorageTransformer, StoreV3, _rmdir_from_keys_v3 @@ -191,6 +192,7 @@ def __getitem__(self, key): return self.inner_store.__getitem__(key) def __setitem__(self, key, value): + value = ensure_bytes(value) if self._is_data_key(key): shard_key, chunk_subkey = self._key_to_shard(key) chunks_to_read = set(self._get_chunks_in_shard(shard_key)) From 73fb0a5797360f758b0423bc3c9f5bdf363fd5db Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Mon, 12 Dec 2022 15:30:45 +0100 Subject: [PATCH 14/19] minor fixes --- zarr/_storage/v3_storage_transformers.py | 2 +- zarr/tests/test_storage_v3.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py index 8de99fee88..c46396e439 100644 --- a/zarr/_storage/v3_storage_transformers.py +++ b/zarr/_storage/v3_storage_transformers.py @@ -171,7 +171,7 @@ def _get_chunks_in_shard(self, shard_key: str) -> Iterator[Tuple[int, ...]]: def __getitem__(self, key): if self._is_data_key(key): if self.supports_efficient_get_partial_values(): - # Use the partial implementation, which fetches the index seperately + # Use the partial implementation, which fetches the index separately value = self.get_partial_values([(key, (0, None))])[0] if value is None: raise KeyError(key) diff --git a/zarr/tests/test_storage_v3.py b/zarr/tests/test_storage_v3.py index 5a3fee0a30..f33a40e8d8 100644 --- a/zarr/tests/test_storage_v3.py +++ b/zarr/tests/test_storage_v3.py @@ -100,7 +100,7 @@ class DummyStorageTransfomer(StorageTransformer): def __init__(self, _type, test_value) -> None: super().__init__(_type) if test_value != self.TEST_CONSTANT: - raise ValueError( + raise ValueError( # pragma: no cover f"test_value must be {self.TEST_CONSTANT}, but is {test_value}" ) self.test_value = test_value From e1960a12b7fdbb48f605749328aa55ee400067fe Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 22 Dec 2022 13:47:28 +0100 Subject: [PATCH 15/19] adapt to supports_efficient_get_partial_values property --- zarr/_storage/v3.py | 1 + zarr/_storage/v3_storage_transformers.py | 8 ++++---- zarr/core.py | 4 ++-- zarr/tests/test_core.py | 4 ++-- zarr/util.py | 2 +- 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/zarr/_storage/v3.py b/zarr/_storage/v3.py index a1e19c55bd..9eb7f6bb5f 100644 --- a/zarr/_storage/v3.py +++ b/zarr/_storage/v3.py @@ -182,6 +182,7 @@ def rmdir(self, path=None): if self.fs.isdir(store_path): self.fs.rm(store_path, recursive=True) + @property def supports_efficient_get_partial_values(self): return True diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py index c46396e439..dfd9211f43 100644 --- a/zarr/_storage/v3_storage_transformers.py +++ b/zarr/_storage/v3_storage_transformers.py @@ -170,7 +170,7 @@ def _get_chunks_in_shard(self, shard_key: str) -> Iterator[Tuple[int, ...]]: def __getitem__(self, key): if self._is_data_key(key): - if self.supports_efficient_get_partial_values(): + if self.supports_efficient_get_partial_values: # Use the partial implementation, which fetches the index separately value = self.get_partial_values([(key, (0, None))])[0] if value is None: @@ -199,7 +199,7 @@ def __setitem__(self, key, value): chunks_to_read.remove(chunk_subkey) new_content = {chunk_subkey: value} try: - if self.supports_efficient_get_partial_values(): + if self.supports_efficient_get_partial_values: index = self._get_index_from_store(shard_key) full_shard_value = None else: @@ -220,7 +220,7 @@ def __setitem__(self, key, value): # use get_partial_values if less than half of the available chunks must be read: # (This can be changed when set_partial_values can be used efficiently.) use_partial_get = ( - self.supports_efficient_get_partial_values() + self.supports_efficient_get_partial_values and len(valid_chunk_slices) < len(chunk_slices) / 2 ) @@ -299,7 +299,7 @@ def __len__(self): return sum(1 for _ in self.keys()) def get_partial_values(self, key_ranges): - if self.supports_efficient_get_partial_values(): + if self.supports_efficient_get_partial_values: transformed_key_ranges = [] cached_indices = {} none_indices = [] diff --git a/zarr/core.py b/zarr/core.py index d589ca439f..b9db6cb2c8 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -1275,7 +1275,7 @@ def _get_selection(self, indexer, out=None, fields=None): if ( not hasattr(self.chunk_store, "getitems") and not ( hasattr(self.chunk_store, "get_partial_values") and - self.chunk_store.supports_efficient_get_partial_values() + self.chunk_store.supports_efficient_get_partial_values ) ) or any(map(lambda x: x == 0, self.shape)): # sequentially get one key at a time from storage @@ -2041,7 +2041,7 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, and not fields and self.dtype != object and hasattr(self.chunk_store, "get_partial_values") - and self.chunk_store.supports_efficient_get_partial_values() + and self.chunk_store.supports_efficient_get_partial_values ): partial_read_decode = True cdatas = { diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index b4051e6b3b..677c3ed3eb 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -3338,7 +3338,7 @@ def test_nbytes_stored(self): def test_supports_efficient_get_set_partial_values(self): z = self.create_array(shape=100, chunks=10) - assert z.chunk_store.supports_efficient_get_partial_values() + assert z.chunk_store.supports_efficient_get_partial_values assert not z.chunk_store.supports_efficient_set_partial_values() def expected(self): @@ -3487,7 +3487,7 @@ def test_keys_inner_store(self): def test_supports_efficient_get_set_partial_values(self): z = self.create_array(shape=100, chunks=10) - assert not z.chunk_store.supports_efficient_get_partial_values() + assert not z.chunk_store.supports_efficient_get_partial_values assert not z.chunk_store.supports_efficient_set_partial_values() def expected(self): diff --git a/zarr/util.py b/zarr/util.py index daff8d5d84..5976b36d8d 100644 --- a/zarr/util.py +++ b/zarr/util.py @@ -641,7 +641,7 @@ def read_full(self): class UncompressedPartialReadBufferV3: def __init__(self, store_key, chunk_store, itemsize): - assert chunk_store.supports_efficient_get_partial_values() + assert chunk_store.supports_efficient_get_partial_values self.chunk_store = chunk_store self.store_key = store_key self.itemsize = itemsize From c1bc26d9b78620cbda52e700d4ee8182750a64ce Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 22 Dec 2022 14:09:04 +0100 Subject: [PATCH 16/19] add ZARR_V3_SHARDING flag for sharding usage --- .github/workflows/minimal.yml | 2 ++ .github/workflows/python-package.yml | 1 + .github/workflows/windows-testing.yml | 1 + zarr/_storage/v3_storage_transformers.py | 13 +++++++++++++ zarr/tests/test_core.py | 4 +++- zarr/tests/test_storage_v3.py | 3 ++- 6 files changed, 22 insertions(+), 2 deletions(-) diff --git a/.github/workflows/minimal.yml b/.github/workflows/minimal.yml index 2cde38e081..4de5aca273 100644 --- a/.github/workflows/minimal.yml +++ b/.github/workflows/minimal.yml @@ -24,6 +24,7 @@ jobs: shell: "bash -l {0}" env: ZARR_V3_EXPERIMENTAL_API: 1 + ZARR_V3_SHARDING: 1 run: | conda activate minimal python -m pip install . @@ -32,6 +33,7 @@ jobs: shell: "bash -l {0}" env: ZARR_V3_EXPERIMENTAL_API: 1 + ZARR_V3_SHARDING: 1 run: | conda activate minimal rm -rf fixture/ diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 872ce52343..cee2ca7aef 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -70,6 +70,7 @@ jobs: ZARR_TEST_MONGO: 1 ZARR_TEST_REDIS: 1 ZARR_V3_EXPERIMENTAL_API: 1 + ZARR_V3_SHARDING: 1 run: | conda activate zarr-env mkdir ~/blob_emulator diff --git a/.github/workflows/windows-testing.yml b/.github/workflows/windows-testing.yml index ea1d0f64c9..2f8922b447 100644 --- a/.github/workflows/windows-testing.yml +++ b/.github/workflows/windows-testing.yml @@ -52,6 +52,7 @@ jobs: env: ZARR_TEST_ABS: 1 ZARR_V3_EXPERIMENTAL_API: 1 + ZARR_V3_SHARDING: 1 - name: Conda info shell: bash -l {0} run: conda info diff --git a/zarr/_storage/v3_storage_transformers.py b/zarr/_storage/v3_storage_transformers.py index dfd9211f43..3675d42c38 100644 --- a/zarr/_storage/v3_storage_transformers.py +++ b/zarr/_storage/v3_storage_transformers.py @@ -1,5 +1,6 @@ import functools import itertools +import os from typing import NamedTuple, Tuple, Optional, Union, Iterator from numcodecs.compat import ensure_bytes @@ -12,6 +13,17 @@ MAX_UINT_64 = 2 ** 64 - 1 +v3_sharding_available = os.environ.get('ZARR_V3_SHARDING', '0').lower() not in ['0', 'false'] + + +def assert_zarr_v3_sharding_available(): + if not v3_sharding_available: + raise NotImplementedError( + "Using V3 sharding is experimental and not yet finalized! To enable support, set:\n" + "ZARR_V3_SHARDING=1" + ) # pragma: no cover + + class _ShardIndex(NamedTuple): store: "ShardingStorageTransformer" # dtype uint64, shape (chunks_per_shard_0, chunks_per_shard_1, ..., 2) @@ -83,6 +95,7 @@ class ShardingStorageTransformer(StorageTransformer): # lgtm[py/missing-equals] valid_types = ["indexed"] def __init__(self, _type, chunks_per_shard) -> None: + assert_zarr_v3_sharding_available() super().__init__(_type) if isinstance(chunks_per_shard, int): chunks_per_shard = (chunks_per_shard, ) diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index 677c3ed3eb..397ebad513 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -20,7 +20,7 @@ from zarr._storage.store import ( v3_api_available, ) -from .._storage.v3_storage_transformers import ShardingStorageTransformer +from .._storage.v3_storage_transformers import ShardingStorageTransformer, v3_sharding_available from zarr.core import Array from zarr.errors import ArrayNotFoundError, ContainsGroupError from zarr.meta import json_loads @@ -3300,6 +3300,7 @@ def expected(self): @pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") @pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") +@pytest.mark.skipif(not v3_sharding_available, reason="sharding is disabled") class TestArrayWithFSStoreV3PartialReadUncompressedSharded( TestArrayWithPathV3, TestArrayWithFSStorePartialRead ): @@ -3445,6 +3446,7 @@ def expected(self): @pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") +@pytest.mark.skipif(not v3_sharding_available, reason="sharding is disabled") class TestArrayWithShardingStorageTransformerV3(TestArrayWithPathV3): @staticmethod diff --git a/zarr/tests/test_storage_v3.py b/zarr/tests/test_storage_v3.py index b52c14228b..09a98daabd 100644 --- a/zarr/tests/test_storage_v3.py +++ b/zarr/tests/test_storage_v3.py @@ -10,7 +10,7 @@ import zarr from zarr._storage.store import _get_hierarchy_metadata, v3_api_available, StorageTransformer -from zarr._storage.v3_storage_transformers import ShardingStorageTransformer +from zarr._storage.v3_storage_transformers import ShardingStorageTransformer, v3_sharding_available from zarr.core import Array from zarr.meta import _default_entry_point_metadata_v3 from zarr.storage import (atexit_rmglob, atexit_rmtree, data_root, @@ -528,6 +528,7 @@ def create_store(self, **kwargs): return store +@pytest.mark.skipif(not v3_sharding_available, reason="sharding is disabled") class TestStorageTransformerV3(TestMappingStoreV3): def create_store(self, **kwargs): From 6f5b35a15d97c33edbeeeddeac21a0d587c30221 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 22 Dec 2022 14:16:19 +0100 Subject: [PATCH 17/19] fix release notes --- docs/release.rst | 35 +++-------------------------------- 1 file changed, 3 insertions(+), 32 deletions(-) diff --git a/docs/release.rst b/docs/release.rst index 23f866fcf9..d5f3441c7d 100644 --- a/docs/release.rst +++ b/docs/release.rst @@ -17,40 +17,11 @@ Unreleased * Fix bug that caused double counting of groups in ``groups()`` and ``group_keys()`` methods with V3 stores. By :user:`Ryan Abernathey ` :issue:`1228`. -* Improve Zarr V3 support, adding partial store read/write and storage transformers. - -.. _release_2.13.0: - -2.13.0 ------- -.. warning:: - Pre-release! Use `pip install --pre zarr` to evaluate this release. - -Major changes -~~~~~~~~~~~~~ - -* Remove support for Python 3.7 in concert with NumPy dependency. - By :user:`Davis Bennett `; :issue:`1067`. - -Bug fixes -~~~~~~~~~ - -* Fix bug in N5 storage that prevented arrays located in the root of the hierarchy from - bearing the `n5` keyword. Along with fixing this bug, new tests were added for N5 routines - that had previously been excluded from testing, and type annotations were added to the N5 codebase. - By :user:`Davis Bennett `; :issue:`1092`. - -* Fix bug in LRUEStoreCache in which the current size wasn't reset on invalidation. - By :user:`BGCMHou ` and :user:`Josh Moore ` :issue:`1076`, :issue:`1077`. - -Enhancements -~~~~~~~~~~~~ - -* **Improve Zarr V3 support, adding partial store read/write, storage transformers and sharding.** - Add two features of the [v3 spec](https://zarr-specs.readthedocs.io/en/latest/core/v3.0.html): +* Improve Zarr V3 support, adding partial store read/write and storage transformers and sharding.** + Add features of the [v3 spec](https://zarr-specs.readthedocs.io/en/latest/core/v3.0.html): + * storage transformers * `get_partial_values` and `set_partial_values` * efficient `get_partial_values` implementation for `FSStoreV3` - * storage transformers interface * sharding storage transformer By :user:`Jonathan Striebel `; :issue:`1096`, :issue:`1111`. From 070c02cd9641de6b604ce30bd18da093bd164480 Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Thu, 22 Dec 2022 14:17:11 +0100 Subject: [PATCH 18/19] fix release notes --- docs/release.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/release.rst b/docs/release.rst index d5f3441c7d..fe33a4d075 100644 --- a/docs/release.rst +++ b/docs/release.rst @@ -17,7 +17,7 @@ Unreleased * Fix bug that caused double counting of groups in ``groups()`` and ``group_keys()`` methods with V3 stores. By :user:`Ryan Abernathey ` :issue:`1228`. -* Improve Zarr V3 support, adding partial store read/write and storage transformers and sharding.** +* Improve Zarr V3 support, adding partial store read/write and storage transformers and sharding. Add features of the [v3 spec](https://zarr-specs.readthedocs.io/en/latest/core/v3.0.html): * storage transformers * `get_partial_values` and `set_partial_values` From 385b5d3635618e086eb4752f81c652379751a5ad Mon Sep 17 00:00:00 2001 From: Jonathan Striebel Date: Mon, 16 Jan 2023 17:37:33 +0100 Subject: [PATCH 19/19] add storage_transformers and get/set_partial_values (#1096) * add storage_transformers and get/set_partial_values * formatting * add docs and release notes * add test_core testcase * Update zarr/creation.py Co-authored-by: Gregory Lee * apply PR feedback * add comment that storage_transformers=None is the same as storage_transformers=[] * use empty tuple as default for storage_transformers * make mypy happy * better coverage, minor fix, adding rmdir * add missing rmdir to test * increase coverage * improve test coverage * fix TestArrayWithStorageTransformersV3 * Update zarr/creation.py Co-authored-by: Gregory Lee * pick generic storage transformer changes from #1111 * increase coverage * fix order of storage transformers * retrigger CI * minor fixes * make flake8 happy * apply PR feedback Co-authored-by: Gregory Lee Co-authored-by: Josh Moore --- docs/release.rst | 17 ++- zarr/_storage/store.py | 225 +++++++++++++++++++++++++++++++++- zarr/core.py | 26 +++- zarr/creation.py | 12 +- zarr/meta.py | 48 +++++++- zarr/storage.py | 9 +- zarr/tests/test_core.py | 34 ++++- zarr/tests/test_creation.py | 15 +++ zarr/tests/test_storage_v3.py | 125 ++++++++++++++++++- 9 files changed, 493 insertions(+), 18 deletions(-) diff --git a/docs/release.rst b/docs/release.rst index 817bdc4f37..f633aea7cc 100644 --- a/docs/release.rst +++ b/docs/release.rst @@ -6,14 +6,20 @@ Release notes # to document your changes. On releases it will be # re-indented so that it does not show up in the notes. - .. _unreleased: +.. _unreleased: - Unreleased - ---------- +Unreleased +---------- .. # .. warning:: # Pre-release! Use :command:`pip install --pre zarr` to evaluate this release. +* Improve Zarr V3 support, adding partial store read/write and storage transformers. + Add two features of the [v3 spec](https://zarr-specs.readthedocs.io/en/latest/core/v3.0.html): + * storage transformers + * `get_partial_values` and `set_partial_values` + By :user:`Jonathan Striebel `; :issue:`1096`. + .. _release_2.13.6: 2.13.6 @@ -44,7 +50,10 @@ Bug fixes Appreciation ~~~~~~~~~~~~~ -Special thanks to Outreachy participants for contributing to most of the maintenance PRs. Please read the blog post summarising the contribution phase and welcoming new Outreachy interns: https://zarr.dev/blog/welcoming-outreachy-2022-interns/ +Special thanks to Outreachy participants for contributing to most of the +maintenance PRs. Please read the blog post summarising the contribution phase +and welcoming new Outreachy interns: +https://zarr.dev/blog/welcoming-outreachy-2022-interns/ Enhancements diff --git a/zarr/_storage/store.py b/zarr/_storage/store.py index 9e265cf383..4d813b8e05 100644 --- a/zarr/_storage/store.py +++ b/zarr/_storage/store.py @@ -1,8 +1,10 @@ import abc import os +from collections import defaultdict from collections.abc import MutableMapping +from copy import copy from string import ascii_letters, digits -from typing import Any, List, Mapping, Optional, Union +from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Union from zarr.meta import Metadata2, Metadata3 from zarr.util import normalize_storage_path @@ -254,6 +256,82 @@ def __setitem__(self, key, value): def __getitem__(self, key): """Get a value.""" + @abc.abstractmethod + def rmdir(self, path=None): + """Remove a data path and all its subkeys and related metadata. + Expects a path without the data or meta root prefix.""" + + @property + def supports_efficient_get_partial_values(self): + return False + + def get_partial_values( + self, + key_ranges: Sequence[Tuple[str, Tuple[int, Optional[int]]]] + ) -> List[Union[bytes, memoryview, bytearray]]: + """Get multiple partial values. + key_ranges can be an iterable of key, range pairs, + where a range specifies two integers range_start and range_length + as a tuple, (range_start, range_length). + range_length may be None to indicate to read until the end. + range_start may be negative to start reading range_start bytes + from the end of the file. + A key may occur multiple times with different ranges. + Inserts None for missing keys into the returned list.""" + results: List[Union[bytes, memoryview, bytearray]] = ( + [None] * len(key_ranges) # type: ignore[list-item] + ) + indexed_ranges_by_key: Dict[str, List[Tuple[int, Tuple[int, Optional[int]]]]] = ( + defaultdict(list) + ) + for i, (key, range_) in enumerate(key_ranges): + indexed_ranges_by_key[key].append((i, range_)) + for key, indexed_ranges in indexed_ranges_by_key.items(): + try: + value = self[key] + except KeyError: # pragma: no cover + continue + for i, (range_from, range_length) in indexed_ranges: + if range_length is None: + results[i] = value[range_from:] + else: + results[i] = value[range_from:range_from + range_length] + return results + + def supports_efficient_set_partial_values(self): + return False + + def set_partial_values(self, key_start_values): + """Set multiple partial values. + key_start_values can be an iterable of key, start and value triplets + as tuples, (key, start, value), where start defines the offset in bytes. + A key may occur multiple times with different starts and non-overlapping values. + Also, start may only be beyond the current value if other values fill the gap. + start may be negative to start writing start bytes from the current + end of the file, ending the file with the new value.""" + unique_keys = set(next(zip(*key_start_values))) + values = {} + for key in unique_keys: + old_value = self.get(key) + values[key] = None if old_value is None else bytearray(old_value) + for key, start, value in key_start_values: + if values[key] is None: + assert start == 0 + values[key] = value + else: + if start > len(values[key]): # pragma: no cover + raise ValueError( + f"Cannot set value at start {start}, " + + f"since it is beyond the data at key {key}, " + + f"having length {len(values[key])}." + ) + if start < 0: + values[key][start:] = value + else: + values[key][start:start + len(value)] = value + for key, value in values.items(): + self[key] = value + def clear(self): """Remove all items from store.""" self.erase_prefix("/") @@ -303,6 +381,151 @@ def _ensure_store(store): ) +class StorageTransformer(MutableMapping, abc.ABC): + """Base class for storage transformers. The methods simply pass on the data as-is + and should be overwritten by sub-classes.""" + + _store_version = 3 + _metadata_class = Metadata3 + + def __init__(self, _type) -> None: + if _type not in self.valid_types: # pragma: no cover + raise ValueError( + f"Storage transformer cannot be initialized with type {_type}, " + + f"must be one of {list(self.valid_types)}." + ) + self.type = _type + self._inner_store = None + + def _copy_for_array(self, array, inner_store): + transformer_copy = copy(self) + transformer_copy._inner_store = inner_store + return transformer_copy + + @abc.abstractproperty + def extension_uri(self): + pass # pragma: no cover + + @abc.abstractproperty + def valid_types(self): + pass # pragma: no cover + + def get_config(self): + """Return a dictionary holding configuration parameters for this + storage transformer. All values must be compatible with JSON encoding.""" + # Override in sub-class if need special encoding of config values. + # By default, assume all non-private members are configuration + # parameters except for type . + return { + k: v for k, v in self.__dict__.items() + if not k.startswith('_') and k != "type" + } + + @classmethod + def from_config(cls, _type, config): + """Instantiate storage transformer from a configuration object.""" + # override in sub-class if need special decoding of config values + + # by default, assume constructor accepts configuration parameters as + # keyword arguments without any special decoding + return cls(_type, **config) + + @property + def inner_store(self) -> Union["StorageTransformer", StoreV3]: + assert self._inner_store is not None, ( + "inner_store is not initialized, first get a copy via _copy_for_array." + ) + return self._inner_store + + # The following implementations are usually fine to keep as-is: + + def __eq__(self, other): + return ( + type(self) == type(other) and + self._inner_store == other._inner_store and + self.get_config() == other.get_config() + ) + + def erase(self, key): + self.__delitem__(key) + + def list(self): + return list(self.keys()) + + def list_dir(self, prefix): + return StoreV3.list_dir(self, prefix) + + def is_readable(self): + return self.inner_store.is_readable() + + def is_writeable(self): + return self.inner_store.is_writeable() + + def is_listable(self): + return self.inner_store.is_listable() + + def is_erasable(self): + return self.inner_store.is_erasable() + + def clear(self): + return self.inner_store.clear() + + def __enter__(self): + return self.inner_store.__enter__() + + def __exit__(self, exc_type, exc_value, traceback): + return self.inner_store.__exit__(exc_type, exc_value, traceback) + + def close(self) -> None: + return self.inner_store.close() + + # The following implementations might need to be re-implemented + # by subclasses implementing storage transformers: + + def rename(self, src_path: str, dst_path: str) -> None: + return self.inner_store.rename(src_path, dst_path) + + def list_prefix(self, prefix): + return self.inner_store.list_prefix(prefix) + + def erase_prefix(self, prefix): + return self.inner_store.erase_prefix(prefix) + + def rmdir(self, path=None): + return self.inner_store.rmdir(path) + + def __contains__(self, key): + return self.inner_store.__contains__(key) + + def __setitem__(self, key, value): + return self.inner_store.__setitem__(key, value) + + def __getitem__(self, key): + return self.inner_store.__getitem__(key) + + def __delitem__(self, key): + return self.inner_store.__delitem__(key) + + def __iter__(self): + return self.inner_store.__iter__() + + def __len__(self): + return self.inner_store.__len__() + + @property + def supports_efficient_get_partial_values(self): + return self.inner_store.supports_efficient_get_partial_values + + def get_partial_values(self, key_ranges): + return self.inner_store.get_partial_values(key_ranges) + + def supports_efficient_set_partial_values(self): + return self.inner_store.supports_efficient_set_partial_values() + + def set_partial_values(self, key_start_values): + return self.inner_store.set_partial_values(key_start_values) + + # allow MutableMapping for backwards compatibility StoreLike = Union[BaseStore, MutableMapping] diff --git a/zarr/core.py b/zarr/core.py index e5b2045160..5d37570831 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -189,6 +189,7 @@ def __init__( self._store = store self._chunk_store = chunk_store + self._transformed_chunk_store = None self._path = normalize_storage_path(path) if self._path: self._key_prefix = self._path + '/' @@ -292,6 +293,16 @@ def _load_metadata_nosync(self): filters = [get_codec(config) for config in filters] self._filters = filters + if self._version == 3: + storage_transformers = meta.get('storage_transformers', []) + if storage_transformers: + transformed_store = self._chunk_store or self._store + for storage_transformer in storage_transformers[::-1]: + transformed_store = storage_transformer._copy_for_array( + self, transformed_store + ) + self._transformed_chunk_store = transformed_store + def _refresh_metadata(self): if not self._cache_metadata: self._load_metadata() @@ -371,10 +382,12 @@ def read_only(self, value): @property def chunk_store(self): """A MutableMapping providing the underlying storage for array chunks.""" - if self._chunk_store is None: - return self._store - else: + if self._transformed_chunk_store is not None: + return self._transformed_chunk_store + elif self._chunk_store is not None: return self._chunk_store + else: + return self._store @property def shape(self): @@ -1800,7 +1813,7 @@ def _set_selection(self, indexer, value, fields=None): check_array_shape('value', value, sel_shape) # iterate over chunks in range - if not hasattr(self.store, "setitems") or self._synchronizer is not None \ + if not hasattr(self.chunk_store, "setitems") or self._synchronizer is not None \ or any(map(lambda x: x == 0, self.shape)): # iterative approach for chunk_coords, chunk_selection, out_selection in indexer: @@ -2229,7 +2242,10 @@ def _encode_chunk(self, chunk): cdata = chunk # ensure in-memory data is immutable and easy to compare - if isinstance(self.chunk_store, KVStore): + if ( + isinstance(self.chunk_store, KVStore) + or isinstance(self._chunk_store, KVStore) + ): cdata = ensure_bytes(cdata) return cdata diff --git a/zarr/creation.py b/zarr/creation.py index cc191e3734..a6fa8e44cc 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -22,7 +22,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', overwrite=False, path=None, chunk_store=None, filters=None, cache_metadata=True, cache_attrs=True, read_only=False, object_codec=None, dimension_separator=None, write_empty_chunks=True, - *, zarr_version=None, meta_array=None, **kwargs): + *, zarr_version=None, meta_array=None, storage_transformers=(), **kwargs): """Create an array. Parameters @@ -85,6 +85,14 @@ def create(shape, chunks=True, dtype=None, compressor='default', .. versionadded:: 2.11 + storage_transformers : sequence of StorageTransformers, optional + Setting storage transformers, changes the storage structure and behaviour + of data coming from the underlying store. The transformers are applied in the + order of the given sequence. Supplying an empty sequence is the same as omitting + the argument or setting it to None. May only be set when using zarr_version 3. + + .. versionadded:: 2.13 + zarr_version : {None, 2, 3}, optional The zarr protocol version of the created array. If None, it will be inferred from ``store`` or ``chunk_store`` if they are provided, @@ -170,7 +178,7 @@ def create(shape, chunks=True, dtype=None, compressor='default', init_array(store, shape=shape, chunks=chunks, dtype=dtype, compressor=compressor, fill_value=fill_value, order=order, overwrite=overwrite, path=path, chunk_store=chunk_store, filters=filters, object_codec=object_codec, - dimension_separator=dimension_separator) + dimension_separator=dimension_separator, storage_transformers=storage_transformers) # instantiate array z = Array(store, path=path, chunk_store=chunk_store, synchronizer=synchronizer, diff --git a/zarr/meta.py b/zarr/meta.py index 77c55b9871..41a90101b5 100644 --- a/zarr/meta.py +++ b/zarr/meta.py @@ -9,7 +9,11 @@ from zarr.errors import MetadataError from zarr.util import json_dumps, json_loads -from typing import cast, Union, Any, List, Mapping as MappingType, Optional +from typing import cast, Union, Any, List, Mapping as MappingType, Optional, TYPE_CHECKING + +if TYPE_CHECKING: # pragma: no cover + from zarr._storage.store import StorageTransformer + ZARR_FORMAT = 2 ZARR_FORMAT_v3 = 3 @@ -459,6 +463,36 @@ def _decode_codec_metadata(cls, meta: Optional[Mapping]) -> Optional[Codec]: return codec + @classmethod + def _encode_storage_transformer_metadata( + cls, + storage_transformer: "StorageTransformer" + ) -> Optional[Mapping]: + return { + "extension": storage_transformer.extension_uri, + "type": storage_transformer.type, + "configuration": storage_transformer.get_config(), + } + + @classmethod + def _decode_storage_transformer_metadata(cls, meta: Mapping) -> "StorageTransformer": + from zarr.tests.test_storage_v3 import DummyStorageTransfomer + + # This might be changed to a proper registry in the future + KNOWN_STORAGE_TRANSFORMERS = [DummyStorageTransfomer] + + conf = meta.get('configuration', {}) + extension_uri = meta['extension'] + transformer_type = meta['type'] + + for StorageTransformerCls in KNOWN_STORAGE_TRANSFORMERS: + if StorageTransformerCls.extension_uri == extension_uri: + break + else: # pragma: no cover + raise NotImplementedError + + return StorageTransformerCls.from_config(transformer_type, conf) + @classmethod def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, Any]: meta = cls.parse_metadata(s) @@ -476,6 +510,10 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A # TODO: remove dimension_separator? compressor = cls._decode_codec_metadata(meta.get("compressor", None)) + storage_transformers = meta.get("storage_transformers", ()) + storage_transformers = [ + cls._decode_storage_transformer_metadata(i) for i in storage_transformers + ] extensions = meta.get("extensions", []) meta = dict( shape=tuple(meta["shape"]), @@ -493,6 +531,8 @@ def decode_array_metadata(cls, s: Union[MappingType, str]) -> MappingType[str, A # compressor field should be absent when there is no compression if compressor: meta['compressor'] = compressor + if storage_transformers: + meta['storage_transformers'] = storage_transformers except Exception as e: raise MetadataError("error decoding metadata: %s" % e) @@ -514,6 +554,10 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: object_codec = None compressor = cls._encode_codec_metadata(meta.get("compressor", None)) + storage_transformers = meta.get("storage_transformers", ()) + storage_transformers = [ + cls._encode_storage_transformer_metadata(i) for i in storage_transformers + ] extensions = meta.get("extensions", []) meta = dict( shape=meta["shape"] + sdshape, @@ -532,6 +576,8 @@ def encode_array_metadata(cls, meta: MappingType[str, Any]) -> bytes: meta["compressor"] = compressor if dimension_separator: meta["dimension_separator"] = dimension_separator + if storage_transformers: + meta["storage_transformers"] = storage_transformers return json_dumps(meta) diff --git a/zarr/storage.py b/zarr/storage.py index a2a8919d0b..db51cca947 100644 --- a/zarr/storage.py +++ b/zarr/storage.py @@ -311,6 +311,7 @@ def init_array( filters=None, object_codec=None, dimension_separator=None, + storage_transformers=(), ): """Initialize an array store with the given configuration. Note that this is a low-level function and there should be no need to call this directly from user code. @@ -438,7 +439,8 @@ def init_array( order=order, overwrite=overwrite, path=path, chunk_store=chunk_store, filters=filters, object_codec=object_codec, - dimension_separator=dimension_separator) + dimension_separator=dimension_separator, + storage_transformers=storage_transformers) def _init_array_metadata( @@ -455,6 +457,7 @@ def _init_array_metadata( filters=None, object_codec=None, dimension_separator=None, + storage_transformers=(), ): store_version = getattr(store, '_store_version', 2) @@ -576,6 +579,7 @@ def _init_array_metadata( if store_version < 3: meta.update(dict(chunks=chunks, dtype=dtype, order=order, filters=filters_config)) + assert not storage_transformers else: if dimension_separator is None: dimension_separator = "/" @@ -589,7 +593,8 @@ def _init_array_metadata( separator=dimension_separator), chunk_memory_layout=order, data_type=dtype, - attributes=attributes) + attributes=attributes, + storage_transformers=storage_transformers) ) key = _prefix_to_array_key(store, _path_to_prefix(path)) diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index e32026e662..ffacefb937 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -49,9 +49,11 @@ KVStoreV3, LMDBStoreV3, LRUStoreCacheV3, + RmdirV3, SQLiteStoreV3, StoreV3, ) +from zarr.tests.test_storage_v3 import DummyStorageTransfomer from zarr.util import buffer_size from zarr.tests.util import abs_container, skip_test_env_var, have_fsspec, mktemp @@ -3098,7 +3100,7 @@ def test_nbytes_stored(self): # Note: this custom mapping doesn't actually have all methods in the # v3 spec (e.g. erase), but they aren't needed here. -class CustomMappingV3(StoreV3): +class CustomMappingV3(RmdirV3, StoreV3): def __init__(self): self.inner = KVStoreV3(dict()) @@ -3359,6 +3361,36 @@ def expected(self): ] +@pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") +class TestArrayWithStorageTransformersV3(TestArrayWithChunkStoreV3): + + @staticmethod + def create_array(array_path='arr1', read_only=False, **kwargs): + store = KVStoreV3(dict()) + # separate chunk store + chunk_store = KVStoreV3(dict()) + cache_metadata = kwargs.pop('cache_metadata', True) + cache_attrs = kwargs.pop('cache_attrs', True) + write_empty_chunks = kwargs.pop('write_empty_chunks', True) + dummy_storage_transformer = DummyStorageTransfomer( + "dummy_type", test_value=DummyStorageTransfomer.TEST_CONSTANT + ) + init_array(store, path=array_path, chunk_store=chunk_store, + storage_transformers=[dummy_storage_transformer], **kwargs) + return Array(store, path=array_path, read_only=read_only, + chunk_store=chunk_store, cache_metadata=cache_metadata, + cache_attrs=cache_attrs, write_empty_chunks=write_empty_chunks) + + def expected(self): + return [ + "3fb9a4f8233b09ad02067b6b7fc9fd5caa405c7d", + "89c8eb364beb84919fc9153d2c1ed2696274ec18", + "73307055c3aec095dd1232c38d793ef82a06bd97", + "6152c09255a5efa43b1a115546e35affa00c138c", + "2f8802fc391f67f713302e84fad4fd8f1366d6c2", + ] + + @pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") def test_array_mismatched_store_versions(): store_v3 = KVStoreV3(dict()) diff --git a/zarr/tests/test_creation.py b/zarr/tests/test_creation.py index 4c9c292734..b791bc3952 100644 --- a/zarr/tests/test_creation.py +++ b/zarr/tests/test_creation.py @@ -19,8 +19,10 @@ from zarr._storage.store import v3_api_available from zarr._storage.v3 import DirectoryStoreV3, KVStoreV3 from zarr.sync import ThreadSynchronizer +from zarr.tests.test_storage_v3 import DummyStorageTransfomer from zarr.tests.util import mktemp, have_fsspec + _VERSIONS = ((None, 2, 3) if v3_api_available else (None, 2)) _VERSIONS2 = ((2, 3) if v3_api_available else (2, )) @@ -747,3 +749,16 @@ def test_create_read_only(zarr_version, at_root): def test_json_dumps_chunks_numpy_dtype(): z = zeros((10,), chunks=(np.int64(2),)) assert np.all(z[...] == 0) + + +@pytest.mark.skipif(not v3_api_available, reason="V3 is disabled") +@pytest.mark.parametrize('at_root', [False, True]) +def test_create_with_storage_transformers(at_root): + kwargs = _init_creation_kwargs(zarr_version=3, at_root=at_root) + transformer = DummyStorageTransfomer( + "dummy_type", + test_value=DummyStorageTransfomer.TEST_CONSTANT + ) + z = create(1000000000, chunks=True, storage_transformers=[transformer], **kwargs) + assert isinstance(z.chunk_store, DummyStorageTransfomer) + assert z.chunk_store.test_value == DummyStorageTransfomer.TEST_CONSTANT diff --git a/zarr/tests/test_storage_v3.py b/zarr/tests/test_storage_v3.py index 4f6215135c..9f18c89361 100644 --- a/zarr/tests/test_storage_v3.py +++ b/zarr/tests/test_storage_v3.py @@ -1,6 +1,7 @@ import array import atexit import copy +import inspect import os import tempfile @@ -8,7 +9,7 @@ import pytest import zarr -from zarr._storage.store import _get_hierarchy_metadata, v3_api_available +from zarr._storage.store import _get_hierarchy_metadata, v3_api_available, StorageTransformer from zarr.meta import _default_entry_point_metadata_v3 from zarr.storage import (atexit_rmglob, atexit_rmtree, data_root, default_compressor, getsize, init_array, meta_root, @@ -88,6 +89,18 @@ def keys(self): """keys""" +class DummyStorageTransfomer(StorageTransformer): + TEST_CONSTANT = "test1234" + + extension_uri = "https://purl.org/zarr/spec/storage_transformers/dummy/1.0" + valid_types = ["dummy_type"] + + def __init__(self, _type, test_value) -> None: + super().__init__(_type) + assert test_value == self.TEST_CONSTANT + self.test_value = test_value + + def test_ensure_store_v3(): class InvalidStore: pass @@ -190,8 +203,11 @@ def test_init_array(self, dimension_separator_fixture_v3): store = self.create_store() path = 'arr1' + transformer = DummyStorageTransfomer( + "dummy_type", test_value=DummyStorageTransfomer.TEST_CONSTANT + ) init_array(store, path=path, shape=1000, chunks=100, - dimension_separator=pass_dim_sep) + dimension_separator=pass_dim_sep, storage_transformers=[transformer]) # check metadata mkey = meta_root + path + '.array.json' @@ -204,6 +220,9 @@ def test_init_array(self, dimension_separator_fixture_v3): assert meta['fill_value'] is None # Missing MUST be assumed to be "/" assert meta['chunk_grid']['separator'] is want_dim_sep + assert len(meta["storage_transformers"]) == 1 + assert isinstance(meta["storage_transformers"][0], DummyStorageTransfomer) + assert meta["storage_transformers"][0].test_value == DummyStorageTransfomer.TEST_CONSTANT store.close() def test_list_prefix(self): @@ -235,6 +254,67 @@ def test_rename_nonexisting(self): with pytest.raises(NotImplementedError): store.rename('a', 'b') + def test_get_partial_values(self): + store = self.create_store() + store.supports_efficient_get_partial_values in [True, False] + store[data_root + 'foo'] = b'abcdefg' + store[data_root + 'baz'] = b'z' + assert [b'a'] == store.get_partial_values( + [ + (data_root + 'foo', (0, 1)) + ] + ) + assert [ + b'd', b'b', b'z', b'abc', b'defg', b'defg', b'g', b'ef' + ] == store.get_partial_values( + [ + (data_root + 'foo', (3, 1)), + (data_root + 'foo', (1, 1)), + (data_root + 'baz', (0, 1)), + (data_root + 'foo', (0, 3)), + (data_root + 'foo', (3, 4)), + (data_root + 'foo', (3, None)), + (data_root + 'foo', (-1, None)), + (data_root + 'foo', (-3, 2)), + ] + ) + + def test_set_partial_values(self): + store = self.create_store() + store.supports_efficient_set_partial_values() + store[data_root + 'foo'] = b'abcdefg' + store.set_partial_values( + [ + (data_root + 'foo', 0, b'hey') + ] + ) + assert store[data_root + 'foo'] == b'heydefg' + + store.set_partial_values( + [ + (data_root + 'baz', 0, b'z') + ] + ) + assert store[data_root + 'baz'] == b'z' + store.set_partial_values( + [ + (data_root + 'foo', 1, b'oo'), + (data_root + 'baz', 1, b'zzz'), + (data_root + 'baz', 4, b'aaaa'), + (data_root + 'foo', 6, b'done'), + ] + ) + assert store[data_root + 'foo'] == b'hoodefdone' + assert store[data_root + 'baz'] == b'zzzzaaaa' + store.set_partial_values( + [ + (data_root + 'foo', -2, b'NE'), + (data_root + 'baz', -5, b'q'), + ] + ) + assert store[data_root + 'foo'] == b'hoodefdoNE' + assert store[data_root + 'baz'] == b'zzzq' + class TestMappingStoreV3(StoreV3Tests): @@ -443,6 +523,31 @@ def create_store(self, **kwargs): return store +class TestStorageTransformerV3(TestMappingStoreV3): + + def create_store(self, **kwargs): + inner_store = super().create_store(**kwargs) + storage_transformer = DummyStorageTransfomer( + "dummy_type", test_value=DummyStorageTransfomer.TEST_CONSTANT + ) + return storage_transformer._copy_for_array(None, inner_store) + + def test_method_forwarding(self): + store = self.create_store() + assert store.list() == store.inner_store.list() + assert store.list_dir(data_root) == store.inner_store.list_dir(data_root) + + assert store.is_readable() + assert store.is_writeable() + assert store.is_listable() + store.inner_store._readable = False + store.inner_store._writeable = False + store.inner_store._listable = False + assert not store.is_readable() + assert not store.is_writeable() + assert not store.is_listable() + + class TestLRUStoreCacheV3(_TestLRUStoreCache, StoreV3Tests): CountingClass = CountingDictV3 @@ -535,3 +640,19 @@ def test_top_level_imports(): assert hasattr(zarr, store_name) # pragma: no cover else: assert not hasattr(zarr, store_name) # pragma: no cover + + +def _get_public_and_dunder_methods(some_class): + return set( + name for name, _ in inspect.getmembers(some_class, predicate=inspect.isfunction) + if not name.startswith("_") or name.startswith("__") + ) + + +def test_storage_transformer_interface(): + store_v3_methods = _get_public_and_dunder_methods(StoreV3) + store_v3_methods.discard("__init__") + storage_transformer_methods = _get_public_and_dunder_methods(StorageTransformer) + storage_transformer_methods.discard("__init__") + storage_transformer_methods.discard("get_config") + assert storage_transformer_methods == store_v3_methods