Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ fn encoded_meta() -> Vec<u8> {
distinct_count: None,
max_value: Some(vec![rng.random(); 8]),
min_value: Some(vec![rng.random(); 8]),
nan_count: None,
is_max_value_exact: Some(true),
is_min_value_exact: Some(true),
};
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ impl FallbackEncoder {
encoding,
min_value,
max_value,
nan_count: None,
variable_length_bytes,
})
}
Expand Down Expand Up @@ -409,6 +410,7 @@ impl DictEncoder {
encoding: Encoding::RLE_DICTIONARY,
min_value,
max_value,
nan_count: None,
variable_length_bytes,
}
}
Expand Down
15 changes: 15 additions & 0 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct DataPageValues<T> {
pub encoding: Encoding,
pub min_value: Option<T>,
pub max_value: Option<T>,
pub nan_count: Option<u64>,
pub variable_length_bytes: Option<i64>,
}

Expand Down Expand Up @@ -131,6 +132,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
statistics_enabled: EnabledStatistics,
min_value: Option<T::T>,
max_value: Option<T::T>,
nan_count: Option<u64>,
bloom_filter: Option<Sbbf>,
variable_length_bytes: Option<i64>,
}
Expand All @@ -148,6 +150,17 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
// INTERVAL has undefined sort order, so don't write min/max stats for it
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
// Count NaN values for floating point types
if matches!(T::T::PHYSICAL_TYPE, Type::FLOAT | Type::DOUBLE)
|| (T::T::PHYSICAL_TYPE == Type::FIXED_LEN_BYTE_ARRAY
&& self.descr.logical_type() == Some(LogicalType::Float16))
{
let nan_count = slice.iter().filter(|v| is_nan(&self.descr, *v)).count() as u64;
if nan_count > 0 {
*self.nan_count.get_or_insert(0) += nan_count;
}
}

if let Some((min, max)) = self.min_max(slice, None) {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
Expand Down Expand Up @@ -210,6 +223,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
bloom_filter,
min_value: None,
max_value: None,
nan_count: None,
variable_length_bytes: None,
})
}
Expand Down Expand Up @@ -304,6 +318,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
num_values: std::mem::take(&mut self.num_values),
min_value: self.min_value.take(),
max_value: self.max_value.take(),
nan_count: self.nan_count.take(),
variable_length_bytes: self.variable_length_bytes.take(),
})
}
Expand Down
128 changes: 123 additions & 5 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ struct ColumnMetrics<T: Default> {
min_column_value: Option<T>,
max_column_value: Option<T>,
num_column_nulls: u64,
num_column_nans: Option<u64>,
column_distinct_count: Option<u64>,
variable_length_bytes: Option<i64>,
repetition_level_histogram: Option<LevelHistogram>,
Expand Down Expand Up @@ -1003,6 +1004,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;

if let Some(nan_count) = values_data.nan_count {
*self.column_metrics.num_column_nans.get_or_insert(0) += nan_count;
}

let page_statistics = match (values_data.min_value, values_data.max_value) {
(Some(min), Some(max)) => {
// Update chunk level statistics
Expand All @@ -1016,7 +1021,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
None,
Some(self.page_metrics.num_page_nulls),
false,
),
)
.with_nan_count(values_data.nan_count),
)
}
_ => None,
Expand Down Expand Up @@ -1190,6 +1196,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Some(self.column_metrics.num_column_nulls),
false,
)
.with_nan_count(self.column_metrics.num_column_nans)
.with_backwards_compatible_min_max(backwards_compatible_min_max)
.into();

Expand Down Expand Up @@ -2649,6 +2656,7 @@ mod tests {
stats.max_opt().unwrap(),
&ByteArray::from(f16::ONE + f16::ONE)
);
assert_eq!(stats.nan_count_opt(), Some(1));
}

#[test]
Expand All @@ -2665,6 +2673,7 @@ mod tests {
stats.max_opt().unwrap(),
&ByteArray::from(f16::ONE + f16::ONE)
);
assert_eq!(stats.nan_count_opt(), Some(1));
}

#[test]
Expand All @@ -2678,6 +2687,7 @@ mod tests {
assert!(stats.min_bytes_opt().is_none());
assert!(stats.max_bytes_opt().is_none());
assert!(stats.is_min_max_backwards_compatible());
assert_eq!(stats.nan_count_opt(), Some(2));
}

#[test]
Expand Down Expand Up @@ -2736,24 +2746,26 @@ mod tests {
fn test_float_statistics_nan_middle() {
let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Float(stats) = stats {
if let Statistics::Float(stats) = &stats {
assert_eq!(stats.min_opt().unwrap(), &1.0);
assert_eq!(stats.max_opt().unwrap(), &2.0);
} else {
panic!("expecting Statistics::Float");
}
assert_eq!(stats.nan_count_opt(), Some(1));
}

#[test]
fn test_float_statistics_nan_start() {
let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Float(stats) = stats {
if let Statistics::Float(stats) = &stats {
assert_eq!(stats.min_opt().unwrap(), &1.0);
assert_eq!(stats.max_opt().unwrap(), &2.0);
} else {
panic!("expecting Statistics::Float");
}
assert_eq!(stats.nan_count_opt(), Some(1));
}

#[test]
Expand All @@ -2763,6 +2775,7 @@ mod tests {
assert!(stats.max_bytes_opt().is_none());
assert!(stats.is_min_max_backwards_compatible());
assert!(matches!(stats, Statistics::Float(_)));
assert_eq!(stats.nan_count_opt(), Some(2));
}

#[test]
Expand Down Expand Up @@ -2823,24 +2836,26 @@ mod tests {
fn test_double_statistics_nan_middle() {
let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Double(stats) = stats {
if let Statistics::Double(stats) = &stats {
assert_eq!(stats.min_opt().unwrap(), &1.0);
assert_eq!(stats.max_opt().unwrap(), &2.0);
} else {
panic!("expecting Statistics::Double");
}
assert_eq!(stats.nan_count_opt(), Some(1));
}

#[test]
fn test_double_statistics_nan_start() {
let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Double(stats) = stats {
if let Statistics::Double(stats) = &stats {
assert_eq!(stats.min_opt().unwrap(), &1.0);
assert_eq!(stats.max_opt().unwrap(), &2.0);
} else {
panic!("expecting Statistics::Double");
}
assert_eq!(stats.nan_count_opt(), Some(1));
}

#[test]
Expand All @@ -2850,6 +2865,7 @@ mod tests {
assert!(stats.max_bytes_opt().is_none());
assert!(matches!(stats, Statistics::Double(_)));
assert!(stats.is_min_max_backwards_compatible());
assert_eq!(stats.nan_count_opt(), Some(2));
}

#[test]
Expand Down Expand Up @@ -2906,6 +2922,108 @@ mod tests {
}
}

#[test]
fn test_float_statistics_infinity_with_nan() {
// Test column with Infinity and NaN values
let stats =
statistics_roundtrip::<FloatType>(&[1.0, f32::INFINITY, f32::NAN, 2.0, f32::NAN]);
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Float(stats) = &stats {
assert_eq!(stats.min_opt().unwrap(), &1.0);
assert_eq!(stats.max_opt().unwrap(), &f32::INFINITY);
} else {
panic!("expecting Statistics::Float");
}
assert_eq!(stats.nan_count_opt(), Some(2));
}

#[test]
fn test_float_statistics_neg_infinity_with_nan() {
// Test column with -Infinity and NaN values
let stats = statistics_roundtrip::<FloatType>(&[
f32::NEG_INFINITY,
-1.0,
f32::NAN,
0.0,
f32::NAN,
1.0,
]);
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Float(stats) = &stats {
assert_eq!(stats.min_opt().unwrap(), &f32::NEG_INFINITY);
assert_eq!(stats.max_opt().unwrap(), &1.0);
} else {
panic!("expecting Statistics::Float");
}
assert_eq!(stats.nan_count_opt(), Some(2));
}

#[test]
fn test_float_statistics_both_infinities_with_nan() {
// Test column with both +Infinity, -Infinity and NaN values
let stats = statistics_roundtrip::<FloatType>(&[
f32::NEG_INFINITY,
f32::NAN,
0.0,
f32::INFINITY,
f32::NAN,
]);
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Float(stats) = &stats {
assert_eq!(stats.min_opt().unwrap(), &f32::NEG_INFINITY);
assert_eq!(stats.max_opt().unwrap(), &f32::INFINITY);
} else {
panic!("expecting Statistics::Float");
}
assert_eq!(stats.nan_count_opt(), Some(2));
}

#[test]
fn test_double_statistics_infinity_with_nan() {
// Test with f64 (double) type
let stats = statistics_roundtrip::<DoubleType>(&[
1.0,
f64::INFINITY,
f64::NAN,
f64::NEG_INFINITY,
f64::NAN,
2.0,
]);
assert!(stats.is_min_max_backwards_compatible());
if let Statistics::Double(stats) = &stats {
assert_eq!(stats.min_opt().unwrap(), &f64::NEG_INFINITY);
assert_eq!(stats.max_opt().unwrap(), &f64::INFINITY);
} else {
panic!("expecting Statistics::Double");
}

assert_eq!(stats.nan_count_opt(), Some(2));
}

#[test]
fn test_float16_statistics_infinity_with_nan() {
// Test Float16 with Infinity and NaN
let input = [
f16::ONE,
f16::INFINITY,
f16::NAN,
f16::NEG_INFINITY,
f16::NAN,
]
.into_iter()
.map(|s| ByteArray::from(s).into())
.collect::<Vec<_>>();

let stats = float16_statistics_roundtrip(&input);
assert!(stats.is_min_max_backwards_compatible());
assert_eq!(
stats.min_opt().unwrap(),
&ByteArray::from(f16::NEG_INFINITY)
);
assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::INFINITY));
assert_eq!(stats.nan_count_opt(), Some(2));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the spec also talks about the case for all nans, so that would also be a good case to check

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review, I will polish the tests side.


#[test]
fn test_compare_greater_byte_array_decimals() {
assert!(!compare_greater_byte_array_decimals(&[], &[],),);
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1968,9 +1968,9 @@ mod tests {
.build();

#[cfg(not(feature = "encryption"))]
let base_expected_size = 2312;
let base_expected_size = 2376;
#[cfg(feature = "encryption")]
let base_expected_size = 2648;
let base_expected_size = 2712;

assert_eq!(parquet_meta.memory_size(), base_expected_size);

Expand Down Expand Up @@ -1998,9 +1998,9 @@ mod tests {
.build();

#[cfg(not(feature = "encryption"))]
let bigger_expected_size = 2816;
let bigger_expected_size = 2880;
#[cfg(feature = "encryption")]
let bigger_expected_size = 3152;
let bigger_expected_size = 3216;

// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
Expand Down
Loading
Loading