Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
*/
package org.apache.iceberg.spark.data;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.vortex.GenericVortexReaders;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.vortex.VortexRowReader;
Expand All @@ -38,35 +41,96 @@
/** Read Vortex as Spark {@link InternalRow}. */
public class SparkVortexReader implements VortexRowReader<InternalRow> {

private final List<VortexValueReader<?>> fieldReaders;
// Parallel arrays indexed by position in the expected (projected) schema. A field is read either
// from a file column ({@code columnNames[i]} is the Arrow column name) or as a constant
// ({@code columnNames[i]} is null and {@code readers[i]} is a constant reader).
private final VortexValueReader<?>[] readers;
private final String[] columnNames;

// Resolves expected field position -> Arrow batch column index. Vortex only returns the projected
// (non-constant, file-resident) columns, so this mapping is computed by name from the first batch
// rather than assuming the batch is positionally aligned with the expected schema. -1 marks a
// constant field that is not backed by a batch column.
private int[] batchColumnIndex;

public SparkVortexReader(
Schema readSchema,
org.apache.arrow.vector.types.pojo.Schema fileArrowSchema,
Map<Integer, ?> idToConstant) {
List<Field> fields = fileArrowSchema.getFields();
Map<Integer, ?> constants = idToConstant == null ? Collections.emptyMap() : idToConstant;

List<Field> fileFields = fileArrowSchema.getFields();
Map<String, Field> arrowFieldsByName = Maps.newHashMapWithExpectedSize(fileFields.size());
for (Field field : fileFields) {
arrowFieldsByName.put(field.getName(), field);
}

List<Types.NestedField> expected = readSchema.columns();
this.fieldReaders = Lists.newArrayListWithExpectedSize(expected.size());
this.readers = new VortexValueReader<?>[expected.size()];
this.columnNames = new String[expected.size()];

for (int i = 0; i < expected.size(); i++) {
Type icebergType = expected.get(i).type();
Field arrowField = fields.get(i);
this.fieldReaders.add(
VortexSchemaWithTypeVisitor.visit(icebergType, arrowField, SparkReadBuilder.INSTANCE));
Types.NestedField field = expected.get(i);
int id = field.fieldId();
if (constants.containsKey(id)) {
// Identity-partition value or metadata column (e.g. _file, _spec_id, _partition) supplied
// through idToConstant instead of being stored in the data file.
this.readers[i] = GenericVortexReaders.constants(constants.get(id));
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
this.readers[i] = GenericVortexReaders.constants(false);
} else {
Field arrowField = arrowFieldsByName.get(field.name());
if (arrowField == null) {
// Field is neither a constant nor present in the data file; fill with null.
this.readers[i] = GenericVortexReaders.constants(null);
} else {
this.readers[i] =
VortexSchemaWithTypeVisitor.visit(
field.type(), arrowField, SparkReadBuilder.INSTANCE);
this.columnNames[i] = arrowField.getName();
}
}
}
}

@Override
public InternalRow read(VectorSchemaRoot batch, int row) {
GenericInternalRow result = new GenericInternalRow(fieldReaders.size());
for (int i = 0; i < fieldReaders.size(); i++) {
VortexValueReader<?> reader = fieldReaders.get(i);
result.update(i, reader.read(batch.getVector(i), row));
if (batchColumnIndex == null) {
this.batchColumnIndex = resolveColumns(batch);
}

GenericInternalRow result = new GenericInternalRow(readers.length);
for (int i = 0; i < readers.length; i++) {
int columnIndex = batchColumnIndex[i];
FieldVector vector = columnIndex < 0 ? null : batch.getVector(columnIndex);
result.update(i, readers[i].read(vector, row));
}
return result;
}

static class SparkReadBuilder extends VortexSchemaWithTypeVisitor<VortexValueReader<?>> {
private int[] resolveColumns(VectorSchemaRoot batch) {
List<FieldVector> vectors = batch.getFieldVectors();
Map<String, Integer> nameToIndex = Maps.newHashMapWithExpectedSize(vectors.size());
for (int i = 0; i < vectors.size(); i++) {
nameToIndex.put(vectors.get(i).getField().getName(), i);
}

int[] indexes = new int[columnNames.length];
for (int i = 0; i < columnNames.length; i++) {
if (columnNames[i] == null) {
indexes[i] = -1;
} else {
Integer index = nameToIndex.get(columnNames[i]);
Preconditions.checkState(
index != null, "Vortex batch is missing projected column: %s", columnNames[i]);
indexes[i] = index;
}
}

return indexes;
}

static class SparkReadBuilder extends VortexSchemaWithTypeVisitor<VortexValueReader<?>> {
static final SparkReadBuilder INSTANCE = new SparkReadBuilder();

private SparkReadBuilder() {}
Expand Down Expand Up @@ -110,7 +174,6 @@ public VortexValueReader<?> primitive(Type.PrimitiveType icebergType, Field prim
}

static class StructReader implements VortexValueReader<InternalRow> {

private final List<VortexValueReader<?>> fields;

private StructReader(List<VortexValueReader<?>> fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
*/
package org.apache.iceberg.spark.data.vectorized;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.vortex.VortexBatchReader;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnVector;
Expand All @@ -38,54 +39,72 @@ public static VortexBatchReader<ColumnarBatch> buildReader(
Schema icebergSchema,
org.apache.arrow.vector.types.pojo.Schema vortexSchema,
Map<Integer, ?> idToConstant) {
return new SchemaCachingBatchReader(icebergSchema, vortexSchema, idToConstant);
return new ConstantAwareBatchReader(icebergSchema, idToConstant);
}

static final class SchemaCachingBatchReader implements VortexBatchReader<ColumnarBatch> {
private final Schema readerSchema;
static final class ConstantAwareBatchReader implements VortexBatchReader<ColumnarBatch> {
private final List<Types.NestedField> columns;
private final Map<Integer, ?> idToConstant;
private final List<Integer> schemaMapping;

SchemaCachingBatchReader(
Schema readerSchema,
org.apache.arrow.vector.types.pojo.Schema vortexSchema,
Map<Integer, ?> idToConstant) {
this.readerSchema = readerSchema;
this.idToConstant = idToConstant;
this.schemaMapping = vortexSchemaMapping(readerSchema, vortexSchema);
// Resolves expected column position -> Arrow batch column index, computed by name from the
// first batch. -1 marks a constant column not backed by a batch column. Vortex returns only the
// projected (non-constant, file-resident) columns, so the batch is not positionally aligned
// with
// the reader schema.
private int[] batchColumnIndex;

ConstantAwareBatchReader(Schema readerSchema, Map<Integer, ?> idToConstant) {
this.columns = readerSchema.columns();
this.idToConstant = idToConstant == null ? Collections.emptyMap() : idToConstant;
}

@Override
public ColumnarBatch read(VectorSchemaRoot batch) {
int rowCount = batch.getRowCount();
Map<Integer, ColumnVector> vectors = Maps.newHashMap();
List<FieldVector> fieldVectors = batch.getFieldVectors();
if (batchColumnIndex == null) {
this.batchColumnIndex = resolveColumns(fieldVectors);
}

for (Map.Entry<Integer, ?> entry : idToConstant.entrySet()) {
Integer fieldId = entry.getKey();
Object constant = entry.getValue();
if (MetadataColumns.isMetadataColumn(fieldId)) {
continue;
// Build columns in reader-schema order so they line up with Spark's expected output schema.
ColumnVector[] vectors = new ColumnVector[columns.size()];
for (int i = 0; i < columns.size(); i++) {
Types.NestedField field = columns.get(i);
int columnIndex = batchColumnIndex[i];
if (columnIndex >= 0) {
vectors[i] = new ArrowColumnVector(fieldVectors.get(columnIndex));
} else if (idToConstant.containsKey(field.fieldId())) {
vectors[i] =
new ConstantColumnVector(field.type(), rowCount, idToConstant.get(field.fieldId()));
} else if (field.fieldId() == MetadataColumns.IS_DELETED.fieldId()) {
vectors[i] = new ConstantColumnVector(Types.BooleanType.get(), rowCount, false);
} else {
// Column is neither a constant nor present in the data file; surface nulls.
vectors[i] = new ConstantColumnVector(field.type(), rowCount, null);
}

vectors.put(
fieldId, new ConstantColumnVector(readerSchema.findType(fieldId), rowCount, constant));
}

List<FieldVector> fieldVectors = batch.getFieldVectors();
return new ColumnarBatch(vectors, rowCount);
}

private int[] resolveColumns(List<FieldVector> fieldVectors) {
Map<String, Integer> nameToIndex = Maps.newHashMapWithExpectedSize(fieldVectors.size());
for (int i = 0; i < fieldVectors.size(); i++) {
int fieldId = schemaMapping.get(i);
vectors.put(fieldId, new ArrowColumnVector(fieldVectors.get(i)));
nameToIndex.put(fieldVectors.get(i).getField().getName(), i);
}

return new ColumnarBatch(vectors.values().toArray(new ColumnVector[0]), rowCount);
}
int[] indexes = new int[columns.size()];
for (int i = 0; i < columns.size(); i++) {
Types.NestedField field = columns.get(i);
if (idToConstant.containsKey(field.fieldId())) {
indexes[i] = -1;
} else {
Integer index = nameToIndex.get(field.name());
indexes[i] = index == null ? -1 : index;
}
}

// Mapping from Arrow Schema field index to Iceberg Field ID.
static List<Integer> vortexSchemaMapping(
Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema vortexSchema) {
return vortexSchema.getFields().stream()
.map(field -> icebergSchema.findField(field.getName()).fieldId())
.collect(Collectors.toList());
return indexes;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
*/
package org.apache.iceberg.spark.data;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.vortex.GenericVortexReaders;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.vortex.VortexRowReader;
Expand All @@ -38,35 +41,96 @@
/** Read Vortex as Spark {@link InternalRow}. */
public class SparkVortexReader implements VortexRowReader<InternalRow> {

private final List<VortexValueReader<?>> fieldReaders;
// Parallel arrays indexed by position in the expected (projected) schema. A field is read either
// from a file column ({@code columnNames[i]} is the Arrow column name) or as a constant
// ({@code columnNames[i]} is null and {@code readers[i]} is a constant reader).
private final VortexValueReader<?>[] readers;
private final String[] columnNames;

// Resolves expected field position -> Arrow batch column index. Vortex only returns the projected
// (non-constant, file-resident) columns, so this mapping is computed by name from the first batch
// rather than assuming the batch is positionally aligned with the expected schema. -1 marks a
// constant field that is not backed by a batch column.
private int[] batchColumnIndex;

public SparkVortexReader(
Schema readSchema,
org.apache.arrow.vector.types.pojo.Schema fileArrowSchema,
Map<Integer, ?> idToConstant) {
List<Field> fields = fileArrowSchema.getFields();
Map<Integer, ?> constants = idToConstant == null ? Collections.emptyMap() : idToConstant;

List<Field> fileFields = fileArrowSchema.getFields();
Map<String, Field> arrowFieldsByName = Maps.newHashMapWithExpectedSize(fileFields.size());
for (Field field : fileFields) {
arrowFieldsByName.put(field.getName(), field);
}

List<Types.NestedField> expected = readSchema.columns();
this.fieldReaders = Lists.newArrayListWithExpectedSize(expected.size());
this.readers = new VortexValueReader<?>[expected.size()];
this.columnNames = new String[expected.size()];

for (int i = 0; i < expected.size(); i++) {
Type icebergType = expected.get(i).type();
Field arrowField = fields.get(i);
this.fieldReaders.add(
VortexSchemaWithTypeVisitor.visit(icebergType, arrowField, SparkReadBuilder.INSTANCE));
Types.NestedField field = expected.get(i);
int id = field.fieldId();
if (constants.containsKey(id)) {
// Identity-partition value or metadata column (e.g. _file, _spec_id, _partition) supplied
// through idToConstant instead of being stored in the data file.
this.readers[i] = GenericVortexReaders.constants(constants.get(id));
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
this.readers[i] = GenericVortexReaders.constants(false);
} else {
Field arrowField = arrowFieldsByName.get(field.name());
if (arrowField == null) {
// Field is neither a constant nor present in the data file; fill with null.
this.readers[i] = GenericVortexReaders.constants(null);
} else {
this.readers[i] =
VortexSchemaWithTypeVisitor.visit(
field.type(), arrowField, SparkReadBuilder.INSTANCE);
this.columnNames[i] = arrowField.getName();
}
}
}
}

@Override
public InternalRow read(VectorSchemaRoot batch, int row) {
GenericInternalRow result = new GenericInternalRow(fieldReaders.size());
for (int i = 0; i < fieldReaders.size(); i++) {
VortexValueReader<?> reader = fieldReaders.get(i);
result.update(i, reader.read(batch.getVector(i), row));
if (batchColumnIndex == null) {
this.batchColumnIndex = resolveColumns(batch);
}

GenericInternalRow result = new GenericInternalRow(readers.length);
for (int i = 0; i < readers.length; i++) {
int columnIndex = batchColumnIndex[i];
FieldVector vector = columnIndex < 0 ? null : batch.getVector(columnIndex);
result.update(i, readers[i].read(vector, row));
}
return result;
}

static class SparkReadBuilder extends VortexSchemaWithTypeVisitor<VortexValueReader<?>> {
private int[] resolveColumns(VectorSchemaRoot batch) {
List<FieldVector> vectors = batch.getFieldVectors();
Map<String, Integer> nameToIndex = Maps.newHashMapWithExpectedSize(vectors.size());
for (int i = 0; i < vectors.size(); i++) {
nameToIndex.put(vectors.get(i).getField().getName(), i);
}

int[] indexes = new int[columnNames.length];
for (int i = 0; i < columnNames.length; i++) {
if (columnNames[i] == null) {
indexes[i] = -1;
} else {
Integer index = nameToIndex.get(columnNames[i]);
Preconditions.checkState(
index != null, "Vortex batch is missing projected column: %s", columnNames[i]);
indexes[i] = index;
}
}

return indexes;
}

static class SparkReadBuilder extends VortexSchemaWithTypeVisitor<VortexValueReader<?>> {
static final SparkReadBuilder INSTANCE = new SparkReadBuilder();

private SparkReadBuilder() {}
Expand Down Expand Up @@ -110,7 +174,6 @@ public VortexValueReader<?> primitive(Type.PrimitiveType icebergType, Field prim
}

static class StructReader implements VortexValueReader<InternalRow> {

private final List<VortexValueReader<?>> fields;

private StructReader(List<VortexValueReader<?>> fields) {
Expand Down
Loading
Loading