diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index c120d603d9b..fb0b1e7599f 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -43,12 +43,11 @@ import pandas as pd import numpy as np -import logging +from arcticdb.log import version as log from arcticdb.version_store._normalization import normalize_metadata from arcticdb.version_store.admin_tools import AdminTools import arcticdb_ext as _ae -logger = logging.getLogger(__name__) AsOf = Union[int, str, datetime.datetime, _PreloadedIndexQuery] @@ -989,6 +988,7 @@ def stage( or ``sort_and_finalize_staged_data`` to specify which data to finalize. """ + log.debug("Staging data for symbol={}", symbol) if not self._allowed_input_type(data): raise ArcticUnsupportedDataTypeException( @@ -1112,6 +1112,7 @@ def write( >>> w = adb.WritePayload("symbol", df, metadata={'the': 'metadata'}) >>> lib.write(*w, staged=True) """ + log.debug("Writing symbol={}, staged={}, prune_previous_versions={}", symbol, staged, prune_previous_versions) is_recursive_normalizers_enabled = self._nvs._is_recursive_normalizers_enabled( **{"recursive_normalizers": recursive_normalizers} ) @@ -1201,6 +1202,7 @@ def write_pickle( -------- write: For more detailed documentation. """ + log.debug("Writing pickled data for symbol={}, staged={}", symbol, staged) return self._nvs.write( symbol=symbol, data=data, @@ -1295,6 +1297,7 @@ def write_batch( >>> items[0].symbol, items[1].symbol ('symbol_1', 'symbol_2') """ + log.debug("Writing batch of {} symbols", len(payloads)) self._raise_if_duplicate_symbols_in_batch(payloads) self._raise_if_unsupported_type_in_write_batch(payloads) @@ -1343,6 +1346,7 @@ def write_pickle_batch( write: For more detailed documentation. write_pickle: For information on the implications of providing data that needs to be pickled. """ + log.debug("Writing pickled batch of {} symbols", len(payloads)) self._raise_if_duplicate_symbols_in_batch(payloads) return self._nvs._batch_write_internal( @@ -1439,6 +1443,7 @@ def append( 2018-01-05 5 2018-01-06 6 """ + log.debug("Appending data to symbol={}", symbol) if not self._allowed_input_type(data): raise ArcticUnsupportedDataTypeException( @@ -1490,6 +1495,7 @@ def append_batch( ArcticUnsupportedDataTypeException If data that is not of NormalizableType appears in any of the payloads. """ + log.debug("Appending batch of {} symbols", len(append_payloads)) self._raise_if_duplicate_symbols_in_batch(append_payloads) self._raise_if_unsupported_type_in_write_batch(append_payloads) @@ -1612,6 +1618,7 @@ def update( 2024-01-11 00:00:00.000000 2024-02-01 00:00:00.000000001 1 b'test' 1738599073268493107 5975110026983744452 84 2 1 2 200009 200031 """ + log.debug("Updating symbol={}, upsert={}, date_range={}", symbol, upsert, date_range) if not self._allowed_input_type(data): raise ArcticUnsupportedDataTypeException( @@ -1693,6 +1700,7 @@ def update_batch( 2024-01-01 10 2024-01-02 11 """ + log.debug("Updating batch of {} symbols, upsert={}", len(update_payloads), upsert) self._raise_if_duplicate_symbols_in_batch(update_payloads) self._raise_if_unsupported_type_in_write_batch(update_payloads) @@ -1722,6 +1730,7 @@ def delete_staged_data(self, symbol: str) -> None: write Documentation on the ``staged`` parameter explains the concept of staged data in more detail. """ + log.debug("Deleting staged data for symbol={}", symbol) self._nvs.remove_incomplete(symbol) def finalize_staged_data( @@ -1845,6 +1854,7 @@ def finalize_staged_data( 2000-01-03 3 2000-01-04 4 """ + log.debug("Finalizing staged data for symbol={}, mode={}", symbol, mode) mode = Library._normalize_staged_data_mode(mode) return self._nvs.compact_incomplete( @@ -1965,6 +1975,7 @@ def sort_and_finalize_staged_data( 2024-01-03 3 2024-01-04 4 """ + log.debug("Sorting and finalizing staged data for symbol={}, mode={}", symbol, mode) mode = Library._normalize_staged_data_mode(mode) compaction_result = self._nvs.version_store.sort_merge( symbol, @@ -2005,6 +2016,7 @@ def get_staged_symbols(self) -> List[str]: write Documentation on the ``staged`` parameter explains the concept of staged data in more detail. """ + log.debug("Getting staged symbols") return self._nvs.list_symbols_with_incomplete_data() def read( @@ -2116,6 +2128,7 @@ def read( ---- column: [[5,6,7]] """ + log.debug("Reading symbol={}, as_of={}, date_range={}, columns={}, lazy={}", symbol, as_of, date_range, columns, lazy) if lazy: return LazyDataFrame( self, @@ -2233,6 +2246,7 @@ def read_batch( -------- read """ + log.debug("Reading batch of {} symbols, lazy={}", len(symbols), lazy) symbol_strings = [] as_ofs = [] date_ranges = [] @@ -2422,6 +2436,7 @@ def read_batch_and_join( 2025-01-01 00:00:00 1 2025-01-02 00:00:00 2 """ + log.debug("Reading and joining batch of {} symbols", len(symbols)) symbol_strings = [] as_ofs = [] date_ranges = [] @@ -2487,6 +2502,7 @@ def read_metadata(self, symbol: str, as_of: Optional[AsOf] = None) -> VersionedI Structure containing metadata and version number of the affected symbol in the store. The data attribute will be None. """ + log.debug("Reading metadata for symbol={}, as_of={}", symbol, as_of) return self._nvs.read_metadata(symbol, as_of, iterate_snapshots_if_tombstoned=False) def read_metadata_batch(self, symbols: List[Union[str, ReadInfoRequest]]) -> List[Union[VersionedItem, DataError]]: @@ -2512,6 +2528,7 @@ def read_metadata_batch(self, symbols: List[Union[str, ReadInfoRequest]]) -> Lis -------- read_metadata """ + log.debug("Reading metadata batch of {} symbols", len(symbols)) symbol_strings, as_ofs = self.parse_list_of_symbols(symbols) include_errors_and_none_meta = True @@ -2549,6 +2566,7 @@ def write_metadata( VersionedItem Structure containing metadata and version number of the affected symbol in the store. """ + log.debug("Writing metadata for symbol={}", symbol) return self._nvs.write_metadata(symbol, metadata, prune_previous_version=prune_previous_versions) def write_metadata_batch( @@ -2599,6 +2617,7 @@ def write_metadata_batch( >>> lib.read_metadata("symbol_2") {'the': 'metadata_2'} """ + log.debug("Writing metadata batch of {} symbols", len(write_metadata_payloads)) self._raise_if_duplicate_symbols_in_batch(write_metadata_payloads) throw_on_error = False @@ -2651,6 +2670,7 @@ def snapshot( If a symbol or the version of symbol specified in versions does not exist or has been deleted in the library, or, the library has no symbol. """ + log.debug("Creating snapshot={}", snapshot_name) # We deliberately check the snapshot name only with the v2 API to avoid disruption to legacy users on the v1 API self._nvs.version_store.verify_snapshot(snapshot_name) self._nvs.snapshot(snap_name=snapshot_name, metadata=metadata, skip_symbols=skip_symbols, versions=versions) @@ -2677,6 +2697,7 @@ def delete(self, symbol: str, versions: Optional[Union[int, Iterable[int]]] = No versions Version or versions of symbol to delete. If ``None`` then all versions will be deleted. """ + log.debug("Deleting symbol={}, versions={}", symbol, versions) if versions is None: self._nvs.delete(symbol) return @@ -2703,6 +2724,7 @@ def delete_batch(self, delete_requests: List[Union[str, DeleteRequest]]) -> List List of DataError objects, one for each symbol that was not deleted due to an error. If the symbol was already deleted, there will be no error, just a warning. """ + log.debug("Deleting batch of {} symbols", len(delete_requests)) symbols = [] versions = [] @@ -2731,6 +2753,7 @@ def prune_previous_versions(self, symbol) -> None: symbol : `str` Symbol name to prune. """ + log.debug("Pruning previous versions for symbol={}", symbol) self._nvs.prune_previous_versions(symbol) def delete_data_in_range( @@ -2765,6 +2788,7 @@ def delete_data_in_range( 2018-01-03 7 2018-01-04 8 """ + log.debug("Deleting data in range for symbol={}, date_range={}", symbol, date_range) if date_range is None: raise ArcticInvalidApiUsageException("date_range must be given but was None") self._nvs.delete(symbol, date_range=date_range, prune_previous_version=prune_previous_versions) @@ -2784,6 +2808,7 @@ def delete_snapshot(self, snapshot_name: str) -> None: Exception If the named snapshot does not exist. """ + log.debug("Deleting snapshot={}", snapshot_name) return self._nvs.delete_snapshot(snapshot_name) def list_symbols(self, snapshot_name: Optional[str] = None, regex: Optional[str] = None) -> List[str]: @@ -2804,6 +2829,7 @@ def list_symbols(self, snapshot_name: Optional[str] = None, regex: Optional[str] List[str] Symbols in the library. """ + log.debug("Listing symbols, snapshot_name={}, regex={}", snapshot_name, regex) return self._nvs.list_symbols(snapshot=snapshot_name, regex=regex) def has_symbol(self, symbol: str, as_of: Optional[AsOf] = None) -> bool: @@ -2838,6 +2864,7 @@ def has_symbol(self, symbol: str, as_of: Optional[AsOf] = None) -> bool: >>> "another_symbol" in lib False """ + log.debug("Checking if symbol={} exists, as_of={}", symbol, as_of) return self._nvs.has_symbol(symbol, as_of=as_of) def list_snapshots(self, load_metadata: Optional[bool] = True) -> Union[List[str], Dict[str, Any]]: @@ -2855,6 +2882,7 @@ def list_snapshots(self, load_metadata: Optional[bool] = True) -> Union[List[str Snapshots in the library. Returns a list of snapshot names if load_metadata is False, otherwise returns a dictionary where keys are snapshot names and values are metadata associated with that snapshot. """ + log.debug("Listing snapshots, load_metadata={}", load_metadata) result = self._nvs.list_snapshots(load_metadata) return result if load_metadata else list(result.keys()) @@ -2903,6 +2931,7 @@ def list_versions( >>> versions["symbol", 1].snapshots ["my_snap"] """ + log.debug("Listing versions for symbol={}, latest_only={}", symbol, latest_only) versions = self._nvs.list_versions( symbol=symbol, snapshot=snapshot, @@ -2953,6 +2982,7 @@ def head( If lazy is False, VersionedItem object that contains a .data and .metadata element. If lazy is True, a LazyDataFrame object on which further querying can be performed prior to collect. """ + log.debug("Reading head of symbol={}, n={}, as_of={}", symbol, n, as_of) if lazy: q = QueryBuilder().head(n) return LazyDataFrame( @@ -3019,6 +3049,7 @@ def tail( If lazy is False, VersionedItem object that contains a .data and .metadata element. If lazy is True, a LazyDataFrame object on which further querying can be performed prior to collect. """ + log.debug("Reading tail of symbol={}, n={}, as_of={}", symbol, n, as_of) if lazy: q = QueryBuilder().tail(n) return LazyDataFrame( @@ -3087,6 +3118,7 @@ def get_description(self, symbol: str, as_of: Optional[AsOf] = None) -> SymbolDe SymbolDescription For documentation on each field. """ + log.debug("Getting description for symbol={}, as_of={}", symbol, as_of) info = self._nvs.get_info( symbol, as_of, @@ -3145,6 +3177,7 @@ def get_description_batch( SymbolDescription For documentation on each field. """ + log.debug("Getting description batch of {} symbols", len(symbols)) symbol_strings, as_ofs = self.parse_list_of_symbols(symbols) throw_on_error = False @@ -3165,6 +3198,7 @@ def reload_symbol_list(self) -> None: This can take a long time on large libraries or certain S3 implementations, and once started, it cannot be safely interrupted. If the call is interrupted somehow (exception/process killed), please call this again ASAP. """ + log.debug("Reloading symbol list") self._nvs.version_store.reload_symbol_list() def compact_symbol_list(self) -> None: @@ -3183,6 +3217,7 @@ def compact_symbol_list(self) -> None: InternalException Storage lock required to compact the symbol list could not be acquired """ + log.debug("Compacting symbol list") return self._nvs.compact_symbol_list() def compact_data_experimental( @@ -3249,6 +3284,7 @@ def compact_data_experimental( >>> len(lib_tool.read_index("sym")) 1 """ + log.debug("Compacting data for symbol={}, rows_per_segment={}", symbol, rows_per_segment) return self._nvs.compact_data_experimental(symbol, rows_per_segment, prune_previous_versions) def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None) -> bool: @@ -3273,6 +3309,7 @@ def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None) ------- bool """ + log.debug("Checking fragmentation for symbol={}", symbol) return self._nvs.is_symbol_fragmented(symbol, segment_size) def defragment_symbol_data( @@ -3340,6 +3377,7 @@ def defragment_symbol_data( Config map setting - SymbolDataCompact.SegmentCount will be replaced by a library setting in the future. This API will allow overriding the setting as well. """ + log.debug("Defragmenting symbol={}, segment_size={}", symbol, segment_size) return self._nvs.defragment_symbol_data(symbol, segment_size, prune_previous_versions) def merge_experimental( @@ -3431,6 +3469,7 @@ def merge_experimental( 1970-01-01 00:00:00.000000002 100 1970-01-01 00:00:00.000000003 3 """ + log.debug("Merging symbol={} with strategy={}", symbol, strategy) return self._nvs.merge_experimental( symbol=symbol, source=source,