Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions crates/iceberg/src/formats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! File Format API for Apache Iceberg.
//!
//! This module provides the trait-based abstraction for reading and writing
//! Iceberg data files across multiple file formats and in-memory representations.

mod traits;
mod registry;
mod parquet;

pub use traits::*;
pub use registry::*;
129 changes: 129 additions & 0 deletions crates/iceberg/src/formats/parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Parquet format implementation wrapping the existing `ArrowReader` and `ParquetWriter`.
//!
//! `ParquetArrowModel` implements both [`FormatReader`] and [`FormatWriter`] for
//! Parquet files producing/consuming Arrow `RecordBatch`. This is the shipped
//! default format-batch pairing registered in [`FormatRegistry::default()`].
//!
//! This implementation wraps the existing reader and writer code rather than
//! replacing it. The existing types (`ArrowReader`, `ParquetWriter`) remain
//! unchanged — `ParquetArrowModel` delegates to them internally.

#![allow(dead_code)]

use futures::future::BoxFuture;

use super::traits::{
DataBatch, FormatFileWriter, FormatReader, FormatWriter, ReadOptions, ReadResult, WriteOptions,
WriterResult,
};
use crate::io::{InputFile, OutputFile};
use crate::spec::DataFileFormat;
use crate::Result;

/// Parquet format implementation producing Arrow `RecordBatch`.
///
/// Registered in the `FormatRegistry` as the reader and writer for
/// `(DataFileFormat::Parquet, TypeId::of::<RecordBatch>())`.
///
/// # Example registration
///
/// ```rust,ignore
/// use std::sync::Arc;
/// use arrow_array::RecordBatch;
/// use iceberg::formats::{FormatRegistry, ParquetArrowModel};
///
/// let mut registry = FormatRegistry::new();
/// let model = Arc::new(ParquetArrowModel);
/// registry.register_reader::<RecordBatch>(model.clone());
/// registry.register_writer::<RecordBatch>(model);
/// ```
#[derive(Debug, Clone)]
pub struct ParquetArrowModel;

impl FormatReader for ParquetArrowModel {
fn format(&self) -> DataFileFormat {
DataFileFormat::Parquet
}

fn read(
&self,
_input: InputFile,
_options: ReadOptions,
) -> BoxFuture<'static, Result<ReadResult>> {
// Production: delegates to the existing ArrowReader pipeline.
//
// 1. Opens the Parquet file via ArrowFileReader
// 2. Applies projection mask from options.schema
// 3. Evaluates row group statistics against options.predicate
// 4. Returns ReadResult with:
// - stream: the RecordBatch stream from ParquetRecordBatchStream
// - residual_predicate: the predicate (or portion of it) that
// row-group statistics could not fully resolve
todo!("Wrap ArrowReader")
}
}

impl FormatWriter for ParquetArrowModel {
fn format(&self) -> DataFileFormat {
DataFileFormat::Parquet
}

fn write(
&self,
_output: OutputFile,
_options: WriteOptions,
) -> BoxFuture<'static, Result<Box<dyn FormatFileWriter>>> {
// Production: delegates to the existing ParquetWriter.
//
// 1. Converts options.schema to Arrow schema
// 2. Builds WriterProperties from options.properties
// 3. Creates AsyncArrowWriter via the existing ParquetWriter path
// 4. Returns a ParquetFormatFileWriter that wraps the writer
todo!("Wrap ParquetWriter")
}
}

/// The per-file writer returned by `ParquetArrowModel::write`.
///
/// Wraps the existing `ParquetWriter` and delegates `write_batch` and
/// `close` to it. On `close`, extracts Parquet metadata and builds
/// `DataFileBuilder` with column sizes, value counts, null counts,
/// min/max bounds, and split offsets.
pub struct ParquetFormatFileWriter {
// Production: holds an AsyncArrowWriter or the existing ParquetWriter.
_private: (),
}

impl FormatFileWriter for ParquetFormatFileWriter {
fn write_batch(&mut self, _batch: &dyn DataBatch) -> BoxFuture<'_, Result<()>> {
// Production:
// 1. Downcast batch to &RecordBatch via DataBatch::as_any()
// 2. Write to the underlying AsyncArrowWriter
todo!("Downcast and write")
}

fn close(self: Box<Self>) -> BoxFuture<'static, Result<WriterResult>> {
// Production:
// 1. Close the AsyncArrowWriter, get Parquet FileMetaData
// 2. Extract metrics via parquet_to_data_file_builder()
// 3. Return WriterResult { data_files: vec![builder] }
todo!("Close and extract metrics")
}
}
142 changes: 142 additions & 0 deletions crates/iceberg/src/formats/registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Format registry for runtime format resolution.

use std::any::TypeId;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};

use crate::spec::DataFileFormat;
use crate::{Error, ErrorKind, Result};

use super::{DataBatch, FormatReader, FormatWriter};

/// Maps `(DataFileFormat, TypeId)` pairs to format reader/writer implementations.
///
/// The two-dimensional key matches the Java `FormatModelRegistry` pattern where
/// the key is `(FileFormat, Class<?>)`. The first dimension is the file format
/// (determined at runtime from manifest metadata). The second dimension is the
/// batch type the caller wants to produce or consume (determined at compile time
/// by the engine integration).
///
/// This design supports the pattern where multiple engines register their own
/// optimized readers for the same format. Spark registers a Parquet reader that
/// produces `ColumnarBatch`. Flink registers one that produces `RowData`. In
/// Rust, DataFusion registers a reader producing `RecordBatch`. A future Comet
/// integration could register one producing its own batch type.
///
/// Callers specify the batch type they expect via the generic parameter on
/// `reader::<B>(format)` and `writer::<B>(format)`. They never name the
/// concrete reader/writer implementation.
pub struct FormatRegistry {
readers: HashMap<(DataFileFormat, TypeId), Arc<dyn FormatReader>>,
writers: HashMap<(DataFileFormat, TypeId), Arc<dyn FormatWriter>>,
}

impl FormatRegistry {
/// Create an empty registry.
pub fn new() -> Self {
Self {
readers: HashMap::new(),
writers: HashMap::new(),
}
}

/// Register a format reader for a specific batch type.
///
/// Panics if a reader for the same (format, batch type) is already registered.
pub fn register_reader<B: DataBatch>(&mut self, reader: Arc<dyn FormatReader>) {
let key = (reader.format(), TypeId::of::<B>());
if self.readers.contains_key(&key) {
panic!(
"FormatReader already registered for {:?} with batch type {:?}",
key.0, key.1
);
}
self.readers.insert(key, reader);
}

/// Register a format writer for a specific batch type.
///
/// Panics if a writer for the same (format, batch type) is already registered.
pub fn register_writer<B: DataBatch>(&mut self, writer: Arc<dyn FormatWriter>) {
let key = (writer.format(), TypeId::of::<B>());
if self.writers.contains_key(&key) {
panic!(
"FormatWriter already registered for {:?} with batch type {:?}",
key.0, key.1
);
}
self.writers.insert(key, writer);
}

/// Get the reader for a format that produces batch type `B`.
///
/// The generic parameter `B` is the batch type the caller wants to consume.
/// The registry returns the reader registered for `(format, TypeId::of::<B>())`.
pub fn reader<B: DataBatch>(&self, format: DataFileFormat) -> Result<&dyn FormatReader> {
let key = (format, TypeId::of::<B>());
self.readers.get(&key).map(|r| r.as_ref()).ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"No reader registered for {format:?} producing batch type {:?}",
TypeId::of::<B>()
),
)
})
}

/// Get the writer for a format that accepts batch type `B`.
///
/// The generic parameter `B` is the batch type the caller will feed to the writer.
/// The registry returns the writer registered for `(format, TypeId::of::<B>())`.
pub fn writer<B: DataBatch>(&self, format: DataFileFormat) -> Result<&dyn FormatWriter> {
let key = (format, TypeId::of::<B>());
self.writers.get(&key).map(|w| w.as_ref()).ok_or_else(|| {
Error::new(
ErrorKind::FeatureUnsupported,
format!(
"No writer registered for {format:?} accepting batch type {:?}",
TypeId::of::<B>()
),
)
})
}
}

impl Default for FormatRegistry {
fn default() -> Self {
// Format registrations will be added here as implementations land.
// #[cfg(feature = "format-parquet")]
// {
// let mut registry = Self::new();
// let model = Arc::new(ParquetArrowModel::new());
// registry.register_reader::<RecordBatch>(model.clone());
// registry.register_writer::<RecordBatch>(model);
// return registry;
// }
Self::new()
}
}

/// Process-wide default registry, initialized on first access.
pub fn default_format_registry() -> &'static FormatRegistry {
static REGISTRY: OnceLock<FormatRegistry> = OnceLock::new();
REGISTRY.get_or_init(FormatRegistry::default)
}
Loading
Loading