Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions crates/iceberg/src/formats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! File Format API for Apache Iceberg.
//!
//! This module provides the trait-based abstraction for reading and writing
//! Iceberg data files across multiple file formats and in-memory representations.

mod traits;
mod registry;
pub mod tck;

pub use traits::*;
pub use registry::*;
142 changes: 142 additions & 0 deletions crates/iceberg/src/formats/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Format registry for runtime format resolution.

use std::any::TypeId;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};

use crate::spec::DataFileFormat;
use crate::{Error, ErrorKind, Result};

use super::{DataBatch, FormatReader, FormatWriter};

/// Maps `(DataFileFormat, TypeId)` pairs to format reader/writer implementations.
///
/// The two-dimensional key matches the Java `FormatModelRegistry` pattern where
/// the key is `(FileFormat, Class<?>)`. The first dimension is the file format
/// (determined at runtime from manifest metadata). The second dimension is the
/// batch type the caller wants to produce or consume (determined at compile time
/// by the engine integration).
///
/// This design supports the pattern where multiple engines register their own
/// optimized readers for the same format. Spark registers a Parquet reader that
/// produces `ColumnarBatch`. Flink registers one that produces `RowData`. In
/// Rust, DataFusion registers a reader producing `RecordBatch`. A future Comet
/// integration could register one producing its own batch type.
///
/// Callers specify the batch type they expect via the generic parameter on
/// `reader::<B>(format)` and `writer::<B>(format)`. They never name the
/// concrete reader/writer implementation.
pub struct FormatRegistry {
readers: HashMap<(DataFileFormat, TypeId), Arc<dyn FormatReader>>,
writers: HashMap<(DataFileFormat, TypeId), Arc<dyn FormatWriter>>,
}

impl FormatRegistry {
/// Create an empty registry.
pub fn new() -> Self {
Self {
readers: HashMap::new(),
writers: HashMap::new(),
}
}

/// Register a format reader for a specific batch type.
///
/// Panics if a reader for the same (format, batch type) is already registered.
pub fn register_reader<B: DataBatch>(&mut self, reader: Arc<dyn FormatReader>) {
let key = (reader.format(), TypeId::of::<B>());
if self.readers.contains_key(&key) {
panic!(
"FormatReader already registered for {:?} with batch type {:?}",
key.0, key.1
);
}
self.readers.insert(key, reader);
}

/// Register a format writer for a specific batch type.
///
/// Panics if a writer for the same (format, batch type) is already registered.
pub fn register_writer<B: DataBatch>(&mut self, writer: Arc<dyn FormatWriter>) {
let key = (writer.format(), TypeId::of::<B>());
if self.writers.contains_key(&key) {
panic!(
"FormatWriter already registered for {:?} with batch type {:?}",
key.0, key.1
);
}
self.writers.insert(key, writer);
}

/// Get the reader for a format that produces batch type `B`.
///
/// The generic parameter `B` is the batch type the caller wants to consume.
/// The registry returns the reader registered for `(format, TypeId::of::<B>())`.
pub fn reader<B: DataBatch>(&self, format: DataFileFormat) -> Result<&dyn FormatReader> {
let key = (format, TypeId::of::<B>());
self.readers.get(&key).map(|r| r.as_ref()).ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"No reader registered for {format:?} producing batch type {:?}",
TypeId::of::<B>()
),
)
})
}

/// Get the writer for a format that accepts batch type `B`.
///
/// The generic parameter `B` is the batch type the caller will feed to the writer.
/// The registry returns the writer registered for `(format, TypeId::of::<B>())`.
pub fn writer<B: DataBatch>(&self, format: DataFileFormat) -> Result<&dyn FormatWriter> {
let key = (format, TypeId::of::<B>());
self.writers.get(&key).map(|w| w.as_ref()).ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"No writer registered for {format:?} accepting batch type {:?}",
TypeId::of::<B>()
),
)
})
}
}

impl Default for FormatRegistry {
fn default() -> Self {
// Format registrations will be added here as implementations land.
// #[cfg(feature = "format-parquet")]
// {
// let mut registry = Self::new();
// let model = Arc::new(ParquetArrowModel::new());
// registry.register_reader::<RecordBatch>(model.clone());
// registry.register_writer::<RecordBatch>(model);
// return registry;
// }
Self::new()
}
}

/// Process-wide default registry, initialized on first access.
pub fn default_format_registry() -> &'static FormatRegistry {
static REGISTRY: OnceLock<FormatRegistry> = OnceLock::new();
REGISTRY.get_or_init(FormatRegistry::default)
}
141 changes: 141 additions & 0 deletions crates/iceberg/src/formats/tck.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Technology Compatibility Kit (TCK) — Layer 1: DataBatch conformance.
//!
//! This module defines the test harness that any `DataBatch` implementation
//! must pass. A new batch type adds one fixture implementation to enter the
//! entire TCK matrix.
//!
//! # Structure
//!
//! ```text
//! Layer 3: Integration (format x batch x scenario)
//! Layer 2: Format (one format, known-good batch type)
//! Layer 1: DataBatch (one batch type, no format involvement) ← this module
//! ```
//!
//! When a Layer 3 test fails, Layer 1 and Layer 2 results identify whether
//! the bug is in the batch implementation, the format implementation, or
//! their interaction.
//!
//! # Adding a new DataBatch implementation
//!
//! 1. Implement `DataBatch` for your type.
//! 2. Implement `DataBatchTestFixture` for a fixture that constructs test data.
//! 3. Call the TCK test functions with your fixture.
//! 4. All tests must pass before merging.

use super::DataBatch;

/// Trait for constructing test fixtures for a specific `DataBatch` implementation.
///
/// Each `DataBatch` implementation provides a fixture that knows how to
/// build test data in its native format. This keeps the TCK test logic
/// generic while each implementation constructs its own batch type.
pub trait DataBatchTestFixture {
/// The concrete `DataBatch` type under test.
type Batch: DataBatch;

/// A batch with a single Int32 column (field ID 1), non-null.
fn int32_batch(&self, num_rows: usize) -> Self::Batch;

/// A batch with a nullable String column (field ID 2), alternating null/non-null.
fn nullable_string_batch(&self, num_rows: usize) -> Self::Batch;

/// A batch with Int32 (field ID 1) and String (field ID 2) columns.
fn two_column_batch(&self, num_rows: usize) -> Self::Batch;

/// An empty batch (zero rows) with the same schema as `two_column_batch`.
fn empty_batch(&self) -> Self::Batch;
}

// ---------------------------------------------------------------------------
// TCK Layer 1 test scenarios
//
// Each function tests one aspect of the DataBatch contract. These are called
// by concrete test modules (one per DataBatch implementation).
// ---------------------------------------------------------------------------

/// Verify num_rows returns the correct count.
pub fn tck_num_rows<F: DataBatchTestFixture>(fixture: &F) {
let batch = fixture.int32_batch(10);
assert_eq!(batch.num_rows(), 10);
}

/// Verify num_rows is zero for an empty batch.
pub fn tck_num_rows_empty<F: DataBatchTestFixture>(fixture: &F) {
let batch = fixture.empty_batch();
assert_eq!(batch.num_rows(), 0);
}

/// Verify column_metrics returns correct null count for nullable columns.
pub fn tck_column_metrics_null_count<F: DataBatchTestFixture>(fixture: &F) {
let batch = fixture.nullable_string_batch(6);
let metrics = batch.column_metrics(2);
assert!(metrics.is_some(), "field ID 2 should be present");
let m = metrics.unwrap();
assert_eq!(m.value_count, 6);
assert_eq!(m.null_count, 3);
}

/// Verify column_metrics returns None for a field ID not in the batch.
pub fn tck_column_metrics_missing_field<F: DataBatchTestFixture>(fixture: &F) {
let batch = fixture.int32_batch(5);
assert!(batch.column_metrics(99).is_none());
}

/// Verify as_any allows downcasting to the concrete batch type.
pub fn tck_as_any_downcast<F: DataBatchTestFixture>(fixture: &F) {
let batch = fixture.int32_batch(3);
let any_ref = batch.as_any();
assert!(
any_ref.downcast_ref::<F::Batch>().is_some(),
"as_any must allow downcasting to the concrete batch type"
);
}

/// Verify project returns an error for a non-existent field ID.
pub fn tck_project_nonexistent_field<F: DataBatchTestFixture>(fixture: &F) {
let batch = fixture.two_column_batch(5);
let result = batch.project(&[99]);
assert!(result.is_err(), "projecting a non-existent field ID must error");
}

// ---------------------------------------------------------------------------
// Future TCK scenarios (stubbed)
// ---------------------------------------------------------------------------

/// Verify filter with a simple equality predicate.
pub fn tck_filter_equality<F: DataBatchTestFixture>(_fixture: &F) {
todo!("Requires constructing a BoundPredicate for testing")
}

/// Verify inject_constants adds constant columns.
pub fn tck_inject_constants<F: DataBatchTestFixture>(_fixture: &F) {
todo!("Requires Datum construction for each primitive type")
}

/// Verify evolve_schema handles type promotion (int -> long).
pub fn tck_evolve_schema_promotion<F: DataBatchTestFixture>(_fixture: &F) {
todo!("Requires constructing source/target Schema pairs")
}

/// Verify evolve_schema handles adding a new nullable column.
pub fn tck_evolve_schema_add_column<F: DataBatchTestFixture>(_fixture: &F) {
todo!("Requires constructing source/target Schema pairs")
}
Loading
Loading