Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 95 additions & 2 deletions velox/dwio/parquet/writer/arrow/Metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ static std::shared_ptr<Statistics> MakeTypedColumnStats(
metadata.num_values - metadata.statistics.null_count,
metadata.statistics.null_count,
metadata.statistics.distinct_count,
/*nan_count=*/0,
metadata.statistics.__isset.max_value ||
metadata.statistics.__isset.min_value,
metadata.statistics.__isset.null_count,
metadata.statistics.__isset.distinct_count);
metadata.statistics.__isset.distinct_count,
/*has_nan_count=*/false);
}
// Default behavior
return MakeStatistics<DType>(
Expand All @@ -114,9 +116,11 @@ static std::shared_ptr<Statistics> MakeTypedColumnStats(
metadata.num_values - metadata.statistics.null_count,
metadata.statistics.null_count,
metadata.statistics.distinct_count,
/*nan_count=*/0,
metadata.statistics.__isset.max || metadata.statistics.__isset.min,
metadata.statistics.__isset.null_count,
metadata.statistics.__isset.distinct_count);
metadata.statistics.__isset.distinct_count,
/*has_nan_count=*/false);
}

std::shared_ptr<Statistics> MakeColumnStats(
Expand Down Expand Up @@ -1015,6 +1019,22 @@ class FileMetaData::FileMetaDataImpl {
file_decryptor_ = file_decryptor;
}

// Set NaN counts from the builder (called during Finish)
// This stores total NaN counts per field ID across all row groups.
void setNaNCounts(
std::unordered_map<int32_t, std::pair<int64_t, bool>> nan_counts) {
field_nan_counts_ = std::move(nan_counts);
}

// Get total NaN count for a specific field ID across all row groups.
std::pair<int64_t, bool> getNaNCount(int32_t fieldId) const {
auto it = field_nan_counts_.find(fieldId);
if (it != field_nan_counts_.end()) {
return it->second;
}
return {0, false};
}

private:
friend FileMetaDataBuilder;
uint32_t metadata_len_ = 0;
Expand All @@ -1024,6 +1044,9 @@ class FileMetaData::FileMetaDataImpl {
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
const ReaderProperties properties_;
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
// Total NaN counts per field ID across all row groups: field_id ->
// (nan_count, has_nan_count).
std::unordered_map<int32_t, std::pair<int64_t, bool>> field_nan_counts_;

void InitSchema() {
if (metadata_->schema.empty()) {
Expand Down Expand Up @@ -1200,6 +1223,10 @@ std::shared_ptr<FileMetaData> FileMetaData::Subset(
return impl_->Subset(row_groups);
}

std::pair<int64_t, bool> FileMetaData::getNaNCount(int32_t fieldId) const {
return impl_->getNaNCount(fieldId);
}

void FileMetaData::WriteTo(
::arrow::io::OutputStream* dst,
const std::shared_ptr<Encryptor>& encryptor) const {
Expand Down Expand Up @@ -1715,6 +1742,19 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
// column metadata
void SetStatistics(const EncodedStatistics& val) {
column_chunk_->meta_data.__set_statistics(ToThrift(val));
// Store NaN count separately since it's not written to the parquet file.
if (val.has_nan_count) {
nan_count_ = val.nan_count;
has_nan_count_ = true;
}
}

int64_t nan_count() const {
return nan_count_;
}

bool has_nan_count() const {
return has_nan_count_;
}

void Finish(
Expand Down Expand Up @@ -1883,6 +1923,9 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
owned_column_chunk_;
const std::shared_ptr<WriterProperties> properties_;
const ColumnDescriptor* column_;
// NaN count is stored separately since it's not written to the parquet file.
int64_t nan_count_ = 0;
bool has_nan_count_ = false;
};

std::unique_ptr<ColumnChunkMetaDataBuilder> ColumnChunkMetaDataBuilder::Make(
Expand Down Expand Up @@ -1970,6 +2013,14 @@ int64_t ColumnChunkMetaDataBuilder::total_compressed_size() const {
return impl_->total_compressed_size();
}

int64_t ColumnChunkMetaDataBuilder::nan_count() const {
return impl_->nan_count();
}

bool ColumnChunkMetaDataBuilder::has_nan_count() const {
return impl_->has_nan_count();
}

class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
public:
explicit RowGroupMetaDataBuilderImpl(
Expand Down Expand Up @@ -2062,6 +2113,16 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
return row_group_->num_rows;
}

// Returns a map of field_id -> (nan_count, has_nan_count).
std::unordered_map<int32_t, std::pair<int64_t, bool>> nan_counts() const {
std::unordered_map<int32_t, std::pair<int64_t, bool>> result;
for (const auto& builder : column_builders_) {
int32_t field_id = builder->descr()->schema_node()->field_id();
result[field_id] = {builder->nan_count(), builder->has_nan_count()};
}
return result;
}

private:
void InitializeColumns(int ncols) {
row_group_->columns.resize(ncols);
Expand Down Expand Up @@ -2119,6 +2180,11 @@ void RowGroupMetaDataBuilder::Finish(
impl_->Finish(total_bytes_written, row_group_ordinal);
}

std::unordered_map<int32_t, std::pair<int64_t, bool>>
RowGroupMetaDataBuilder::nan_counts() const {
return impl_->nan_counts();
}

// file metadata
class FileMetaDataBuilder::FileMetaDataBuilderImpl {
public:
Expand All @@ -2138,6 +2204,9 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
}

RowGroupMetaDataBuilder* AppendRowGroup() {
// Accumulate NaN counts from the previous row group before creating a new
// one.
accumulateNaNCountsFromCurrentRowGroup();
row_groups_.emplace_back();
current_row_group_builder_ = RowGroupMetaDataBuilder::Make(
properties_, schema_, &row_groups_.back());
Expand Down Expand Up @@ -2182,6 +2251,9 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {

std::unique_ptr<FileMetaData> Finish(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
// Accumulate NaN counts from the last row group.
accumulateNaNCountsFromCurrentRowGroup();

int64_t total_rows = 0;
for (auto row_group : row_groups_) {
total_rows += row_group.num_rows;
Expand Down Expand Up @@ -2259,6 +2331,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
file_meta_data->impl_->metadata_ = std::move(metadata_);
file_meta_data->impl_->InitSchema();
file_meta_data->impl_->InitKeyValueMetadata();
// Pass total NaN counts per field ID to FileMetaData.
file_meta_data->impl_->setNaNCounts(std::move(field_nan_counts_));
return file_meta_data;
}

Expand Down Expand Up @@ -2290,12 +2364,31 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
crypto_metadata_;

private:
// Helper to accumulate NaN counts from the current row group builder.
void accumulateNaNCountsFromCurrentRowGroup() {
if (!current_row_group_builder_) {
return;
}
auto rg_nan_counts = current_row_group_builder_->nan_counts();
// Accumulate NaN counts from this row group (keyed by field ID).
for (const auto& [fieldId, countPair] : rg_nan_counts) {
const auto& [count, has_count] = countPair;
if (has_count) {
field_nan_counts_[fieldId].first += count;
field_nan_counts_[fieldId].second = true;
}
}
}

const std::shared_ptr<WriterProperties> properties_;
std::vector<facebook::velox::parquet::thrift::RowGroup> row_groups_;

std::unique_ptr<RowGroupMetaDataBuilder> current_row_group_builder_;
const SchemaDescriptor* schema_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
// Total NaN counts per field ID across all row groups: field_id ->
// (nan_count, has_nan_count).
std::unordered_map<int32_t, std::pair<int64_t, bool>> field_nan_counts_;
};

std::unique_ptr<FileMetaDataBuilder> FileMetaDataBuilder::Make(
Expand Down
18 changes: 18 additions & 0 deletions velox/dwio/parquet/writer/arrow/Metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -416,6 +417,12 @@ class PARQUET_EXPORT FileMetaData {
std::shared_ptr<FileMetaData> Subset(
const std::vector<int>& row_groups) const;

/// \brief Get total NaN count for a specific field ID across all row groups.
/// Returns a pair of (nan_count, has_nan_count).
/// NaN counts are collected during writing but not written to the parquet
/// file.
std::pair<int64_t, bool> getNaNCount(int32_t fieldId) const;

private:
friend FileMetaDataBuilder;
friend class SerializedFile;
Expand Down Expand Up @@ -486,6 +493,13 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
const ColumnDescriptor* descr() const;

int64_t total_compressed_size() const;

// NaN count accessors - NaN counts are collected during writing but not
// written to the parquet file.
int64_t nan_count() const;

bool has_nan_count() const;

// commit the metadata

void Finish(
Expand Down Expand Up @@ -537,6 +551,10 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {

void set_num_rows(int64_t num_rows);

// Get NaN counts for all columns in current row group.
// Returns a map of field_id -> (nan_count, has_nan_count).
std::unordered_map<int32_t, std::pair<int64_t, bool>> nan_counts() const;

// commit the metadata
void Finish(int64_t total_bytes_written, int16_t row_group_ordinal = -1);

Expand Down
Loading
Loading