diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 641b81257a91..b1e748ac74c7 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -2759,7 +2759,7 @@ mod tests { Encoding::BYTE_STREAM_SPLIT, ], DataType::Float32 | DataType::Float64 => { - vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT] + vec![Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT, Encoding::ALP] } _ => vec![Encoding::PLAIN], }; @@ -3000,6 +3000,28 @@ mod tests { required_and_optional::((0..SMALL_SIZE).map(|i| i as f64)); } + // "Decimal Like" floating point values + const DECIMAL_LIKE_VALUES: [f32; 9] = [ + 1.23, + 4.56, + 5.56, + f32::NAN, + f32::NEG_INFINITY, + f32::INFINITY, + f32::INFINITY, + 7.89, + 0.12, + ]; + #[test] + fn f32_decimal_like_single_column() { + required_and_optional::(DECIMAL_LIKE_VALUES.iter().copied()); + } + + #[test] + fn f64_decimal_like_single_column() { + required_and_optional::(DECIMAL_LIKE_VALUES.iter().map(|f| *f as f64)); + } + // The timestamp array types don't implement From> because they need the timezone // argument, and they also doesn't support building from a Vec>, so call // one_column_roundtrip manually instead of calling required_and_optional for these tests. diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index d8c7b9201389..52e9c9e0d9df 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -702,6 +702,7 @@ pub(crate) mod private { + Send + HeapSize + crate::encodings::decoding::private::GetDecoder + + crate::encodings::encoding::private::GetEncoder + crate::file::statistics::private::MakeStatistics { const PHYSICAL_TYPE: Type; diff --git a/parquet/src/encodings/alp.rs b/parquet/src/encodings/alp.rs index 9d06cdf7b999..15f38a32d393 100644 --- a/parquet/src/encodings/alp.rs +++ b/parquet/src/encodings/alp.rs @@ -53,6 +53,9 @@ pub(crate) struct AlpHeader { } impl AlpHeader { + /// Size of the header. in bytes + pub(crate) const SERIALIZED_SIZE: usize = 7; + pub(crate) fn num_elements_usize(&self) -> usize { self.num_elements as usize } @@ -81,6 +84,16 @@ impl AlpHeader { 0 } } + + /// Encodes the header fields into the provided byte slice. + pub(crate) fn serialize(&self, dst: &mut [u8]) { + dst[0..3].copy_from_slice(&[ + ALP_COMPRESSION_MODE, + ALP_INTEGER_ENCODING_FOR_BIT_PACK, + self.log_vector_size, + ]); + dst[3..7].copy_from_slice(&self.num_elements.to_le_bytes()); + } } /// Per-vector ALP metadata (4 bytes), equivalent to C++ `AlpEncodedVectorInfo`. @@ -102,7 +115,22 @@ pub(crate) struct AlpInfo { } impl AlpInfo { - pub(crate) const STORED_SIZE: usize = 4; + pub(crate) const SERIALIZED_SIZE: usize = 4; + + pub(crate) fn new(exponent: u8, factor: u8, num_exceptions: u16) -> Self { + Self { + exponent, + factor, + num_exceptions, + } + } + + /// Encodes the header fields into the provided byte slice. + pub(crate) fn serialize(&self, dst: &mut [u8]) { + dst[0] = self.exponent; + dst[1] = self.factor; + dst[2..4].copy_from_slice(&self.num_exceptions.to_le_bytes()); + } } /// Per-vector FOR metadata for exact integer type (`u32` for `f32`, `u64` for `f64`). @@ -119,14 +147,27 @@ impl AlpInfo { /// +----+----+----+----+-----------+ /// ``` #[derive(Debug, Clone, Copy)] -pub(crate) struct ForInfo { - pub(crate) frame_of_reference: Exact, +pub(crate) struct ForInfo { + pub(crate) frame_of_reference: T, pub(crate) bit_width: u8, } -impl ForInfo { - pub(crate) fn stored_size() -> usize { - Exact::WIDTH + 1 +impl ForInfo { + pub(crate) fn serialized_size() -> usize { + T::WIDTH + 1 + } + + pub(crate) fn new(frame_of_reference: T, bit_width: u8) -> Self { + Self { + frame_of_reference, + bit_width, + } + } + + pub(crate) fn serialize(&self, dst: &mut [u8]) { + let for_len = T::WIDTH; + self.frame_of_reference.write_le_bytes(&mut dst[0..for_len]); + dst[for_len] = self.bit_width; } pub(crate) fn get_bit_packed_size(&self, num_elements: u16) -> usize { @@ -137,7 +178,7 @@ impl ForInfo { let bit_packed_size = self.get_bit_packed_size(num_elements); bit_packed_size + num_exceptions as usize * std::mem::size_of::() - + num_exceptions as usize * Exact::WIDTH + + num_exceptions as usize * T::WIDTH } } @@ -151,10 +192,13 @@ impl ForInfo { /// - FOR stores non-negative deltas optimized for bitpacking. /// - Unsigned arithmetic avoids signed-overflow edge cases in FOR stage. /// - Signed interpretation is applied later during decimal reconstruction. -pub(crate) trait AlpExact: Copy + std::fmt::Debug + FromBitpacked { +pub(crate) trait AlpExact: Default + Copy + std::fmt::Debug + FromBitpacked { const WIDTH: usize; type Signed: Copy; fn from_le_slice(slice: &[u8]) -> Self; + /// Write `Self` as little-endian bytes into `dst`. + /// `dst` must be at least `Self::WIDTH` bytes long. + fn write_le_bytes(self, dst: &mut [u8]); fn zero() -> Self; fn wrapping_add(self, rhs: Self) -> Self; fn reinterpret_as_signed(self) -> Self::Signed; @@ -168,6 +212,10 @@ impl AlpExact for u32 { u32::from_le_bytes([slice[0], slice[1], slice[2], slice[3]]) } + fn write_le_bytes(self, dst: &mut [u8]) { + dst.copy_from_slice(&self.to_le_bytes()); + } + fn zero() -> Self { 0 } @@ -191,6 +239,10 @@ impl AlpExact for u64 { ]) } + fn write_le_bytes(self, dst: &mut [u8]) { + dst.copy_from_slice(&self.to_le_bytes()); + } + fn zero() -> Self { 0 } diff --git a/parquet/src/encodings/decoding/alp_decoder.rs b/parquet/src/encodings/decoding/alp_decoder.rs index b2c91bfe939d..a20d0d89e3ad 100644 --- a/parquet/src/encodings/decoding/alp_decoder.rs +++ b/parquet/src/encodings/decoding/alp_decoder.rs @@ -47,8 +47,8 @@ struct AlpEncodedVectorView { impl AlpEncodedVectorView { fn expected_stored_size(&self) -> usize { - AlpInfo::STORED_SIZE - + ForInfo::::stored_size() + AlpInfo::SERIALIZED_SIZE + + ForInfo::::serialized_size() + self .for_info .get_data_stored_size(self.num_elements, self.alp_info.num_exceptions) @@ -239,7 +239,7 @@ fn parse_vector_view( ) -> Result> { let vector_bytes = &body[vector_start..vector_end]; - let metadata_size = AlpInfo::STORED_SIZE + ForInfo::::stored_size(); + let metadata_size = AlpInfo::SERIALIZED_SIZE + ForInfo::::serialized_size(); if vector_bytes.len() < metadata_size { return Err(general_err!( "Invalid ALP page: vector metadata too short, expected at least {} bytes, got {}", @@ -284,7 +284,7 @@ fn parse_vector_view( )); } - let for_start = AlpInfo::STORED_SIZE; + let for_start = AlpInfo::SERIALIZED_SIZE; let for_end = for_start + Exact::WIDTH; let frame_of_reference = Exact::from_le_slice(&vector_bytes[for_start..for_end]); let bit_width = vector_bytes[for_end]; diff --git a/parquet/src/encodings/encoding/alp_encoder/mod.rs b/parquet/src/encodings/encoding/alp_encoder/mod.rs new file mode 100644 index 000000000000..50f78c83873c --- /dev/null +++ b/parquet/src/encodings/encoding/alp_encoder/mod.rs @@ -0,0 +1,345 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! ALP (Adaptive Lossless floating-Point) encoder. +//! +//! Based on the draft Parquet spec: + +mod vector; + +use crate::basic::Encoding; +use crate::data_type::DataType; +use crate::encodings::alp::{AlpFloat, AlpHeader}; +use crate::errors::{ParquetError, Result}; +use bytes::Bytes; +use std::fmt::Formatter; + +use super::Encoder; +use vector::{InProgressVector, VectorFinishResult, VectorPutResult, EncodingParams}; + +/// Vector size in bits +const ALP_LOG_VECTOR_SIZE: u8 = 10; +const ALP_VECTOR_SIZE: usize = 1 << ALP_LOG_VECTOR_SIZE; +const ALP_COMPRESSION_MODE: u8 = 0; +const ALP_INTEGER_ENCODING_FOR_BIT_PACK: u8 = 0; + +/// ALP encoder for `f32` / `f64` columns. +/// +pub(crate) struct AlpEncoder +where + T::T: AlpFloat, +{ + /// In progress buffer. + /// + /// The final ALP page is incrementally constructed in-place in this buffer. + /// + /// Page format: + /// + /// ```text + /// +-------------+-----------------------------+--------------------------------------+ + /// | Header | Offset Array | Vector Data | + /// | (7 bytes) | (num_vectors * 4 bytes) | (variable) | + /// +-------------+------+------+-----+---------+----------+----------+-----+----------+ + /// | Page Header | off0 | off1 | ... | off N-1 | Vector 0 | Vector 1 | ... | Vec N-1 | + /// | (7 bytes) | (4B) | (4B) | | (4B) |(variable)|(variable)| |(variable)| + /// +-------------+------+------+-----+---------+----------+----------+-----+----------+ + /// ``` + /// + buffer: Vec, + /// Currently in progress vector + vector_state: VectorState, + /// Total number of values encoded, NOT including the currently in progress vector + count: usize, +} + +/// State machine that tracks the currently in progress vector being built +#[derive(Default, Debug)] +enum VectorState { + /// Default value, temporarily left in place during state transition + #[default] + Placeholder, + /// No vector in progress + None(Scratch), + /// Vector started, + InProgress(InProgressVector), +} + +/// Buffers to reuse for next vector +struct Scratch { + /// Encoding parameters (maybe not known until we see a sample of the data) + encoding_params: Option, + exception_positions: Vec, + exception_values: Vec, +} + +impl std::fmt::Debug for Scratch { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Scratch") + .field( + "exception_positions", + &format!( + "Vec with capacity {}", + self.exception_positions.capacity() + ), + ) + .field( + "exception_values", + &format!("Vec with capacity {}", self.exception_values.capacity()), + ) + .finish() + } +} + +impl AlpEncoder +where + T::T: AlpFloat, +{ + /// Create a new encoder with + pub(crate) fn new() -> Self { + let expected_num_values = 1024; + // TODO allocate with expected size + // Leave space for an header + let mut buffer = vec![0; AlpHeader::SERIALIZED_SIZE]; + + let scratch = Scratch { + encoding_params: None, + exception_positions: Vec::with_capacity(expected_num_values), + exception_values: Vec::with_capacity(expected_num_values), + }; + + Self { + buffer, + vector_state: VectorState::None(scratch), + count: 0, + } + } +} + +impl Encoder for AlpEncoder +where + T::T: AlpFloat, +{ + /// Write as many values from `values` as possible into the underlying vector. + /// Will + fn put(&mut self, mut values: &[T::T]) -> Result<()> { + let mut done = false; + loop { + // leave Default value in self.vector_state temporarily + let current_state = std::mem::take(&mut self.vector_state); + self.vector_state = match current_state { + VectorState::Placeholder => { + return Err(general_err!( + "Internal Error: ALP encoder called after error" + )); + } + // begin a new vector + VectorState::None(scratch) => { + let in_progress = InProgressVector::new(&mut self.buffer, scratch); + // Will encode on the next loop through + VectorState::InProgress(in_progress) + } + VectorState::InProgress(mut in_progress) => { + match in_progress.put(&mut self.buffer, values)? { + // Consumed enough to complete the vector + VectorPutResult::Finished { + encoded_len, + finish_result, + } => { + let VectorFinishResult { + vector_len: vector_size, + scratch, + } = finish_result; + self.count += vector_size; + values = &values[encoded_len..]; + VectorState::None(scratch) + } + // Consumed all input + VectorPutResult::StillInProgress(in_progress) => { + done = true; + VectorState::InProgress(in_progress) + } + } + } + }; + if done { + return Ok(()); + } + } + } + + fn encoding(&self) -> Encoding { + Encoding::ALP + } + + fn estimated_data_encoded_size(&self) -> usize { + // TODO add estimated data size of in progress vector + self.buffer.len() + } + + fn estimated_memory_size(&self) -> usize { + // TODO add data size of in progess vector + self.buffer.capacity() + } + + fn flush_buffer(&mut self) -> Result { + // finish up the last vector if needed + let current_state = std::mem::take(&mut self.vector_state); + self.vector_state = if let VectorState::InProgress(in_progress) = current_state { + let VectorFinishResult { + vector_len, + scratch, + } = in_progress.finish(&mut self.buffer)?; + self.count += vector_len; + VectorState::None(scratch) + } else { + current_state + }; + + // update page header in place now that we know the final value count + let value_count: i32 = self.count.try_into().map_err(|_| { + general_err!("ALP can encode at most i32::MAX values, got {}", self.count) + })?; + + let header = AlpHeader { + compression_mode: ALP_COMPRESSION_MODE, + integer_encoding: ALP_INTEGER_ENCODING_FOR_BIT_PACK, + //log_vector_size: ALP_LOG_VECTOR_SIZE, + log_vector_size: 0, // TODO support something more + num_elements: value_count, + }; + + header.serialize(&mut self.buffer); + + // reset internal fields for next time + self.count = 0; + let mut buffer = Vec::with_capacity(self.buffer.capacity()); + std::mem::swap(&mut buffer, &mut self.buffer); + + Ok(Bytes::from(buffer)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::basic::Type as PhysicalType; + use crate::data_type::{DoubleType, FloatType}; + use crate::encodings::decoding::get_decoder; + use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType}; + use std::sync::Arc; + + fn col_desc(t: PhysicalType) -> ColumnDescPtr { + let ty = SchemaType::primitive_type_builder("c", t) + .with_length(0) + .build() + .unwrap(); + Arc::new(ColumnDescriptor::new( + Arc::new(ty), + 0, + 0, + ColumnPath::new(vec![]), + )) + } + + /// Compare floats by bit pattern so NaN, ±0.0, and ±Inf are distinguished. + trait BitsAsU64 { + fn bits(&self) -> u64; + } + impl BitsAsU64 for f32 { + fn bits(&self) -> u64 { + self.to_bits() as u64 + } + } + impl BitsAsU64 for f64 { + fn bits(&self) -> u64 { + self.to_bits() + } + } + + fn check_roundtrip(values: &[T::T]) + where + T::T: AlpFloat + BitsAsU64 + std::fmt::Debug, + { + let descr = col_desc(T::get_physical_type()); + let mut encoder = AlpEncoder::::new(); + encoder.put(values).unwrap(); + let bytes = encoder.flush_buffer().unwrap(); + + let mut decoder = get_decoder::(descr, Encoding::ALP).unwrap(); + decoder.set_data(bytes, values.len()).unwrap(); + let mut out = vec![T::T::default(); values.len()]; + let read = decoder.get(&mut out).unwrap(); + assert_eq!(read, values.len()); + + for (i, (got, want)) in out.iter().zip(values).enumerate() { + assert_eq!( + got.bits(), + want.bits(), + "bit mismatch at index {i}: got={got:?}, want={want:?}" + ); + } + } + + #[test] + fn alp_encoder_reports_alp_encoding() { + let encoder = AlpEncoder::::new(); + assert_eq!(encoder.encoding(), Encoding::ALP); + let encoder = AlpEncoder::::new(); + assert_eq!(encoder.encoding(), Encoding::ALP); + } + + #[test] + fn alp_encoder_roundtrip_f32() { + check_roundtrip::(&[ + 1.23, + 4.56, + 7.89, + 0.12, + f32::NAN, + -0.0, + f32::INFINITY, + f32::NEG_INFINITY, + ]); + } + + #[test] + fn alp_encoder_roundtrip_f64() { + check_roundtrip::(&[ + 1.23, + 4.56, + 7.89, + 0.12, + f64::NAN, + -0.0, + f64::INFINITY, + f64::NEG_INFINITY, + ]); + } + + #[test] + fn alp_encoder_roundtrip_multi_vector() { + // 1024 + 58: one full vector plus a partial trailing vector. + let values: Vec = (0..1082).map(|i| i as f32 * 0.1).collect(); + check_roundtrip::(&values); + } + + #[test] + fn alp_encoder_roundtrip_empty() { + check_roundtrip::(&[]); + check_roundtrip::(&[]); + } +} diff --git a/parquet/src/encodings/encoding/alp_encoder/vector.rs b/parquet/src/encodings/encoding/alp_encoder/vector.rs new file mode 100644 index 000000000000..c3dd545b80b4 --- /dev/null +++ b/parquet/src/encodings/encoding/alp_encoder/vector.rs @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::{ParquetError, Result}; +use crate::encodings::alp::{AlpFloat, AlpInfo, ForInfo}; +use crate::encodings::encoding::alp_encoder::{ALP_VECTOR_SIZE, Scratch}; +use std::mem; + +pub(super) enum VectorPutResult { + /// All values were encoded, and the vector is still in progress (not full) + StillInProgress(InProgressVector), + /// After `encoded` values from `values` Vector were written, the Vector was + /// full and fully written to the buffer. + Finished { + /// Number of values from the input that were encoded + encoded_len: usize, + /// Vector information + finish_result: VectorFinishResult, + }, +} + +/// result of finishing a Vector +#[derive(Debug)] +pub(super) struct VectorFinishResult { + /// Total number of values encoded in the output vector (can be fewer than + /// the target vector size for the last vector) + pub(super) vector_len: usize, + /// Returned scratch buffers/encoding parameters + pub(super) scratch: Scratch, +} + + +/// Accumulates data for the Vector currently being encoded, buffering if necessary. +/// +/// ```text +/// +-------------------+-----------------+-------------------+---------------------+-------------------+ +/// | AlpInfo | ForInfo | PackedValues | ExceptionPositions | ExceptionValues | +/// | (4 bytes) | (5B or 9B) | (variable) | (variable) | (variable) | +/// +-------------------+-----------------+-------------------+---------------------+-------------------+ +/// ``` +#[derive(Debug)] +pub(super) struct InProgressVector { + /// Start position of the vector in the page (points to AlpInfo) + start_pos: usize, + /// Number of values in the vector so far + vector_len: usize, + /// Encoding parameters (maybe not known until we see a sample of the data) + encoding_params: Option, + /// positions of values in exception_values in original vector. u16 so + /// we can copy them directly to the output. + exception_positions: Vec, + /// TODO can avoid this copy when we have an entire vector. + /// values that could not be encoded + /// or are being accumulated before the page is done + /// Since we don't know how many exceptions there will be we buffer them here until the end of the vector, then write them all at once to the output buffer. + exception_values: Vec, +} + +impl InProgressVector { + /// Creates a new in progress vector, writing space for the eventual header + /// to buffer + /// + /// Uses buffers from scratch and optional pre-known encoding paramaters + pub(super) fn new(buffer: &mut Vec, + scratch: Scratch) -> InProgressVector { + let Scratch { + encoding_params, + mut exception_positions, + mut exception_values, + } = scratch; + exception_positions.clear(); + exception_values.clear(); + + let start_pos = buffer.len(); + // reserve space for the header. ForInfo is `frame_of_reference` + // (4 bytes for f32 / 8 bytes for f64 — same as `T::get_type_size()`) + // plus a 1-byte `bit_width`. + let header_len = AlpInfo::SERIALIZED_SIZE + ForInfo::::serialized_size(); + buffer.resize(buffer.len() + header_len, 0); + + InProgressVector { + start_pos, + vector_len: 0, + encoding_params, + exception_positions, + exception_values, + } + } + + /// Encode as many values as possible, writing directly to `dst` when possible + /// + /// Returns [`VectorPutResult`] which distinguishes between the vector being + /// complete or still have space. + pub(super) fn put( + mut self, + dst: &mut Vec, + values: &[F], + ) -> Result> { + + + // Phase 1: Determine encoding parameters from first batch if needed + let encoding_params = self.encoding_params + .take() + // TODO: handle case when a small first batch is pushed (maybe buffer) + .unwrap_or_else(|| EncodingParams::from_sample(values)); + + // If we can encode an entire vector do so + let space_left = ALP_VECTOR_SIZE - self.vector_len; + let num_to_encode = values.len().min(ALP_VECTOR_SIZE).min(space_left); + + // TODO: actually encode that many values (TODO) + // for now just treat them all as exceptions + self.exception_values.extend(&values[0..num_to_encode]); + + // TODO check overflow + let vector_len = self.vector_len as u16; + self.exception_positions.extend(vector_len..vector_len + num_to_encode as u16); + + // Update counters + self.vector_len += num_to_encode; + self.encoding_params = Some(encoding_params); + if self.vector_len < ALP_VECTOR_SIZE { + Ok(VectorPutResult::StillInProgress(self)) + } else { + Ok(VectorPutResult::Finished { + encoded_len: num_to_encode, + finish_result: self.finish(dst)? + }) + } + } + + /// Finalize this vector and write the remaining values to the `dst` buffer + pub(super) fn finish( + self, + dst: &mut Vec, + ) -> Result> { + let Self{ start_pos, vector_len, encoding_params, exception_positions, exception_values } = self; + + // If we had no encoding parameters, no values were written (zero length vector) + let Some(encoding_params) = encoding_params else { + return Err(general_err!("Internal error: ALP Vector finished with no values written")); + }; + + // Output is like this (starting at start_pos) (see diagram on InProgressVector) + // AlpInfo + // ForInfo + // PackedValues (already written) + // ExceptionPositions (write from exception_positions) + // ExceptionValues (write from exception_values) + + let num_exceptions = exception_values.len(); + let num_exceptions: u16 = num_exceptions.try_into() + .map_err(|_| general_err!("More than u16::MAX exceptions in ALP Vector: {num_exceptions}"))?; + + // TODO move ALPInfo and FOR creation into the encoding params + let alp_info = AlpInfo::new(encoding_params.exponent, encoding_params.factor, num_exceptions); + alp_info.serialize(&mut dst[start_pos..]); + let frame_of_reference = Default::default(); + let bit_width = 0; // TODO actually compute bitwidth (from encoding params) + let for_info = ForInfo::::new(frame_of_reference, bit_width); + for_info.serialize(&mut dst[start_pos + AlpInfo::SERIALIZED_SIZE..]); + + // ExceptionPositions (all uint16) + // TODO make this faster + dst.extend(exception_positions.iter().flat_map(|pos| pos.to_le_bytes())); + // ExceptionValues (all T) + //dst.extend(exception_values.iter().flat_map(|val| val.to_le_bytes())); + // temp just write zeros (the traits are getting messy) + let num_bytes = exception_values.len() * mem::size_of::(); + dst.extend(std::iter::repeat(0).take(num_bytes)); + + Ok(VectorFinishResult { + vector_len, + scratch: Scratch { + encoding_params: Some(encoding_params), + exception_positions, + exception_values, + }, + }) + } +} + + +/// Encoding Parameters +#[derive(Debug)] +pub(super) struct EncodingParams { + exponent: u8, + factor: u8, +} + +impl EncodingParams { + /// Create encoding parameters from a sample of the data. + /// + /// Algorithm: TODO + fn from_sample(values: &[F]) -> EncodingParams { + // TEMP hard code + let exponent = 0; + let factor = 5; + EncodingParams { + exponent, + factor, + } + } +} \ No newline at end of file diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index eeabcf4ba5ce..c695eb7acc92 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -27,13 +27,72 @@ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::{BitWriter, num_required_bits}; +use alp_encoder::AlpEncoder; use byte_stream_split_encoder::{ByteStreamSplitEncoder, VariableWidthByteStreamSplitEncoder}; use bytes::Bytes; pub use dict_encoder::DictEncoder; +mod alp_encoder; mod byte_stream_split_encoder; mod dict_encoder; +pub(crate) mod private { + use super::*; + use crate::data_type::{ByteArray, FixedLenByteArray, Int96}; + + /// A trait that allows getting an [`Encoder`] implementation for a + /// [`DataType`] with the corresponding [`ParquetValueType`], similarly to + /// [`crate::encodings::decoding::private::GetDecoder`]. + /// + /// This is necessary to support [`Encoder`] implementations that may not be + /// applicable for all [`DataType`] and by extension all + /// [`ParquetValueType`] — e.g. ALP, which only applies to `f32`/`f64`. + pub(crate) trait GetEncoder { + /// Construct an ALP encoder for `T`. Default implementation rejects the + /// encoding; `f32` and `f64` override to return a real encoder. + fn get_encoder>( + encoding: Encoding, + descr: &ColumnDescPtr, + ) -> Result>> + where + Self: Sized, + { + get_encoder_default(encoding, descr) + } + } + + impl GetEncoder for bool {} + impl GetEncoder for i32 {} + impl GetEncoder for i64 {} + impl GetEncoder for Int96 {} + impl GetEncoder for ByteArray {} + impl GetEncoder for FixedLenByteArray {} + + impl GetEncoder for f32 { + fn get_encoder>( + encoding: Encoding, + descr: &ColumnDescPtr, + ) -> Result>> { + match encoding { + Encoding::ALP => Ok(Box::new(AlpEncoder::::new())), + _ => get_encoder_default(encoding, descr), + } + } + } + + impl GetEncoder for f64 { + fn get_encoder>( + encoding: Encoding, + descr: &ColumnDescPtr, + ) -> Result>> { + match encoding { + Encoding::ALP => Ok(Box::new(AlpEncoder::::new())), + _ => get_encoder_default(encoding, descr), + } + } + } +} + // ---------------------------------------------------------------------- // Encoders @@ -83,6 +142,14 @@ pub trait Encoder: Send { pub fn get_encoder( encoding: Encoding, descr: &ColumnDescPtr, +) -> Result>> { + use self::private::GetEncoder; + T::T::get_encoder(encoding, descr) +} + +fn get_encoder_default( + encoding: Encoding, + descr: &ColumnDescPtr, ) -> Result>> { let encoder: Box> = match encoding { Encoding::PLAIN => Box::new(PlainEncoder::new()), @@ -101,7 +168,9 @@ pub fn get_encoder( )), _ => Box::new(ByteStreamSplitEncoder::new()), }, - e => return Err(nyi_err!("Encoding {} is not supported", e)), + Encoding::ALP => return Err(general_err!("ALP Encoding only supported for FLOAT and DOUBLE")), + #[expect(deprecated)] + e @ Encoding::BIT_PACKED => return Err(nyi_err!("Encoding {} is not supported", e)), }; Ok(encoder) }