Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand All @@ -73,7 +74,9 @@ protected static List<Object> parameters() {
new Object[] {2, FileFormat.PARQUET, false},
new Object[] {2, FileFormat.PARQUET, true},
new Object[] {2, FileFormat.ORC, false},
new Object[] {2, FileFormat.ORC, true});
new Object[] {2, FileFormat.ORC, true},
new Object[] {2, FileFormat.VORTEX, false},
new Object[] {2, FileFormat.VORTEX, true});
}

private static final String PARTITION_VALUE = "aaa";
Expand Down Expand Up @@ -162,6 +165,9 @@ public void testEqualityDeleteWriter() throws IOException {
@TestTemplate
public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException {
assumeThat(partitioned).isFalse();
// Mixed-spec scans supply partition values via idToConstant, which the Vortex generic reader
// does not yet inject (no constant readers — see GenericVortexReader TODO).
assumeThat(fileFormat).isNotEqualTo(FileFormat.VORTEX);

List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
Schema equalityDeleteRowSchema = table.schema().select("id");
Expand Down Expand Up @@ -217,6 +223,9 @@ public void testEqualityDeleteWriterWithMultipleSpecs() throws IOException {

@TestTemplate
public void testPositionDeleteWriter() throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// write a data file
Expand Down Expand Up @@ -355,6 +364,9 @@ public void testPositionDeleteWriterWithRow() throws IOException {

@TestTemplate
public void testPositionDeleteWriterMultipleDataFiles() throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// write two data files
Expand Down Expand Up @@ -506,6 +518,16 @@ private List<Record> readFile(Schema schema, InputFile inputFile) throws IOExcep
return ImmutableList.copyOf(records);
}

case VORTEX:
try (CloseableIterable<Record> records =
FormatModelRegistry.<Record, Schema>readBuilder(
FileFormat.VORTEX, Record.class, inputFile)
.project(schema)
.build()) {

return ImmutableList.copyOf(records);
}

default:
throw new UnsupportedOperationException("Unsupported read file format: " + fileFormat);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ protected static List<Object> parameters() {
return Arrays.asList(
new Object[] {2, FileFormat.AVRO},
new Object[] {2, FileFormat.PARQUET},
new Object[] {2, FileFormat.ORC});
new Object[] {2, FileFormat.ORC},
new Object[] {2, FileFormat.VORTEX});
}

private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
Expand Down Expand Up @@ -175,6 +176,9 @@ public void testClusteredEqualityDeleteWriterNoRecords() throws IOException {

@TestTemplate
public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException {
// Mixed-spec scans supply partition values via idToConstant, which the Vortex generic reader
// does not yet inject (no constant readers — see GenericVortexReader TODO).
assumeThat(fileFormat).isNotEqualTo(FileFormat.VORTEX);
List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
Schema equalityDeleteRowSchema = table.schema().select("id");
FileWriterFactory<T> writerFactory =
Expand Down Expand Up @@ -240,6 +244,9 @@ public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException

@TestTemplate
public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() throws IOException {
// Mixed-spec scans supply partition values via idToConstant, which the Vortex generic reader
// does not yet inject (no constant readers — see GenericVortexReader TODO).
assumeThat(fileFormat).isNotEqualTo(FileFormat.VORTEX);
List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
Schema equalityDeleteRowSchema = table.schema().select("id");
FileWriterFactory<T> writerFactory =
Expand Down Expand Up @@ -294,6 +301,9 @@ public void testClusteredPositionDeleteWriterNoRecordsFileGranularity() throws I

private void checkClusteredPositionDeleteWriterNoRecords(DeleteGranularity deleteGranularity)
throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
ClusteredPositionDeleteWriter<T> writer =
new ClusteredPositionDeleteWriter<>(
Expand Down Expand Up @@ -323,6 +333,9 @@ public void testClusteredPositionDeleteWriterMultipleSpecsFileGranularity() thro

private void checkClusteredPositionDeleteWriterMultipleSpecs(DeleteGranularity deleteGranularity)
throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// add an unpartitioned data file
Expand Down Expand Up @@ -406,6 +419,9 @@ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitionsFileGra

private void checkClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions(
DeleteGranularity deleteGranularity) throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
Expand Down Expand Up @@ -472,6 +488,9 @@ public void testClusteredPositionDeleteWriterFileGranularity() throws IOExceptio

private void checkClusteredPositionDeleteWriterGranularity(DeleteGranularity deleteGranularity)
throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// add the first data file
Expand Down Expand Up @@ -571,6 +590,9 @@ public void testFanoutPositionOnlyDeleteWriterNoRecordsFileGranularity() throws

private void checkFanoutPositionOnlyDeleteWriterNoRecords(DeleteGranularity deleteGranularity)
throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
FanoutPositionOnlyDeleteWriter<T> writer =
new FanoutPositionOnlyDeleteWriter<>(
Expand Down Expand Up @@ -601,6 +623,9 @@ public void testFanoutPositionOnlyDeleteWriterOutOfOrderRecordsFileGranularity()

private void checkFanoutPositionOnlyDeleteWriterOutOfOrderRecords(
DeleteGranularity deleteGranularity) throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// add an unpartitioned data file
Expand Down Expand Up @@ -691,6 +716,9 @@ public void testFanoutPositionOnlyDeleteWriterFileGranularity() throws IOExcepti

private void checkFanoutPositionOnlyDeleteWriterGranularity(DeleteGranularity deleteGranularity)
throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());

// add the first data file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.io;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -49,7 +50,9 @@ protected static List<Object> parameters() {
new Object[] {2, FileFormat.PARQUET, false},
new Object[] {2, FileFormat.PARQUET, true},
new Object[] {2, FileFormat.ORC, false},
new Object[] {2, FileFormat.ORC, true});
new Object[] {2, FileFormat.ORC, true},
new Object[] {2, FileFormat.VORTEX, false},
new Object[] {2, FileFormat.VORTEX, true});
}

private static final int FILE_SIZE_CHECK_ROWS_DIVISOR = 1000;
Expand Down Expand Up @@ -172,6 +175,9 @@ public void testRollingEqualityDeleteWriterSplitDeletes() throws IOException {

@TestTemplate
public void testRollingPositionDeleteWriterNoRecords() throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
RollingPositionDeleteWriter<T> writer =
new RollingPositionDeleteWriter<>(
Expand All @@ -190,6 +196,9 @@ public void testRollingPositionDeleteWriterNoRecords() throws IOException {

@TestTemplate
public void testRollingPositionDeleteWriterSplitDeletes() throws IOException {
assumeThat(fileFormat)
.as("Vortex does not support position deletes")
.isNotEqualTo(FileFormat.VORTEX);
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
RollingPositionDeleteWriter<T> writer =
new RollingPositionDeleteWriter<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public VortexValueReader<?> struct(
@Override
public VortexValueReader<?> list(
Types.ListType iList, Field listField, VortexValueReader<?> element) {
throw new UnsupportedOperationException("LIST TYPES!");
return GenericVortexReaders.list(element);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.ZoneId;
import java.util.List;
import java.util.UUID;
import java.util.stream.IntStream;
import org.apache.arrow.vector.BaseIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
Expand All @@ -44,6 +45,7 @@
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
Expand Down Expand Up @@ -139,7 +141,6 @@ public Record readNonNull(FieldVector vector, int row) {
}
}

@SuppressWarnings("UnusedVariable")
private static class ListReader<T> implements VortexValueReader<List<T>> {
private final VortexValueReader<T> elementReader;

Expand All @@ -149,7 +150,13 @@ private ListReader(VortexValueReader<T> elementReader) {

@Override
public List<T> readNonNull(FieldVector vector, int row) {
throw new UnsupportedOperationException("Reading lists from Vortex not supported yet");
ListVector listVector = (ListVector) vector;
int start = listVector.getElementStartIndex(row);
int end = listVector.getElementEndIndex(row);
FieldVector elementVector = listVector.getDataVector();
return IntStream.range(start, end)
.mapToObj(i -> elementReader.read(elementVector, i))
.toList();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
Expand Down Expand Up @@ -131,16 +132,21 @@ private static void writeValue(
((VarCharVector) vector).setSafe(rowIndex, strBytes);
break;
case BINARY:
case FIXED:
byte[] bytes;
byte[] binaryBytes;
if (value instanceof ByteBuffer) {
bytes = ByteBuffers.toByteArray((ByteBuffer) value);
binaryBytes = ByteBuffers.toByteArray((ByteBuffer) value);
} else {
bytes = (byte[]) value;
binaryBytes = (byte[]) value;
}

((VarBinaryVector) vector).setSafe(rowIndex, bytes);
((VarBinaryVector) vector).setSafe(rowIndex, binaryBytes);
break;
case FIXED:
// FIXED maps to Arrow FixedSizeBinaryVector, not VarBinaryVector. Until the writer is
// updated to use FixedSizeBinaryVector and validate the byte length, refuse the write
// rather than failing with a cryptic ClassCastException at runtime.
throw new UnsupportedOperationException(
"Writing Iceberg FIXED columns to Vortex is not yet supported");
case DECIMAL:
((DecimalVector) vector).setSafe(rowIndex, (BigDecimal) value);
break;
Expand Down Expand Up @@ -180,6 +186,24 @@ private static void writeValue(
((TimeStampNanoVector) vector).setSafe(rowIndex, localEpochNanos);
}

break;
case LIST:
Types.ListType listType = (Types.ListType) type;
org.apache.iceberg.types.Type elementType = listType.elementType();
ListVector listVector = (ListVector) vector;
FieldVector elementVector = listVector.getDataVector();
List<?> elements = (List<?>) value;
int elementStart = listVector.startNewValue(rowIndex);
for (int i = 0; i < elements.size(); i++) {
Object elementValue = elements.get(i);
int elementIdx = elementStart + i;
if (elementValue == null) {
elementVector.setNull(elementIdx);
} else {
writeValue(elementVector, elementType, elementValue, elementIdx);
}
}
listVector.endValue(rowIndex, elements.size());
break;
default:
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -229,6 +253,10 @@ private static ColumnMetricsTracker<?> newTracker(Types.NestedField field) {
v -> ChronoUnit.NANOS.between(LOCAL_EPOCH, (LocalDateTime) v));
}
default:
if (field.type().isNestedType()) {
// Lists, maps, and structs have no natural ordering — track counts only.
return new ColumnMetricsTracker<>(field.fieldId(), null);
}
return new ColumnMetricsTracker<>(field.fieldId(), (Comparator) Comparator.naturalOrder());
}
}
Expand Down Expand Up @@ -271,6 +299,9 @@ void incrementValueCount() {
@SuppressWarnings("unchecked")
void addValue(Object value) {
valueCount++;
if (comparator == null) {
return;
}
T typedValue = converter != null ? converter.apply(value) : (T) value;
if (min == null || comparator.compare(typedValue, min) < 0) {
min = typedValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public ReadBuilder<D, S> caseSensitive(boolean caseSensitive) {

@Override
public ReadBuilder<D, S> filter(Expression filter) {
this.filterPredicate = Optional.of(filter);
this.filterPredicate = Optional.ofNullable(filter);
return this;
}

Expand Down
Loading
Loading