diff --git a/crates/iceberg/src/formats/mod.rs b/crates/iceberg/src/formats/mod.rs new file mode 100644 index 0000000000..e223b3e7e4 --- /dev/null +++ b/crates/iceberg/src/formats/mod.rs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! File Format API for Apache Iceberg. +//! +//! This module provides the trait-based abstraction for reading and writing +//! Iceberg data files across multiple file formats and in-memory representations. + +mod traits; +mod registry; +mod parquet; + +pub use traits::*; +pub use registry::*; diff --git a/crates/iceberg/src/formats/parquet.rs b/crates/iceberg/src/formats/parquet.rs new file mode 100644 index 0000000000..6d69c2e1f8 --- /dev/null +++ b/crates/iceberg/src/formats/parquet.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Parquet format implementation wrapping the existing `ArrowReader` and `ParquetWriter`. +//! +//! `ParquetArrowModel` implements both [`FormatReader`] and [`FormatWriter`] for +//! Parquet files producing/consuming Arrow `RecordBatch`. This is the shipped +//! default format-batch pairing registered in [`FormatRegistry::default()`]. +//! +//! This implementation wraps the existing reader and writer code rather than +//! replacing it. The existing types (`ArrowReader`, `ParquetWriter`) remain +//! unchanged — `ParquetArrowModel` delegates to them internally. + +#![allow(dead_code)] + +use futures::future::BoxFuture; + +use super::traits::{ + DataBatch, FormatFileWriter, FormatReader, FormatWriter, ReadOptions, ReadResult, WriteOptions, + WriterResult, +}; +use crate::io::{InputFile, OutputFile}; +use crate::spec::DataFileFormat; +use crate::Result; + +/// Parquet format implementation producing Arrow `RecordBatch`. +/// +/// Registered in the `FormatRegistry` as the reader and writer for +/// `(DataFileFormat::Parquet, TypeId::of::())`. +/// +/// # Example registration +/// +/// ```rust,ignore +/// use std::sync::Arc; +/// use arrow_array::RecordBatch; +/// use iceberg::formats::{FormatRegistry, ParquetArrowModel}; +/// +/// let mut registry = FormatRegistry::new(); +/// let model = Arc::new(ParquetArrowModel); +/// registry.register_reader::(model.clone()); +/// registry.register_writer::(model); +/// ``` +#[derive(Debug, Clone)] +pub struct ParquetArrowModel; + +impl FormatReader for ParquetArrowModel { + fn format(&self) -> DataFileFormat { + DataFileFormat::Parquet + } + + fn read( + &self, + _input: InputFile, + _options: ReadOptions, + ) -> BoxFuture<'static, Result> { + // Production: delegates to the existing ArrowReader pipeline. + // + // 1. Opens the Parquet file via ArrowFileReader + // 2. Applies projection mask from options.schema + // 3. Evaluates row group statistics against options.predicate + // 4. Returns ReadResult with: + // - stream: the RecordBatch stream from ParquetRecordBatchStream + // - residual_predicate: the predicate (or portion of it) that + // row-group statistics could not fully resolve + todo!("Wrap ArrowReader") + } +} + +impl FormatWriter for ParquetArrowModel { + fn format(&self) -> DataFileFormat { + DataFileFormat::Parquet + } + + fn write( + &self, + _output: OutputFile, + _options: WriteOptions, + ) -> BoxFuture<'static, Result>> { + // Production: delegates to the existing ParquetWriter. + // + // 1. Converts options.schema to Arrow schema + // 2. Builds WriterProperties from options.properties + // 3. Creates AsyncArrowWriter via the existing ParquetWriter path + // 4. Returns a ParquetFormatFileWriter that wraps the writer + todo!("Wrap ParquetWriter") + } +} + +/// The per-file writer returned by `ParquetArrowModel::write`. +/// +/// Wraps the existing `ParquetWriter` and delegates `write_batch` and +/// `close` to it. On `close`, extracts Parquet metadata and builds +/// `DataFileBuilder` with column sizes, value counts, null counts, +/// min/max bounds, and split offsets. +pub struct ParquetFormatFileWriter { + // Production: holds an AsyncArrowWriter or the existing ParquetWriter. + _private: (), +} + +impl FormatFileWriter for ParquetFormatFileWriter { + fn write_batch(&mut self, _batch: &dyn DataBatch) -> BoxFuture<'_, Result<()>> { + // Production: + // 1. Downcast batch to &RecordBatch via DataBatch::as_any() + // 2. Write to the underlying AsyncArrowWriter + todo!("Downcast and write") + } + + fn close(self: Box) -> BoxFuture<'static, Result> { + // Production: + // 1. Close the AsyncArrowWriter, get Parquet FileMetaData + // 2. Extract metrics via parquet_to_data_file_builder() + // 3. Return WriterResult { data_files: vec![builder] } + todo!("Close and extract metrics") + } +} diff --git a/crates/iceberg/src/formats/registry.rs b/crates/iceberg/src/formats/registry.rs new file mode 100644 index 0000000000..bebb174c1a --- /dev/null +++ b/crates/iceberg/src/formats/registry.rs @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Format registry for runtime format resolution. + +use std::any::TypeId; +use std::collections::HashMap; +use std::sync::{Arc, OnceLock}; + +use crate::spec::DataFileFormat; +use crate::{Error, ErrorKind, Result}; + +use super::{DataBatch, FormatReader, FormatWriter}; + +/// Maps `(DataFileFormat, TypeId)` pairs to format reader/writer implementations. +/// +/// The two-dimensional key matches the Java `FormatModelRegistry` pattern where +/// the key is `(FileFormat, Class)`. The first dimension is the file format +/// (determined at runtime from manifest metadata). The second dimension is the +/// batch type the caller wants to produce or consume (determined at compile time +/// by the engine integration). +/// +/// This design supports the pattern where multiple engines register their own +/// optimized readers for the same format. Spark registers a Parquet reader that +/// produces `ColumnarBatch`. Flink registers one that produces `RowData`. In +/// Rust, DataFusion registers a reader producing `RecordBatch`. A future Comet +/// integration could register one producing its own batch type. +/// +/// Callers specify the batch type they expect via the generic parameter on +/// `reader::(format)` and `writer::(format)`. They never name the +/// concrete reader/writer implementation. +pub struct FormatRegistry { + readers: HashMap<(DataFileFormat, TypeId), Arc>, + writers: HashMap<(DataFileFormat, TypeId), Arc>, +} + +impl FormatRegistry { + /// Create an empty registry. + pub fn new() -> Self { + Self { + readers: HashMap::new(), + writers: HashMap::new(), + } + } + + /// Register a format reader for a specific batch type. + /// + /// Panics if a reader for the same (format, batch type) is already registered. + pub fn register_reader(&mut self, reader: Arc) { + let key = (reader.format(), TypeId::of::()); + if self.readers.contains_key(&key) { + panic!( + "FormatReader already registered for {:?} with batch type {:?}", + key.0, key.1 + ); + } + self.readers.insert(key, reader); + } + + /// Register a format writer for a specific batch type. + /// + /// Panics if a writer for the same (format, batch type) is already registered. + pub fn register_writer(&mut self, writer: Arc) { + let key = (writer.format(), TypeId::of::()); + if self.writers.contains_key(&key) { + panic!( + "FormatWriter already registered for {:?} with batch type {:?}", + key.0, key.1 + ); + } + self.writers.insert(key, writer); + } + + /// Get the reader for a format that produces batch type `B`. + /// + /// The generic parameter `B` is the batch type the caller wants to consume. + /// The registry returns the reader registered for `(format, TypeId::of::())`. + pub fn reader(&self, format: DataFileFormat) -> Result<&dyn FormatReader> { + let key = (format, TypeId::of::()); + self.readers.get(&key).map(|r| r.as_ref()).ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "No reader registered for {format:?} producing batch type {:?}", + TypeId::of::() + ), + ) + }) + } + + /// Get the writer for a format that accepts batch type `B`. + /// + /// The generic parameter `B` is the batch type the caller will feed to the writer. + /// The registry returns the writer registered for `(format, TypeId::of::())`. + pub fn writer(&self, format: DataFileFormat) -> Result<&dyn FormatWriter> { + let key = (format, TypeId::of::()); + self.writers.get(&key).map(|w| w.as_ref()).ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "No writer registered for {format:?} accepting batch type {:?}", + TypeId::of::() + ), + ) + }) + } +} + +impl Default for FormatRegistry { + fn default() -> Self { + // Format registrations will be added here as implementations land. + // #[cfg(feature = "format-parquet")] + // { + // let mut registry = Self::new(); + // let model = Arc::new(ParquetArrowModel::new()); + // registry.register_reader::(model.clone()); + // registry.register_writer::(model); + // return registry; + // } + Self::new() + } +} + +/// Process-wide default registry, initialized on first access. +pub fn default_format_registry() -> &'static FormatRegistry { + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY.get_or_init(FormatRegistry::default) +} diff --git a/crates/iceberg/src/formats/traits.rs b/crates/iceberg/src/formats/traits.rs new file mode 100644 index 0000000000..b67ac49366 --- /dev/null +++ b/crates/iceberg/src/formats/traits.rs @@ -0,0 +1,479 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Core trait definitions for the File Format API. + +use std::any::Any; +use std::collections::HashMap; + +use futures::future::BoxFuture; +use futures::stream::BoxStream; + +use crate::expr::BoundPredicate; +use crate::io::{InputFile, OutputFile}; +use crate::spec::{DataFileBuilder, DataFileFormat, Datum, Schema}; +use crate::Result; + +// --------------------------------------------------------------------------- +// DataBatch +// --------------------------------------------------------------------------- + +/// The contract for any in-memory data representation that the Iceberg kernel +/// can process. +/// +/// Implementing this trait is the cost of entry for bringing a new in-memory +/// format. Implementations have full freedom to fuse operations internally +/// for performance. Correctness is validated by the TCK (Layer 1). +/// +/// The `iceberg` crate ships `impl DataBatch for RecordBatch` as the default. +pub trait DataBatch: Send + 'static { + /// Number of rows in this batch. + fn num_rows(&self) -> usize; + + /// The Iceberg field IDs present in this batch, in column order. + fn field_ids(&self) -> &[i32]; + + /// Project to a subset of columns by Iceberg field ID. + fn project(&self, field_ids: &[i32]) -> Result + where + Self: Sized; + + /// Evaluate a filter predicate, returning only matching rows. + /// + /// Implementations may use any strategy: vectorized array ops, visitor + /// pattern, fused I/O closures, etc. The kernel does NOT dictate + /// evaluation strategy. + fn filter(&self, predicate: &BoundPredicate) -> Result + where + Self: Sized; + + /// Compute column-level metrics for the given field. + /// + /// Returns `None` if the field is not present in this batch. + fn column_metrics(&self, field_id: i32) -> Result>; + + /// Inject constant values for metadata columns (`_file`, `_pos`, `_partition`). + fn inject_constants(&self, constants: &[(i32, Datum)]) -> Result + where + Self: Sized; + + /// Apply schema evolution: add columns with defaults, widen types, rename. + /// + /// The `source` schema describes what this batch currently contains. + /// The `target` schema describes what the caller needs. + fn evolve_schema(&self, source: &Schema, target: &Schema) -> Result + where + Self: Sized; + + /// Downcast support for runtime type resolution. + fn as_any(&self) -> &dyn Any; +} + +// --------------------------------------------------------------------------- +// Supporting types +// --------------------------------------------------------------------------- + +/// A stream of batches produced by a format reader. +pub type DataStream = BoxStream<'static, Result>>; + +/// The result of a format read operation. +/// +/// Contains the batch stream and communicates what the reader handled +/// from the [`ReadOptions`]. The kernel uses `residual_predicate` to +/// determine whether it needs to apply row-level filtering via +/// [`DataBatch::filter`]. +#[non_exhaustive] +pub struct ReadResult { + /// The stream of batches with requested columns decoded. + pub stream: DataStream, + /// The portion of the predicate the reader could NOT evaluate. + /// + /// `None` means the reader fully handled the predicate — all rows + /// in the stream match. `Some(predicate)` means the kernel must + /// apply residual filtering using [`DataBatch::filter`]. + pub residual_predicate: Option, +} + +/// Column-level metrics produced by a writer or computed from a batch. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ColumnMetrics { + /// Number of values in the column (including nulls). + pub value_count: u64, + /// Number of null values. + pub null_count: u64, + /// Number of NaN values (for float and double columns). + pub nan_count: Option, + /// Lower bound value as a typed Datum. + pub lower_bound: Option, + /// Upper bound value as a typed Datum. + pub upper_bound: Option, + /// Column size in bytes. + pub column_size: Option, +} + +impl ColumnMetrics { + /// Create metrics with only counts populated. + pub fn counts(value_count: u64, null_count: u64) -> Self { + Self { + value_count, + null_count, + nan_count: None, + lower_bound: None, + upper_bound: None, + column_size: None, + } + } +} + +/// The content type of a file being written. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +#[non_exhaustive] +pub enum FileContent { + /// Data file containing table rows. + #[default] + Data, + /// Equality delete file. + EqualityDeletes, + /// Position delete file. + PositionDeletes, +} + +/// Configuration for metrics collection during writes. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct MetricsConfig { + /// Per-column metrics mode overrides keyed by column name. + pub column_modes: HashMap, + /// Default mode for columns without an explicit override. + pub default_mode: MetricsMode, +} + +/// The metrics collection mode for a column. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum MetricsMode { + /// Do not collect any metrics. + None, + /// Collect only value counts and null counts. + Counts, + /// Collect full metrics including bounds. + Full, + /// Collect truncated bounds at the given length. + Truncate(u32), +} + +// --------------------------------------------------------------------------- +// ReadOptions / WriteOptions +// --------------------------------------------------------------------------- + +/// Configuration for a format read operation. +/// +/// Passed to [`FormatReader::read`]. Format implementations apply what they +/// can (best-effort pushdown) and ignore unsupported options. +/// +/// # Engine schema / variant shredding +/// +/// This struct intentionally does NOT carry an `engine_schema` parameter. +/// Java's `ReadBuilder.engineProjection(S)` serves two purposes: engine- +/// requested type widening (e.g., read an `int` column as `long`) and +/// variant shredding layout hints. Both are deferred from Phase 1 because: +/// +/// 1. **Variant shredding** depends on the variant type landing first +/// ([#2188](https://github.com/apache/iceberg-rust/pull/2188)). When it +/// does, shredding can be modeled as a format-agnostic Iceberg-level +/// concept (which paths to shred and what types to extract) rather than +/// an opaque engine-specific blob. +/// +/// 2. **Engine type widening** ("give me `long` even though the schema says +/// `int`") can be expressed as a typed `HashMap` +/// mapping field IDs to desired output types — no type erasure needed. +/// +/// 3. **A type-erased `Box` field would allow hidden coupling.** +/// A caller going through the registry could pass a format-specific +/// schema object that only one format understands, silently breaking +/// when a different format is registered. By deferring this field, we +/// force the eventual design to be format-agnostic and compiler-checked. +/// +/// When these features land, `#[non_exhaustive]` allows adding fields +/// without a breaking change. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct ReadOptions { + /// Iceberg projection schema. + pub schema: Option, + /// Filter predicate for pushdown. + pub predicate: Option, + /// Byte range start for split reading. + pub split_start: Option, + /// Byte range length for split reading. + pub split_length: Option, + /// Case sensitivity for predicate evaluation. Defaults to true. + pub case_sensitive: bool, + /// Number of rows per batch. + pub batch_size: Option, + /// Format-specific configuration. Unknown keys are ignored. + pub properties: HashMap, + /// Constant values for metadata columns not stored in the data file. + pub id_to_constant: HashMap, +} + +/// Configuration for a format write operation. +/// +/// Passed to [`FormatWriter::write`]. Format implementations apply what they +/// can and ignore unsupported options. +/// +/// # Engine schema +/// +/// Like [`ReadOptions`], this struct intentionally omits an `engine_schema` +/// parameter. See the [`ReadOptions`] doc comment for the full rationale: +/// a type-erased field would allow hidden coupling between callers and +/// specific format implementations through the generic registry path. +/// When engine type narrowing is needed, it will be expressed as typed, +/// format-agnostic fields that the compiler can check. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct WriteOptions { + /// Iceberg schema for the file being written. + pub schema: Option, + /// Format-specific configuration. Unknown keys are ignored. + pub properties: HashMap, + /// File-level metadata (for example, delete-type, equality field IDs). + pub metadata: HashMap, + /// Content type: data, equality deletes, or position deletes. + pub content: FileContent, + /// Metrics collection configuration. + pub metrics_config: Option, + /// Allow overwriting an existing file. + pub overwrite: bool, + /// File encryption key. + pub encryption_key: Option>, + /// AAD prefix for encryption. + pub aad_prefix: Option>, +} + +// --------------------------------------------------------------------------- +// FormatReader / FormatWriter +// --------------------------------------------------------------------------- + +/// A format that can read data files. +/// +/// This trait is dyn-compatible, enabling the [`FormatRegistry`](super::FormatRegistry) +/// to store readers without callers knowing the concrete type. +/// Follows the same pattern as [`Storage`](crate::io::Storage) in the IO layer. +/// +/// # Contract +/// +/// The reader MUST: +/// - Decode only the columns in [`ReadOptions::schema`] (projection is an I/O concern). +/// +/// The reader MAY: +/// - Skip chunks that cannot match [`ReadOptions::predicate`] using format-level statistics. +/// - Support byte-range splits via [`ReadOptions::split_start`] / [`ReadOptions::split_length`]. +/// +/// The reader MUST NOT handle: +/// - Residual row-level filtering (kernel responsibility) +/// - Schema evolution (kernel responsibility) +/// - Constant injection (kernel responsibility) +/// - Delete application (kernel responsibility) +/// - Name mapping / field ID resolution (kernel responsibility) +pub trait FormatReader: Send + Sync + 'static { + /// Which file format this reader handles. + fn format(&self) -> DataFileFormat; + + /// Read the given input file, returning a [`ReadResult`] containing the + /// batch stream and the residual predicate the reader could not evaluate. + fn read( + &self, + input: InputFile, + options: ReadOptions, + ) -> BoxFuture<'static, Result>; +} + +/// A format that can write data files. +/// +/// This trait is dyn-compatible, enabling the [`FormatRegistry`](super::FormatRegistry) +/// to store writers without callers knowing the concrete type. +/// +/// # Contract +/// +/// The writer MUST: +/// - Encode batches into the file format. +/// - Collect file-level metrics per [`WriteOptions::metrics_config`]. +/// - Handle format-specific encoding concerns (e.g., variant shredding layout for Parquet). +/// +/// The writer MUST NOT handle: +/// - Partitioning (caller sends pre-partitioned batches) +/// - Sorting (caller sends pre-sorted batches) +/// - Transaction management (kernel handles commit) +pub trait FormatWriter: Send + Sync + 'static { + /// Which file format this writer handles. + fn format(&self) -> DataFileFormat; + + /// Write batches to the given output file. + fn write( + &self, + output: OutputFile, + options: WriteOptions, + ) -> BoxFuture<'static, Result>>; +} + +// --------------------------------------------------------------------------- +// FormatFileWriter +// --------------------------------------------------------------------------- + +/// A writer that accepts batches and produces file metadata on close. +/// +/// Implementations downcast `&dyn DataBatch` to their expected concrete +/// batch type via [`DataBatch::as_any`]. A type mismatch is a runtime error. +pub trait FormatFileWriter: Send { + /// Write a batch of data. + fn write_batch(&mut self, batch: &dyn DataBatch) -> BoxFuture<'_, Result<()>>; + + /// Close the writer and return file metadata for the manifest. + fn close(self: Box) -> BoxFuture<'static, Result>; +} + +/// The result of closing a format writer. +#[non_exhaustive] +pub struct WriterResult { + /// Data file builders representing the files written. + pub data_files: Vec, +} + +// --------------------------------------------------------------------------- +// Future extensions (design examples) +// --------------------------------------------------------------------------- + +/// Variant shredding configuration — format-agnostic. +/// +/// This type is NOT yet wired into [`ReadOptions`] or [`WriteOptions`]. +/// It exists to demonstrate how variant shredding will be expressed when +/// the variant type ([#2188](https://github.com/apache/iceberg-rust/pull/2188)) +/// lands. The key property: this is an Iceberg-level concept, not a +/// format-specific one. Any format that supports shredding interprets +/// the same struct identically. +/// +/// When ready, `ReadOptions` gains: +/// ```rust,ignore +/// pub variant_shredding: Vec, +/// ``` +#[derive(Debug, Clone)] +pub struct VariantShredding { + /// The variant column's Iceberg field ID. + pub variant_field_id: i32, + /// Paths to extract as physical columns. + pub shredded_paths: Vec, +} + +/// A single path within a variant to shred into a physical column. +#[derive(Debug, Clone)] +pub struct ShredPath { + /// Dotted path into the variant (e.g., `"name"`, `"address.city"`). + pub path: String, + /// The Iceberg primitive type to extract this path as. + pub data_type: crate::spec::PrimitiveType, +} + +/// Engine-requested type overrides for specific columns. +/// +/// This type is NOT yet wired into [`ReadOptions`]. It exists to demonstrate +/// how engine type widening will be expressed. The use case: an engine wants +/// a column read as `long` even though the Iceberg schema defines it as `int`. +/// This is distinct from schema evolution (where the TABLE schema changed) — +/// here the engine simply prefers a wider type for its own processing. +/// +/// When ready, `ReadOptions` gains: +/// ```rust,ignore +/// pub type_overrides: HashMap, +/// ``` +/// +/// Format implementations that support widening during read (Parquet, ORC) +/// apply these overrides. Formats that don't support it ignore them. +pub type TypeOverrides = HashMap; + +#[cfg(test)] +mod tests { + use super::*; + + /// Demonstrates how variant shredding will integrate with ReadOptions + /// when the variant type lands. The shredding request is format-agnostic: + /// it uses Iceberg field IDs and primitive types, not Arrow schemas or + /// Parquet MessageTypes. + #[test] + fn example_variant_shredding_config() { + let _shredding = VariantShredding { + variant_field_id: 7, + shredded_paths: vec![ + ShredPath { + path: "name".to_string(), + data_type: crate::spec::PrimitiveType::String, + }, + ShredPath { + path: "age".to_string(), + data_type: crate::spec::PrimitiveType::Int, + }, + ShredPath { + path: "address.city".to_string(), + data_type: crate::spec::PrimitiveType::String, + }, + ], + }; + + // When wired into ReadOptions, usage looks like: + // + // let options = ReadOptions { + // schema: Some(projected_schema), + // variant_shredding: vec![shredding], + // ..Default::default() + // }; + // + // Any format that supports shredding (Parquet, ORC) reads the + // shredded columns directly. Formats that don't (Avro) ignore + // the field and return the full binary variant. + } + + /// Demonstrates how engine type widening will integrate with ReadOptions. + /// The engine requests specific columns be widened during read, expressed + /// as a typed HashMap — no Box, no format-specific coupling. + #[test] + fn example_type_overrides_config() { + let mut overrides: TypeOverrides = HashMap::new(); + + // Engine wants field 3 (Iceberg schema says `int`) read as `long` + overrides.insert(3, crate::spec::PrimitiveType::Long); + + // Engine wants field 9 (Iceberg schema says `float`) read as `double` + overrides.insert(9, crate::spec::PrimitiveType::Double); + + // When wired into ReadOptions, usage looks like: + // + // let options = ReadOptions { + // schema: Some(projected_schema), + // type_overrides: overrides, + // ..Default::default() + // }; + // + // Formats that support widening during read (Parquet, ORC) apply + // the overrides at decode time — no post-read conversion needed. + // Formats that don't support it ignore the field; the kernel can + // apply widening post-read via DataBatch::evolve_schema if needed. + + assert_eq!(overrides.len(), 2); + assert_eq!(overrides[&3], crate::spec::PrimitiveType::Long); + } +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 4e346460f5..fef11bda61 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -95,6 +95,7 @@ pub use runtime::{Runtime, RuntimeHandle}; pub mod arrow; pub(crate) mod delete_file_index; pub mod encryption; +pub mod formats; pub mod test_utils; pub mod writer; diff --git a/crates/iceberg/src/spec/manifest/data_file.rs b/crates/iceberg/src/spec/manifest/data_file.rs index 77bd046f8a..6809ff5c03 100644 --- a/crates/iceberg/src/spec/manifest/data_file.rs +++ b/crates/iceberg/src/spec/manifest/data_file.rs @@ -373,7 +373,7 @@ impl TryFrom for DataContentType { } /// Format of this data. -#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, SerializeDisplay, DeserializeFromStr)] pub enum DataFileFormat { /// Avro file format: Avro,