Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2759,7 +2759,7 @@ mod tests {
Encoding::BYTE_STREAM_SPLIT,
],
DataType::Float32 | DataType::Float64 => {
vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT]
vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT, Encoding::ALP]
}
_ => vec![Encoding::PLAIN],
};
Expand Down Expand Up @@ -3000,6 +3000,28 @@ mod tests {
required_and_optional::<Float64Array, _>((0..SMALL_SIZE).map(|i| i as f64));
}

// "Decimal Like" floating point values
const DECIMAL_LIKE_VALUES: [f32; 9] = [
1.23,
4.56,
5.56,
f32::NAN,
f32::NEG_INFINITY,
f32::INFINITY,
f32::INFINITY,
7.89,
0.12,
];
#[test]
fn f32_decimal_like_single_column() {
required_and_optional::<Float32Array, _>(DECIMAL_LIKE_VALUES.iter().copied());
}

#[test]
fn f64_decimal_like_single_column() {
required_and_optional::<Float64Array, _>(DECIMAL_LIKE_VALUES.iter().map(|f| *f as f64));
}

// The timestamp array types don't implement From<Vec<T>> because they need the timezone
// argument, and they also doesn't support building from a Vec<Option<T>>, so call
// one_column_roundtrip manually instead of calling required_and_optional for these tests.
Expand Down
1 change: 1 addition & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ pub(crate) mod private {
+ Send
+ HeapSize
+ crate::encodings::decoding::private::GetDecoder
+ crate::encodings::encoding::private::GetEncoder
+ crate::file::statistics::private::MakeStatistics
{
const PHYSICAL_TYPE: Type;
Expand Down
68 changes: 60 additions & 8 deletions parquet/src/encodings/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ pub(crate) struct AlpHeader {
}

impl AlpHeader {
/// Size of the header. in bytes
pub(crate) const SERIALIZED_SIZE: usize = 7;

pub(crate) fn num_elements_usize(&self) -> usize {
self.num_elements as usize
}
Expand Down Expand Up @@ -81,6 +84,16 @@ impl AlpHeader {
0
}
}

/// Encodes the header fields into the provided byte slice.
pub(crate) fn serialize(&self, dst: &mut [u8]) {
dst[0..3].copy_from_slice(&[
ALP_COMPRESSION_MODE,
ALP_INTEGER_ENCODING_FOR_BIT_PACK,
self.log_vector_size,
]);
dst[3..7].copy_from_slice(&self.num_elements.to_le_bytes());
}
}

/// Per-vector ALP metadata (4 bytes), equivalent to C++ `AlpEncodedVectorInfo`.
Expand All @@ -102,7 +115,22 @@ pub(crate) struct AlpInfo {
}

impl AlpInfo {
pub(crate) const STORED_SIZE: usize = 4;
pub(crate) const SERIALIZED_SIZE: usize = 4;

pub(crate) fn new(exponent: u8, factor: u8, num_exceptions: u16) -> Self {
Self {
exponent,
factor,
num_exceptions,
}
}

/// Encodes the header fields into the provided byte slice.
pub(crate) fn serialize(&self, dst: &mut [u8]) {
dst[0] = self.exponent;
dst[1] = self.factor;
dst[2..4].copy_from_slice(&self.num_exceptions.to_le_bytes());
}
}

/// Per-vector FOR metadata for exact integer type (`u32` for `f32`, `u64` for `f64`).
Expand All @@ -119,14 +147,27 @@ impl AlpInfo {
/// +----+----+----+----+-----------+
/// ```
#[derive(Debug, Clone, Copy)]
pub(crate) struct ForInfo<Exact: AlpExact> {
pub(crate) frame_of_reference: Exact,
pub(crate) struct ForInfo<T: AlpExact> {
pub(crate) frame_of_reference: T,
pub(crate) bit_width: u8,
}

impl<Exact: AlpExact> ForInfo<Exact> {
pub(crate) fn stored_size() -> usize {
Exact::WIDTH + 1
impl<T: AlpExact> ForInfo<T> {
pub(crate) fn serialized_size() -> usize {
T::WIDTH + 1
}

pub(crate) fn new(frame_of_reference: T, bit_width: u8) -> Self {
Self {
frame_of_reference,
bit_width,
}
}

pub(crate) fn serialize(&self, dst: &mut [u8]) {
let for_len = T::WIDTH;
self.frame_of_reference.write_le_bytes(&mut dst[0..for_len]);
dst[for_len] = self.bit_width;
}

pub(crate) fn get_bit_packed_size(&self, num_elements: u16) -> usize {
Expand All @@ -137,7 +178,7 @@ impl<Exact: AlpExact> ForInfo<Exact> {
let bit_packed_size = self.get_bit_packed_size(num_elements);
bit_packed_size
+ num_exceptions as usize * std::mem::size_of::<u16>()
+ num_exceptions as usize * Exact::WIDTH
+ num_exceptions as usize * T::WIDTH
}
}

Expand All @@ -151,10 +192,13 @@ impl<Exact: AlpExact> ForInfo<Exact> {
/// - FOR stores non-negative deltas optimized for bitpacking.
/// - Unsigned arithmetic avoids signed-overflow edge cases in FOR stage.
/// - Signed interpretation is applied later during decimal reconstruction.
pub(crate) trait AlpExact: Copy + std::fmt::Debug + FromBitpacked {
pub(crate) trait AlpExact: Default + Copy + std::fmt::Debug + FromBitpacked {
const WIDTH: usize;
type Signed: Copy;
fn from_le_slice(slice: &[u8]) -> Self;
/// Write `Self` as little-endian bytes into `dst`.
/// `dst` must be at least `Self::WIDTH` bytes long.
fn write_le_bytes(self, dst: &mut [u8]);
fn zero() -> Self;
fn wrapping_add(self, rhs: Self) -> Self;
fn reinterpret_as_signed(self) -> Self::Signed;
Expand All @@ -168,6 +212,10 @@ impl AlpExact for u32 {
u32::from_le_bytes([slice[0], slice[1], slice[2], slice[3]])
}

fn write_le_bytes(self, dst: &mut [u8]) {
dst.copy_from_slice(&self.to_le_bytes());
}

fn zero() -> Self {
0
}
Expand All @@ -191,6 +239,10 @@ impl AlpExact for u64 {
])
}

fn write_le_bytes(self, dst: &mut [u8]) {
dst.copy_from_slice(&self.to_le_bytes());
}

fn zero() -> Self {
0
}
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/encodings/decoding/alp_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ struct AlpEncodedVectorView<Exact: AlpExact> {

impl<Exact: AlpExact> AlpEncodedVectorView<Exact> {
fn expected_stored_size(&self) -> usize {
AlpInfo::STORED_SIZE
+ ForInfo::<Exact>::stored_size()
AlpInfo::SERIALIZED_SIZE
+ ForInfo::<Exact>::serialized_size()
+ self
.for_info
.get_data_stored_size(self.num_elements, self.alp_info.num_exceptions)
Expand Down Expand Up @@ -239,7 +239,7 @@ fn parse_vector_view<Exact: AlpExact>(
) -> Result<AlpEncodedVectorView<Exact>> {
let vector_bytes = &body[vector_start..vector_end];

let metadata_size = AlpInfo::STORED_SIZE + ForInfo::<Exact>::stored_size();
let metadata_size = AlpInfo::SERIALIZED_SIZE + ForInfo::<Exact>::serialized_size();
if vector_bytes.len() < metadata_size {
return Err(general_err!(
"Invalid ALP page: vector metadata too short, expected at least {} bytes, got {}",
Expand Down Expand Up @@ -284,7 +284,7 @@ fn parse_vector_view<Exact: AlpExact>(
));
}

let for_start = AlpInfo::STORED_SIZE;
let for_start = AlpInfo::SERIALIZED_SIZE;
let for_end = for_start + Exact::WIDTH;
let frame_of_reference = Exact::from_le_slice(&vector_bytes[for_start..for_end]);
let bit_width = vector_bytes[for_end];
Expand Down
Loading
Loading