Skip to content
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl FallbackEncoder {
encoding,
min_value,
max_value,
nan_count: None,
variable_length_bytes,
})
}
Expand Down Expand Up @@ -411,6 +412,7 @@ impl DictEncoder {
encoding: Encoding::RLE_DICTIONARY,
min_value,
max_value,
nan_count: None,
variable_length_bytes,
}
}
Expand Down
115 changes: 113 additions & 2 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,7 @@ fn get_fsb_array_slice(
#[cfg(test)]
mod tests {
use super::*;
use std::cmp::Ordering;
use std::collections::HashMap;

use std::fs::File;
Expand Down Expand Up @@ -2918,10 +2919,120 @@ mod tests {
for column in row_group.columns() {
assert!(column.offset_index_offset().is_some());
assert!(column.offset_index_length().is_some());
assert!(column.column_index_offset().is_none());
assert!(column.column_index_length().is_none());
assert!(column.column_index_offset().is_some());
assert!(column.column_index_length().is_some());
}
}
assert!(file_meta_data.column_index().is_some());
if let Some(col_indexes) = file_meta_data.column_index() {
for rg_idx in col_indexes {
for idx in rg_idx {
assert!(idx.nan_counts().is_some());
let float_idx = match idx {
ColumnIndexMetaData::DOUBLE(idx) => idx,
_ => panic!("expected double statistics"),
};
for i in 0..idx.num_pages() as usize {
assert_eq!(float_idx.nan_count(i), Some(10));
assert_eq!(
f64::NAN.total_cmp(float_idx.min_value(i).unwrap()),
Ordering::Equal
);
assert_eq!(
f64::NAN.total_cmp(float_idx.max_value(i).unwrap()),
Ordering::Equal
);
}
}
}
}
}

#[test]
fn check_page_offset_index_with_mixed_nan() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col",
DataType::Float64,
true,
)]));

let mut out = Vec::with_capacity(1024);
let props = WriterProperties::builder()
.set_data_page_row_count_limit(10)
.build();
let mut writer = ArrowWriter::try_new(&mut out, schema.clone(), Some(props))
.expect("Unable to write file");

// write a page of all NaN (since batch min and max are NaN, global min/max are NaN)
let values = Arc::new(Float64Array::from(vec![f64::NAN; 10]));
let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap();
writer.write(&batch).unwrap();

// write a page of all -NaN (batch min/max is -NaN, should update global min to -NaN)
let values = Arc::new(Float64Array::from(vec![-f64::NAN; 10]));
let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap();
writer.write(&batch).unwrap();

// write a page of all 0 (non-NaN should override global min/max, now 0/0)
let values = Arc::new(Float64Array::from(vec![0_f64; 10]));
let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap();
writer.write(&batch).unwrap();

// write a mixed page (should now have min -1, max 1)
let values = Arc::new(Float64Array::from(vec![
-1.0,
0.0,
f64::NAN,
-f64::NAN,
1.0,
]));
let batch = RecordBatch::try_new(schema.clone(), vec![values]).unwrap();
writer.write(&batch).unwrap();

let file_meta_data = writer.close().unwrap();

// check the column chunk stats are correct
let col_stats = file_meta_data
.row_group(0)
.column(0)
.statistics()
.expect("missing column chunk statistics");

assert_eq!(col_stats.nan_count_opt(), Some(22));
assert_eq!(col_stats.min_bytes_opt(), Some((-1.0f64).as_bytes()));
assert_eq!(col_stats.max_bytes_opt(), Some(1.0f64.as_bytes()));

assert!(file_meta_data.column_index().is_some());
let col_idx = &file_meta_data.column_index().as_ref().unwrap()[0][0];
assert_eq!(col_idx.num_pages(), 4);

// test each page
let float_idx = match col_idx {
ColumnIndexMetaData::DOUBLE(idx) => idx,
_ => panic!("expected double statistics"),
};

assert_eq!(float_idx.nan_counts, Some(vec![10, 10, 0, 2]));
assert_eq!(
f64::NAN.total_cmp(float_idx.min_value(0).unwrap()),
Ordering::Equal
);
assert_eq!(
f64::NAN.total_cmp(float_idx.max_value(0).unwrap()),
Ordering::Equal
);
assert_eq!(
(-f64::NAN).total_cmp(float_idx.min_value(1).unwrap()),
Ordering::Equal
);
assert_eq!(
(-f64::NAN).total_cmp(float_idx.max_value(1).unwrap()),
Ordering::Equal
);
assert_eq!(float_idx.min_value(2), Some(&0.0));
assert_eq!(float_idx.max_value(2), Some(&0.0));
assert_eq!(float_idx.min_value(3), Some(&-1.0));
assert_eq!(float_idx.max_value(3), Some(&1.0));
}

#[test]
Expand Down
Loading
Loading