diff --git a/cpp/arcticdb/pipeline/column_stats.cpp b/cpp/arcticdb/pipeline/column_stats.cpp index 2c9047bf25..7fbea6f852 100644 --- a/cpp/arcticdb/pipeline/column_stats.cpp +++ b/cpp/arcticdb/pipeline/column_stats.cpp @@ -90,6 +90,51 @@ SegmentInMemory merge_column_stats_segments(const std::vector& return merged; } +SegmentInMemory merge_column_stats_with_range_replacement( + const SegmentInMemory& old_segment, SegmentInMemory new_segment, const entity::TimestampRange& range_replaced +) { + const size_t old_row_count = old_segment.row_count(); + std::vector to_merge; + to_merge.reserve(3); + + size_t first_in_range = old_row_count; + size_t last_in_range = 0; + bool found_overlap = false; + + for (size_t row = 0; row < old_row_count; ++row) { + auto start_opt = old_segment.scalar_at(row, start_index_column_offset); + auto end_opt = old_segment.scalar_at(row, end_index_column_offset); + + util::check( + start_opt.has_value() && end_opt.has_value(), + "Missing start/end index in old column stats segment at row {}", + row + ); + + entity::TimestampRange row_range{*start_opt, *end_opt}; + if (entity::intersects(row_range, range_replaced)) { + if (!found_overlap) { + first_in_range = row; + found_overlap = true; + } + last_in_range = row; + } + } + + if (!found_overlap) { + to_merge.emplace_back(old_segment.clone()); + } else { + if (first_in_range > 0) { + to_merge.emplace_back(old_segment.truncate(0, first_in_range, false)); + } + if (last_in_range + 1 < old_row_count) { + to_merge.emplace_back(old_segment.truncate(last_in_range + 1, old_row_count, false)); + } + } + to_merge.emplace_back(std::move(new_segment)); + return merge_column_stats_segments(to_merge); +} + std::string type_to_operator_string(ColumnStatTypeInternal type) { switch (type) { case ColumnStatTypeInternal::MIN_V1: diff --git a/cpp/arcticdb/pipeline/column_stats.hpp b/cpp/arcticdb/pipeline/column_stats.hpp index a15949c432..ac94aadbb9 100644 --- a/cpp/arcticdb/pipeline/column_stats.hpp +++ b/cpp/arcticdb/pipeline/column_stats.hpp @@ -15,6 +15,14 @@ namespace arcticdb { SegmentInMemory merge_column_stats_segments(const std::vector& segments); +// Builds a merged column-stats segment that keeps every row of `old_segment` whose +// [start_index, end_index] lies fully outside `range_replaced`, and inserts the rows from +// `new_segment` (covering the in-range row-slices). Both segments must share the same +// stat-column schema. Used by the range-restricted RMW path in create_column_stats_impl. +SegmentInMemory merge_column_stats_with_range_replacement( + const SegmentInMemory& old_segment, SegmentInMemory new_segment, const entity::TimestampRange& range_replaced +); + // User facing types - eg users are only allowed to create min and max together, not one or the other enum class ColumnStatType { MINMAX }; // Total universe of column stats we support - min and max are treated separately here diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 1a1b9de91e..6f49eab871 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -172,16 +172,17 @@ folly::Future LocalVersionedEngine::delete_unreferenced_pruned_inde } void LocalVersionedEngine::create_column_stats_internal( - const VersionedItem& versioned_item, ColumnStats& column_stats, const ReadOptions& read_options + const VersionedItem& versioned_item, ColumnStats& column_stats, const ReadQuery& read_query, + const ReadOptions& read_options ) { ARCTICDB_RUNTIME_SAMPLE(CreateColumnStatsInternal, 0) ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: create_column_stats"); - create_column_stats_impl(store(), versioned_item, column_stats, read_options); + create_column_stats_impl(store(), versioned_item, column_stats, read_query, read_options); } void LocalVersionedEngine::create_column_stats_version_internal( const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query, - const ReadOptions& read_options + const ReadQuery& read_query, const ReadOptions& read_options ) { auto versioned_item = get_version_to_read(stream_id, version_query); missing_data::check( @@ -189,7 +190,7 @@ void LocalVersionedEngine::create_column_stats_version_internal( "create_column_stats_version_internal: version not found for stream '{}'", stream_id ); - create_column_stats_internal(versioned_item.value(), column_stats, read_options); + create_column_stats_internal(versioned_item.value(), column_stats, read_query, read_options); } void LocalVersionedEngine::drop_column_stats_internal( diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index ed103ec13b..abe5407769 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -242,12 +242,13 @@ class LocalVersionedEngine : public VersionedEngine { ); void create_column_stats_internal( - const VersionedItem& versioned_item, ColumnStats& column_stats, const ReadOptions& read_options + const VersionedItem& versioned_item, ColumnStats& column_stats, const ReadQuery& read_query, + const ReadOptions& read_options ); void create_column_stats_version_internal( const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query, - const ReadOptions& read_options + const ReadQuery& read_query, const ReadOptions& read_options ); void drop_column_stats_internal( diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 816901e60a..420e2ab8c8 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -1950,9 +1951,11 @@ AtomKey index_key_to_column_stats_key(const IndexTypeKey& index_key) { void create_column_stats_impl( const std::shared_ptr& store, const VersionedItem& versioned_item, ColumnStats& column_stats, - const ReadOptions& read_options + const ReadQuery& caller_read_query, const ReadOptions& read_options ) { using namespace arcticdb::pipelines; + const bool is_filtered = !std::holds_alternative(caller_read_query.row_filter) || + caller_read_query.row_range.has_value(); auto column_stats_key = index_key_to_column_stats_key(versioned_item.key_); auto index_future = store->read(versioned_item.key_); @@ -2000,7 +2003,7 @@ void create_column_stats_impl( old_segment = std::move(column_stats_try).value(); } - if (old_segment) { + if (old_segment && !is_filtered) { arcticc::pb2::column_stats_pb2::ColumnStatsHeader old_header; bool unpacked = old_segment->metadata()->UnpackTo(&old_header); util::check( @@ -2026,29 +2029,57 @@ void create_column_stats_impl( auto pipeline_context = std::make_shared(); pipeline_context->stream_id_ = versioned_item.key_.id(); auto read_query = std::make_shared(std::vector{std::make_shared(std::move(*clause))}); + read_query->row_filter = caller_read_query.row_filter; + read_query->row_range = caller_read_query.row_range; read_indexed_keys_to_pipeline(pipeline_context, *read_query, read_options, index_info); - auto segs = read_process_and_collect(store, pipeline_context, read_query, read_options).get(); - schema::check( - !segs.empty(), "Cannot create column stats for nonexistent columns" - ); + auto segments = read_process_and_collect(store, pipeline_context, read_query, read_options).get(); + if (segments.empty()) { + if (is_filtered) { + // No row-slices overlap the caller's range — nothing to recompute. + log::version().warn("create_column_stats: no row-slices overlap the requested range; nothing to do"); + return; + } + schema::raise("Cannot create column stats for nonexistent columns"); + } - std::vector segments_in_memory; - for (auto& seg : segs) { - segments_in_memory.emplace_back(seg.release_segment(store)); + std::vector col_stats_segments_in_memory; + for (auto& segment : segments) { + col_stats_segments_in_memory.emplace_back(segment.release_segment(store)); } - SegmentInMemory new_segment = merge_column_stats_segments(segments_in_memory); - util::check(new_segment.metadata(), "new_segment should always have metadata"); - new_segment.descriptor().set_id(versioned_item.key_.id()); + + SegmentInMemory new_col_stats_segment = merge_column_stats_segments(col_stats_segments_in_memory); + util::check(new_col_stats_segment.metadata(), "new_col_stats_segment should always have metadata"); + new_col_stats_segment.descriptor().set_id(versioned_item.key_.id()); storage::UpdateOpts update_opts; update_opts.upsert_ = true; + if (!old_segment.has_value()) { - store->update(column_stats_key, std::move(new_segment), update_opts).get(); + store->update(column_stats_key, std::move(new_col_stats_segment), update_opts).get(); + } else if (is_filtered) { + // Range-restricted RMW: keep out-of-range rows from old_segment, replace in-range rows with new_col_stats_segment. + auto range_start = std::numeric_limits::max(); + auto range_end = std::numeric_limits::min(); + + for (size_t row = 0; row < new_col_stats_segment.row_count(); ++row) { + auto s = new_col_stats_segment.scalar_at(row, start_index_column_offset); + auto e = new_col_stats_segment.scalar_at(row, end_index_column_offset); + util::check(s.has_value() && e.has_value(), "Missing start/end index in new column stats segment"); + range_start = std::min(range_start, *s); + range_end = std::max(range_end, *e); + } + + SegmentInMemory merged = merge_column_stats_with_range_replacement( + *old_segment, std::move(new_col_stats_segment), entity::TimestampRange{range_start, range_end} + ); + + merged.descriptor().set_id(versioned_item.key_.id()); + store->update(column_stats_key, std::move(merged), update_opts).get(); } else { // Check that the start and end index columns match internal::check( - new_segment.column(0) == old_segment->column(0) && new_segment.column(1) == old_segment->column(1), + new_col_stats_segment.column(0) == old_segment->column(0) && new_col_stats_segment.column(1) == old_segment->column(1), "Cannot create column stats, existing column stats row-groups do not match" ); // Merge the ColumnStatsHeader metadata from old and new segments @@ -2058,14 +2089,14 @@ void create_column_stats_impl( log::version().warn( "Found existing Column Stats key without metadata? Just creating new stats... {}", column_stats_key ); - store->update(column_stats_key, std::move(new_segment), update_opts).get(); + store->update(column_stats_key, std::move(new_col_stats_segment), update_opts).get(); return; } bool unpacked = old_metadata->UnpackTo(&old_header); util::check(unpacked, "Could not unpack column stats metadata from the old header?"); validate_column_stats_header_version(old_header); arcticc::pb2::column_stats_pb2::ColumnStatsHeader new_header; - unpacked = new_segment.metadata()->UnpackTo(&new_header); + unpacked = new_col_stats_segment.metadata()->UnpackTo(&new_header); util::check(unpacked, "Could not unpack column stats metadata from the new header?"); auto next_offset = old_segment->descriptor().field_count(); for (const auto& [data_col_offset, entry_list] : new_header.stats_by_column()) { @@ -2077,7 +2108,7 @@ void create_column_stats_impl( } } // Add new stat columns to the old segment - old_segment->concatenate(std::move(new_segment)); + old_segment->concatenate(std::move(new_col_stats_segment)); google::protobuf::Any any; any.PackFrom(old_header); old_segment->reset_metadata(); diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index de9cd7c50f..d401d51628 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -79,7 +79,7 @@ AtomKey index_key_to_column_stats_key(const IndexTypeKey& index_key); void create_column_stats_impl( const std::shared_ptr& store, const VersionedItem& versioned_item, ColumnStats& column_stats, - const ReadOptions& read_options + const ReadQuery& read_query, const ReadOptions& read_options ); void drop_column_stats_impl( diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index 78941001b1..a59cde3b61 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -825,11 +825,12 @@ VersionedItem PythonVersionStore::write_metadata( } void PythonVersionStore::create_column_stats_version( - const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query + const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query, + const ReadQuery& read_query ) { ReadOptions read_options; read_options.set_dynamic_schema(cfg().write_options().dynamic_schema()); - create_column_stats_version_internal(stream_id, column_stats, version_query, read_options); + create_column_stats_version_internal(stream_id, column_stats, version_query, read_query, read_options); } void PythonVersionStore::drop_column_stats_version( diff --git a/cpp/arcticdb/version/version_store_api.hpp b/cpp/arcticdb/version/version_store_api.hpp index 50b2ffea51..f3dfb1b2da 100644 --- a/cpp/arcticdb/version/version_store_api.hpp +++ b/cpp/arcticdb/version/version_store_api.hpp @@ -105,7 +105,8 @@ class PythonVersionStore : public LocalVersionedEngine { VersionedItem write_metadata(const StreamId& stream_id, const py::object& user_meta, bool prune_previous_versions); void create_column_stats_version( - const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query + const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query, + const ReadQuery& read_query ); void drop_column_stats_version( diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index db97101d4b..20babc2c0a 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -1266,14 +1266,27 @@ def _batch_update_internal( _log_warning_on_writing_empty_dataframe(data_vector[idx], symbols[idx]) return result - def create_column_stats( - self, symbol: str, column_stats: Dict[str, Set[str]], as_of: Optional[VersionQueryInput] = None + def create_column_stats_experimental( + self, + symbol: str, + column_stats: Optional[Dict[str, Set[str]]] = None, + as_of: Optional[VersionQueryInput] = None, + date_range: Optional[DateRangeInput] = None, + row_range: Optional[Tuple[int, int]] = None, ) -> None: """ Calculates the specified column statistics for each row-slice for the given symbol. In the future, these statistics will be used by `QueryBuilder` filtering operations to reduce the number of data segments read out of storage. + When `column_stats` is omitted, MINMAX stats are built for every non-index column whose dtype is numeric + (uint/int/float) or a UTC nanosecond timestamp. Any pre-existing stats are merged with the newly computed + ones (read-modify-write). + + When `date_range` or `row_range` is provided, stats are only (re)computed for the row-slices overlapping that + range. Existing stats for row-slices that fall fully outside the range are kept; in-range slice stats are + replaced by the freshly computed values. `date_range` and `row_range` are mutually exclusive. + Parameters ---------- symbol: `str` @@ -1285,14 +1298,53 @@ def create_column_stats( "MINMAX" : store the minimum and maximum value for the column in each row-slice as_of : `Optional[VersionQueryInput]`, default=None See documentation of `read` method for more details. + date_range: `Optional[DateRangeInput]`, default=None + Restrict computation to row-slices overlapping this date range. Mutually exclusive with `row_range`. + Only supported on timestamp-indexed symbols. + row_range: `Optional[Tuple[int, int]]`, default=None + Restrict computation to row-slices overlapping the given (start, end) row range. Mutually exclusive + with `date_range`. Returns ------- None """ - column_stats = self._get_column_stats(column_stats) + check( + date_range is None or row_range is None, + "create_column_stats_experimental: date_range and row_range are mutually exclusive", + ) + + # if no column stats specified in the function call, fallback to columns stats over all columns + if column_stats is None: + column_stats = self._get_eligible_column_stats_spec(symbol, as_of) + if not column_stats: + return + + column_stats = self._convert_to_native_column_stats(column_stats) version_query = self._get_version_query(as_of) - self.version_store.create_column_stats_version(symbol, column_stats, version_query) + read_query = _PythonVersionStoreReadQuery() + + if date_range is not None: + read_query.row_filter = _normalize_dt_range(date_range) + + if row_range is not None: + read_query.row_range = _SignedRowRange(row_range[0], row_range[1]) + + self.version_store.create_column_stats_version(symbol, column_stats, version_query, read_query) + + def _get_eligible_column_stats_spec( + self, symbol: str, as_of: Optional[VersionQueryInput] + ) -> Dict[str, Set[str]]: + info = self.get_info(symbol, version=as_of) + numeric_value_types = { + TypeDescriptor.ValueType.UINT, + TypeDescriptor.ValueType.INT, + TypeDescriptor.ValueType.FLOAT, + TypeDescriptor.ValueType.NANOSECONDS_UTC, + } + columns = info["col_names"]["columns"] + dtypes = info["dtype"] + return {str(col): {"MINMAX"} for col, dtype in zip(columns, dtypes) if dtype.value_type in numeric_value_types} def drop_column_stats( self, symbol: str, column_stats: Optional[Dict[str, Set[str]]] = None, as_of: Optional[VersionQueryInput] = None