Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions cpp/arcticdb/pipeline/column_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,51 @@ SegmentInMemory merge_column_stats_segments(const std::vector<SegmentInMemory>&
return merged;
}

SegmentInMemory merge_column_stats_with_range_replacement(
const SegmentInMemory& old_segment, SegmentInMemory new_segment, const entity::TimestampRange& range_replaced
) {
const size_t old_row_count = old_segment.row_count();
std::vector<SegmentInMemory> to_merge;
to_merge.reserve(3);

size_t first_in_range = old_row_count;
size_t last_in_range = 0;
bool found_overlap = false;

for (size_t row = 0; row < old_row_count; ++row) {
auto start_opt = old_segment.scalar_at<entity::timestamp>(row, start_index_column_offset);
auto end_opt = old_segment.scalar_at<entity::timestamp>(row, end_index_column_offset);

util::check(
start_opt.has_value() && end_opt.has_value(),
"Missing start/end index in old column stats segment at row {}",
row
);

entity::TimestampRange row_range{*start_opt, *end_opt};
if (entity::intersects(row_range, range_replaced)) {
if (!found_overlap) {
first_in_range = row;
found_overlap = true;
}
last_in_range = row;
}
}

if (!found_overlap) {
to_merge.emplace_back(old_segment.clone());
} else {
if (first_in_range > 0) {
to_merge.emplace_back(old_segment.truncate(0, first_in_range, false));
}
if (last_in_range + 1 < old_row_count) {
to_merge.emplace_back(old_segment.truncate(last_in_range + 1, old_row_count, false));
}
}
to_merge.emplace_back(std::move(new_segment));
return merge_column_stats_segments(to_merge);
Comment on lines +93 to +135

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two correctness concerns with the range-restricted RMW merge:

  1. Schema mismatch silently corrupts stats. In the is_filtered path the existing-stats drop optimization is skipped (version_core.cpp:2006), so new_segment holds stats for whatever column_stats was requested. If that set differs from the old segment's stat columns (e.g. user passes an explicit column_stats subset, or the eligible-column set changed between calls), merge_column_stats_segments unions columns by name: in-range rows will lack the old-only stat columns and out-of-range rows will lack the new-only columns, producing a sparse/inconsistent stats segment. The doc comment states "Both segments must share the same stat-column schema" but nothing validates or enforces it. Either assert the schemas match (like the non-filtered branch does at version_core.cpp:2081) or handle the differing-column case explicitly.

  2. Closed-interval intersects vs exclusive end_index. entity::intersects is closed on both ends (left.first <= right.second && left.second >= right.first). If the column-stats end_index is stored one-past-the-end (exclusive, as index keys are), an old row whose end_index equals range_replaced.first will be falsely treated as overlapping and dropped. Please confirm the end-index convention here and adjust the comparison if it is exclusive.

}

std::string type_to_operator_string(ColumnStatTypeInternal type) {
switch (type) {
case ColumnStatTypeInternal::MIN_V1:
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/pipeline/column_stats.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ namespace arcticdb {

SegmentInMemory merge_column_stats_segments(const std::vector<SegmentInMemory>& segments);

// Builds a merged column-stats segment that keeps every row of `old_segment` whose
// [start_index, end_index] lies fully outside `range_replaced`, and inserts the rows from
// `new_segment` (covering the in-range row-slices). Both segments must share the same
// stat-column schema. Used by the range-restricted RMW path in create_column_stats_impl.
SegmentInMemory merge_column_stats_with_range_replacement(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think let's just leave this merging with the old stats stuff out, at least for now, as it's a bit complicated and I'm not certain that we will need it.

const SegmentInMemory& old_segment, SegmentInMemory new_segment, const entity::TimestampRange& range_replaced
);

// User facing types - eg users are only allowed to create min and max together, not one or the other
enum class ColumnStatType { MINMAX };
// Total universe of column stats we support - min and max are treated separately here
Expand Down
9 changes: 5 additions & 4 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,24 +172,25 @@ folly::Future<folly::Unit> LocalVersionedEngine::delete_unreferenced_pruned_inde
}

void LocalVersionedEngine::create_column_stats_internal(
const VersionedItem& versioned_item, ColumnStats& column_stats, const ReadOptions& read_options
const VersionedItem& versioned_item, ColumnStats& column_stats, const ReadQuery& read_query,
const ReadOptions& read_options
) {
ARCTICDB_RUNTIME_SAMPLE(CreateColumnStatsInternal, 0)
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: create_column_stats");
create_column_stats_impl(store(), versioned_item, column_stats, read_options);
create_column_stats_impl(store(), versioned_item, column_stats, read_query, read_options);
}

void LocalVersionedEngine::create_column_stats_version_internal(
const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query,
const ReadOptions& read_options
const ReadQuery& read_query, const ReadOptions& read_options
) {
auto versioned_item = get_version_to_read(stream_id, version_query);
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(
versioned_item.has_value(),
"create_column_stats_version_internal: version not found for stream '{}'",
stream_id
);
create_column_stats_internal(versioned_item.value(), column_stats, read_options);
create_column_stats_internal(versioned_item.value(), column_stats, read_query, read_options);
}

void LocalVersionedEngine::drop_column_stats_internal(
Expand Down
5 changes: 3 additions & 2 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,13 @@ class LocalVersionedEngine : public VersionedEngine {
);

void create_column_stats_internal(
const VersionedItem& versioned_item, ColumnStats& column_stats, const ReadOptions& read_options
const VersionedItem& versioned_item, ColumnStats& column_stats, const ReadQuery& read_query,
const ReadOptions& read_options
);

void create_column_stats_version_internal(
const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query,
const ReadOptions& read_options
const ReadQuery& read_query, const ReadOptions& read_options
);

void drop_column_stats_internal(
Expand Down
65 changes: 48 additions & 17 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <arcticdb/stream/index.hpp>
#include <arcticdb/pipeline/query.hpp>
#include <arcticdb/pipeline/read_pipeline.hpp>
#include <arcticdb/pipeline/column_stats.hpp>
#include <arcticdb/pipeline/column_stats_filter.hpp>
#include <arcticdb/async/task_scheduler.hpp>
#include <arcticdb/async/tasks.hpp>
Expand Down Expand Up @@ -1950,9 +1951,11 @@ AtomKey index_key_to_column_stats_key(const IndexTypeKey& index_key) {

void create_column_stats_impl(
const std::shared_ptr<Store>& store, const VersionedItem& versioned_item, ColumnStats& column_stats,
const ReadOptions& read_options
const ReadQuery& caller_read_query, const ReadOptions& read_options
) {
using namespace arcticdb::pipelines;
const bool is_filtered = !std::holds_alternative<std::monostate>(caller_read_query.row_filter) ||
caller_read_query.row_range.has_value();
auto column_stats_key = index_key_to_column_stats_key(versioned_item.key_);
auto index_future = store->read(versioned_item.key_);

Expand Down Expand Up @@ -2000,7 +2003,7 @@ void create_column_stats_impl(
old_segment = std::move(column_stats_try).value();
}

if (old_segment) {
if (old_segment && !is_filtered) {
arcticc::pb2::column_stats_pb2::ColumnStatsHeader old_header;
bool unpacked = old_segment->metadata()->UnpackTo(&old_header);
util::check(
Expand All @@ -2026,29 +2029,57 @@ void create_column_stats_impl(
auto pipeline_context = std::make_shared<PipelineContext>();
pipeline_context->stream_id_ = versioned_item.key_.id();
auto read_query = std::make_shared<ReadQuery>(std::vector{std::make_shared<Clause>(std::move(*clause))});
read_query->row_filter = caller_read_query.row_filter;
read_query->row_range = caller_read_query.row_range;
read_indexed_keys_to_pipeline(pipeline_context, *read_query, read_options, index_info);

auto segs = read_process_and_collect(store, pipeline_context, read_query, read_options).get();
schema::check<ErrorCode::E_COLUMN_DOESNT_EXIST>(
!segs.empty(), "Cannot create column stats for nonexistent columns"
);
auto segments = read_process_and_collect(store, pipeline_context, read_query, read_options).get();
if (segments.empty()) {
if (is_filtered) {
// No row-slices overlap the caller's range — nothing to recompute.
log::version().warn("create_column_stats: no row-slices overlap the requested range; nothing to do");
return;
}
schema::raise<ErrorCode::E_COLUMN_DOESNT_EXIST>("Cannot create column stats for nonexistent columns");
}

std::vector<SegmentInMemory> segments_in_memory;
for (auto& seg : segs) {
segments_in_memory.emplace_back(seg.release_segment(store));
std::vector<SegmentInMemory> col_stats_segments_in_memory;
for (auto& segment : segments) {
col_stats_segments_in_memory.emplace_back(segment.release_segment(store));
}
SegmentInMemory new_segment = merge_column_stats_segments(segments_in_memory);
util::check(new_segment.metadata(), "new_segment should always have metadata");
new_segment.descriptor().set_id(versioned_item.key_.id());

SegmentInMemory new_col_stats_segment = merge_column_stats_segments(col_stats_segments_in_memory);
util::check(new_col_stats_segment.metadata(), "new_col_stats_segment should always have metadata");
new_col_stats_segment.descriptor().set_id(versioned_item.key_.id());

storage::UpdateOpts update_opts;
update_opts.upsert_ = true;

if (!old_segment.has_value()) {
store->update(column_stats_key, std::move(new_segment), update_opts).get();
store->update(column_stats_key, std::move(new_col_stats_segment), update_opts).get();
} else if (is_filtered) {
// Range-restricted RMW: keep out-of-range rows from old_segment, replace in-range rows with new_col_stats_segment.
auto range_start = std::numeric_limits<entity::timestamp>::max();
auto range_end = std::numeric_limits<entity::timestamp>::min();

for (size_t row = 0; row < new_col_stats_segment.row_count(); ++row) {
auto s = new_col_stats_segment.scalar_at<entity::timestamp>(row, start_index_column_offset);
auto e = new_col_stats_segment.scalar_at<entity::timestamp>(row, end_index_column_offset);
util::check(s.has_value() && e.has_value(), "Missing start/end index in new column stats segment");
range_start = std::min(range_start, *s);
range_end = std::max(range_end, *e);
}

SegmentInMemory merged = merge_column_stats_with_range_replacement(
*old_segment, std::move(new_col_stats_segment), entity::TimestampRange{range_start, range_end}
);

merged.descriptor().set_id(versioned_item.key_.id());
store->update(column_stats_key, std::move(merged), update_opts).get();
} else {
// Check that the start and end index columns match
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
new_segment.column(0) == old_segment->column(0) && new_segment.column(1) == old_segment->column(1),
new_col_stats_segment.column(0) == old_segment->column(0) && new_col_stats_segment.column(1) == old_segment->column(1),
"Cannot create column stats, existing column stats row-groups do not match"
);
// Merge the ColumnStatsHeader metadata from old and new segments
Expand All @@ -2058,14 +2089,14 @@ void create_column_stats_impl(
log::version().warn(
"Found existing Column Stats key without metadata? Just creating new stats... {}", column_stats_key
);
store->update(column_stats_key, std::move(new_segment), update_opts).get();
store->update(column_stats_key, std::move(new_col_stats_segment), update_opts).get();
return;
}
bool unpacked = old_metadata->UnpackTo(&old_header);
util::check(unpacked, "Could not unpack column stats metadata from the old header?");
validate_column_stats_header_version(old_header);
arcticc::pb2::column_stats_pb2::ColumnStatsHeader new_header;
unpacked = new_segment.metadata()->UnpackTo(&new_header);
unpacked = new_col_stats_segment.metadata()->UnpackTo(&new_header);
util::check(unpacked, "Could not unpack column stats metadata from the new header?");
auto next_offset = old_segment->descriptor().field_count();
for (const auto& [data_col_offset, entry_list] : new_header.stats_by_column()) {
Expand All @@ -2077,7 +2108,7 @@ void create_column_stats_impl(
}
}
// Add new stat columns to the old segment
old_segment->concatenate(std::move(new_segment));
old_segment->concatenate(std::move(new_col_stats_segment));
google::protobuf::Any any;
any.PackFrom(old_header);
old_segment->reset_metadata();
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ AtomKey index_key_to_column_stats_key(const IndexTypeKey& index_key);

void create_column_stats_impl(
const std::shared_ptr<Store>& store, const VersionedItem& versioned_item, ColumnStats& column_stats,
const ReadOptions& read_options
const ReadQuery& read_query, const ReadOptions& read_options
);

void drop_column_stats_impl(
Expand Down
5 changes: 3 additions & 2 deletions cpp/arcticdb/version/version_store_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -825,11 +825,12 @@ VersionedItem PythonVersionStore::write_metadata(
}

void PythonVersionStore::create_column_stats_version(
const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query
const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query,
const ReadQuery& read_query
) {
ReadOptions read_options;
read_options.set_dynamic_schema(cfg().write_options().dynamic_schema());
create_column_stats_version_internal(stream_id, column_stats, version_query, read_options);
create_column_stats_version_internal(stream_id, column_stats, version_query, read_query, read_options);
}

void PythonVersionStore::drop_column_stats_version(
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/version/version_store_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class PythonVersionStore : public LocalVersionedEngine {
VersionedItem write_metadata(const StreamId& stream_id, const py::object& user_meta, bool prune_previous_versions);

void create_column_stats_version(
const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query
const StreamId& stream_id, ColumnStats& column_stats, const VersionQuery& version_query,
const ReadQuery& read_query
);

void drop_column_stats_version(
Expand Down
60 changes: 56 additions & 4 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,14 +1266,27 @@ def _batch_update_internal(
_log_warning_on_writing_empty_dataframe(data_vector[idx], symbols[idx])
return result

def create_column_stats(
self, symbol: str, column_stats: Dict[str, Set[str]], as_of: Optional[VersionQueryInput] = None
def create_column_stats_experimental(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming the public NativeVersionStore.create_column_stats to create_column_stats_experimental is a breaking API change with no deprecation cycle. Existing callers break immediately — e.g. python/benchmarks/column_stats.py still calls lib._nvs.create_column_stats(...) in 4 places. Per the deprecation protocol (PR_REVIEW_GUIDELINES §2), keep create_column_stats as a working alias (or thin wrapper) and update all in-repo callers, rather than removing the name outright.

self,
symbol: str,
column_stats: Optional[Dict[str, Set[str]]] = None,
as_of: Optional[VersionQueryInput] = None,
date_range: Optional[DateRangeInput] = None,
row_range: Optional[Tuple[int, int]] = None,
) -> None:
"""
Calculates the specified column statistics for each row-slice for the given symbol. In the future, these
statistics will be used by `QueryBuilder` filtering operations to reduce the number of data segments read out
of storage.

When `column_stats` is omitted, MINMAX stats are built for every non-index column whose dtype is numeric
(uint/int/float) or a UTC nanosecond timestamp. Any pre-existing stats are merged with the newly computed
ones (read-modify-write).

When `date_range` or `row_range` is provided, stats are only (re)computed for the row-slices overlapping that
range. Existing stats for row-slices that fall fully outside the range are kept; in-range slice stats are
replaced by the freshly computed values. `date_range` and `row_range` are mutually exclusive.

Parameters
----------
symbol: `str`
Expand All @@ -1285,14 +1298,53 @@ def create_column_stats(
"MINMAX" : store the minimum and maximum value for the column in each row-slice
as_of : `Optional[VersionQueryInput]`, default=None
See documentation of `read` method for more details.
date_range: `Optional[DateRangeInput]`, default=None
Restrict computation to row-slices overlapping this date range. Mutually exclusive with `row_range`.
Only supported on timestamp-indexed symbols.
row_range: `Optional[Tuple[int, int]]`, default=None
Restrict computation to row-slices overlapping the given (start, end) row range. Mutually exclusive
with `date_range`.

Returns
-------
None
"""
column_stats = self._get_column_stats(column_stats)
check(
date_range is None or row_range is None,
"create_column_stats_experimental: date_range and row_range are mutually exclusive",
)

# if no column stats specified in the function call, fallback to columns stats over all columns
if column_stats is None:
column_stats = self._get_eligible_column_stats_spec(symbol, as_of)
if not column_stats:
return

column_stats = self._convert_to_native_column_stats(column_stats)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._convert_to_native_column_stats(...) does not exist anywhere in the codebase — the only definition is _get_column_stats (line 2439). Every call to create_column_stats_experimental will raise AttributeError before doing any work, so the feature is completely non-functional. This also confirms there is no test exercising this path (a single test would have caught it). Use the existing helper:

Suggested change
column_stats = self._convert_to_native_column_stats(column_stats)
column_stats = self._get_column_stats(column_stats)

version_query = self._get_version_query(as_of)
self.version_store.create_column_stats_version(symbol, column_stats, version_query)
read_query = _PythonVersionStoreReadQuery()

if date_range is not None:
read_query.row_filter = _normalize_dt_range(date_range)

if row_range is not None:
read_query.row_range = _SignedRowRange(row_range[0], row_range[1])

self.version_store.create_column_stats_version(symbol, column_stats, version_query, read_query)

def _get_eligible_column_stats_spec(
self, symbol: str, as_of: Optional[VersionQueryInput]
) -> Dict[str, Set[str]]:
info = self.get_info(symbol, version=as_of)
numeric_value_types = {
TypeDescriptor.ValueType.UINT,
TypeDescriptor.ValueType.INT,
TypeDescriptor.ValueType.FLOAT,
TypeDescriptor.ValueType.NANOSECONDS_UTC,
}
columns = info["col_names"]["columns"]
dtypes = info["dtype"]
return {str(col): {"MINMAX"} for col, dtype in zip(columns, dtypes) if dtype.value_type in numeric_value_types}

def drop_column_stats(
self, symbol: str, column_stats: Optional[Dict[str, Set[str]]] = None, as_of: Optional[VersionQueryInput] = None
Expand Down
Loading