Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions cpp/arcticdb/pipeline/column_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ std::string type_to_operator_string(ColumnStatTypeInternal type) {
return "v1_MIN";
case ColumnStatTypeInternal::MAX_V1:
return "v1_MAX";
case ColumnStatTypeInternal::NAN_COUNT_V1:
return "v1_NAN_COUNT";
case ColumnStatTypeInternal::NULL_COUNT_V1:
return "v1_NULL_COUNT";
default:
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unknown column stat type requested");
}
Expand Down Expand Up @@ -160,6 +164,8 @@ ColumnStats::ColumnStats(
switch (entry.type()) {
case MIN_V1:
case MAX_V1:
case NAN_COUNT_V1:
case NULL_COUNT_V1:
external_type = ColumnStatType::MINMAX;
break;
case UNKNOWN:
Expand Down Expand Up @@ -209,7 +215,10 @@ namespace {
std::unordered_set<ColumnStatTypeInternal> external_to_internal(ColumnStatType type) {
switch (type) {
case ColumnStatType::MINMAX:
return {ColumnStatTypeInternal::MIN_V1, ColumnStatTypeInternal::MAX_V1};
return {ColumnStatTypeInternal::MIN_V1,
ColumnStatTypeInternal::MAX_V1,
ColumnStatTypeInternal::NAN_COUNT_V1,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

external_to_internal(MINMAX) now unconditionally returns 4 internal stat types. drop() calls this to construct the list of column names to remove (v1_MIN, v1_MAX, v1_NAN_COUNT, v1_NAT_COUNT). For column stats segments that were written by an older client (only v1_MIN and v1_MAX columns exist), dropping will produce names for columns that aren't in the segment.

Please verify what the downstream consumer of dropped_names does when asked to drop a non-existent column — if it raises, this is a forward-compatibility break that needs handling; if it silently ignores, please add a test that creates column stats with the old format and then drops them with the new client.

ColumnStatTypeInternal::NULL_COUNT_V1};
default:
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unknown column stat type");
}
Expand Down Expand Up @@ -294,7 +303,13 @@ std::optional<Clause> ColumnStats::clause() const {
),
ColumnName(
to_segment_column_name(name_and_stat_types.mangled_name, ColumnStatTypeInternal::MAX_V1)
)
),
ColumnName(to_segment_column_name(
name_and_stat_types.mangled_name, ColumnStatTypeInternal::NAN_COUNT_V1
)),
ColumnName(to_segment_column_name(
name_and_stat_types.mangled_name, ColumnStatTypeInternal::NULL_COUNT_V1
))
));
break;
default:
Expand Down
34 changes: 30 additions & 4 deletions cpp/arcticdb/processing/unsorted_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) {
using type_info = ScalarTypeInfo<decltype(col_tag)>;
using RawType = typename type_info::RawType;
if constexpr (!is_sequence_type(type_info::data_type)) {
// null_count_ tracks rows that are genuinely absent (sparse-map gaps from Arrow
// validity bitmaps). nan_count_ tracks in-band sentinel values found while iterating
// the dense values (NaN for floats, NaT for time types) - see the for_each below.
if (input_column.column_->is_sparse()) {
const auto sparse_gap_count = input_column.column_->last_row() + 1 - input_column.column_->row_count();
if (sparse_gap_count > 0) {
null_count_ += static_cast<uint64_t>(sparse_gap_count);
}
}
auto is_nat_or_nan = []([[maybe_unused]] RawType v) {
if constexpr (is_floating_point_type(type_info::data_type)) {
return std::isnan(v);
Expand All @@ -46,9 +55,12 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) {
[[maybe_unused]] bool any_nan{false};
arcticdb::for_each<typename type_info::TDT>(*input_column.column_, [&](auto value) {
const auto& curr = static_cast<RawType>(value);
if constexpr (is_floating_point_type(type_info::data_type) || is_time_type(type_info::data_type)) {
// Skip NaN/NaT as they don't generate a stable ordering
// In-band sentinel (NaN for floats, NaT for time types) - count it and skip the
// min/max update so those reflect only real values.
if constexpr (is_floating_point_type(type_info::data_type) ||
is_time_type(type_info::data_type)) {
if (is_nat_or_nan(curr)) {
++nan_count_;
any_nan = true;
return;
}
Expand Down Expand Up @@ -78,8 +90,8 @@ void MinMaxAggregatorData::aggregate(const ColumnWithStrings& input_column) {

SegmentInMemory MinMaxAggregatorData::finalize(const std::vector<ColumnName>& output_column_names) const {
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
output_column_names.size() == 2,
"Expected 2 output column names in MinMaxAggregatorData::finalize, but got {}",
output_column_names.size() == 4,
"Expected 4 output column names in MinMaxAggregatorData::finalize, but got {}",
output_column_names.size()
);
SegmentInMemory seg;
Expand All @@ -93,16 +105,30 @@ SegmentInMemory MinMaxAggregatorData::finalize(const std::vector<ColumnName>& ou
auto max_col = std::make_shared<Column>(make_scalar_type(max_->data_type()), Sparsity::PERMITTED);
max_col->push_back<RawType>(max_->get<RawType>());

auto nan_count_col = std::make_shared<Column>(make_scalar_type(DataType::UINT64), Sparsity::PERMITTED);
nan_count_col->push_back<uint64_t>(nan_count_);

auto null_count_col = std::make_shared<Column>(make_scalar_type(DataType::UINT64), Sparsity::PERMITTED);
null_count_col->push_back<uint64_t>(null_count_);

auto& entry_list = (*header.mutable_stats_by_column())[data_col_offset_];
auto* min_entry = entry_list.add_entries();
min_entry->set_stats_seg_offset(0);
min_entry->set_type(arcticc::pb2::column_stats_pb2::MIN_V1);
auto* max_entry = entry_list.add_entries();
max_entry->set_stats_seg_offset(1);
max_entry->set_type(arcticc::pb2::column_stats_pb2::MAX_V1);
auto* nan_entry = entry_list.add_entries();
nan_entry->set_stats_seg_offset(2);
nan_entry->set_type(arcticc::pb2::column_stats_pb2::NAN_COUNT_V1);
auto* null_entry = entry_list.add_entries();
null_entry->set_stats_seg_offset(3);
null_entry->set_type(arcticc::pb2::column_stats_pb2::NULL_COUNT_V1);

seg.add_column(scalar_field(min_col->type().data_type(), output_column_names[0].value), min_col);
seg.add_column(scalar_field(max_col->type().data_type(), output_column_names[1].value), max_col);
seg.add_column(scalar_field(DataType::UINT64, output_column_names[2].value), nan_count_col);
seg.add_column(scalar_field(DataType::UINT64, output_column_names[3].value), null_count_col);
});
}

Expand Down
16 changes: 13 additions & 3 deletions cpp/arcticdb/processing/unsorted_aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,33 @@ class MinMaxAggregatorData {
private:
std::optional<Value> min_;
std::optional<Value> max_;
uint64_t nan_count_{0};
uint64_t null_count_{0};
size_t data_col_offset_;
};

class MinMaxAggregator {
public:
explicit MinMaxAggregator(
ColumnName column_name, size_t data_col_offset, ColumnName output_column_name_min,
ColumnName output_column_name_max
ColumnName output_column_name_max, ColumnName output_column_name_nan_count,
ColumnName output_column_name_null_count
) :
column_name_(std::move(column_name)),
data_col_offset_(data_col_offset),
output_column_name_min_(std::move(output_column_name_min)),
output_column_name_max_(std::move(output_column_name_max)) {}
output_column_name_max_(std::move(output_column_name_max)),
output_column_name_nan_count_(std::move(output_column_name_nan_count)),
output_column_name_null_count_(std::move(output_column_name_null_count)) {}

ARCTICDB_MOVE_COPY_DEFAULT(MinMaxAggregator)

[[nodiscard]] ColumnName get_input_column_name() const { return column_name_; }
[[nodiscard]] std::vector<ColumnName> get_output_column_names() const {
return {output_column_name_min_, output_column_name_max_};
return {output_column_name_min_,
output_column_name_max_,
output_column_name_nan_count_,
output_column_name_null_count_};
}
[[nodiscard]] MinMaxAggregatorData get_aggregator_data() const { return MinMaxAggregatorData(data_col_offset_); }

Expand All @@ -51,6 +59,8 @@ class MinMaxAggregator {
size_t data_col_offset_;
ColumnName output_column_name_min_;
ColumnName output_column_name_max_;
ColumnName output_column_name_nan_count_;
ColumnName output_column_name_null_count_;
};

class AggregatorDataBase {
Expand Down
2 changes: 2 additions & 0 deletions cpp/proto/arcticc/pb2/column_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ enum ColumnStatsType {
// misinterpret the new statistics format.
MIN_V1 = 1;
MAX_V1 = 2;
NAN_COUNT_V1 = 3;
NULL_COUNT_V1 = 4;
}

message StatEntry {
Expand Down
Loading
Loading