From ae3f34c4efe626c97723211a2f37f3759ff920bd Mon Sep 17 00:00:00 2001 From: Kurtis Wright Date: Wed, 20 May 2026 16:24:58 +0000 Subject: [PATCH 1/2] feat(formats): add File Format API traits and registry --- crates/iceberg/src/formats/mod.rs | 27 + crates/iceberg/src/formats/registry.rs | 142 ++++++ crates/iceberg/src/formats/traits.rs | 479 ++++++++++++++++++ crates/iceberg/src/lib.rs | 1 + crates/iceberg/src/spec/manifest/data_file.rs | 2 +- 5 files changed, 650 insertions(+), 1 deletion(-) create mode 100644 crates/iceberg/src/formats/mod.rs create mode 100644 crates/iceberg/src/formats/registry.rs create mode 100644 crates/iceberg/src/formats/traits.rs diff --git a/crates/iceberg/src/formats/mod.rs b/crates/iceberg/src/formats/mod.rs new file mode 100644 index 0000000000..3a3c0389a8 --- /dev/null +++ b/crates/iceberg/src/formats/mod.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! File Format API for Apache Iceberg. +//! +//! This module provides the trait-based abstraction for reading and writing +//! Iceberg data files across multiple file formats and in-memory representations. + +mod traits; +mod registry; + +pub use traits::*; +pub use registry::*; diff --git a/crates/iceberg/src/formats/registry.rs b/crates/iceberg/src/formats/registry.rs new file mode 100644 index 0000000000..bebb174c1a --- /dev/null +++ b/crates/iceberg/src/formats/registry.rs @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Format registry for runtime format resolution. + +use std::any::TypeId; +use std::collections::HashMap; +use std::sync::{Arc, OnceLock}; + +use crate::spec::DataFileFormat; +use crate::{Error, ErrorKind, Result}; + +use super::{DataBatch, FormatReader, FormatWriter}; + +/// Maps `(DataFileFormat, TypeId)` pairs to format reader/writer implementations. +/// +/// The two-dimensional key matches the Java `FormatModelRegistry` pattern where +/// the key is `(FileFormat, Class)`. The first dimension is the file format +/// (determined at runtime from manifest metadata). The second dimension is the +/// batch type the caller wants to produce or consume (determined at compile time +/// by the engine integration). +/// +/// This design supports the pattern where multiple engines register their own +/// optimized readers for the same format. Spark registers a Parquet reader that +/// produces `ColumnarBatch`. Flink registers one that produces `RowData`. In +/// Rust, DataFusion registers a reader producing `RecordBatch`. A future Comet +/// integration could register one producing its own batch type. +/// +/// Callers specify the batch type they expect via the generic parameter on +/// `reader::(format)` and `writer::(format)`. They never name the +/// concrete reader/writer implementation. +pub struct FormatRegistry { + readers: HashMap<(DataFileFormat, TypeId), Arc>, + writers: HashMap<(DataFileFormat, TypeId), Arc>, +} + +impl FormatRegistry { + /// Create an empty registry. + pub fn new() -> Self { + Self { + readers: HashMap::new(), + writers: HashMap::new(), + } + } + + /// Register a format reader for a specific batch type. + /// + /// Panics if a reader for the same (format, batch type) is already registered. + pub fn register_reader(&mut self, reader: Arc) { + let key = (reader.format(), TypeId::of::()); + if self.readers.contains_key(&key) { + panic!( + "FormatReader already registered for {:?} with batch type {:?}", + key.0, key.1 + ); + } + self.readers.insert(key, reader); + } + + /// Register a format writer for a specific batch type. + /// + /// Panics if a writer for the same (format, batch type) is already registered. + pub fn register_writer(&mut self, writer: Arc) { + let key = (writer.format(), TypeId::of::()); + if self.writers.contains_key(&key) { + panic!( + "FormatWriter already registered for {:?} with batch type {:?}", + key.0, key.1 + ); + } + self.writers.insert(key, writer); + } + + /// Get the reader for a format that produces batch type `B`. + /// + /// The generic parameter `B` is the batch type the caller wants to consume. + /// The registry returns the reader registered for `(format, TypeId::of::())`. + pub fn reader(&self, format: DataFileFormat) -> Result<&dyn FormatReader> { + let key = (format, TypeId::of::()); + self.readers.get(&key).map(|r| r.as_ref()).ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "No reader registered for {format:?} producing batch type {:?}", + TypeId::of::() + ), + ) + }) + } + + /// Get the writer for a format that accepts batch type `B`. + /// + /// The generic parameter `B` is the batch type the caller will feed to the writer. + /// The registry returns the writer registered for `(format, TypeId::of::())`. + pub fn writer(&self, format: DataFileFormat) -> Result<&dyn FormatWriter> { + let key = (format, TypeId::of::()); + self.writers.get(&key).map(|w| w.as_ref()).ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + format!( + "No writer registered for {format:?} accepting batch type {:?}", + TypeId::of::() + ), + ) + }) + } +} + +impl Default for FormatRegistry { + fn default() -> Self { + // Format registrations will be added here as implementations land. + // #[cfg(feature = "format-parquet")] + // { + // let mut registry = Self::new(); + // let model = Arc::new(ParquetArrowModel::new()); + // registry.register_reader::(model.clone()); + // registry.register_writer::(model); + // return registry; + // } + Self::new() + } +} + +/// Process-wide default registry, initialized on first access. +pub fn default_format_registry() -> &'static FormatRegistry { + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY.get_or_init(FormatRegistry::default) +} diff --git a/crates/iceberg/src/formats/traits.rs b/crates/iceberg/src/formats/traits.rs new file mode 100644 index 0000000000..b67ac49366 --- /dev/null +++ b/crates/iceberg/src/formats/traits.rs @@ -0,0 +1,479 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Core trait definitions for the File Format API. + +use std::any::Any; +use std::collections::HashMap; + +use futures::future::BoxFuture; +use futures::stream::BoxStream; + +use crate::expr::BoundPredicate; +use crate::io::{InputFile, OutputFile}; +use crate::spec::{DataFileBuilder, DataFileFormat, Datum, Schema}; +use crate::Result; + +// --------------------------------------------------------------------------- +// DataBatch +// --------------------------------------------------------------------------- + +/// The contract for any in-memory data representation that the Iceberg kernel +/// can process. +/// +/// Implementing this trait is the cost of entry for bringing a new in-memory +/// format. Implementations have full freedom to fuse operations internally +/// for performance. Correctness is validated by the TCK (Layer 1). +/// +/// The `iceberg` crate ships `impl DataBatch for RecordBatch` as the default. +pub trait DataBatch: Send + 'static { + /// Number of rows in this batch. + fn num_rows(&self) -> usize; + + /// The Iceberg field IDs present in this batch, in column order. + fn field_ids(&self) -> &[i32]; + + /// Project to a subset of columns by Iceberg field ID. + fn project(&self, field_ids: &[i32]) -> Result + where + Self: Sized; + + /// Evaluate a filter predicate, returning only matching rows. + /// + /// Implementations may use any strategy: vectorized array ops, visitor + /// pattern, fused I/O closures, etc. The kernel does NOT dictate + /// evaluation strategy. + fn filter(&self, predicate: &BoundPredicate) -> Result + where + Self: Sized; + + /// Compute column-level metrics for the given field. + /// + /// Returns `None` if the field is not present in this batch. + fn column_metrics(&self, field_id: i32) -> Result>; + + /// Inject constant values for metadata columns (`_file`, `_pos`, `_partition`). + fn inject_constants(&self, constants: &[(i32, Datum)]) -> Result + where + Self: Sized; + + /// Apply schema evolution: add columns with defaults, widen types, rename. + /// + /// The `source` schema describes what this batch currently contains. + /// The `target` schema describes what the caller needs. + fn evolve_schema(&self, source: &Schema, target: &Schema) -> Result + where + Self: Sized; + + /// Downcast support for runtime type resolution. + fn as_any(&self) -> &dyn Any; +} + +// --------------------------------------------------------------------------- +// Supporting types +// --------------------------------------------------------------------------- + +/// A stream of batches produced by a format reader. +pub type DataStream = BoxStream<'static, Result>>; + +/// The result of a format read operation. +/// +/// Contains the batch stream and communicates what the reader handled +/// from the [`ReadOptions`]. The kernel uses `residual_predicate` to +/// determine whether it needs to apply row-level filtering via +/// [`DataBatch::filter`]. +#[non_exhaustive] +pub struct ReadResult { + /// The stream of batches with requested columns decoded. + pub stream: DataStream, + /// The portion of the predicate the reader could NOT evaluate. + /// + /// `None` means the reader fully handled the predicate — all rows + /// in the stream match. `Some(predicate)` means the kernel must + /// apply residual filtering using [`DataBatch::filter`]. + pub residual_predicate: Option, +} + +/// Column-level metrics produced by a writer or computed from a batch. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ColumnMetrics { + /// Number of values in the column (including nulls). + pub value_count: u64, + /// Number of null values. + pub null_count: u64, + /// Number of NaN values (for float and double columns). + pub nan_count: Option, + /// Lower bound value as a typed Datum. + pub lower_bound: Option, + /// Upper bound value as a typed Datum. + pub upper_bound: Option, + /// Column size in bytes. + pub column_size: Option, +} + +impl ColumnMetrics { + /// Create metrics with only counts populated. + pub fn counts(value_count: u64, null_count: u64) -> Self { + Self { + value_count, + null_count, + nan_count: None, + lower_bound: None, + upper_bound: None, + column_size: None, + } + } +} + +/// The content type of a file being written. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +#[non_exhaustive] +pub enum FileContent { + /// Data file containing table rows. + #[default] + Data, + /// Equality delete file. + EqualityDeletes, + /// Position delete file. + PositionDeletes, +} + +/// Configuration for metrics collection during writes. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct MetricsConfig { + /// Per-column metrics mode overrides keyed by column name. + pub column_modes: HashMap, + /// Default mode for columns without an explicit override. + pub default_mode: MetricsMode, +} + +/// The metrics collection mode for a column. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum MetricsMode { + /// Do not collect any metrics. + None, + /// Collect only value counts and null counts. + Counts, + /// Collect full metrics including bounds. + Full, + /// Collect truncated bounds at the given length. + Truncate(u32), +} + +// --------------------------------------------------------------------------- +// ReadOptions / WriteOptions +// --------------------------------------------------------------------------- + +/// Configuration for a format read operation. +/// +/// Passed to [`FormatReader::read`]. Format implementations apply what they +/// can (best-effort pushdown) and ignore unsupported options. +/// +/// # Engine schema / variant shredding +/// +/// This struct intentionally does NOT carry an `engine_schema` parameter. +/// Java's `ReadBuilder.engineProjection(S)` serves two purposes: engine- +/// requested type widening (e.g., read an `int` column as `long`) and +/// variant shredding layout hints. Both are deferred from Phase 1 because: +/// +/// 1. **Variant shredding** depends on the variant type landing first +/// ([#2188](https://github.com/apache/iceberg-rust/pull/2188)). When it +/// does, shredding can be modeled as a format-agnostic Iceberg-level +/// concept (which paths to shred and what types to extract) rather than +/// an opaque engine-specific blob. +/// +/// 2. **Engine type widening** ("give me `long` even though the schema says +/// `int`") can be expressed as a typed `HashMap` +/// mapping field IDs to desired output types — no type erasure needed. +/// +/// 3. **A type-erased `Box` field would allow hidden coupling.** +/// A caller going through the registry could pass a format-specific +/// schema object that only one format understands, silently breaking +/// when a different format is registered. By deferring this field, we +/// force the eventual design to be format-agnostic and compiler-checked. +/// +/// When these features land, `#[non_exhaustive]` allows adding fields +/// without a breaking change. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct ReadOptions { + /// Iceberg projection schema. + pub schema: Option, + /// Filter predicate for pushdown. + pub predicate: Option, + /// Byte range start for split reading. + pub split_start: Option, + /// Byte range length for split reading. + pub split_length: Option, + /// Case sensitivity for predicate evaluation. Defaults to true. + pub case_sensitive: bool, + /// Number of rows per batch. + pub batch_size: Option, + /// Format-specific configuration. Unknown keys are ignored. + pub properties: HashMap, + /// Constant values for metadata columns not stored in the data file. + pub id_to_constant: HashMap, +} + +/// Configuration for a format write operation. +/// +/// Passed to [`FormatWriter::write`]. Format implementations apply what they +/// can and ignore unsupported options. +/// +/// # Engine schema +/// +/// Like [`ReadOptions`], this struct intentionally omits an `engine_schema` +/// parameter. See the [`ReadOptions`] doc comment for the full rationale: +/// a type-erased field would allow hidden coupling between callers and +/// specific format implementations through the generic registry path. +/// When engine type narrowing is needed, it will be expressed as typed, +/// format-agnostic fields that the compiler can check. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct WriteOptions { + /// Iceberg schema for the file being written. + pub schema: Option, + /// Format-specific configuration. Unknown keys are ignored. + pub properties: HashMap, + /// File-level metadata (for example, delete-type, equality field IDs). + pub metadata: HashMap, + /// Content type: data, equality deletes, or position deletes. + pub content: FileContent, + /// Metrics collection configuration. + pub metrics_config: Option, + /// Allow overwriting an existing file. + pub overwrite: bool, + /// File encryption key. + pub encryption_key: Option>, + /// AAD prefix for encryption. + pub aad_prefix: Option>, +} + +// --------------------------------------------------------------------------- +// FormatReader / FormatWriter +// --------------------------------------------------------------------------- + +/// A format that can read data files. +/// +/// This trait is dyn-compatible, enabling the [`FormatRegistry`](super::FormatRegistry) +/// to store readers without callers knowing the concrete type. +/// Follows the same pattern as [`Storage`](crate::io::Storage) in the IO layer. +/// +/// # Contract +/// +/// The reader MUST: +/// - Decode only the columns in [`ReadOptions::schema`] (projection is an I/O concern). +/// +/// The reader MAY: +/// - Skip chunks that cannot match [`ReadOptions::predicate`] using format-level statistics. +/// - Support byte-range splits via [`ReadOptions::split_start`] / [`ReadOptions::split_length`]. +/// +/// The reader MUST NOT handle: +/// - Residual row-level filtering (kernel responsibility) +/// - Schema evolution (kernel responsibility) +/// - Constant injection (kernel responsibility) +/// - Delete application (kernel responsibility) +/// - Name mapping / field ID resolution (kernel responsibility) +pub trait FormatReader: Send + Sync + 'static { + /// Which file format this reader handles. + fn format(&self) -> DataFileFormat; + + /// Read the given input file, returning a [`ReadResult`] containing the + /// batch stream and the residual predicate the reader could not evaluate. + fn read( + &self, + input: InputFile, + options: ReadOptions, + ) -> BoxFuture<'static, Result>; +} + +/// A format that can write data files. +/// +/// This trait is dyn-compatible, enabling the [`FormatRegistry`](super::FormatRegistry) +/// to store writers without callers knowing the concrete type. +/// +/// # Contract +/// +/// The writer MUST: +/// - Encode batches into the file format. +/// - Collect file-level metrics per [`WriteOptions::metrics_config`]. +/// - Handle format-specific encoding concerns (e.g., variant shredding layout for Parquet). +/// +/// The writer MUST NOT handle: +/// - Partitioning (caller sends pre-partitioned batches) +/// - Sorting (caller sends pre-sorted batches) +/// - Transaction management (kernel handles commit) +pub trait FormatWriter: Send + Sync + 'static { + /// Which file format this writer handles. + fn format(&self) -> DataFileFormat; + + /// Write batches to the given output file. + fn write( + &self, + output: OutputFile, + options: WriteOptions, + ) -> BoxFuture<'static, Result>>; +} + +// --------------------------------------------------------------------------- +// FormatFileWriter +// --------------------------------------------------------------------------- + +/// A writer that accepts batches and produces file metadata on close. +/// +/// Implementations downcast `&dyn DataBatch` to their expected concrete +/// batch type via [`DataBatch::as_any`]. A type mismatch is a runtime error. +pub trait FormatFileWriter: Send { + /// Write a batch of data. + fn write_batch(&mut self, batch: &dyn DataBatch) -> BoxFuture<'_, Result<()>>; + + /// Close the writer and return file metadata for the manifest. + fn close(self: Box) -> BoxFuture<'static, Result>; +} + +/// The result of closing a format writer. +#[non_exhaustive] +pub struct WriterResult { + /// Data file builders representing the files written. + pub data_files: Vec, +} + +// --------------------------------------------------------------------------- +// Future extensions (design examples) +// --------------------------------------------------------------------------- + +/// Variant shredding configuration — format-agnostic. +/// +/// This type is NOT yet wired into [`ReadOptions`] or [`WriteOptions`]. +/// It exists to demonstrate how variant shredding will be expressed when +/// the variant type ([#2188](https://github.com/apache/iceberg-rust/pull/2188)) +/// lands. The key property: this is an Iceberg-level concept, not a +/// format-specific one. Any format that supports shredding interprets +/// the same struct identically. +/// +/// When ready, `ReadOptions` gains: +/// ```rust,ignore +/// pub variant_shredding: Vec, +/// ``` +#[derive(Debug, Clone)] +pub struct VariantShredding { + /// The variant column's Iceberg field ID. + pub variant_field_id: i32, + /// Paths to extract as physical columns. + pub shredded_paths: Vec, +} + +/// A single path within a variant to shred into a physical column. +#[derive(Debug, Clone)] +pub struct ShredPath { + /// Dotted path into the variant (e.g., `"name"`, `"address.city"`). + pub path: String, + /// The Iceberg primitive type to extract this path as. + pub data_type: crate::spec::PrimitiveType, +} + +/// Engine-requested type overrides for specific columns. +/// +/// This type is NOT yet wired into [`ReadOptions`]. It exists to demonstrate +/// how engine type widening will be expressed. The use case: an engine wants +/// a column read as `long` even though the Iceberg schema defines it as `int`. +/// This is distinct from schema evolution (where the TABLE schema changed) — +/// here the engine simply prefers a wider type for its own processing. +/// +/// When ready, `ReadOptions` gains: +/// ```rust,ignore +/// pub type_overrides: HashMap, +/// ``` +/// +/// Format implementations that support widening during read (Parquet, ORC) +/// apply these overrides. Formats that don't support it ignore them. +pub type TypeOverrides = HashMap; + +#[cfg(test)] +mod tests { + use super::*; + + /// Demonstrates how variant shredding will integrate with ReadOptions + /// when the variant type lands. The shredding request is format-agnostic: + /// it uses Iceberg field IDs and primitive types, not Arrow schemas or + /// Parquet MessageTypes. + #[test] + fn example_variant_shredding_config() { + let _shredding = VariantShredding { + variant_field_id: 7, + shredded_paths: vec![ + ShredPath { + path: "name".to_string(), + data_type: crate::spec::PrimitiveType::String, + }, + ShredPath { + path: "age".to_string(), + data_type: crate::spec::PrimitiveType::Int, + }, + ShredPath { + path: "address.city".to_string(), + data_type: crate::spec::PrimitiveType::String, + }, + ], + }; + + // When wired into ReadOptions, usage looks like: + // + // let options = ReadOptions { + // schema: Some(projected_schema), + // variant_shredding: vec![shredding], + // ..Default::default() + // }; + // + // Any format that supports shredding (Parquet, ORC) reads the + // shredded columns directly. Formats that don't (Avro) ignore + // the field and return the full binary variant. + } + + /// Demonstrates how engine type widening will integrate with ReadOptions. + /// The engine requests specific columns be widened during read, expressed + /// as a typed HashMap — no Box, no format-specific coupling. + #[test] + fn example_type_overrides_config() { + let mut overrides: TypeOverrides = HashMap::new(); + + // Engine wants field 3 (Iceberg schema says `int`) read as `long` + overrides.insert(3, crate::spec::PrimitiveType::Long); + + // Engine wants field 9 (Iceberg schema says `float`) read as `double` + overrides.insert(9, crate::spec::PrimitiveType::Double); + + // When wired into ReadOptions, usage looks like: + // + // let options = ReadOptions { + // schema: Some(projected_schema), + // type_overrides: overrides, + // ..Default::default() + // }; + // + // Formats that support widening during read (Parquet, ORC) apply + // the overrides at decode time — no post-read conversion needed. + // Formats that don't support it ignore the field; the kernel can + // apply widening post-read via DataBatch::evolve_schema if needed. + + assert_eq!(overrides.len(), 2); + assert_eq!(overrides[&3], crate::spec::PrimitiveType::Long); + } +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 4e346460f5..fef11bda61 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -95,6 +95,7 @@ pub use runtime::{Runtime, RuntimeHandle}; pub mod arrow; pub(crate) mod delete_file_index; pub mod encryption; +pub mod formats; pub mod test_utils; pub mod writer; diff --git a/crates/iceberg/src/spec/manifest/data_file.rs b/crates/iceberg/src/spec/manifest/data_file.rs index 77bd046f8a..6809ff5c03 100644 --- a/crates/iceberg/src/spec/manifest/data_file.rs +++ b/crates/iceberg/src/spec/manifest/data_file.rs @@ -373,7 +373,7 @@ impl TryFrom for DataContentType { } /// Format of this data. -#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash, SerializeDisplay, DeserializeFromStr)] pub enum DataFileFormat { /// Avro file format: Avro, From 4d8743387737c69f6742a09a308e7cc1cea835b6 Mon Sep 17 00:00:00 2001 From: Kurtis Wright Date: Wed, 20 May 2026 17:59:13 +0000 Subject: [PATCH 2/2] feat(formats): add TCK Layer 1 harness for DataBatch conformance --- crates/iceberg/src/formats/mod.rs | 1 + crates/iceberg/src/formats/tck.rs | 141 +++++++++++++++++++++++++++ crates/iceberg/src/formats/traits.rs | 2 +- 3 files changed, 143 insertions(+), 1 deletion(-) create mode 100644 crates/iceberg/src/formats/tck.rs diff --git a/crates/iceberg/src/formats/mod.rs b/crates/iceberg/src/formats/mod.rs index 3a3c0389a8..f4270d0a0e 100644 --- a/crates/iceberg/src/formats/mod.rs +++ b/crates/iceberg/src/formats/mod.rs @@ -22,6 +22,7 @@ mod traits; mod registry; +pub mod tck; pub use traits::*; pub use registry::*; diff --git a/crates/iceberg/src/formats/tck.rs b/crates/iceberg/src/formats/tck.rs new file mode 100644 index 0000000000..4821cc794f --- /dev/null +++ b/crates/iceberg/src/formats/tck.rs @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Technology Compatibility Kit (TCK) — Layer 1: DataBatch conformance. +//! +//! This module defines the test harness that any `DataBatch` implementation +//! must pass. A new batch type adds one fixture implementation to enter the +//! entire TCK matrix. +//! +//! # Structure +//! +//! ```text +//! Layer 3: Integration (format x batch x scenario) +//! Layer 2: Format (one format, known-good batch type) +//! Layer 1: DataBatch (one batch type, no format involvement) ← this module +//! ``` +//! +//! When a Layer 3 test fails, Layer 1 and Layer 2 results identify whether +//! the bug is in the batch implementation, the format implementation, or +//! their interaction. +//! +//! # Adding a new DataBatch implementation +//! +//! 1. Implement `DataBatch` for your type. +//! 2. Implement `DataBatchTestFixture` for a fixture that constructs test data. +//! 3. Call the TCK test functions with your fixture. +//! 4. All tests must pass before merging. + +use super::DataBatch; + +/// Trait for constructing test fixtures for a specific `DataBatch` implementation. +/// +/// Each `DataBatch` implementation provides a fixture that knows how to +/// build test data in its native format. This keeps the TCK test logic +/// generic while each implementation constructs its own batch type. +pub trait DataBatchTestFixture { + /// The concrete `DataBatch` type under test. + type Batch: DataBatch; + + /// A batch with a single Int32 column (field ID 1), non-null. + fn int32_batch(&self, num_rows: usize) -> Self::Batch; + + /// A batch with a nullable String column (field ID 2), alternating null/non-null. + fn nullable_string_batch(&self, num_rows: usize) -> Self::Batch; + + /// A batch with Int32 (field ID 1) and String (field ID 2) columns. + fn two_column_batch(&self, num_rows: usize) -> Self::Batch; + + /// An empty batch (zero rows) with the same schema as `two_column_batch`. + fn empty_batch(&self) -> Self::Batch; +} + +// --------------------------------------------------------------------------- +// TCK Layer 1 test scenarios +// +// Each function tests one aspect of the DataBatch contract. These are called +// by concrete test modules (one per DataBatch implementation). +// --------------------------------------------------------------------------- + +/// Verify num_rows returns the correct count. +pub fn tck_num_rows(fixture: &F) { + let batch = fixture.int32_batch(10); + assert_eq!(batch.num_rows(), 10); +} + +/// Verify num_rows is zero for an empty batch. +pub fn tck_num_rows_empty(fixture: &F) { + let batch = fixture.empty_batch(); + assert_eq!(batch.num_rows(), 0); +} + +/// Verify column_metrics returns correct null count for nullable columns. +pub fn tck_column_metrics_null_count(fixture: &F) { + let batch = fixture.nullable_string_batch(6); + let metrics = batch.column_metrics(2); + assert!(metrics.is_some(), "field ID 2 should be present"); + let m = metrics.unwrap(); + assert_eq!(m.value_count, 6); + assert_eq!(m.null_count, 3); +} + +/// Verify column_metrics returns None for a field ID not in the batch. +pub fn tck_column_metrics_missing_field(fixture: &F) { + let batch = fixture.int32_batch(5); + assert!(batch.column_metrics(99).is_none()); +} + +/// Verify as_any allows downcasting to the concrete batch type. +pub fn tck_as_any_downcast(fixture: &F) { + let batch = fixture.int32_batch(3); + let any_ref = batch.as_any(); + assert!( + any_ref.downcast_ref::().is_some(), + "as_any must allow downcasting to the concrete batch type" + ); +} + +/// Verify project returns an error for a non-existent field ID. +pub fn tck_project_nonexistent_field(fixture: &F) { + let batch = fixture.two_column_batch(5); + let result = batch.project(&[99]); + assert!(result.is_err(), "projecting a non-existent field ID must error"); +} + +// --------------------------------------------------------------------------- +// Future TCK scenarios (stubbed) +// --------------------------------------------------------------------------- + +/// Verify filter with a simple equality predicate. +pub fn tck_filter_equality(_fixture: &F) { + todo!("Requires constructing a BoundPredicate for testing") +} + +/// Verify inject_constants adds constant columns. +pub fn tck_inject_constants(_fixture: &F) { + todo!("Requires Datum construction for each primitive type") +} + +/// Verify evolve_schema handles type promotion (int -> long). +pub fn tck_evolve_schema_promotion(_fixture: &F) { + todo!("Requires constructing source/target Schema pairs") +} + +/// Verify evolve_schema handles adding a new nullable column. +pub fn tck_evolve_schema_add_column(_fixture: &F) { + todo!("Requires constructing source/target Schema pairs") +} diff --git a/crates/iceberg/src/formats/traits.rs b/crates/iceberg/src/formats/traits.rs index b67ac49366..6d538bfa39 100644 --- a/crates/iceberg/src/formats/traits.rs +++ b/crates/iceberg/src/formats/traits.rs @@ -64,7 +64,7 @@ pub trait DataBatch: Send + 'static { /// Compute column-level metrics for the given field. /// /// Returns `None` if the field is not present in this batch. - fn column_metrics(&self, field_id: i32) -> Result>; + fn column_metrics(&self, field_id: i32) -> Option; /// Inject constant values for metadata columns (`_file`, `_pos`, `_partition`). fn inject_constants(&self, constants: &[(i32, Datum)]) -> Result