From e99b5d0f1fca477026ae9a8ab717b93ea58b4d1a Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 29 May 2026 15:09:05 +0100 Subject: [PATCH 1/3] Add support for constant column readers --- .../iceberg/io/TestFileWriterFactory.java | 3 - .../iceberg/io/TestPartitioningWriters.java | 6 - .../iceberg/spark/data/SparkVortexReader.java | 121 +++++++++--- .../VectorizedSparkVortexReaders.java | 83 ++++---- .../iceberg/spark/data/SparkVortexReader.java | 121 +++++++++--- .../VectorizedSparkVortexReaders.java | 83 ++++---- .../iceberg/spark/data/SparkVortexReader.java | 120 ++++++++++-- .../VectorizedSparkVortexReaders.java | 83 ++++---- .../data/vortex/GenericVortexReader.java | 94 +++++++-- .../data/vortex/GenericVortexReaders.java | 54 +++++- .../data/vortex/GenericVortexWriter.java | 21 ++ .../iceberg/vortex/VortexFormatModel.java | 25 ++- .../apache/iceberg/vortex/VortexIterable.java | 27 ++- .../vortex/VortexSchemaWithTypeVisitor.java | 37 +++- .../vortex/TestGenericReadProjection.java | 62 ++---- .../vortex/TestVortexConstantReaders.java | 179 ++++++++++++++++++ 16 files changed, 862 insertions(+), 257 deletions(-) create mode 100644 vortex/src/test/java/org/apache/iceberg/vortex/TestVortexConstantReaders.java diff --git a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java index 73b21dcd2b9c..3853da724ee9 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java +++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java @@ -165,9 +165,6 @@ public void testEqualityDeleteWriter() throws IOException { @TestTemplate public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException { assumeThat(partitioned).isFalse(); - // Mixed-spec scans supply partition values via idToConstant, which the Vortex generic reader - // does not yet inject (no constant readers — see GenericVortexReader TODO). - assumeThat(fileFormat).isNotEqualTo(FileFormat.VORTEX); List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); Schema equalityDeleteRowSchema = table.schema().select("id"); diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java index 597d32684b50..8751096112e9 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java +++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java @@ -179,9 +179,6 @@ public void testClusteredEqualityDeleteWriterNoRecords() throws IOException { @TestTemplate public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException { - // Mixed-spec scans supply partition values via idToConstant, which the Vortex generic reader - // does not yet inject (no constant readers — see GenericVortexReader TODO). - assumeThat(fileFormat).isNotEqualTo(FileFormat.VORTEX); List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); Schema equalityDeleteRowSchema = table.schema().select("id"); FileWriterFactory writerFactory = @@ -247,9 +244,6 @@ public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException @TestTemplate public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() throws IOException { - // Mixed-spec scans supply partition values via idToConstant, which the Vortex generic reader - // does not yet inject (no constant readers — see GenericVortexReader TODO). - assumeThat(fileFormat).isNotEqualTo(FileFormat.VORTEX); List equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId()); Schema equalityDeleteRowSchema = table.schema().select("id"); FileWriterFactory writerFactory = diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index 9116d2a55279..bf5c97964716 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -18,15 +18,18 @@ */ package org.apache.iceberg.spark.data; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.data.vortex.GenericVortexReaders; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexRowReader; @@ -38,35 +41,96 @@ /** Read Vortex as Spark {@link InternalRow}. */ public class SparkVortexReader implements VortexRowReader { - private final List> fieldReaders; + // Parallel arrays indexed by position in the expected (projected) schema. A field is read either + // from a file column ({@code columnNames[i]} is the Arrow column name) or as a constant + // ({@code columnNames[i]} is null and {@code readers[i]} is a constant reader). + private final VortexValueReader[] readers; + private final String[] columnNames; + + // Resolves expected field position -> Arrow batch column index. Vortex only returns the projected + // (non-constant, file-resident) columns, so this mapping is computed by name from the first batch + // rather than assuming the batch is positionally aligned with the expected schema. -1 marks a + // constant field that is not backed by a batch column. + private int[] batchColumnIndex; public SparkVortexReader( Schema readSchema, org.apache.arrow.vector.types.pojo.Schema fileArrowSchema, Map idToConstant) { - List fields = fileArrowSchema.getFields(); + Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; + + List fileFields = fileArrowSchema.getFields(); + Map arrowFieldsByName = Maps.newHashMapWithExpectedSize(fileFields.size()); + for (Field field : fileFields) { + arrowFieldsByName.put(field.getName(), field); + } + List expected = readSchema.columns(); - this.fieldReaders = Lists.newArrayListWithExpectedSize(expected.size()); + this.readers = new VortexValueReader[expected.size()]; + this.columnNames = new String[expected.size()]; + for (int i = 0; i < expected.size(); i++) { - Type icebergType = expected.get(i).type(); - Field arrowField = fields.get(i); - this.fieldReaders.add( - VortexSchemaWithTypeVisitor.visit(icebergType, arrowField, SparkReadBuilder.INSTANCE)); + Types.NestedField field = expected.get(i); + int id = field.fieldId(); + if (constants.containsKey(id)) { + // Identity-partition value or metadata column (e.g. _file, _spec_id, _partition) supplied + // through idToConstant instead of being stored in the data file. + this.readers[i] = GenericVortexReaders.constants(constants.get(id)); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + this.readers[i] = GenericVortexReaders.constants(false); + } else { + Field arrowField = arrowFieldsByName.get(field.name()); + if (arrowField == null) { + // Field is neither a constant nor present in the data file; fill with null. + this.readers[i] = GenericVortexReaders.constants(null); + } else { + this.readers[i] = + VortexSchemaWithTypeVisitor.visit( + field.type(), arrowField, SparkReadBuilder.INSTANCE); + this.columnNames[i] = arrowField.getName(); + } + } } } @Override public InternalRow read(VectorSchemaRoot batch, int row) { - GenericInternalRow result = new GenericInternalRow(fieldReaders.size()); - for (int i = 0; i < fieldReaders.size(); i++) { - VortexValueReader reader = fieldReaders.get(i); - result.update(i, reader.read(batch.getVector(i), row)); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(batch); + } + + GenericInternalRow result = new GenericInternalRow(readers.length); + for (int i = 0; i < readers.length; i++) { + int columnIndex = batchColumnIndex[i]; + FieldVector vector = columnIndex < 0 ? null : batch.getVector(columnIndex); + result.update(i, readers[i].read(vector, row)); } return result; } - static class SparkReadBuilder extends VortexSchemaWithTypeVisitor> { + private int[] resolveColumns(VectorSchemaRoot batch) { + List vectors = batch.getFieldVectors(); + Map nameToIndex = Maps.newHashMapWithExpectedSize(vectors.size()); + for (int i = 0; i < vectors.size(); i++) { + nameToIndex.put(vectors.get(i).getField().getName(), i); + } + int[] indexes = new int[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + if (columnNames[i] == null) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(columnNames[i]); + Preconditions.checkState( + index != null, "Vortex batch is missing projected column: %s", columnNames[i]); + indexes[i] = index; + } + } + + return indexes; + } + + static class SparkReadBuilder extends VortexSchemaWithTypeVisitor> { static final SparkReadBuilder INSTANCE = new SparkReadBuilder(); private SparkReadBuilder() {} @@ -74,7 +138,7 @@ private SparkReadBuilder() {} @Override public VortexValueReader struct( Types.StructType schema, List fields, List> children) { - return new StructReader(children); + return new StructReader(fields, children); } @Override @@ -110,22 +174,33 @@ public VortexValueReader primitive(Type.PrimitiveType icebergType, Field prim } static class StructReader implements VortexValueReader { + // File column name backing each expected field, or null when the field is absent from the file. + private final String[] childNames; + private final List> fieldReaders; - private final List> fields; - - private StructReader(List> fields) { - this.fields = fields; + private StructReader(List fields, List> fieldReaders) { + this.fieldReaders = fieldReaders; + this.childNames = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + this.childNames[i] = field == null ? null : field.getName(); + } } @Override public InternalRow readNonNull(FieldVector vector, int row) { org.apache.arrow.vector.complex.StructVector struct = (org.apache.arrow.vector.complex.StructVector) vector; - GenericInternalRow result = new GenericInternalRow(fields.size()); - for (int i = 0; i < fields.size(); i++) { - VortexValueReader fieldReader = fields.get(i); - FieldVector child = (FieldVector) struct.getChildByOrdinal(i); - result.update(i, fieldReader.read(child, row)); + GenericInternalRow result = new GenericInternalRow(fieldReaders.size()); + for (int i = 0; i < fieldReaders.size(); i++) { + VortexValueReader fieldReader = fieldReaders.get(i); + if (fieldReader == null) { + // Expected field is not present in the file struct; project it as null. + result.update(i, null); + } else { + FieldVector child = (FieldVector) struct.getChild(childNames[i]); + result.update(i, fieldReader.read(child, row)); + } } return result; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java index e7045540cb68..1535509f53f1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java @@ -18,14 +18,15 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexBatchReader; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; @@ -38,54 +39,72 @@ public static VortexBatchReader buildReader( Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema, Map idToConstant) { - return new SchemaCachingBatchReader(icebergSchema, vortexSchema, idToConstant); + return new ConstantAwareBatchReader(icebergSchema, idToConstant); } - static final class SchemaCachingBatchReader implements VortexBatchReader { - private final Schema readerSchema; + static final class ConstantAwareBatchReader implements VortexBatchReader { + private final List columns; private final Map idToConstant; - private final List schemaMapping; - SchemaCachingBatchReader( - Schema readerSchema, - org.apache.arrow.vector.types.pojo.Schema vortexSchema, - Map idToConstant) { - this.readerSchema = readerSchema; - this.idToConstant = idToConstant; - this.schemaMapping = vortexSchemaMapping(readerSchema, vortexSchema); + // Resolves expected column position -> Arrow batch column index, computed by name from the + // first batch. -1 marks a constant column not backed by a batch column. Vortex returns only the + // projected (non-constant, file-resident) columns, so the batch is not positionally aligned + // with + // the reader schema. + private int[] batchColumnIndex; + + ConstantAwareBatchReader(Schema readerSchema, Map idToConstant) { + this.columns = readerSchema.columns(); + this.idToConstant = idToConstant == null ? Collections.emptyMap() : idToConstant; } @Override public ColumnarBatch read(VectorSchemaRoot batch) { int rowCount = batch.getRowCount(); - Map vectors = Maps.newHashMap(); + List fieldVectors = batch.getFieldVectors(); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(fieldVectors); + } - for (Map.Entry entry : idToConstant.entrySet()) { - Integer fieldId = entry.getKey(); - Object constant = entry.getValue(); - if (MetadataColumns.isMetadataColumn(fieldId)) { - continue; + // Build columns in reader-schema order so they line up with Spark's expected output schema. + ColumnVector[] vectors = new ColumnVector[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + int columnIndex = batchColumnIndex[i]; + if (columnIndex >= 0) { + vectors[i] = new ArrowColumnVector(fieldVectors.get(columnIndex)); + } else if (idToConstant.containsKey(field.fieldId())) { + vectors[i] = + new ConstantColumnVector(field.type(), rowCount, idToConstant.get(field.fieldId())); + } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { + vectors[i] = new ConstantColumnVector(Types.BooleanType.get(), rowCount, false); + } else { + // Column is neither a constant nor present in the data file; surface nulls. + vectors[i] = new ConstantColumnVector(field.type(), rowCount, null); } - - vectors.put( - fieldId, new ConstantColumnVector(readerSchema.findType(fieldId), rowCount, constant)); } - List fieldVectors = batch.getFieldVectors(); + return new ColumnarBatch(vectors, rowCount); + } + + private int[] resolveColumns(List fieldVectors) { + Map nameToIndex = Maps.newHashMapWithExpectedSize(fieldVectors.size()); for (int i = 0; i < fieldVectors.size(); i++) { - int fieldId = schemaMapping.get(i); - vectors.put(fieldId, new ArrowColumnVector(fieldVectors.get(i))); + nameToIndex.put(fieldVectors.get(i).getField().getName(), i); } - return new ColumnarBatch(vectors.values().toArray(new ColumnVector[0]), rowCount); - } + int[] indexes = new int[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + if (idToConstant.containsKey(field.fieldId())) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(field.name()); + indexes[i] = index == null ? -1 : index; + } + } - // Mapping from Arrow Schema field index to Iceberg Field ID. - static List vortexSchemaMapping( - Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema) { - return vortexSchema.getFields().stream() - .map(field -> icebergSchema.findField(field.getName()).fieldId()) - .collect(Collectors.toList()); + return indexes; } } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index 9116d2a55279..bf5c97964716 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -18,15 +18,18 @@ */ package org.apache.iceberg.spark.data; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.data.vortex.GenericVortexReaders; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexRowReader; @@ -38,35 +41,96 @@ /** Read Vortex as Spark {@link InternalRow}. */ public class SparkVortexReader implements VortexRowReader { - private final List> fieldReaders; + // Parallel arrays indexed by position in the expected (projected) schema. A field is read either + // from a file column ({@code columnNames[i]} is the Arrow column name) or as a constant + // ({@code columnNames[i]} is null and {@code readers[i]} is a constant reader). + private final VortexValueReader[] readers; + private final String[] columnNames; + + // Resolves expected field position -> Arrow batch column index. Vortex only returns the projected + // (non-constant, file-resident) columns, so this mapping is computed by name from the first batch + // rather than assuming the batch is positionally aligned with the expected schema. -1 marks a + // constant field that is not backed by a batch column. + private int[] batchColumnIndex; public SparkVortexReader( Schema readSchema, org.apache.arrow.vector.types.pojo.Schema fileArrowSchema, Map idToConstant) { - List fields = fileArrowSchema.getFields(); + Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; + + List fileFields = fileArrowSchema.getFields(); + Map arrowFieldsByName = Maps.newHashMapWithExpectedSize(fileFields.size()); + for (Field field : fileFields) { + arrowFieldsByName.put(field.getName(), field); + } + List expected = readSchema.columns(); - this.fieldReaders = Lists.newArrayListWithExpectedSize(expected.size()); + this.readers = new VortexValueReader[expected.size()]; + this.columnNames = new String[expected.size()]; + for (int i = 0; i < expected.size(); i++) { - Type icebergType = expected.get(i).type(); - Field arrowField = fields.get(i); - this.fieldReaders.add( - VortexSchemaWithTypeVisitor.visit(icebergType, arrowField, SparkReadBuilder.INSTANCE)); + Types.NestedField field = expected.get(i); + int id = field.fieldId(); + if (constants.containsKey(id)) { + // Identity-partition value or metadata column (e.g. _file, _spec_id, _partition) supplied + // through idToConstant instead of being stored in the data file. + this.readers[i] = GenericVortexReaders.constants(constants.get(id)); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + this.readers[i] = GenericVortexReaders.constants(false); + } else { + Field arrowField = arrowFieldsByName.get(field.name()); + if (arrowField == null) { + // Field is neither a constant nor present in the data file; fill with null. + this.readers[i] = GenericVortexReaders.constants(null); + } else { + this.readers[i] = + VortexSchemaWithTypeVisitor.visit( + field.type(), arrowField, SparkReadBuilder.INSTANCE); + this.columnNames[i] = arrowField.getName(); + } + } } } @Override public InternalRow read(VectorSchemaRoot batch, int row) { - GenericInternalRow result = new GenericInternalRow(fieldReaders.size()); - for (int i = 0; i < fieldReaders.size(); i++) { - VortexValueReader reader = fieldReaders.get(i); - result.update(i, reader.read(batch.getVector(i), row)); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(batch); + } + + GenericInternalRow result = new GenericInternalRow(readers.length); + for (int i = 0; i < readers.length; i++) { + int columnIndex = batchColumnIndex[i]; + FieldVector vector = columnIndex < 0 ? null : batch.getVector(columnIndex); + result.update(i, readers[i].read(vector, row)); } return result; } - static class SparkReadBuilder extends VortexSchemaWithTypeVisitor> { + private int[] resolveColumns(VectorSchemaRoot batch) { + List vectors = batch.getFieldVectors(); + Map nameToIndex = Maps.newHashMapWithExpectedSize(vectors.size()); + for (int i = 0; i < vectors.size(); i++) { + nameToIndex.put(vectors.get(i).getField().getName(), i); + } + int[] indexes = new int[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + if (columnNames[i] == null) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(columnNames[i]); + Preconditions.checkState( + index != null, "Vortex batch is missing projected column: %s", columnNames[i]); + indexes[i] = index; + } + } + + return indexes; + } + + static class SparkReadBuilder extends VortexSchemaWithTypeVisitor> { static final SparkReadBuilder INSTANCE = new SparkReadBuilder(); private SparkReadBuilder() {} @@ -74,7 +138,7 @@ private SparkReadBuilder() {} @Override public VortexValueReader struct( Types.StructType schema, List fields, List> children) { - return new StructReader(children); + return new StructReader(fields, children); } @Override @@ -110,22 +174,33 @@ public VortexValueReader primitive(Type.PrimitiveType icebergType, Field prim } static class StructReader implements VortexValueReader { + // File column name backing each expected field, or null when the field is absent from the file. + private final String[] childNames; + private final List> fieldReaders; - private final List> fields; - - private StructReader(List> fields) { - this.fields = fields; + private StructReader(List fields, List> fieldReaders) { + this.fieldReaders = fieldReaders; + this.childNames = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + this.childNames[i] = field == null ? null : field.getName(); + } } @Override public InternalRow readNonNull(FieldVector vector, int row) { org.apache.arrow.vector.complex.StructVector struct = (org.apache.arrow.vector.complex.StructVector) vector; - GenericInternalRow result = new GenericInternalRow(fields.size()); - for (int i = 0; i < fields.size(); i++) { - VortexValueReader fieldReader = fields.get(i); - FieldVector child = (FieldVector) struct.getChildByOrdinal(i); - result.update(i, fieldReader.read(child, row)); + GenericInternalRow result = new GenericInternalRow(fieldReaders.size()); + for (int i = 0; i < fieldReaders.size(); i++) { + VortexValueReader fieldReader = fieldReaders.get(i); + if (fieldReader == null) { + // Expected field is not present in the file struct; project it as null. + result.update(i, null); + } else { + FieldVector child = (FieldVector) struct.getChild(childNames[i]); + result.update(i, fieldReader.read(child, row)); + } } return result; } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java index e7045540cb68..1535509f53f1 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java @@ -18,14 +18,15 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexBatchReader; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; @@ -38,54 +39,72 @@ public static VortexBatchReader buildReader( Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema, Map idToConstant) { - return new SchemaCachingBatchReader(icebergSchema, vortexSchema, idToConstant); + return new ConstantAwareBatchReader(icebergSchema, idToConstant); } - static final class SchemaCachingBatchReader implements VortexBatchReader { - private final Schema readerSchema; + static final class ConstantAwareBatchReader implements VortexBatchReader { + private final List columns; private final Map idToConstant; - private final List schemaMapping; - SchemaCachingBatchReader( - Schema readerSchema, - org.apache.arrow.vector.types.pojo.Schema vortexSchema, - Map idToConstant) { - this.readerSchema = readerSchema; - this.idToConstant = idToConstant; - this.schemaMapping = vortexSchemaMapping(readerSchema, vortexSchema); + // Resolves expected column position -> Arrow batch column index, computed by name from the + // first batch. -1 marks a constant column not backed by a batch column. Vortex returns only the + // projected (non-constant, file-resident) columns, so the batch is not positionally aligned + // with + // the reader schema. + private int[] batchColumnIndex; + + ConstantAwareBatchReader(Schema readerSchema, Map idToConstant) { + this.columns = readerSchema.columns(); + this.idToConstant = idToConstant == null ? Collections.emptyMap() : idToConstant; } @Override public ColumnarBatch read(VectorSchemaRoot batch) { int rowCount = batch.getRowCount(); - Map vectors = Maps.newHashMap(); + List fieldVectors = batch.getFieldVectors(); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(fieldVectors); + } - for (Map.Entry entry : idToConstant.entrySet()) { - Integer fieldId = entry.getKey(); - Object constant = entry.getValue(); - if (MetadataColumns.isMetadataColumn(fieldId)) { - continue; + // Build columns in reader-schema order so they line up with Spark's expected output schema. + ColumnVector[] vectors = new ColumnVector[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + int columnIndex = batchColumnIndex[i]; + if (columnIndex >= 0) { + vectors[i] = new ArrowColumnVector(fieldVectors.get(columnIndex)); + } else if (idToConstant.containsKey(field.fieldId())) { + vectors[i] = + new ConstantColumnVector(field.type(), rowCount, idToConstant.get(field.fieldId())); + } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { + vectors[i] = new ConstantColumnVector(Types.BooleanType.get(), rowCount, false); + } else { + // Column is neither a constant nor present in the data file; surface nulls. + vectors[i] = new ConstantColumnVector(field.type(), rowCount, null); } - - vectors.put( - fieldId, new ConstantColumnVector(readerSchema.findType(fieldId), rowCount, constant)); } - List fieldVectors = batch.getFieldVectors(); + return new ColumnarBatch(vectors, rowCount); + } + + private int[] resolveColumns(List fieldVectors) { + Map nameToIndex = Maps.newHashMapWithExpectedSize(fieldVectors.size()); for (int i = 0; i < fieldVectors.size(); i++) { - int fieldId = schemaMapping.get(i); - vectors.put(fieldId, new ArrowColumnVector(fieldVectors.get(i))); + nameToIndex.put(fieldVectors.get(i).getField().getName(), i); } - return new ColumnarBatch(vectors.values().toArray(new ColumnVector[0]), rowCount); - } + int[] indexes = new int[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + if (idToConstant.containsKey(field.fieldId())) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(field.name()); + indexes[i] = index == null ? -1 : index; + } + } - // Mapping from Arrow Schema field index to Iceberg Field ID. - static List vortexSchemaMapping( - Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema) { - return vortexSchema.getFields().stream() - .map(field -> icebergSchema.findField(field.getName()).fieldId()) - .collect(Collectors.toList()); + return indexes; } } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index 63c4e0edc62e..bf5c97964716 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -18,15 +18,18 @@ */ package org.apache.iceberg.spark.data; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.data.vortex.GenericVortexReaders; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexRowReader; @@ -37,33 +40,96 @@ /** Read Vortex as Spark {@link InternalRow}. */ public class SparkVortexReader implements VortexRowReader { - private final List> fieldReaders; + + // Parallel arrays indexed by position in the expected (projected) schema. A field is read either + // from a file column ({@code columnNames[i]} is the Arrow column name) or as a constant + // ({@code columnNames[i]} is null and {@code readers[i]} is a constant reader). + private final VortexValueReader[] readers; + private final String[] columnNames; + + // Resolves expected field position -> Arrow batch column index. Vortex only returns the projected + // (non-constant, file-resident) columns, so this mapping is computed by name from the first batch + // rather than assuming the batch is positionally aligned with the expected schema. -1 marks a + // constant field that is not backed by a batch column. + private int[] batchColumnIndex; public SparkVortexReader( Schema readSchema, org.apache.arrow.vector.types.pojo.Schema fileArrowSchema, Map idToConstant) { - List fields = fileArrowSchema.getFields(); + Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; + + List fileFields = fileArrowSchema.getFields(); + Map arrowFieldsByName = Maps.newHashMapWithExpectedSize(fileFields.size()); + for (Field field : fileFields) { + arrowFieldsByName.put(field.getName(), field); + } + List expected = readSchema.columns(); - this.fieldReaders = Lists.newArrayListWithExpectedSize(expected.size()); + this.readers = new VortexValueReader[expected.size()]; + this.columnNames = new String[expected.size()]; + for (int i = 0; i < expected.size(); i++) { - Type icebergType = expected.get(i).type(); - Field arrowField = fields.get(i); - this.fieldReaders.add( - VortexSchemaWithTypeVisitor.visit(icebergType, arrowField, SparkReadBuilder.INSTANCE)); + Types.NestedField field = expected.get(i); + int id = field.fieldId(); + if (constants.containsKey(id)) { + // Identity-partition value or metadata column (e.g. _file, _spec_id, _partition) supplied + // through idToConstant instead of being stored in the data file. + this.readers[i] = GenericVortexReaders.constants(constants.get(id)); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + this.readers[i] = GenericVortexReaders.constants(false); + } else { + Field arrowField = arrowFieldsByName.get(field.name()); + if (arrowField == null) { + // Field is neither a constant nor present in the data file; fill with null. + this.readers[i] = GenericVortexReaders.constants(null); + } else { + this.readers[i] = + VortexSchemaWithTypeVisitor.visit( + field.type(), arrowField, SparkReadBuilder.INSTANCE); + this.columnNames[i] = arrowField.getName(); + } + } } } @Override public InternalRow read(VectorSchemaRoot batch, int row) { - GenericInternalRow result = new GenericInternalRow(fieldReaders.size()); - for (int i = 0; i < fieldReaders.size(); i++) { - VortexValueReader reader = fieldReaders.get(i); - result.update(i, reader.read(batch.getVector(i), row)); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(batch); + } + + GenericInternalRow result = new GenericInternalRow(readers.length); + for (int i = 0; i < readers.length; i++) { + int columnIndex = batchColumnIndex[i]; + FieldVector vector = columnIndex < 0 ? null : batch.getVector(columnIndex); + result.update(i, readers[i].read(vector, row)); } return result; } + private int[] resolveColumns(VectorSchemaRoot batch) { + List vectors = batch.getFieldVectors(); + Map nameToIndex = Maps.newHashMapWithExpectedSize(vectors.size()); + for (int i = 0; i < vectors.size(); i++) { + nameToIndex.put(vectors.get(i).getField().getName(), i); + } + + int[] indexes = new int[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + if (columnNames[i] == null) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(columnNames[i]); + Preconditions.checkState( + index != null, "Vortex batch is missing projected column: %s", columnNames[i]); + indexes[i] = index; + } + } + + return indexes; + } + static class SparkReadBuilder extends VortexSchemaWithTypeVisitor> { static final SparkReadBuilder INSTANCE = new SparkReadBuilder(); @@ -72,7 +138,7 @@ private SparkReadBuilder() {} @Override public VortexValueReader struct( Types.StructType schema, List fields, List> children) { - return new StructReader(children); + return new StructReader(fields, children); } @Override @@ -108,21 +174,33 @@ public VortexValueReader primitive(Type.PrimitiveType icebergType, Field prim } static class StructReader implements VortexValueReader { - private final List> fields; + // File column name backing each expected field, or null when the field is absent from the file. + private final String[] childNames; + private final List> fieldReaders; - private StructReader(List> fields) { - this.fields = fields; + private StructReader(List fields, List> fieldReaders) { + this.fieldReaders = fieldReaders; + this.childNames = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + this.childNames[i] = field == null ? null : field.getName(); + } } @Override public InternalRow readNonNull(FieldVector vector, int row) { org.apache.arrow.vector.complex.StructVector struct = (org.apache.arrow.vector.complex.StructVector) vector; - GenericInternalRow result = new GenericInternalRow(fields.size()); - for (int i = 0; i < fields.size(); i++) { - VortexValueReader fieldReader = fields.get(i); - FieldVector child = (FieldVector) struct.getChildByOrdinal(i); - result.update(i, fieldReader.read(child, row)); + GenericInternalRow result = new GenericInternalRow(fieldReaders.size()); + for (int i = 0; i < fieldReaders.size(); i++) { + VortexValueReader fieldReader = fieldReaders.get(i); + if (fieldReader == null) { + // Expected field is not present in the file struct; project it as null. + result.update(i, null); + } else { + FieldVector child = (FieldVector) struct.getChild(childNames[i]); + result.update(i, fieldReader.read(child, row)); + } } return result; } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java index e7045540cb68..1535509f53f1 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java @@ -18,14 +18,15 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexBatchReader; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; @@ -38,54 +39,72 @@ public static VortexBatchReader buildReader( Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema, Map idToConstant) { - return new SchemaCachingBatchReader(icebergSchema, vortexSchema, idToConstant); + return new ConstantAwareBatchReader(icebergSchema, idToConstant); } - static final class SchemaCachingBatchReader implements VortexBatchReader { - private final Schema readerSchema; + static final class ConstantAwareBatchReader implements VortexBatchReader { + private final List columns; private final Map idToConstant; - private final List schemaMapping; - SchemaCachingBatchReader( - Schema readerSchema, - org.apache.arrow.vector.types.pojo.Schema vortexSchema, - Map idToConstant) { - this.readerSchema = readerSchema; - this.idToConstant = idToConstant; - this.schemaMapping = vortexSchemaMapping(readerSchema, vortexSchema); + // Resolves expected column position -> Arrow batch column index, computed by name from the + // first batch. -1 marks a constant column not backed by a batch column. Vortex returns only the + // projected (non-constant, file-resident) columns, so the batch is not positionally aligned + // with + // the reader schema. + private int[] batchColumnIndex; + + ConstantAwareBatchReader(Schema readerSchema, Map idToConstant) { + this.columns = readerSchema.columns(); + this.idToConstant = idToConstant == null ? Collections.emptyMap() : idToConstant; } @Override public ColumnarBatch read(VectorSchemaRoot batch) { int rowCount = batch.getRowCount(); - Map vectors = Maps.newHashMap(); + List fieldVectors = batch.getFieldVectors(); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(fieldVectors); + } - for (Map.Entry entry : idToConstant.entrySet()) { - Integer fieldId = entry.getKey(); - Object constant = entry.getValue(); - if (MetadataColumns.isMetadataColumn(fieldId)) { - continue; + // Build columns in reader-schema order so they line up with Spark's expected output schema. + ColumnVector[] vectors = new ColumnVector[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + int columnIndex = batchColumnIndex[i]; + if (columnIndex >= 0) { + vectors[i] = new ArrowColumnVector(fieldVectors.get(columnIndex)); + } else if (idToConstant.containsKey(field.fieldId())) { + vectors[i] = + new ConstantColumnVector(field.type(), rowCount, idToConstant.get(field.fieldId())); + } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { + vectors[i] = new ConstantColumnVector(Types.BooleanType.get(), rowCount, false); + } else { + // Column is neither a constant nor present in the data file; surface nulls. + vectors[i] = new ConstantColumnVector(field.type(), rowCount, null); } - - vectors.put( - fieldId, new ConstantColumnVector(readerSchema.findType(fieldId), rowCount, constant)); } - List fieldVectors = batch.getFieldVectors(); + return new ColumnarBatch(vectors, rowCount); + } + + private int[] resolveColumns(List fieldVectors) { + Map nameToIndex = Maps.newHashMapWithExpectedSize(fieldVectors.size()); for (int i = 0; i < fieldVectors.size(); i++) { - int fieldId = schemaMapping.get(i); - vectors.put(fieldId, new ArrowColumnVector(fieldVectors.get(i))); + nameToIndex.put(fieldVectors.get(i).getField().getName(), i); } - return new ColumnarBatch(vectors.values().toArray(new ColumnVector[0]), rowCount); - } + int[] indexes = new int[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + if (idToConstant.containsKey(field.fieldId())) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(field.name()); + indexes[i] = index == null ? -1 : index; + } + } - // Mapping from Arrow Schema field index to Iceberg Field ID. - static List vortexSchemaMapping( - Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema) { - return vortexSchema.getFields().stream() - .map(field -> icebergSchema.findField(field.getName()).fieldId()) - .collect(Collectors.toList()); + return indexes; } } } diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java index b58ad77b2759..16ca29ff4053 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java @@ -27,10 +27,12 @@ import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexRowReader; @@ -40,21 +42,57 @@ public class GenericVortexReader implements VortexRowReader { private final Types.StructType structType; - private final List> fieldReaders; + + // Parallel arrays indexed by position in the expected (projected) struct. A field is read either + // from a file column ({@code columnNames[i]} is the Arrow column name) or as a constant + // ({@code columnNames[i]} is null and {@code readers[i]} is a constant reader). + private final VortexValueReader[] readers; + private final String[] columnNames; + + // Resolves expected field position -> Arrow batch column index. Vortex only returns the projected + // (non-constant, file-resident) columns, so this mapping is computed by name from the first batch + // rather than assuming the batch is positionally aligned with the expected schema. -1 marks a + // constant field that is not backed by a batch column. + private int[] batchColumnIndex; private GenericVortexReader( Schema expectedSchema, org.apache.arrow.vector.types.pojo.Schema fileArrowSchema, Map idToConstant) { this.structType = expectedSchema.asStruct(); - GenericReadBuilder builder = new GenericReadBuilder(idToConstant); + Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; + List fileFields = fileArrowSchema.getFields(); + Map arrowFieldsByName = Maps.newHashMapWithExpectedSize(fileFields.size()); + for (Field field : fileFields) { + arrowFieldsByName.put(field.getName(), field); + } + + GenericReadBuilder builder = new GenericReadBuilder(); List expectedFields = structType.fields(); - this.fieldReaders = Lists.newArrayListWithExpectedSize(expectedFields.size()); + this.readers = new VortexValueReader[expectedFields.size()]; + this.columnNames = new String[expectedFields.size()]; + for (int i = 0; i < expectedFields.size(); i++) { - Type icebergType = expectedFields.get(i).type(); - Field arrowField = fileFields.get(i); - this.fieldReaders.add(VortexSchemaWithTypeVisitor.visit(icebergType, arrowField, builder)); + Types.NestedField field = expectedFields.get(i); + int id = field.fieldId(); + if (constants.containsKey(id)) { + // Identity-partition value or metadata column (e.g. _file, _spec_id, _partition) supplied + // through idToConstant instead of being stored in the data file. + this.readers[i] = GenericVortexReaders.constants(constants.get(id)); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + this.readers[i] = GenericVortexReaders.constants(false); + } else { + Field arrowField = arrowFieldsByName.get(field.name()); + if (arrowField == null) { + // The expected field is neither a constant nor present in the data file (for example an + // unsupplied metadata column). Fill it with null rather than reading a missing column. + this.readers[i] = GenericVortexReaders.constants(null); + } else { + this.readers[i] = VortexSchemaWithTypeVisitor.visit(field.type(), arrowField, builder); + this.columnNames[i] = arrowField.getName(); + } + } } } @@ -72,28 +110,48 @@ public static VortexRowReader buildReader( @Override public Record read(VectorSchemaRoot batch, int row) { + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(batch); + } + GenericRecord record = GenericRecord.create(structType); - for (int i = 0; i < fieldReaders.size(); i++) { - VortexValueReader reader = fieldReaders.get(i); - FieldVector vector = batch.getVector(i); - record.set(i, reader.read(vector, row)); + for (int i = 0; i < readers.length; i++) { + int columnIndex = batchColumnIndex[i]; + FieldVector vector = columnIndex < 0 ? null : batch.getVector(columnIndex); + record.set(i, readers[i].read(vector, row)); } return record; } - @SuppressWarnings("UnusedVariable") - static class GenericReadBuilder extends VortexSchemaWithTypeVisitor> { - // TODO(aduffy): implement constant readers to fill in identity partition values - private final Map idToConstant; + private int[] resolveColumns(VectorSchemaRoot batch) { + List vectors = batch.getFieldVectors(); + Map nameToIndex = Maps.newHashMapWithExpectedSize(vectors.size()); + for (int i = 0; i < vectors.size(); i++) { + nameToIndex.put(vectors.get(i).getField().getName(), i); + } - GenericReadBuilder(Map idToConstant) { - this.idToConstant = idToConstant; + int[] indexes = new int[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + if (columnNames[i] == null) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(columnNames[i]); + Preconditions.checkState( + index != null, "Vortex batch is missing projected column: %s", columnNames[i]); + indexes[i] = index; + } } + return indexes; + } + + static class GenericReadBuilder extends VortexSchemaWithTypeVisitor> { + GenericReadBuilder() {} + @Override public VortexValueReader struct( Types.StructType iStruct, List fields, List> children) { - return GenericVortexReaders.struct(iStruct, children); + return GenericVortexReaders.struct(iStruct, fields, children); } @Override diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java index 4525f514d7e5..26df6752eec7 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java @@ -47,6 +47,7 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types; @@ -110,21 +111,40 @@ public static VortexValueReader timestampTz(String timeZone, boo } public static VortexValueReader struct( - Types.StructType schema, List> readers) { - return new StructReader(schema, readers); + Types.StructType schema, List fields, List> readers) { + return new StructReader(schema, fields, readers); } public static VortexValueReader> list(VortexValueReader elementReader) { return new ListReader<>(elementReader); } + /** + * Returns a reader that always produces {@code constant}, ignoring the Arrow vector and row. + * + *

Used to inject identity-partition values and metadata columns (for example {@code _file} or + * {@code _spec_id}) that are supplied through {@code idToConstant} rather than being read from + * the data file. + */ + public static VortexValueReader constants(C constant) { + return new ConstantReader<>(constant); + } + private static class StructReader implements VortexValueReader { private final Types.StructType schema; + // File column name backing each expected field, or null when the field is absent from the file. + private final String[] childNames; private final List> readers; - private StructReader(Types.StructType schema, List> readers) { + private StructReader( + Types.StructType schema, List fields, List> readers) { this.schema = schema; this.readers = readers; + this.childNames = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + this.childNames[i] = field == null ? null : field.getName(); + } } @Override @@ -133,9 +153,13 @@ public Record readNonNull(FieldVector vector, int row) { GenericRecord record = GenericRecord.create(schema); for (int i = 0; i < readers.size(); i++) { VortexValueReader reader = readers.get(i); - FieldVector child = (FieldVector) struct.getChildByOrdinal(i); - Object value = reader.read(child, row); - record.set(i, value); + if (reader == null) { + // Expected field is not present in the file struct; project it as null. + record.set(i, null); + } else { + FieldVector child = (FieldVector) struct.getChild(childNames[i]); + record.set(i, reader.read(child, row)); + } } return record; } @@ -160,6 +184,24 @@ public List readNonNull(FieldVector vector, int row) { } } + private static class ConstantReader implements VortexValueReader { + private final C constant; + + private ConstantReader(C constant) { + this.constant = constant; + } + + @Override + public C read(FieldVector vector, int row) { + return constant; + } + + @Override + public C readNonNull(FieldVector vector, int row) { + return constant; + } + } + private static class BooleanReader implements VortexValueReader { static final BooleanReader INSTANCE = new BooleanReader(); diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java index 677870ad597f..6d9346b960a5 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java @@ -50,6 +50,7 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.StructVector; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; @@ -205,6 +206,26 @@ private static void writeValue( } listVector.endValue(rowIndex, elements.size()); break; + case STRUCT: + Types.StructType structType = (Types.StructType) type; + StructVector structVector = (StructVector) vector; + Record structValue = (Record) value; + List structFields = structType.fields(); + for (int i = 0; i < structFields.size(); i++) { + Types.NestedField structField = structFields.get(i); + // Bind each Iceberg child to the Arrow child of the same name; the Arrow struct is built + // from the write schema, so names line up even if ordinals were to drift. + FieldVector childVector = (FieldVector) structVector.getChild(structField.name()); + Object childValue = structValue.get(i); + if (childValue == null) { + childVector.setNull(rowIndex); + } else { + writeValue(childVector, structField.type(), childValue, rowIndex); + } + } + // Mark the struct slot itself as non-null for this row. + structVector.setIndexDefined(rowIndex); + break; default: throw new UnsupportedOperationException( "Unsupported Iceberg type for Vortex write: " + type); diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java index 58c3f5b94fb5..822a915132f2 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java @@ -23,15 +23,16 @@ import dev.vortex.jni.NativeRuntime; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.TableProperties; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -45,6 +46,7 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -355,18 +357,23 @@ public CloseableIterable build() { readerFunction.read(schema, fileSchema, engineSchema, idToConstant); } - org.apache.iceberg.Schema readSchema = schema; - if (idToConstant != null) { - List readerFields = - schema.columns().stream() - .filter(field -> !idToConstant.containsKey(field.fieldId())) - .collect(Collectors.toList()); - readSchema = new org.apache.iceberg.Schema(readerFields); + // Compute the columns to scan from the data file. Constants (identity partition values and + // metadata columns such as _file, _spec_id and _partition) come from idToConstant, and + // _is_deleted is synthesized by the reader, so none of those are projected from the file. + // _pos is also excluded and currently resolves to null: Vortex exposes row positions through + // a `row_idx` scan expression that the Java bindings (<= 0.73.0) do not yet surface. + Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; + List projection = Lists.newArrayList(); + for (Types.NestedField field : schema.columns()) { + if (!constants.containsKey(field.fieldId()) + && !MetadataColumns.isMetadataColumn(field.name())) { + projection.add(field.name()); + } } return new VortexIterable<>( inputFile, - readSchema, + projection, filterPredicate, rowRange, readerFunc, diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java index d0d680b983b0..c52c6e0c54c6 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java @@ -30,9 +30,11 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableGroup; @@ -41,7 +43,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,14 +62,14 @@ public class VortexIterable extends CloseableGroup implements CloseableIterab VortexIterable( InputFile inputFile, - Schema icebergSchema, + List projection, Optional filterPredicate, long[] rowRange, Function> readerFunction, Function> batchReaderFunction, int workerThreads) { this.inputFile = inputFile; - this.projection = Lists.transform(icebergSchema.columns(), Types.NestedField::name); + this.projection = projection; this.filterPredicate = filterPredicate; this.rowRange = rowRange; this.rowReaderFunc = readerFunction; @@ -103,7 +105,24 @@ public CloseableIterator iterator() { return ConvertFilterToVortex.convert(icebergFileSchema, icebergExpression); }); - String[] projectionNames = projection.toArray(new String[0]); + // Vortex resolves projected columns by name and errors on any name not in the file. Drop + // requested columns the file does not contain (e.g. fields added after the file was written) so + // the reader fills them with null/constants instead of crashing the scan. Binding is by name: + // Vortex stores no Iceberg field ids (its Java bindings drop Arrow field/schema metadata), so a + // column renamed since write time cannot be rebound to its old physical column here. + Set fileColumns = Sets.newHashSetWithExpectedSize(fileArrowSchema.getFields().size()); + for (Field field : fileArrowSchema.getFields()) { + fileColumns.add(field.getName()); + } + + List presentProjection = Lists.newArrayListWithExpectedSize(projection.size()); + for (String name : projection) { + if (fileColumns.contains(name)) { + presentProjection.add(name); + } + } + + String[] projectionNames = presentProjection.toArray(new String[0]); dev.vortex.api.Expression scanProjection = dev.vortex.api.Expression.select(projectionNames, dev.vortex.api.Expression.root()); diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java index 8bd72d55896e..887d7b4ed831 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java @@ -19,10 +19,12 @@ package org.apache.iceberg.vortex; import java.util.List; +import java.util.Map; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -62,12 +64,35 @@ public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor T visitStruct( Types.StructType struct, List fields, VortexSchemaWithTypeVisitor visitor) { - List results = Lists.newArrayListWithExpectedSize(fields.size()); - for (int fieldId = 0; fieldId < fields.size(); fieldId++) { - Field field = fields.get(fieldId); - Types.NestedField iField = struct != null ? struct.field(fieldId) : null; - results.add(visit(iField != null ? iField.type() : null, field, visitor)); + if (struct == null) { + // No expected Iceberg type to bind to (a file-only column). Walk children positionally; the + // resulting reader is discarded by callers that pass a null target type. + List results = Lists.newArrayListWithExpectedSize(fields.size()); + for (Field field : fields) { + results.add(visit(null, field, visitor)); + } + return visitor.struct(null, fields, results); } - return visitor.struct(struct, fields, results); + + // Arrow/Vortex schemas carry no Iceberg field ids, so expected struct fields are bound to file + // columns by name (the top-level reader resolves columns the same way). Driving the walk from + // the expected fields lets a projection reorder, drop, or add struct fields relative to the + // physical file layout. The returned fields/children are aligned to the expected fields, with a + // null entry wherever the file does not contain the expected field. + Map fileFieldsByName = Maps.newHashMapWithExpectedSize(fields.size()); + for (Field field : fields) { + fileFieldsByName.put(field.getName(), field); + } + + List expectedFields = struct.fields(); + List matchedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); + List results = Lists.newArrayListWithExpectedSize(expectedFields.size()); + for (Types.NestedField expectedField : expectedFields) { + Field fileField = fileFieldsByName.get(expectedField.name()); + matchedFields.add(fileField); + results.add(fileField == null ? null : visit(expectedField.type(), fileField, visitor)); + } + + return visitor.struct(struct, matchedFields, results); } } diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericReadProjection.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericReadProjection.java index ee707ab1f16a..a4cfc020487e 100644 --- a/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericReadProjection.java +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericReadProjection.java @@ -65,67 +65,45 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema } } - // GenericVortexReader and VortexSchemaWithTypeVisitor pair expected Iceberg fields with file - // Arrow fields positionally. Any read schema that does not list columns in the same order as the - // file fails. Renamed fields additionally need field-ID-based matching (Arrow schemas do not - // carry Iceberg field ids), and missing-field defaults need null fill-in support in the reader. - // The tests below stay declared so they re-enable automatically once those gaps are filled. - - @Test - @Override - @Disabled("Vortex reader binds expected fields to file fields by position; reordered read fails") - public void testReorderedFullProjection() {} - - @Test - @Override - @Disabled( - "Vortex reader binds expected fields to file fields by position; column-subset read fails") - public void testBasicProjection() {} + // Projection binds columns by name (see GenericVortexReader and VortexSchemaWithTypeVisitor) and + // the scan drops columns absent from the file (VortexIterable), so reordered, subset, missing, + // nested-struct, and list projections all work. The tests left disabled below each hit a + // *renamed* column (testListOfStructsProjection only in its trailing y->z sub-case): rebinding a + // renamed column to its old physical column requires Iceberg field ids stored in the file. Vortex + // drops Arrow field and schema metadata on write (verified empirically), so there is no field-id + // channel to persist and name-based binding cannot recover a rename. Re-enable if/when Vortex + // preserves field metadata or otherwise exposes field ids. @Test @Override @Disabled( - "Vortex reader binds expected fields to file fields by position; column-subset read fails") - public void testSpecialCharacterProjection() {} - - @Test - @Override - @Disabled( - "Vortex projection asks the file for the read schema's column names; renames are not " - + "resolved by field id") + "Rename resolution needs Iceberg field ids in the file, but Vortex drops Arrow metadata, so " + + "a renamed column cannot be bound to its old physical column by name.") public void testRename() {} @Test @Override @Disabled( - "Vortex projection asks the file for the read schema's column names; renames are not " - + "resolved by field id") + "Rename resolution needs Iceberg field ids in the file, but Vortex drops Arrow metadata, so " + + "a renamed column cannot be bound to its old physical column by name.") public void testRenamedAddedField() {} @Test @Override @Disabled( - "Vortex projection asks the file for the read schema's column names; missing fields are not " - + "filled with nulls/defaults") - public void testReorderedProjection() {} - - @Test - @Override - @Disabled("VortexSchemaWithTypeVisitor walks nested struct children positionally") - public void testNestedStructProjection() {} + "List-of-structs projection works by name, but the trailing y->z rename sub-case needs " + + "Iceberg field ids the Vortex file does not carry, so the renamed element field reads " + + "null.") + public void testListOfStructsProjection() {} private static void assumeSupported(Schema schema) { - // LIST round-trip works in the generic writer/reader (see TestGenericVortex), but projection - // breaks on lists for the same positional-field-matching reason that disables the other - // projection tests above. Skip until projection is rewritten to match by name/field-id. + // Lists and structs project by name now; maps and fixed stay out of these projection scenarios + // because they have no Vortex reader yet. assumeThat( TypeUtil.find( schema, - type -> - type.typeId() == Type.TypeID.LIST - || type.typeId() == Type.TypeID.MAP - || type.typeId() == Type.TypeID.FIXED)) - .as("Vortex does not yet support lists, maps, or fixed in projection scenarios") + type -> type.typeId() == Type.TypeID.MAP || type.typeId() == Type.TypeID.FIXED)) + .as("Vortex does not yet support maps or fixed in projection scenarios") .isNull(); } diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexConstantReaders.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexConstantReaders.java new file mode 100644 index 000000000000..3a7ee1cf030f --- /dev/null +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexConstantReaders.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.vortex; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.vortex.GenericVortexReader; +import org.apache.iceberg.data.vortex.GenericVortexWriter; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** Exercises {@code idToConstant} injection in {@link GenericVortexReader}. */ +public class TestVortexConstantReaders { + + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), + required(2, "category", Types.StringType.get()), + required(3, "measurement", Types.DoubleType.get())); + + @TempDir private Path temp; + + @Test + public void testIdentityPartitionConstantOverridesFileColumn() throws IOException { + // The physical "category" value differs from the partition constant. When supplied through + // idToConstant the reader must surface the constant, not whatever is stored in the file. + List written = + Lists.newArrayList(record(0L, "file-value", 1.5d), record(1L, "file-value", 2.5d)); + OutputFile outputFile = writeRecords(SCHEMA, written); + + Map idToConstant = ImmutableMap.of(2, "partition-value"); + + List rows = read(outputFile, SCHEMA, idToConstant); + + assertThat(rows).hasSize(2); + for (int i = 0; i < rows.size(); i++) { + assertThat(rows.get(i).getField("id")).isEqualTo((long) i); + assertThat(rows.get(i).getField("category")).isEqualTo("partition-value"); + assertThat(rows.get(i).getField("measurement")) + .isEqualTo(written.get(i).getField("measurement")); + } + } + + @Test + public void testMetadataColumnConstantsInjected() throws IOException { + List written = Lists.newArrayList(record(0L, "a", 1.5d), record(1L, "b", 2.5d)); + OutputFile outputFile = writeRecords(SCHEMA, written); + + String filePath = "s3://bucket/path/data.vortex"; + int specId = 7; + + // Projection mixes file columns with metadata columns that are not stored in the file at all. + Schema projection = + new Schema( + SCHEMA.findField("id"), + SCHEMA.findField("measurement"), + MetadataColumns.FILE_PATH, + MetadataColumns.SPEC_ID); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.FILE_PATH.fieldId(), filePath, + MetadataColumns.SPEC_ID.fieldId(), specId); + + List rows = read(outputFile, projection, idToConstant); + + assertThat(rows).hasSize(2); + for (int i = 0; i < rows.size(); i++) { + assertThat(rows.get(i).getField("id")).isEqualTo((long) i); + assertThat(rows.get(i).getField("measurement")) + .isEqualTo(written.get(i).getField("measurement")); + assertThat(rows.get(i).getField(MetadataColumns.FILE_PATH.name())).isEqualTo(filePath); + assertThat(rows.get(i).getField(MetadataColumns.SPEC_ID.name())).isEqualTo(specId); + } + } + + @Test + public void testReorderedProjectionWithConstant() throws IOException { + // A projection whose order differs from the file's physical column order, with a constant + // interleaved, must still resolve each column by name. + List written = + Lists.newArrayList(record(0L, "file-value", 1.5d), record(1L, "file-value", 2.5d)); + OutputFile outputFile = writeRecords(SCHEMA, written); + + Schema projection = + new Schema( + SCHEMA.findField("measurement"), SCHEMA.findField("category"), SCHEMA.findField("id")); + + Map idToConstant = ImmutableMap.of(2, "partition-value"); + + List rows = read(outputFile, projection, idToConstant); + + assertThat(rows).hasSize(2); + for (int i = 0; i < rows.size(); i++) { + assertThat(rows.get(i).getField("measurement")) + .isEqualTo(written.get(i).getField("measurement")); + assertThat(rows.get(i).getField("category")).isEqualTo("partition-value"); + assertThat(rows.get(i).getField("id")).isEqualTo((long) i); + } + } + + private static Record record(long id, String category, double measurement) { + GenericRecord record = GenericRecord.create(SCHEMA); + record.setField("id", id); + record.setField("category", category); + record.setField("measurement", measurement); + return record; + } + + private OutputFile writeRecords(Schema schema, List records) throws IOException { + OutputFile outputFile = + Files.localOutput(temp.resolve("test-" + System.nanoTime() + ".vortex").toFile()); + try (FileAppender appender = + formatModel() + .writeBuilder(EncryptedFiles.plainAsEncryptedOutput(outputFile)) + .schema(schema) + .content(FileContent.DATA) + .build()) { + appender.addAll(records); + } + + return outputFile; + } + + private List read(OutputFile outputFile, Schema projection, Map idToConstant) + throws IOException { + try (CloseableIterable reader = + formatModel() + .readBuilder(outputFile.toInputFile()) + .project(projection) + .idToConstant(idToConstant) + .build()) { + return Lists.newArrayList(reader); + } + } + + private static VortexFormatModel> formatModel() { + return VortexFormatModel.create( + Record.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> GenericVortexWriter.buildWriter(icebergSchema), + (VortexFormatModel.ReaderFunction) GenericVortexReader::buildReader); + } +} From 5e1fc1ac6f1956864e3780a54852cc284e1a0dd4 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 5 Jun 2026 00:59:52 +0100 Subject: [PATCH 2/3] slightly better --- .../iceberg/vortex/VortexFormatModel.java | 16 +-- .../apache/iceberg/vortex/VortexIterable.java | 21 ++-- .../vortex/VortexSchemaWithTypeVisitor.java | 106 +++++++++--------- 3 files changed, 67 insertions(+), 76 deletions(-) diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java index 822a915132f2..2654f0882cc5 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java @@ -46,7 +46,6 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -363,13 +362,14 @@ public CloseableIterable build() { // _pos is also excluded and currently resolves to null: Vortex exposes row positions through // a `row_idx` scan expression that the Java bindings (<= 0.73.0) do not yet surface. Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; - List projection = Lists.newArrayList(); - for (Types.NestedField field : schema.columns()) { - if (!constants.containsKey(field.fieldId()) - && !MetadataColumns.isMetadataColumn(field.name())) { - projection.add(field.name()); - } - } + List projection = + schema.columns().stream() + .filter( + field -> + !constants.containsKey(field.fieldId()) + && !MetadataColumns.isMetadataColumn(field.name())) + .map(Types.NestedField::name) + .toList(); return new VortexIterable<>( inputFile, diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java index c52c6e0c54c6..909acbb4e1d8 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.Field; @@ -42,8 +43,6 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,19 +109,13 @@ public CloseableIterator iterator() { // the reader fills them with null/constants instead of crashing the scan. Binding is by name: // Vortex stores no Iceberg field ids (its Java bindings drop Arrow field/schema metadata), so a // column renamed since write time cannot be rebound to its old physical column here. - Set fileColumns = Sets.newHashSetWithExpectedSize(fileArrowSchema.getFields().size()); - for (Field field : fileArrowSchema.getFields()) { - fileColumns.add(field.getName()); - } - - List presentProjection = Lists.newArrayListWithExpectedSize(projection.size()); - for (String name : projection) { - if (fileColumns.contains(name)) { - presentProjection.add(name); - } - } + Set fileColumns = + fileArrowSchema.getFields().stream() + .map(Field::getName) + .collect(Collectors.toUnmodifiableSet()); - String[] projectionNames = presentProjection.toArray(new String[0]); + String[] projectionNames = + projection.stream().filter(fileColumns::contains).toArray(String[]::new); dev.vortex.api.Expression scanProjection = dev.vortex.api.Expression.select(projectionNames, dev.vortex.api.Expression.root()); diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java index 887d7b4ed831..14b9b18af601 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java @@ -20,11 +20,12 @@ import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -33,66 +34,63 @@ * that visitors can build readers that bind a target Iceberg shape to the file's columns. */ public abstract class VortexSchemaWithTypeVisitor { - public abstract T struct(Types.StructType iStruct, List fields, List children); + public abstract T struct(Types.StructType iStruct, List fields, List children); - public abstract T list(Types.ListType iList, Field listField, T element); + public abstract T list(Types.ListType iList, Field listField, T element); - public abstract T primitive(Type.PrimitiveType iPrimitive, Field primField); + public abstract T primitive(Type.PrimitiveType iPrimitive, Field primField); - public static T visit( - Schema expectedSchema, - org.apache.arrow.vector.types.pojo.Schema fileSchema, - VortexSchemaWithTypeVisitor visitor) { - return visitStruct(expectedSchema.asStruct(), fileSchema.getFields(), visitor); - } - - public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor visitor) { - ArrowType arrowType = field.getType(); - if (arrowType instanceof ArrowType.Struct) { - return visitStruct(iType != null ? iType.asStructType() : null, field.getChildren(), visitor); - } else if (arrowType instanceof ArrowType.List - || arrowType instanceof ArrowType.LargeList - || arrowType instanceof ArrowType.FixedSizeList) { - Types.ListType list = iType != null ? iType.asListType() : null; - Field element = field.getChildren().get(0); - return visitor.list( - list, field, visit(list != null ? list.elementType() : null, element, visitor)); - } else { - return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, field); + public static T visit( + Schema expectedSchema, + org.apache.arrow.vector.types.pojo.Schema fileSchema, + VortexSchemaWithTypeVisitor visitor) { + return visitStruct(expectedSchema.asStruct(), fileSchema.getFields(), visitor); } - } - private static T visitStruct( - Types.StructType struct, List fields, VortexSchemaWithTypeVisitor visitor) { - if (struct == null) { - // No expected Iceberg type to bind to (a file-only column). Walk children positionally; the - // resulting reader is discarded by callers that pass a null target type. - List results = Lists.newArrayListWithExpectedSize(fields.size()); - for (Field field : fields) { - results.add(visit(null, field, visitor)); - } - return visitor.struct(null, fields, results); + public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor visitor) { + ArrowType arrowType = field.getType(); + if (arrowType instanceof ArrowType.Struct) { + return visitStruct(iType != null ? iType.asStructType() : null, field.getChildren(), visitor); + } else if (arrowType instanceof ArrowType.List + || arrowType instanceof ArrowType.LargeList + || arrowType instanceof ArrowType.FixedSizeList) { + Types.ListType list = iType != null ? iType.asListType() : null; + Field element = field.getChildren().get(0); + return visitor.list( + list, field, visit(list != null ? list.elementType() : null, element, visitor)); + } else { + return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, field); + } } - // Arrow/Vortex schemas carry no Iceberg field ids, so expected struct fields are bound to file - // columns by name (the top-level reader resolves columns the same way). Driving the walk from - // the expected fields lets a projection reorder, drop, or add struct fields relative to the - // physical file layout. The returned fields/children are aligned to the expected fields, with a - // null entry wherever the file does not contain the expected field. - Map fileFieldsByName = Maps.newHashMapWithExpectedSize(fields.size()); - for (Field field : fields) { - fileFieldsByName.put(field.getName(), field); - } + private static T visitStruct( + Types.StructType struct, List fields, VortexSchemaWithTypeVisitor visitor) { + if (struct == null) { + // No expected Iceberg type to bind to (a file-only column). Walk children positionally; the + // resulting reader is discarded by callers that pass a null target type. + List results = Lists.newArrayListWithExpectedSize(fields.size()); + for (Field field : fields) { + results.add(visit(null, field, visitor)); + } + return visitor.struct(null, fields, results); + } - List expectedFields = struct.fields(); - List matchedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - List results = Lists.newArrayListWithExpectedSize(expectedFields.size()); - for (Types.NestedField expectedField : expectedFields) { - Field fileField = fileFieldsByName.get(expectedField.name()); - matchedFields.add(fileField); - results.add(fileField == null ? null : visit(expectedField.type(), fileField, visitor)); - } + // Arrow/Vortex schemas carry no Iceberg field ids, so expected struct fields are bound to file + // columns by name (the top-level reader resolves columns the same way). Driving the walk from + // the expected fields lets a projection reorder, drop, or add struct fields relative to the + // physical file layout. The returned fields/children are aligned to the expected fields, with a + // null entry wherever the file does not contain the expected field. + Map fileFieldsByName = fields.stream().collect(Collectors.toUnmodifiableMap(Field::getName, Function.identity())); - return visitor.struct(struct, matchedFields, results); - } + List expectedFields = struct.fields(); + List matchedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); + List results = Lists.newArrayListWithExpectedSize(expectedFields.size()); + for (Types.NestedField expectedField : expectedFields) { + Field fileField = fileFieldsByName.get(expectedField.name()); + matchedFields.add(fileField); + results.add(fileField == null ? null : visit(expectedField.type(), fileField, visitor)); + } + + return visitor.struct(struct, matchedFields, results); + } } From 5c436000bb610cc52e4c24873fa6c477dcb64467 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Fri, 5 Jun 2026 01:12:30 +0100 Subject: [PATCH 3/3] format --- .../vortex/VortexSchemaWithTypeVisitor.java | 101 +++++++++--------- 1 file changed, 51 insertions(+), 50 deletions(-) diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java index 14b9b18af601..590557e4e8c7 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java @@ -34,63 +34,64 @@ * that visitors can build readers that bind a target Iceberg shape to the file's columns. */ public abstract class VortexSchemaWithTypeVisitor { - public abstract T struct(Types.StructType iStruct, List fields, List children); + public abstract T struct(Types.StructType iStruct, List fields, List children); - public abstract T list(Types.ListType iList, Field listField, T element); + public abstract T list(Types.ListType iList, Field listField, T element); - public abstract T primitive(Type.PrimitiveType iPrimitive, Field primField); + public abstract T primitive(Type.PrimitiveType iPrimitive, Field primField); - public static T visit( - Schema expectedSchema, - org.apache.arrow.vector.types.pojo.Schema fileSchema, - VortexSchemaWithTypeVisitor visitor) { - return visitStruct(expectedSchema.asStruct(), fileSchema.getFields(), visitor); - } + public static T visit( + Schema expectedSchema, + org.apache.arrow.vector.types.pojo.Schema fileSchema, + VortexSchemaWithTypeVisitor visitor) { + return visitStruct(expectedSchema.asStruct(), fileSchema.getFields(), visitor); + } - public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor visitor) { - ArrowType arrowType = field.getType(); - if (arrowType instanceof ArrowType.Struct) { - return visitStruct(iType != null ? iType.asStructType() : null, field.getChildren(), visitor); - } else if (arrowType instanceof ArrowType.List - || arrowType instanceof ArrowType.LargeList - || arrowType instanceof ArrowType.FixedSizeList) { - Types.ListType list = iType != null ? iType.asListType() : null; - Field element = field.getChildren().get(0); - return visitor.list( - list, field, visit(list != null ? list.elementType() : null, element, visitor)); - } else { - return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, field); - } + public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor visitor) { + ArrowType arrowType = field.getType(); + if (arrowType instanceof ArrowType.Struct) { + return visitStruct(iType != null ? iType.asStructType() : null, field.getChildren(), visitor); + } else if (arrowType instanceof ArrowType.List + || arrowType instanceof ArrowType.LargeList + || arrowType instanceof ArrowType.FixedSizeList) { + Types.ListType list = iType != null ? iType.asListType() : null; + Field element = field.getChildren().get(0); + return visitor.list( + list, field, visit(list != null ? list.elementType() : null, element, visitor)); + } else { + return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, field); } + } - private static T visitStruct( - Types.StructType struct, List fields, VortexSchemaWithTypeVisitor visitor) { - if (struct == null) { - // No expected Iceberg type to bind to (a file-only column). Walk children positionally; the - // resulting reader is discarded by callers that pass a null target type. - List results = Lists.newArrayListWithExpectedSize(fields.size()); - for (Field field : fields) { - results.add(visit(null, field, visitor)); - } - return visitor.struct(null, fields, results); - } - - // Arrow/Vortex schemas carry no Iceberg field ids, so expected struct fields are bound to file - // columns by name (the top-level reader resolves columns the same way). Driving the walk from - // the expected fields lets a projection reorder, drop, or add struct fields relative to the - // physical file layout. The returned fields/children are aligned to the expected fields, with a - // null entry wherever the file does not contain the expected field. - Map fileFieldsByName = fields.stream().collect(Collectors.toUnmodifiableMap(Field::getName, Function.identity())); + private static T visitStruct( + Types.StructType struct, List fields, VortexSchemaWithTypeVisitor visitor) { + if (struct == null) { + // No expected Iceberg type to bind to (a file-only column). Walk children positionally; the + // resulting reader is discarded by callers that pass a null target type. + List results = Lists.newArrayListWithExpectedSize(fields.size()); + for (Field field : fields) { + results.add(visit(null, field, visitor)); + } + return visitor.struct(null, fields, results); + } - List expectedFields = struct.fields(); - List matchedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); - List results = Lists.newArrayListWithExpectedSize(expectedFields.size()); - for (Types.NestedField expectedField : expectedFields) { - Field fileField = fileFieldsByName.get(expectedField.name()); - matchedFields.add(fileField); - results.add(fileField == null ? null : visit(expectedField.type(), fileField, visitor)); - } + // Arrow/Vortex schemas carry no Iceberg field ids, so expected struct fields are bound to file + // columns by name (the top-level reader resolves columns the same way). Driving the walk from + // the expected fields lets a projection reorder, drop, or add struct fields relative to the + // physical file layout. The returned fields/children are aligned to the expected fields, with a + // null entry wherever the file does not contain the expected field. + Map fileFieldsByName = + fields.stream().collect(Collectors.toUnmodifiableMap(Field::getName, Function.identity())); - return visitor.struct(struct, matchedFields, results); + List expectedFields = struct.fields(); + List matchedFields = Lists.newArrayListWithExpectedSize(expectedFields.size()); + List results = Lists.newArrayListWithExpectedSize(expectedFields.size()); + for (Types.NestedField expectedField : expectedFields) { + Field fileField = fileFieldsByName.get(expectedField.name()); + matchedFields.add(fileField); + results.add(fileField == null ? null : visit(expectedField.type(), fileField, visitor)); } + + return visitor.struct(struct, matchedFields, results); + } }