Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
b33aec4
Core, Data: File Format API interfaces
pvary Apr 11, 2025
5b79dbd
Renjie's comments
pvary Apr 29, 2025
a1daced
registerObjectModel exception handling fix
pvary Apr 30, 2025
79d7703
Removing the need for data.AppenderBuilder
pvary May 6, 2025
8404aa7
Fixed Renjie findings
pvary May 7, 2025
a00540d
Cosmentic changes
pvary May 15, 2025
2a4816a
Steven's comments
pvary May 19, 2025
ba16b59
ObjectModelRegistry->FileAccessFactory, and AppenederBuilder->WriteBu…
pvary May 20, 2025
9976bfb
Review comments by Steven and Russell (some javadoc, and a few method…
pvary May 21, 2025
62ea041
Added generic for the input/output of the reader/writer - like 'FileA…
pvary May 22, 2025
22ca3d7
New classes for implementation allows for absolutely new interfaces
pvary May 26, 2025
598f46d
Default methods for setting multiple config/meta values
pvary May 28, 2025
5ce49dc
Return FileReader instead of CloseableIterable from the ReadBuilder
pvary Jun 16, 2025
46a1742
Revert "Return FileReader instead of CloseableIterable from the ReadB…
pvary Jun 24, 2025
026e5e9
Ryan's comments
pvary Jun 24, 2025
3149874
Move interface classes to core
pvary Jun 25, 2025
4659adf
Rename FileAccessFactory to ObjectModelFactory
pvary Jun 25, 2025
40ec8bb
Ryan's next round of comments
pvary Jun 27, 2025
f20bb4e
Separate input conversion from witers
pvary Jul 22, 2025
7e91a40
Eduard's comments
pvary Jul 24, 2025
a957c47
Fixing parameter names
pvary Jul 24, 2025
8ce2f2e
Ryan's comments
pvary Aug 6, 2025
2b1b10b
Remove builder parameter from data file writers
pvary Aug 6, 2025
26e03b7
Remove parametrization as much as possible
pvary Aug 13, 2025
ef41daa
ContentFileWriteBuilder needs a generic parameter
pvary Aug 14, 2025
acb2254
Revert transformers, and used engine specific type setting for writer…
pvary Aug 25, 2025
3efd188
Steven's and Russel's comments
pvary Sep 11, 2025
e5611f4
Move to writeBuilder/positionDeleteWriteBuilder solution, and depreca…
pvary Sep 15, 2025
3a6a5ed
Use a specific FormatModel to write PositionDeletes
pvary Sep 24, 2025
fad5e07
Changes discussed on the sync
pvary Oct 2, 2025
790e282
Steven's comments
pvary Oct 8, 2025
224070e
Remove the FormatModelRegistry.writeBuilder method
pvary Oct 13, 2025
0c439c1
Eduard's comment
pvary Oct 17, 2025
ddcf866
Addressing Amogh's, Steven's and Gabor's comments
pvary Oct 20, 2025
2fa0c5f
Removing ReadBuilder.outputSchema per Steven's and Renjie's comments
pvary Oct 21, 2025
8f58a9e
Aihua's comments
pvary Oct 26, 2025
b22a91a
Move back to parametrized writers
pvary Nov 5, 2025
8085a4b
Encryption handling
pvary Nov 5, 2025
af4f8a3
Revert back to OutputFile in the FormatModel
pvary Nov 8, 2025
c17c9ca
Synchronized register method, and only provide writeBuilder, location…
pvary Nov 9, 2025
7c175ea
constantValues -> idToConstant
pvary Nov 9, 2025
7ed56a5
Remove new from the API parameter names
pvary Nov 9, 2025
69461a2
Move to EncryptedOutputFile from OutputFile FormatModel.writeBuilder
pvary Nov 10, 2025
fdbb6c0
Ryan's comments
pvary Nov 11, 2025
4ae6ba0
Fix typo in comment
pvary Nov 20, 2025
fe743e1
Revert back to prevent re-registering models
pvary Nov 20, 2025
b66e891
Ryan's comments
pvary Nov 21, 2025
d450ec6
Steven's comment to change the log message for failed registration
pvary Nov 21, 2025
564ba8c
Create a marker class for the Comet reader and a few extra nits
pvary Dec 9, 2025
17f030f
Added back the ReadBuilder.outputSchema method as the attirbute is us…
pvary Jan 19, 2026
74ca9f7
Changes from #12298 based on Ryan's comments
pvary Feb 5, 2026
6cf720a
Last fixes
pvary Feb 6, 2026
7f6cb33
Javadoc comment fixes
pvary Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 15 additions & 42 deletions core/src/main/java/org/apache/iceberg/io/AppenderBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,22 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;

/**
* Interface which should be implemented by the data file format implementations. The {@link
* AppenderBuilder} will be parametrized based on the user provided configuration and finally the
* {@link AppenderBuilder#build(AppenderBuilder.WriteMode)} method is used to generate the appender
* for the specific writer use-cases. The following input should be handled by the appender in the
* specific modes:
* Interface which is implemented by the data file format implementations. The {@link ObjectModel}
* provides the {@link AppenderBuilder} for the given parameters:
*
* <ul>
* <li>The appender's engine specific input type
* <ul>
* <li>{@link AppenderBuilder.WriteMode#DATA_WRITER}
* <li>{@link AppenderBuilder.WriteMode#EQUALITY_DELETE_WRITER}
* </ul>
* <li>{@link org.apache.iceberg.deletes.PositionDelete} where the type of the row is the
* appender's engine specific input type
* <ul>
* <li>{@link AppenderBuilder.WriteMode#POSITION_DELETE_WRITER}
* <li>{@link AppenderBuilder.WriteMode#POSITION_DELETE_WITH_ROW_WRITER}
* </ul>
* <li>file format
* <li>engine specific object model
* <li>{@link ObjectModel.WriteMode}
* </ul>
*
* The {@link AppenderBuilder} is used to write data to the target files.
*
* @param <B> type returned by builder API to allow chained calls
* @param <E> the engine specific schema of the input data
*/
Expand All @@ -60,6 +52,11 @@ public interface AppenderBuilder<B extends AppenderBuilder<B, E>, E> {
*/
B set(String property, String value);

default B set(Map<String, String> properties) {
properties.forEach(this::set);
return (B) this;
}

/**
* Set a file metadata property in the created file.
*
Expand Down Expand Up @@ -107,30 +104,6 @@ default B aadPrefix(ByteBuffer aadPrefix) {
*/
B engineSchema(E newEngineSchema);
Comment thread
pvary marked this conversation as resolved.
Outdated

/**
* Builds the {@link FileAppender} for the configured {@link WriteMode}. Could change several
* use-case specific configurations, like:
*
* <ul>
* <li>Mode specific writer context (typically different for data and delete files).
* <li>Writer functions to accept data rows, or {@link
* org.apache.iceberg.deletes.PositionDelete}s
* </ul>
*/
<D> FileAppender<D> build(WriteMode mode) throws IOException;

/**
* Writer modes. Based on the mode {@link #build(WriteMode)} could alter the appender
* configuration when creating the {@link FileAppender}.
*/
enum WriteMode {
/** Mode for writing data files. */
DATA_WRITER,
/** Mode for writing equality delete files. */
EQUALITY_DELETE_WRITER,
/** Mode for writing position delete files. */
POSITION_DELETE_WRITER,
/** Mode for writing position delete files with row data. */
POSITION_DELETE_WITH_ROW_WRITER
}
/** Finalizes the configuration and builds the {@link FileAppender}. */
<D> FileAppender<D> build() throws IOException;
}
31 changes: 29 additions & 2 deletions core/src/main/java/org/apache/iceberg/io/ObjectModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,27 @@ public interface ObjectModel<E> {

/**
* The appender builder for the output file which writes the data in the specified file format and
* accepts the records defined by this object model.
* accepts the records defined by this object model. The 'mode' parameter defines the input type
* for the specific writer use-cases. The appender should handle the following input in the
* specific modes:
*
* <ul>
* <li>The appender's engine specific input type
* <ul>
* <li>{@link WriteMode#DATA_WRITER}
* <li>{@link WriteMode#EQUALITY_DELETE_WRITER}
* </ul>
* <li>{@link org.apache.iceberg.deletes.PositionDelete} where the type of the row is the
* appender's engine specific input type when the 'mode' is {@link
* WriteMode#POSITION_DELETE_WRITER}
* </ul>
*
* @param outputFile to write to
* @param mode for the appender
* @return the appender builder
* @param <B> The type of the appender builder
*/
<B extends AppenderBuilder<B, E>> B appenderBuilder(OutputFile outputFile);
<B extends AppenderBuilder<B, E>> B appenderBuilder(OutputFile outputFile, WriteMode mode);
Comment thread
stevenzwu marked this conversation as resolved.
Outdated

/**
* The reader builder for the input file which reads the data from the specified file format and
Expand All @@ -76,4 +90,17 @@ public interface ObjectModel<E> {
* @param <B> The type of the reader builder
*/
<B extends ReadBuilder<B>> B readBuilder(InputFile inputFile);
Comment thread
stevenzwu marked this conversation as resolved.
Outdated

/**
* Writer modes. Based on the mode the object model could alter the appender configuration when
* creating the {@link FileAppender}.
*/
enum WriteMode {
/** Mode for writing data files. */
DATA_WRITER,
/** Mode for writing equality delete files. */
EQUALITY_DELETE_WRITER,
/** Mode for writing position delete files. */
POSITION_DELETE_WRITER,
}
}
43 changes: 0 additions & 43 deletions data/src/main/java/org/apache/iceberg/data/AppenderBuilder.java

This file was deleted.

38 changes: 24 additions & 14 deletions data/src/main/java/org/apache/iceberg/data/ObjectModelRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.iceberg.data;

import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.Map;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.encryption.EncryptedOutputFile;
Expand All @@ -36,10 +36,10 @@

/**
* Registry which provides the available {@link ReadBuilder}s and writer builders ({@link
* org.apache.iceberg.data.AppenderBuilder}, {@link DataWriterBuilder}, {@link
* EqualityDeleteWriterBuilder}, {@link PositionDeleteWriterBuilder}). Based on the `file format`
* and the requested `object model name` the registry returns the correct reader and writer
* builders. These builders could be used to generate the readers and writers.
* AppenderBuilder}, {@link DataWriterBuilder}, {@link EqualityDeleteWriterBuilder}, {@link
Comment thread
pvary marked this conversation as resolved.
Outdated
* PositionDeleteWriterBuilder}). Based on the `file format` and the requested `object model name`
* the registry returns the correct reader and writer builders. These builders could be used to
* generate the readers and writers.
*
* <p>The available {@link ObjectModel}s are registered by the {@link
* #registerObjectModel(ObjectModel)} method. These {@link ObjectModel}s will be used to create the
Expand All @@ -49,9 +49,13 @@
public final class ObjectModelRegistry {
Comment thread
stevenzwu marked this conversation as resolved.
Outdated
private static final Logger LOG = LoggerFactory.getLogger(ObjectModelRegistry.class);
// The list of classes which are used for registering the reader and writer builders
private static final List<String> CLASSES_TO_REGISTER = ImmutableList.of();
private static final List<String> CLASSES_TO_REGISTER =
ImmutableList.of(
"org.apache.iceberg.arrow.vectorized.ArrowReader",
"org.apache.iceberg.flink.data.FlinkObjectModels",
"org.apache.iceberg.spark.source.SparkObjectModels");
Comment thread
pvary marked this conversation as resolved.
Outdated

private static final ConcurrentMap<Key, ObjectModel<?>> OBJECT_MODELS = Maps.newConcurrentMap();
private static final Map<Key, ObjectModel<?>> OBJECT_MODELS = Maps.newConcurrentMap();
Comment thread
pvary marked this conversation as resolved.
Outdated

/**
* Registers a new object model.
Expand Down Expand Up @@ -116,9 +120,10 @@ public static ReadBuilder<?> readBuilder(
* @param <E> type for the engine specific schema expected by the appender
* @return {@link ReadBuilder} for building the actual reader
*/
public static <E> org.apache.iceberg.data.AppenderBuilder<?, E> appenderBuilder(
public static <E> AppenderBuilder<?, E> appenderBuilder(
FileFormat format, String objectModelName, EncryptedOutputFile outputFile) {
return writerFor(format, objectModelName, outputFile);
return ((ObjectModel<E>) OBJECT_MODELS.get(new Key(format, objectModelName)))
.appenderBuilder(outputFile.encryptingOutputFile(), ObjectModel.WriteMode.DATA_WRITER);
}

/**
Expand All @@ -133,7 +138,7 @@ public static <E> org.apache.iceberg.data.AppenderBuilder<?, E> appenderBuilder(
*/
public static <E> DataWriterBuilder<?, E> writerBuilder(
FileFormat format, String objectModelName, EncryptedOutputFile outputFile) {
return writerFor(format, objectModelName, outputFile);
return writerFor(format, objectModelName, outputFile, ObjectModel.WriteMode.DATA_WRITER);
}

/**
Expand All @@ -148,7 +153,8 @@ public static <E> DataWriterBuilder<?, E> writerBuilder(
*/
public static <E> EqualityDeleteWriterBuilder<?, E> equalityDeleteWriterBuilder(
FileFormat format, String objectModelName, EncryptedOutputFile outputFile) {
return writerFor(format, objectModelName, outputFile);
return writerFor(
format, objectModelName, outputFile, ObjectModel.WriteMode.EQUALITY_DELETE_WRITER);
}

/**
Expand All @@ -164,15 +170,19 @@ public static <E> EqualityDeleteWriterBuilder<?, E> equalityDeleteWriterBuilder(
*/
public static <E> PositionDeleteWriterBuilder<?, E> positionDeleteWriterBuilder(
FileFormat format, String objectModelName, EncryptedOutputFile outputFile) {
return writerFor(format, objectModelName, outputFile);
return writerFor(
format, objectModelName, outputFile, ObjectModel.WriteMode.POSITION_DELETE_WRITER);
}

@SuppressWarnings("unchecked")
private static <B extends AppenderBuilder<B, E>, E> WriteBuilder<?, ?, E> writerFor(
FileFormat format, String objectModelName, EncryptedOutputFile outputFile) {
FileFormat format,
String objectModelName,
EncryptedOutputFile outputFile,
ObjectModel.WriteMode mode) {
return new WriteBuilder<>(
((ObjectModel<E>) OBJECT_MODELS.get(new Key(format, objectModelName)))
.<B>appenderBuilder(outputFile.encryptingOutputFile()),
.<B>appenderBuilder(outputFile.encryptingOutputFile(), mode),
outputFile.encryptingOutputFile().location(),
format);
}
Expand Down
27 changes: 6 additions & 21 deletions data/src/main/java/org/apache/iceberg/data/WriteBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,15 @@
*
* The builder wraps the file format specific {@link AppenderBuilder}. To allow further engine and
* file format specific configuration changes for the given writer the {@link
* AppenderBuilder#build(AppenderBuilder.WriteMode)} method is called with the correct parameter to
* create the appender used internally to provide the required functionality.
* AppenderBuilder#build()} method is called to create the appender used internally to provide the
* required functionality.
*
* @param <A> type of the appender
* @param <E> engine specific schema of the input records used for appender initialization
*/
@SuppressWarnings("unchecked")
class WriteBuilder<B extends WriteBuilder<B, A, E>, A extends AppenderBuilder<A, E>, E>
implements org.apache.iceberg.data.AppenderBuilder<B, E>,
DataWriterBuilder<B, E>,
implements DataWriterBuilder<B, E>,
EqualityDeleteWriterBuilder<B, E>,
PositionDeleteWriterBuilder<B, E> {
private final AppenderBuilder<A, E> appenderBuilder;
Expand Down Expand Up @@ -184,11 +183,6 @@ public B withSortOrder(SortOrder newSortOrder) {
return (B) this;
}

@Override
public <D> FileAppender<D> appender() throws IOException {
return appenderBuilder.build(AppenderBuilder.WriteMode.DATA_WRITER);
}

@Override
public <D> DataWriter<D> dataWriter() throws IOException {
Preconditions.checkArgument(spec != null, "Cannot create data writer without spec");
Expand All @@ -197,13 +191,7 @@ public <D> DataWriter<D> dataWriter() throws IOException {
"Partition must not be null when creating data writer for partitioned spec");

return new DataWriter<>(
appenderBuilder.build(AppenderBuilder.WriteMode.DATA_WRITER),
format,
location,
spec,
partition,
keyMetadata,
sortOrder);
appenderBuilder.build(), format, location, spec, partition, keyMetadata, sortOrder);
}

@Override
Expand All @@ -227,7 +215,7 @@ public <D> EqualityDeleteWriter<D> equalityDeleteWriter() throws IOException {
IntStream.of(equalityFieldIds)
.mapToObj(Objects::toString)
.collect(Collectors.joining(", ")))
.build(AppenderBuilder.WriteMode.EQUALITY_DELETE_WRITER),
.build(),
format,
location,
spec,
Expand All @@ -251,10 +239,7 @@ public <D> PositionDeleteWriter<D> positionDeleteWriter() throws IOException {
appenderBuilder
.meta("delete-type", "position")
.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema))
.build(
rowSchema != null
? AppenderBuilder.WriteMode.POSITION_DELETE_WITH_ROW_WRITER
: AppenderBuilder.WriteMode.POSITION_DELETE_WRITER),
.build(),
format,
location,
spec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ interface WriterBuilderBase<B extends WriterBuilderBase<B, E>, E> {
B schema(Schema newSchema);

/**
* Sets the engine specific schema for the input. Used by the {@link
* AppenderBuilder#build(AppenderBuilder.WriteMode)} to configure the engine specific converters.
* Sets the engine specific schema for the input. Used by the {@link AppenderBuilder#build()} to
* configure the engine specific converters.
*/
B engineSchema(E engineSchema);

Expand Down
Loading