diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index 9116d2a55279..9449322e39da 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -18,15 +18,18 @@ */ package org.apache.iceberg.spark.data; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.data.vortex.GenericVortexReaders; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexRowReader; @@ -38,35 +41,96 @@ /** Read Vortex as Spark {@link InternalRow}. */ public class SparkVortexReader implements VortexRowReader { - private final List> fieldReaders; + // Parallel arrays indexed by position in the expected (projected) schema. A field is read either + // from a file column ({@code columnNames[i]} is the Arrow column name) or as a constant + // ({@code columnNames[i]} is null and {@code readers[i]} is a constant reader). + private final VortexValueReader[] readers; + private final String[] columnNames; + + // Resolves expected field position -> Arrow batch column index. Vortex only returns the projected + // (non-constant, file-resident) columns, so this mapping is computed by name from the first batch + // rather than assuming the batch is positionally aligned with the expected schema. -1 marks a + // constant field that is not backed by a batch column. + private int[] batchColumnIndex; public SparkVortexReader( Schema readSchema, org.apache.arrow.vector.types.pojo.Schema fileArrowSchema, Map idToConstant) { - List fields = fileArrowSchema.getFields(); + Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; + + List fileFields = fileArrowSchema.getFields(); + Map arrowFieldsByName = Maps.newHashMapWithExpectedSize(fileFields.size()); + for (Field field : fileFields) { + arrowFieldsByName.put(field.getName(), field); + } + List expected = readSchema.columns(); - this.fieldReaders = Lists.newArrayListWithExpectedSize(expected.size()); + this.readers = new VortexValueReader[expected.size()]; + this.columnNames = new String[expected.size()]; + for (int i = 0; i < expected.size(); i++) { - Type icebergType = expected.get(i).type(); - Field arrowField = fields.get(i); - this.fieldReaders.add( - VortexSchemaWithTypeVisitor.visit(icebergType, arrowField, SparkReadBuilder.INSTANCE)); + Types.NestedField field = expected.get(i); + int id = field.fieldId(); + if (constants.containsKey(id)) { + // Identity-partition value or metadata column (e.g. _file, _spec_id, _partition) supplied + // through idToConstant instead of being stored in the data file. + this.readers[i] = GenericVortexReaders.constants(constants.get(id)); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + this.readers[i] = GenericVortexReaders.constants(false); + } else { + Field arrowField = arrowFieldsByName.get(field.name()); + if (arrowField == null) { + // Field is neither a constant nor present in the data file; fill with null. + this.readers[i] = GenericVortexReaders.constants(null); + } else { + this.readers[i] = + VortexSchemaWithTypeVisitor.visit( + field.type(), arrowField, SparkReadBuilder.INSTANCE); + this.columnNames[i] = arrowField.getName(); + } + } } } @Override public InternalRow read(VectorSchemaRoot batch, int row) { - GenericInternalRow result = new GenericInternalRow(fieldReaders.size()); - for (int i = 0; i < fieldReaders.size(); i++) { - VortexValueReader reader = fieldReaders.get(i); - result.update(i, reader.read(batch.getVector(i), row)); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(batch); + } + + GenericInternalRow result = new GenericInternalRow(readers.length); + for (int i = 0; i < readers.length; i++) { + int columnIndex = batchColumnIndex[i]; + FieldVector vector = columnIndex < 0 ? null : batch.getVector(columnIndex); + result.update(i, readers[i].read(vector, row)); } return result; } - static class SparkReadBuilder extends VortexSchemaWithTypeVisitor> { + private int[] resolveColumns(VectorSchemaRoot batch) { + List vectors = batch.getFieldVectors(); + Map nameToIndex = Maps.newHashMapWithExpectedSize(vectors.size()); + for (int i = 0; i < vectors.size(); i++) { + nameToIndex.put(vectors.get(i).getField().getName(), i); + } + + int[] indexes = new int[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + if (columnNames[i] == null) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(columnNames[i]); + Preconditions.checkState( + index != null, "Vortex batch is missing projected column: %s", columnNames[i]); + indexes[i] = index; + } + } + return indexes; + } + + static class SparkReadBuilder extends VortexSchemaWithTypeVisitor> { static final SparkReadBuilder INSTANCE = new SparkReadBuilder(); private SparkReadBuilder() {} @@ -110,7 +174,6 @@ public VortexValueReader primitive(Type.PrimitiveType icebergType, Field prim } static class StructReader implements VortexValueReader { - private final List> fields; private StructReader(List> fields) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java index e7045540cb68..1535509f53f1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java @@ -18,14 +18,15 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexBatchReader; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; @@ -38,54 +39,72 @@ public static VortexBatchReader buildReader( Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema, Map idToConstant) { - return new SchemaCachingBatchReader(icebergSchema, vortexSchema, idToConstant); + return new ConstantAwareBatchReader(icebergSchema, idToConstant); } - static final class SchemaCachingBatchReader implements VortexBatchReader { - private final Schema readerSchema; + static final class ConstantAwareBatchReader implements VortexBatchReader { + private final List columns; private final Map idToConstant; - private final List schemaMapping; - SchemaCachingBatchReader( - Schema readerSchema, - org.apache.arrow.vector.types.pojo.Schema vortexSchema, - Map idToConstant) { - this.readerSchema = readerSchema; - this.idToConstant = idToConstant; - this.schemaMapping = vortexSchemaMapping(readerSchema, vortexSchema); + // Resolves expected column position -> Arrow batch column index, computed by name from the + // first batch. -1 marks a constant column not backed by a batch column. Vortex returns only the + // projected (non-constant, file-resident) columns, so the batch is not positionally aligned + // with + // the reader schema. + private int[] batchColumnIndex; + + ConstantAwareBatchReader(Schema readerSchema, Map idToConstant) { + this.columns = readerSchema.columns(); + this.idToConstant = idToConstant == null ? Collections.emptyMap() : idToConstant; } @Override public ColumnarBatch read(VectorSchemaRoot batch) { int rowCount = batch.getRowCount(); - Map vectors = Maps.newHashMap(); + List fieldVectors = batch.getFieldVectors(); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(fieldVectors); + } - for (Map.Entry entry : idToConstant.entrySet()) { - Integer fieldId = entry.getKey(); - Object constant = entry.getValue(); - if (MetadataColumns.isMetadataColumn(fieldId)) { - continue; + // Build columns in reader-schema order so they line up with Spark's expected output schema. + ColumnVector[] vectors = new ColumnVector[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + int columnIndex = batchColumnIndex[i]; + if (columnIndex >= 0) { + vectors[i] = new ArrowColumnVector(fieldVectors.get(columnIndex)); + } else if (idToConstant.containsKey(field.fieldId())) { + vectors[i] = + new ConstantColumnVector(field.type(), rowCount, idToConstant.get(field.fieldId())); + } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { + vectors[i] = new ConstantColumnVector(Types.BooleanType.get(), rowCount, false); + } else { + // Column is neither a constant nor present in the data file; surface nulls. + vectors[i] = new ConstantColumnVector(field.type(), rowCount, null); } - - vectors.put( - fieldId, new ConstantColumnVector(readerSchema.findType(fieldId), rowCount, constant)); } - List fieldVectors = batch.getFieldVectors(); + return new ColumnarBatch(vectors, rowCount); + } + + private int[] resolveColumns(List fieldVectors) { + Map nameToIndex = Maps.newHashMapWithExpectedSize(fieldVectors.size()); for (int i = 0; i < fieldVectors.size(); i++) { - int fieldId = schemaMapping.get(i); - vectors.put(fieldId, new ArrowColumnVector(fieldVectors.get(i))); + nameToIndex.put(fieldVectors.get(i).getField().getName(), i); } - return new ColumnarBatch(vectors.values().toArray(new ColumnVector[0]), rowCount); - } + int[] indexes = new int[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + if (idToConstant.containsKey(field.fieldId())) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(field.name()); + indexes[i] = index == null ? -1 : index; + } + } - // Mapping from Arrow Schema field index to Iceberg Field ID. - static List vortexSchemaMapping( - Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema) { - return vortexSchema.getFields().stream() - .map(field -> icebergSchema.findField(field.getName()).fieldId()) - .collect(Collectors.toList()); + return indexes; } } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index 9116d2a55279..9449322e39da 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -18,15 +18,18 @@ */ package org.apache.iceberg.spark.data; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.data.vortex.GenericVortexReaders; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexRowReader; @@ -38,35 +41,96 @@ /** Read Vortex as Spark {@link InternalRow}. */ public class SparkVortexReader implements VortexRowReader { - private final List> fieldReaders; + // Parallel arrays indexed by position in the expected (projected) schema. A field is read either + // from a file column ({@code columnNames[i]} is the Arrow column name) or as a constant + // ({@code columnNames[i]} is null and {@code readers[i]} is a constant reader). + private final VortexValueReader[] readers; + private final String[] columnNames; + + // Resolves expected field position -> Arrow batch column index. Vortex only returns the projected + // (non-constant, file-resident) columns, so this mapping is computed by name from the first batch + // rather than assuming the batch is positionally aligned with the expected schema. -1 marks a + // constant field that is not backed by a batch column. + private int[] batchColumnIndex; public SparkVortexReader( Schema readSchema, org.apache.arrow.vector.types.pojo.Schema fileArrowSchema, Map idToConstant) { - List fields = fileArrowSchema.getFields(); + Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; + + List fileFields = fileArrowSchema.getFields(); + Map arrowFieldsByName = Maps.newHashMapWithExpectedSize(fileFields.size()); + for (Field field : fileFields) { + arrowFieldsByName.put(field.getName(), field); + } + List expected = readSchema.columns(); - this.fieldReaders = Lists.newArrayListWithExpectedSize(expected.size()); + this.readers = new VortexValueReader[expected.size()]; + this.columnNames = new String[expected.size()]; + for (int i = 0; i < expected.size(); i++) { - Type icebergType = expected.get(i).type(); - Field arrowField = fields.get(i); - this.fieldReaders.add( - VortexSchemaWithTypeVisitor.visit(icebergType, arrowField, SparkReadBuilder.INSTANCE)); + Types.NestedField field = expected.get(i); + int id = field.fieldId(); + if (constants.containsKey(id)) { + // Identity-partition value or metadata column (e.g. _file, _spec_id, _partition) supplied + // through idToConstant instead of being stored in the data file. + this.readers[i] = GenericVortexReaders.constants(constants.get(id)); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + this.readers[i] = GenericVortexReaders.constants(false); + } else { + Field arrowField = arrowFieldsByName.get(field.name()); + if (arrowField == null) { + // Field is neither a constant nor present in the data file; fill with null. + this.readers[i] = GenericVortexReaders.constants(null); + } else { + this.readers[i] = + VortexSchemaWithTypeVisitor.visit( + field.type(), arrowField, SparkReadBuilder.INSTANCE); + this.columnNames[i] = arrowField.getName(); + } + } } } @Override public InternalRow read(VectorSchemaRoot batch, int row) { - GenericInternalRow result = new GenericInternalRow(fieldReaders.size()); - for (int i = 0; i < fieldReaders.size(); i++) { - VortexValueReader reader = fieldReaders.get(i); - result.update(i, reader.read(batch.getVector(i), row)); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(batch); + } + + GenericInternalRow result = new GenericInternalRow(readers.length); + for (int i = 0; i < readers.length; i++) { + int columnIndex = batchColumnIndex[i]; + FieldVector vector = columnIndex < 0 ? null : batch.getVector(columnIndex); + result.update(i, readers[i].read(vector, row)); } return result; } - static class SparkReadBuilder extends VortexSchemaWithTypeVisitor> { + private int[] resolveColumns(VectorSchemaRoot batch) { + List vectors = batch.getFieldVectors(); + Map nameToIndex = Maps.newHashMapWithExpectedSize(vectors.size()); + for (int i = 0; i < vectors.size(); i++) { + nameToIndex.put(vectors.get(i).getField().getName(), i); + } + + int[] indexes = new int[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + if (columnNames[i] == null) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(columnNames[i]); + Preconditions.checkState( + index != null, "Vortex batch is missing projected column: %s", columnNames[i]); + indexes[i] = index; + } + } + return indexes; + } + + static class SparkReadBuilder extends VortexSchemaWithTypeVisitor> { static final SparkReadBuilder INSTANCE = new SparkReadBuilder(); private SparkReadBuilder() {} @@ -110,7 +174,6 @@ public VortexValueReader primitive(Type.PrimitiveType icebergType, Field prim } static class StructReader implements VortexValueReader { - private final List> fields; private StructReader(List> fields) { diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java index e7045540cb68..1535509f53f1 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java @@ -18,14 +18,15 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexBatchReader; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; @@ -38,54 +39,72 @@ public static VortexBatchReader buildReader( Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema, Map idToConstant) { - return new SchemaCachingBatchReader(icebergSchema, vortexSchema, idToConstant); + return new ConstantAwareBatchReader(icebergSchema, idToConstant); } - static final class SchemaCachingBatchReader implements VortexBatchReader { - private final Schema readerSchema; + static final class ConstantAwareBatchReader implements VortexBatchReader { + private final List columns; private final Map idToConstant; - private final List schemaMapping; - SchemaCachingBatchReader( - Schema readerSchema, - org.apache.arrow.vector.types.pojo.Schema vortexSchema, - Map idToConstant) { - this.readerSchema = readerSchema; - this.idToConstant = idToConstant; - this.schemaMapping = vortexSchemaMapping(readerSchema, vortexSchema); + // Resolves expected column position -> Arrow batch column index, computed by name from the + // first batch. -1 marks a constant column not backed by a batch column. Vortex returns only the + // projected (non-constant, file-resident) columns, so the batch is not positionally aligned + // with + // the reader schema. + private int[] batchColumnIndex; + + ConstantAwareBatchReader(Schema readerSchema, Map idToConstant) { + this.columns = readerSchema.columns(); + this.idToConstant = idToConstant == null ? Collections.emptyMap() : idToConstant; } @Override public ColumnarBatch read(VectorSchemaRoot batch) { int rowCount = batch.getRowCount(); - Map vectors = Maps.newHashMap(); + List fieldVectors = batch.getFieldVectors(); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(fieldVectors); + } - for (Map.Entry entry : idToConstant.entrySet()) { - Integer fieldId = entry.getKey(); - Object constant = entry.getValue(); - if (MetadataColumns.isMetadataColumn(fieldId)) { - continue; + // Build columns in reader-schema order so they line up with Spark's expected output schema. + ColumnVector[] vectors = new ColumnVector[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + int columnIndex = batchColumnIndex[i]; + if (columnIndex >= 0) { + vectors[i] = new ArrowColumnVector(fieldVectors.get(columnIndex)); + } else if (idToConstant.containsKey(field.fieldId())) { + vectors[i] = + new ConstantColumnVector(field.type(), rowCount, idToConstant.get(field.fieldId())); + } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { + vectors[i] = new ConstantColumnVector(Types.BooleanType.get(), rowCount, false); + } else { + // Column is neither a constant nor present in the data file; surface nulls. + vectors[i] = new ConstantColumnVector(field.type(), rowCount, null); } - - vectors.put( - fieldId, new ConstantColumnVector(readerSchema.findType(fieldId), rowCount, constant)); } - List fieldVectors = batch.getFieldVectors(); + return new ColumnarBatch(vectors, rowCount); + } + + private int[] resolveColumns(List fieldVectors) { + Map nameToIndex = Maps.newHashMapWithExpectedSize(fieldVectors.size()); for (int i = 0; i < fieldVectors.size(); i++) { - int fieldId = schemaMapping.get(i); - vectors.put(fieldId, new ArrowColumnVector(fieldVectors.get(i))); + nameToIndex.put(fieldVectors.get(i).getField().getName(), i); } - return new ColumnarBatch(vectors.values().toArray(new ColumnVector[0]), rowCount); - } + int[] indexes = new int[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + if (idToConstant.containsKey(field.fieldId())) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(field.name()); + indexes[i] = index == null ? -1 : index; + } + } - // Mapping from Arrow Schema field index to Iceberg Field ID. - static List vortexSchemaMapping( - Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema) { - return vortexSchema.getFields().stream() - .map(field -> icebergSchema.findField(field.getName()).fieldId()) - .collect(Collectors.toList()); + return indexes; } } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java index 63c4e0edc62e..9449322e39da 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVortexReader.java @@ -18,15 +18,18 @@ */ package org.apache.iceberg.spark.data; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.data.vortex.GenericVortexReaders; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexRowReader; @@ -37,33 +40,96 @@ /** Read Vortex as Spark {@link InternalRow}. */ public class SparkVortexReader implements VortexRowReader { - private final List> fieldReaders; + + // Parallel arrays indexed by position in the expected (projected) schema. A field is read either + // from a file column ({@code columnNames[i]} is the Arrow column name) or as a constant + // ({@code columnNames[i]} is null and {@code readers[i]} is a constant reader). + private final VortexValueReader[] readers; + private final String[] columnNames; + + // Resolves expected field position -> Arrow batch column index. Vortex only returns the projected + // (non-constant, file-resident) columns, so this mapping is computed by name from the first batch + // rather than assuming the batch is positionally aligned with the expected schema. -1 marks a + // constant field that is not backed by a batch column. + private int[] batchColumnIndex; public SparkVortexReader( Schema readSchema, org.apache.arrow.vector.types.pojo.Schema fileArrowSchema, Map idToConstant) { - List fields = fileArrowSchema.getFields(); + Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; + + List fileFields = fileArrowSchema.getFields(); + Map arrowFieldsByName = Maps.newHashMapWithExpectedSize(fileFields.size()); + for (Field field : fileFields) { + arrowFieldsByName.put(field.getName(), field); + } + List expected = readSchema.columns(); - this.fieldReaders = Lists.newArrayListWithExpectedSize(expected.size()); + this.readers = new VortexValueReader[expected.size()]; + this.columnNames = new String[expected.size()]; + for (int i = 0; i < expected.size(); i++) { - Type icebergType = expected.get(i).type(); - Field arrowField = fields.get(i); - this.fieldReaders.add( - VortexSchemaWithTypeVisitor.visit(icebergType, arrowField, SparkReadBuilder.INSTANCE)); + Types.NestedField field = expected.get(i); + int id = field.fieldId(); + if (constants.containsKey(id)) { + // Identity-partition value or metadata column (e.g. _file, _spec_id, _partition) supplied + // through idToConstant instead of being stored in the data file. + this.readers[i] = GenericVortexReaders.constants(constants.get(id)); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + this.readers[i] = GenericVortexReaders.constants(false); + } else { + Field arrowField = arrowFieldsByName.get(field.name()); + if (arrowField == null) { + // Field is neither a constant nor present in the data file; fill with null. + this.readers[i] = GenericVortexReaders.constants(null); + } else { + this.readers[i] = + VortexSchemaWithTypeVisitor.visit( + field.type(), arrowField, SparkReadBuilder.INSTANCE); + this.columnNames[i] = arrowField.getName(); + } + } } } @Override public InternalRow read(VectorSchemaRoot batch, int row) { - GenericInternalRow result = new GenericInternalRow(fieldReaders.size()); - for (int i = 0; i < fieldReaders.size(); i++) { - VortexValueReader reader = fieldReaders.get(i); - result.update(i, reader.read(batch.getVector(i), row)); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(batch); + } + + GenericInternalRow result = new GenericInternalRow(readers.length); + for (int i = 0; i < readers.length; i++) { + int columnIndex = batchColumnIndex[i]; + FieldVector vector = columnIndex < 0 ? null : batch.getVector(columnIndex); + result.update(i, readers[i].read(vector, row)); } return result; } + private int[] resolveColumns(VectorSchemaRoot batch) { + List vectors = batch.getFieldVectors(); + Map nameToIndex = Maps.newHashMapWithExpectedSize(vectors.size()); + for (int i = 0; i < vectors.size(); i++) { + nameToIndex.put(vectors.get(i).getField().getName(), i); + } + + int[] indexes = new int[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + if (columnNames[i] == null) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(columnNames[i]); + Preconditions.checkState( + index != null, "Vortex batch is missing projected column: %s", columnNames[i]); + indexes[i] = index; + } + } + + return indexes; + } + static class SparkReadBuilder extends VortexSchemaWithTypeVisitor> { static final SparkReadBuilder INSTANCE = new SparkReadBuilder(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java index e7045540cb68..1535509f53f1 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkVortexReaders.java @@ -18,14 +18,15 @@ */ package org.apache.iceberg.spark.data.vectorized; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexBatchReader; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnVector; @@ -38,54 +39,72 @@ public static VortexBatchReader buildReader( Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema, Map idToConstant) { - return new SchemaCachingBatchReader(icebergSchema, vortexSchema, idToConstant); + return new ConstantAwareBatchReader(icebergSchema, idToConstant); } - static final class SchemaCachingBatchReader implements VortexBatchReader { - private final Schema readerSchema; + static final class ConstantAwareBatchReader implements VortexBatchReader { + private final List columns; private final Map idToConstant; - private final List schemaMapping; - SchemaCachingBatchReader( - Schema readerSchema, - org.apache.arrow.vector.types.pojo.Schema vortexSchema, - Map idToConstant) { - this.readerSchema = readerSchema; - this.idToConstant = idToConstant; - this.schemaMapping = vortexSchemaMapping(readerSchema, vortexSchema); + // Resolves expected column position -> Arrow batch column index, computed by name from the + // first batch. -1 marks a constant column not backed by a batch column. Vortex returns only the + // projected (non-constant, file-resident) columns, so the batch is not positionally aligned + // with + // the reader schema. + private int[] batchColumnIndex; + + ConstantAwareBatchReader(Schema readerSchema, Map idToConstant) { + this.columns = readerSchema.columns(); + this.idToConstant = idToConstant == null ? Collections.emptyMap() : idToConstant; } @Override public ColumnarBatch read(VectorSchemaRoot batch) { int rowCount = batch.getRowCount(); - Map vectors = Maps.newHashMap(); + List fieldVectors = batch.getFieldVectors(); + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(fieldVectors); + } - for (Map.Entry entry : idToConstant.entrySet()) { - Integer fieldId = entry.getKey(); - Object constant = entry.getValue(); - if (MetadataColumns.isMetadataColumn(fieldId)) { - continue; + // Build columns in reader-schema order so they line up with Spark's expected output schema. + ColumnVector[] vectors = new ColumnVector[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + int columnIndex = batchColumnIndex[i]; + if (columnIndex >= 0) { + vectors[i] = new ArrowColumnVector(fieldVectors.get(columnIndex)); + } else if (idToConstant.containsKey(field.fieldId())) { + vectors[i] = + new ConstantColumnVector(field.type(), rowCount, idToConstant.get(field.fieldId())); + } else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) { + vectors[i] = new ConstantColumnVector(Types.BooleanType.get(), rowCount, false); + } else { + // Column is neither a constant nor present in the data file; surface nulls. + vectors[i] = new ConstantColumnVector(field.type(), rowCount, null); } - - vectors.put( - fieldId, new ConstantColumnVector(readerSchema.findType(fieldId), rowCount, constant)); } - List fieldVectors = batch.getFieldVectors(); + return new ColumnarBatch(vectors, rowCount); + } + + private int[] resolveColumns(List fieldVectors) { + Map nameToIndex = Maps.newHashMapWithExpectedSize(fieldVectors.size()); for (int i = 0; i < fieldVectors.size(); i++) { - int fieldId = schemaMapping.get(i); - vectors.put(fieldId, new ArrowColumnVector(fieldVectors.get(i))); + nameToIndex.put(fieldVectors.get(i).getField().getName(), i); } - return new ColumnarBatch(vectors.values().toArray(new ColumnVector[0]), rowCount); - } + int[] indexes = new int[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + if (idToConstant.containsKey(field.fieldId())) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(field.name()); + indexes[i] = index == null ? -1 : index; + } + } - // Mapping from Arrow Schema field index to Iceberg Field ID. - static List vortexSchemaMapping( - Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema) { - return vortexSchema.getFields().stream() - .map(field -> icebergSchema.findField(field.getName()).fieldId()) - .collect(Collectors.toList()); + return indexes; } } } diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java index 73c2d929ee91..b67ca38ecc75 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java @@ -27,10 +27,12 @@ import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.vortex.VortexRowReader; @@ -40,21 +42,57 @@ public class GenericVortexReader implements VortexRowReader { private final Types.StructType structType; - private final List> fieldReaders; + + // Parallel arrays indexed by position in the expected (projected) struct. A field is read either + // from a file column ({@code columnNames[i]} is the Arrow column name) or as a constant + // ({@code columnNames[i]} is null and {@code readers[i]} is a constant reader). + private final VortexValueReader[] readers; + private final String[] columnNames; + + // Resolves expected field position -> Arrow batch column index. Vortex only returns the projected + // (non-constant, file-resident) columns, so this mapping is computed by name from the first batch + // rather than assuming the batch is positionally aligned with the expected schema. -1 marks a + // constant field that is not backed by a batch column. + private int[] batchColumnIndex; private GenericVortexReader( Schema expectedSchema, org.apache.arrow.vector.types.pojo.Schema fileArrowSchema, Map idToConstant) { this.structType = expectedSchema.asStruct(); - GenericReadBuilder builder = new GenericReadBuilder(idToConstant); + Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; + List fileFields = fileArrowSchema.getFields(); + Map arrowFieldsByName = Maps.newHashMapWithExpectedSize(fileFields.size()); + for (Field field : fileFields) { + arrowFieldsByName.put(field.getName(), field); + } + + GenericReadBuilder builder = new GenericReadBuilder(); List expectedFields = structType.fields(); - this.fieldReaders = Lists.newArrayListWithExpectedSize(expectedFields.size()); + this.readers = new VortexValueReader[expectedFields.size()]; + this.columnNames = new String[expectedFields.size()]; + for (int i = 0; i < expectedFields.size(); i++) { - Type icebergType = expectedFields.get(i).type(); - Field arrowField = fileFields.get(i); - this.fieldReaders.add(VortexSchemaWithTypeVisitor.visit(icebergType, arrowField, builder)); + Types.NestedField field = expectedFields.get(i); + int id = field.fieldId(); + if (constants.containsKey(id)) { + // Identity-partition value or metadata column (e.g. _file, _spec_id, _partition) supplied + // through idToConstant instead of being stored in the data file. + this.readers[i] = GenericVortexReaders.constants(constants.get(id)); + } else if (id == MetadataColumns.IS_DELETED.fieldId()) { + this.readers[i] = GenericVortexReaders.constants(false); + } else { + Field arrowField = arrowFieldsByName.get(field.name()); + if (arrowField == null) { + // The expected field is neither a constant nor present in the data file (for example an + // unsupplied metadata column). Fill it with null rather than reading a missing column. + this.readers[i] = GenericVortexReaders.constants(null); + } else { + this.readers[i] = VortexSchemaWithTypeVisitor.visit(field.type(), arrowField, builder); + this.columnNames[i] = arrowField.getName(); + } + } } } @@ -72,24 +110,44 @@ public static VortexRowReader buildReader( @Override public Record read(VectorSchemaRoot batch, int row) { + if (batchColumnIndex == null) { + this.batchColumnIndex = resolveColumns(batch); + } + GenericRecord record = GenericRecord.create(structType); - for (int i = 0; i < fieldReaders.size(); i++) { - VortexValueReader reader = fieldReaders.get(i); - FieldVector vector = batch.getVector(i); - record.set(i, reader.read(vector, row)); + for (int i = 0; i < readers.length; i++) { + int columnIndex = batchColumnIndex[i]; + FieldVector vector = columnIndex < 0 ? null : batch.getVector(columnIndex); + record.set(i, readers[i].read(vector, row)); } return record; } - @SuppressWarnings("UnusedVariable") - static class GenericReadBuilder extends VortexSchemaWithTypeVisitor> { - // TODO(aduffy): implement constant readers to fill in identity partition values - private final Map idToConstant; + private int[] resolveColumns(VectorSchemaRoot batch) { + List vectors = batch.getFieldVectors(); + Map nameToIndex = Maps.newHashMapWithExpectedSize(vectors.size()); + for (int i = 0; i < vectors.size(); i++) { + nameToIndex.put(vectors.get(i).getField().getName(), i); + } - GenericReadBuilder(Map idToConstant) { - this.idToConstant = idToConstant; + int[] indexes = new int[columnNames.length]; + for (int i = 0; i < columnNames.length; i++) { + if (columnNames[i] == null) { + indexes[i] = -1; + } else { + Integer index = nameToIndex.get(columnNames[i]); + Preconditions.checkState( + index != null, "Vortex batch is missing projected column: %s", columnNames[i]); + indexes[i] = index; + } } + return indexes; + } + + static class GenericReadBuilder extends VortexSchemaWithTypeVisitor> { + GenericReadBuilder() {} + @Override public VortexValueReader struct( Types.StructType iStruct, List fields, List> children) { diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java index e10374314763..206c69ccae80 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java @@ -116,6 +116,17 @@ public static VortexValueReader> list(VortexValueReader elementRe return new ListReader<>(elementReader); } + /** + * Returns a reader that always produces {@code constant}, ignoring the Arrow vector and row. + * + *

Used to inject identity-partition values and metadata columns (for example {@code _file} or + * {@code _spec_id}) that are supplied through {@code idToConstant} rather than being read from + * the data file. + */ + public static VortexValueReader constants(C constant) { + return new ConstantReader<>(constant); + } + private static class StructReader implements VortexValueReader { private final Types.StructType schema; private final List> readers; @@ -153,6 +164,24 @@ public List readNonNull(FieldVector vector, int row) { } } + private static class ConstantReader implements VortexValueReader { + private final C constant; + + private ConstantReader(C constant) { + this.constant = constant; + } + + @Override + public C read(FieldVector vector, int row) { + return constant; + } + + @Override + public C readNonNull(FieldVector vector, int row) { + return constant; + } + } + private static class BooleanReader implements VortexValueReader { static final BooleanReader INSTANCE = new BooleanReader(); diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java index b847980e3ddf..c02713ff8c85 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexFormatModel.java @@ -24,15 +24,16 @@ import dev.vortex.jni.NativeRuntime; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.TableProperties; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -46,6 +47,7 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -352,18 +354,23 @@ public CloseableIterable build() { readerFunction.read(schema, fileSchema, engineSchema, idToConstant); } - org.apache.iceberg.Schema readSchema = schema; - if (idToConstant != null) { - List readerFields = - schema.columns().stream() - .filter(field -> !idToConstant.containsKey(field.fieldId())) - .collect(Collectors.toList()); - readSchema = new org.apache.iceberg.Schema(readerFields); + // Compute the columns to scan from the data file. Constants (identity partition values and + // metadata columns such as _file, _spec_id and _partition) come from idToConstant, and + // _is_deleted is synthesized by the reader, so none of those are projected from the file. + // _pos is also excluded and currently resolves to null: Vortex exposes row positions through + // a `row_idx` scan expression that the Java bindings (<= 0.73.0) do not yet surface. + Map constants = idToConstant == null ? Collections.emptyMap() : idToConstant; + List projection = Lists.newArrayList(); + for (Types.NestedField field : schema.columns()) { + if (!constants.containsKey(field.fieldId()) + && !MetadataColumns.isMetadataColumn(field.name())) { + projection.add(field.name()); + } } return new VortexIterable<>( inputFile, - readSchema, + projection, filterPredicate, rowRange, readerFunc, diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java index 16c73ededd41..ba051eaaea0e 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexIterable.java @@ -42,8 +42,6 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,14 +60,14 @@ public class VortexIterable extends CloseableGroup implements CloseableIterab VortexIterable( InputFile inputFile, - Schema icebergSchema, + List projection, Optional filterPredicate, long[] rowRange, Function> readerFunction, Function> batchReaderFunction, int workerThreads) { this.inputFile = inputFile; - this.projection = Lists.transform(icebergSchema.columns(), Types.NestedField::name); + this.projection = projection; this.filterPredicate = filterPredicate; this.rowRange = rowRange; this.rowReaderFunc = readerFunction; diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexConstantReaders.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexConstantReaders.java new file mode 100644 index 000000000000..3a7ee1cf030f --- /dev/null +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexConstantReaders.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.vortex; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.vortex.GenericVortexReader; +import org.apache.iceberg.data.vortex.GenericVortexWriter; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** Exercises {@code idToConstant} injection in {@link GenericVortexReader}. */ +public class TestVortexConstantReaders { + + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), + required(2, "category", Types.StringType.get()), + required(3, "measurement", Types.DoubleType.get())); + + @TempDir private Path temp; + + @Test + public void testIdentityPartitionConstantOverridesFileColumn() throws IOException { + // The physical "category" value differs from the partition constant. When supplied through + // idToConstant the reader must surface the constant, not whatever is stored in the file. + List written = + Lists.newArrayList(record(0L, "file-value", 1.5d), record(1L, "file-value", 2.5d)); + OutputFile outputFile = writeRecords(SCHEMA, written); + + Map idToConstant = ImmutableMap.of(2, "partition-value"); + + List rows = read(outputFile, SCHEMA, idToConstant); + + assertThat(rows).hasSize(2); + for (int i = 0; i < rows.size(); i++) { + assertThat(rows.get(i).getField("id")).isEqualTo((long) i); + assertThat(rows.get(i).getField("category")).isEqualTo("partition-value"); + assertThat(rows.get(i).getField("measurement")) + .isEqualTo(written.get(i).getField("measurement")); + } + } + + @Test + public void testMetadataColumnConstantsInjected() throws IOException { + List written = Lists.newArrayList(record(0L, "a", 1.5d), record(1L, "b", 2.5d)); + OutputFile outputFile = writeRecords(SCHEMA, written); + + String filePath = "s3://bucket/path/data.vortex"; + int specId = 7; + + // Projection mixes file columns with metadata columns that are not stored in the file at all. + Schema projection = + new Schema( + SCHEMA.findField("id"), + SCHEMA.findField("measurement"), + MetadataColumns.FILE_PATH, + MetadataColumns.SPEC_ID); + + Map idToConstant = + ImmutableMap.of( + MetadataColumns.FILE_PATH.fieldId(), filePath, + MetadataColumns.SPEC_ID.fieldId(), specId); + + List rows = read(outputFile, projection, idToConstant); + + assertThat(rows).hasSize(2); + for (int i = 0; i < rows.size(); i++) { + assertThat(rows.get(i).getField("id")).isEqualTo((long) i); + assertThat(rows.get(i).getField("measurement")) + .isEqualTo(written.get(i).getField("measurement")); + assertThat(rows.get(i).getField(MetadataColumns.FILE_PATH.name())).isEqualTo(filePath); + assertThat(rows.get(i).getField(MetadataColumns.SPEC_ID.name())).isEqualTo(specId); + } + } + + @Test + public void testReorderedProjectionWithConstant() throws IOException { + // A projection whose order differs from the file's physical column order, with a constant + // interleaved, must still resolve each column by name. + List written = + Lists.newArrayList(record(0L, "file-value", 1.5d), record(1L, "file-value", 2.5d)); + OutputFile outputFile = writeRecords(SCHEMA, written); + + Schema projection = + new Schema( + SCHEMA.findField("measurement"), SCHEMA.findField("category"), SCHEMA.findField("id")); + + Map idToConstant = ImmutableMap.of(2, "partition-value"); + + List rows = read(outputFile, projection, idToConstant); + + assertThat(rows).hasSize(2); + for (int i = 0; i < rows.size(); i++) { + assertThat(rows.get(i).getField("measurement")) + .isEqualTo(written.get(i).getField("measurement")); + assertThat(rows.get(i).getField("category")).isEqualTo("partition-value"); + assertThat(rows.get(i).getField("id")).isEqualTo((long) i); + } + } + + private static Record record(long id, String category, double measurement) { + GenericRecord record = GenericRecord.create(SCHEMA); + record.setField("id", id); + record.setField("category", category); + record.setField("measurement", measurement); + return record; + } + + private OutputFile writeRecords(Schema schema, List records) throws IOException { + OutputFile outputFile = + Files.localOutput(temp.resolve("test-" + System.nanoTime() + ".vortex").toFile()); + try (FileAppender appender = + formatModel() + .writeBuilder(EncryptedFiles.plainAsEncryptedOutput(outputFile)) + .schema(schema) + .content(FileContent.DATA) + .build()) { + appender.addAll(records); + } + + return outputFile; + } + + private List read(OutputFile outputFile, Schema projection, Map idToConstant) + throws IOException { + try (CloseableIterable reader = + formatModel() + .readBuilder(outputFile.toInputFile()) + .project(projection) + .idToConstant(idToConstant) + .build()) { + return Lists.newArrayList(reader); + } + } + + private static VortexFormatModel> formatModel() { + return VortexFormatModel.create( + Record.class, + StructType.class, + (icebergSchema, fileSchema, engineSchema) -> GenericVortexWriter.buildWriter(icebergSchema), + (VortexFormatModel.ReaderFunction) GenericVortexReader::buildReader); + } +}