Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions crates/iceberg/src/formats/arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! `impl DataBatch for RecordBatch` — the shipped default in-memory representation.
//!
//! This is the implementation that engines get for free when they use Arrow.
//! Engines that need a different in-memory representation implement `DataBatch`
//! for their own type and pass the TCK Layer 1 suite.

use std::any::Any;

use arrow_array::RecordBatch;

use super::traits::{ColumnMetrics, DataBatch};
use crate::expr::BoundPredicate;
use crate::spec::{Datum, Schema};
use crate::Result;

impl DataBatch for RecordBatch {
fn num_rows(&self) -> usize {
self.num_rows()
}

fn field_ids(&self) -> &[i32] {
// Field IDs are stored per-column in Arrow schema metadata.
// A production implementation caches them; this stub returns empty.
&[]
}

fn project(&self, _field_ids: &[i32]) -> Result<Self> {
// Production: use RecordBatchProjector to select columns by field ID.
todo!("Wire RecordBatchProjector")
}

fn filter(&self, _predicate: &BoundPredicate) -> Result<Self> {
// Production: use PredicateConverter to evaluate against Arrow arrays.
todo!("Wire PredicateConverter")
}

fn column_metrics(&self, _field_id: i32) -> Option<ColumnMetrics> {
// Production: compute min/max/null/NaN from the Arrow array.
todo!("Wire column metrics computation")
}

fn inject_constants(&self, _constants: &[(i32, Datum)]) -> Result<Self> {
// Production: append constant-value columns to the batch.
todo!("Wire constant injection")
}

fn evolve_schema(&self, _source: &Schema, _target: &Schema) -> Result<Self> {
// Production: cast columns for type widening, add null columns for new fields.
todo!("Wire schema evolution")
}

fn as_any(&self) -> &dyn Any {
self
}
}
28 changes: 28 additions & 0 deletions crates/iceberg/src/formats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! File Format API for Apache Iceberg.
//!
//! This module provides the trait-based abstraction for reading and writing
//! Iceberg data files across multiple file formats and in-memory representations.

mod traits;
mod registry;
mod arrow;

pub use traits::*;
pub use registry::*;
142 changes: 142 additions & 0 deletions crates/iceberg/src/formats/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Format registry for runtime format resolution.

use std::any::TypeId;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};

use crate::spec::DataFileFormat;
use crate::{Error, ErrorKind, Result};

use super::{DataBatch, FormatReader, FormatWriter};

/// Maps `(DataFileFormat, TypeId)` pairs to format reader/writer implementations.
///
/// The two-dimensional key matches the Java `FormatModelRegistry` pattern where
/// the key is `(FileFormat, Class<?>)`. The first dimension is the file format
/// (determined at runtime from manifest metadata). The second dimension is the
/// batch type the caller wants to produce or consume (determined at compile time
/// by the engine integration).
///
/// This design supports the pattern where multiple engines register their own
/// optimized readers for the same format. Spark registers a Parquet reader that
/// produces `ColumnarBatch`. Flink registers one that produces `RowData`. In
/// Rust, DataFusion registers a reader producing `RecordBatch`. A future Comet
/// integration could register one producing its own batch type.
///
/// Callers specify the batch type they expect via the generic parameter on
/// `reader::<B>(format)` and `writer::<B>(format)`. They never name the
/// concrete reader/writer implementation.
pub struct FormatRegistry {
readers: HashMap<(DataFileFormat, TypeId), Arc<dyn FormatReader>>,
writers: HashMap<(DataFileFormat, TypeId), Arc<dyn FormatWriter>>,
}

impl FormatRegistry {
/// Create an empty registry.
pub fn new() -> Self {
Self {
readers: HashMap::new(),
writers: HashMap::new(),
}
}

/// Register a format reader for a specific batch type.
///
/// Panics if a reader for the same (format, batch type) is already registered.
pub fn register_reader<B: DataBatch>(&mut self, reader: Arc<dyn FormatReader>) {
let key = (reader.format(), TypeId::of::<B>());
if self.readers.contains_key(&key) {
panic!(
"FormatReader already registered for {:?} with batch type {:?}",
key.0, key.1
);
}
self.readers.insert(key, reader);
}

/// Register a format writer for a specific batch type.
///
/// Panics if a writer for the same (format, batch type) is already registered.
pub fn register_writer<B: DataBatch>(&mut self, writer: Arc<dyn FormatWriter>) {
let key = (writer.format(), TypeId::of::<B>());
if self.writers.contains_key(&key) {
panic!(
"FormatWriter already registered for {:?} with batch type {:?}",
key.0, key.1
);
}
self.writers.insert(key, writer);
}

/// Get the reader for a format that produces batch type `B`.
///
/// The generic parameter `B` is the batch type the caller wants to consume.
/// The registry returns the reader registered for `(format, TypeId::of::<B>())`.
pub fn reader<B: DataBatch>(&self, format: DataFileFormat) -> Result<&dyn FormatReader> {
let key = (format, TypeId::of::<B>());
self.readers.get(&key).map(|r| r.as_ref()).ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"No reader registered for {format:?} producing batch type {:?}",
TypeId::of::<B>()
),
)
})
}

/// Get the writer for a format that accepts batch type `B`.
///
/// The generic parameter `B` is the batch type the caller will feed to the writer.
/// The registry returns the writer registered for `(format, TypeId::of::<B>())`.
pub fn writer<B: DataBatch>(&self, format: DataFileFormat) -> Result<&dyn FormatWriter> {
let key = (format, TypeId::of::<B>());
self.writers.get(&key).map(|w| w.as_ref()).ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"No writer registered for {format:?} accepting batch type {:?}",
TypeId::of::<B>()
),
)
})
}
}

impl Default for FormatRegistry {
fn default() -> Self {
// Format registrations will be added here as implementations land.
// #[cfg(feature = "format-parquet")]
// {
// let mut registry = Self::new();
// let model = Arc::new(ParquetArrowModel::new());
// registry.register_reader::<RecordBatch>(model.clone());
// registry.register_writer::<RecordBatch>(model);
// return registry;
// }
Self::new()
}
}

/// Process-wide default registry, initialized on first access.
pub fn default_format_registry() -> &'static FormatRegistry {
static REGISTRY: OnceLock<FormatRegistry> = OnceLock::new();
REGISTRY.get_or_init(FormatRegistry::default)
}
Loading
Loading