Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ private FormatModelRegistry() {}
ImmutableList.of(
"org.apache.iceberg.data.GenericFormatModels",
"org.apache.iceberg.arrow.vectorized.ArrowFormatModels",
"org.apache.iceberg.flink.data.FlinkFormatModels");
"org.apache.iceberg.flink.data.FlinkFormatModels",
"org.apache.iceberg.spark.source.SparkFormatModels");

// Format models indexed by file format and object model class
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>> MODELS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -167,6 +169,21 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException {
}
}

@Benchmark
@Threads(1)
public void readUsingRegistryReader(Blackhole blackHole) throws IOException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to test the direct method vs the registry method? I would expect this to replace the current readUsingIcebergReaderUnsafe implementation since this is the same reader implementation. We should make sure that there is not a regression by running these benchmarks (for which it would be fine to leave this method here) but I don't want to accumulate essentially dead code testing the same thing.

try (CloseableIterable<InternalRow> rows =
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(SCHEMA)
.build()) {

for (InternalRow row : rows) {
blackHole.consume(row);
}
}
}

@Benchmark
@Threads(1)
public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException {
Expand Down Expand Up @@ -226,4 +243,19 @@ public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOExc
}
}
}

@Benchmark
@Threads(1)
public void readWithProjectionUsingRegistryReader(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows =
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
.build()) {

for (InternalRow row : rows) {
blackhole.consume(row);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -165,6 +167,21 @@ public void readUsingSparkReader(Blackhole blackhole) throws IOException {
}
}

@Benchmark
@Threads(1)
public void readUsingRegistryReader(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows =
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(SCHEMA)
.build()) {

for (InternalRow row : rows) {
blackhole.consume(row);
}
}
}

@Benchmark
@Threads(1)
public void readWithProjectionUsingIcebergReader(Blackhole blackhole) throws IOException {
Expand Down Expand Up @@ -224,4 +241,19 @@ public void readWithProjectionUsingSparkReader(Blackhole blackhole) throws IOExc
}
}
}

@Benchmark
@Threads(1)
public void readWithProjectionUsingRegistryReader(Blackhole blackhole) throws IOException {
try (CloseableIterable<InternalRow> rows =
FormatModelRegistry.readBuilder(
FileFormat.PARQUET, InternalRow.class, Files.localInput(dataFile))
.project(PROJECTED_SCHEMA)
.build()) {

for (InternalRow row : rows) {
blackhole.consume(row);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@

import java.io.File;
import java.io.IOException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.SparkSchemaUtil;
Expand Down Expand Up @@ -121,10 +126,28 @@ public void writeUsingSparkWriter() throws IOException {
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.parquet.fieldId.write.enabled", "false")
.set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests were failing with Spark 4.1, but probably doesn't worth to create a new PR for this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay with this since it isn't production code. It's unlikely that this is going to cause problems cherry-picking.

.schema(SCHEMA)
.build()) {

writer.addAll(rows);
}
}

@Benchmark
@Threads(1)
public void writeUsingRegistryWriter() throws IOException {
Comment thread
pvary marked this conversation as resolved.
Outdated
try (DataWriter<InternalRow> writer =
FormatModelRegistry.dataWriteBuilder(
FileFormat.PARQUET,
InternalRow.class,
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
.schema(SCHEMA)
.engineSchema(SparkSchemaUtil.convert(SCHEMA))
.spec(PartitionSpec.unpartitioned())
.build()) {

writer.write(rows);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@

import java.io.File;
import java.io.IOException;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.spark.SparkSchemaUtil;
Expand Down Expand Up @@ -121,10 +126,28 @@ public void writeUsingSparkWriter() throws IOException {
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.parquet.fieldId.write.enabled", "false")
.set("spark.sql.parquet.variant.annotateLogicalType.enabled", "false")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests were failing with Spark 4.1, but probably doesn't worth to create a new PR for this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we file an issue to track the underlying Spark 4.1 test failure, so we can fix the root cause later?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both "true" and "false" is ok as well. The issue was that the config was not set.

.schema(SCHEMA)
.build()) {

writer.addAll(rows);
}
}

@Benchmark
@Threads(1)
public void writeUsingRegistryWriter() throws IOException {
try (DataWriter<InternalRow> writer =
FormatModelRegistry.dataWriteBuilder(
FileFormat.PARQUET,
InternalRow.class,
EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(dataFile)))
.schema(SCHEMA)
.engineSchema(SparkSchemaUtil.convert(SCHEMA))
Comment thread
pvary marked this conversation as resolved.
Outdated
.spec(PartitionSpec.unpartitioned())
.build()) {

writer.write(rows);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -681,6 +680,13 @@ public CloseableIterable<Record> reader(
return positionDeletesReader(inputFile, format, spec);
}

@Override
public PositionDeleteWriter<Record> writer(
Comment thread
pvary marked this conversation as resolved.
Outdated
OutputFile outputFile, FileFormat format, PartitionSpec spec, StructLike partition)
throws IOException {
return positionDeletesWriter(outputFile, format, spec, partition, null);
}

@Override
public PositionDeleteWriter<Record> writer(
OutputFile outputFile,
Expand Down Expand Up @@ -719,32 +725,10 @@ private ForeachFunction<DeleteFile> rewritePositionDelete(

private static CloseableIterable<Record> positionDeletesReader(
InputFile inputFile, FileFormat format, PartitionSpec spec) {
Schema deleteSchema = DeleteSchemaUtil.posDeleteReadSchema(spec.schema());
switch (format) {
case AVRO:
return Avro.read(inputFile)
.project(deleteSchema)
.reuseContainers()
.createReaderFunc(fileSchema -> PlannedDataReader.create(deleteSchema))
.build();

case PARQUET:
return Parquet.read(inputFile)
.project(deleteSchema)
.reuseContainers()
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema))
.build();

case ORC:
return ORC.read(inputFile)
.project(deleteSchema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema))
.build();

default:
throw new UnsupportedOperationException("Unsupported file format: " + format);
}
return FormatModelRegistry.readBuilder(format, Record.class, inputFile)
.project(DeleteSchemaUtil.posDeleteReadSchema(spec.schema()))
.reuseContainers()
.build();
}

private static PositionDeleteWriter<Record> positionDeletesWriter(
Expand All @@ -754,30 +738,37 @@ private static PositionDeleteWriter<Record> positionDeletesWriter(
StructLike partition,
Schema rowSchema)
throws IOException {
switch (format) {
case AVRO:
return Avro.writeDeletes(outputFile)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case PARQUET:
return Parquet.writeDeletes(outputFile)
.createWriterFunc(GenericParquetWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case ORC:
return ORC.writeDeletes(outputFile)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
default:
throw new UnsupportedOperationException("Unsupported file format: " + format);
if (rowSchema == null) {
return FormatModelRegistry.<Record>positionDeleteWriteBuilder(
format, EncryptedFiles.plainAsEncryptedOutput(outputFile))
.partition(partition)
.spec(spec)
.build();
} else {
return switch (format) {
case AVRO ->
Avro.writeDeletes(outputFile)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case PARQUET ->
Parquet.writeDeletes(outputFile)
.createWriterFunc(GenericParquetWriter::create)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
case ORC ->
ORC.writeDeletes(outputFile)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.rowSchema(rowSchema)
.withSpec(spec)
.buildPositionWriter();
default -> throw new UnsupportedOperationException("Unsupported file format: " + format);
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,7 +77,7 @@ public static ColumnarBatchReader buildReader(
return buildReader(expectedSchema, fileSchema, idToConstant, ArrowAllocation.rootAllocator());
}

public static CometColumnarBatchReader buildCometReader(
public static VectorizedReader<ColumnarBatch> buildCometReader(
Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> idToConstant) {
return (CometColumnarBatchReader)
TypeWithSchemaVisitor.visit(
Expand All @@ -88,6 +90,13 @@ public static CometColumnarBatchReader buildCometReader(
readers -> new CometColumnarBatchReader(readers, expectedSchema)));
}

/** A subclass of ColumnarBatch to identify Comet readers. */
public static class CometColumnarBatch extends ColumnarBatch {
public CometColumnarBatch(ColumnVector[] columns) {
super(columns);
}
}

// enables unsafe memory access to avoid costly checks to see if index is within bounds
// as long as it is not configured explicitly (see BoundsChecking in Arrow)
private static void enableUnsafeMemoryAccess() {
Expand Down
Loading
Loading