From f5153363ab978bee19433dff4bddbb92cf154039 Mon Sep 17 00:00:00 2001 From: weijiii Date: Sun, 4 May 2025 01:24:29 -0700 Subject: [PATCH] Allow row column for equality delete --- .../iceberg/IcebergPageSourceProvider.java | 1 + .../plugin/iceberg/delete/DeleteFilter.java | 3 +- .../plugin/iceberg/delete/DeleteManager.java | 12 +- .../iceberg/delete/EqualityDeleteFilter.java | 13 +- .../plugin/iceberg/delete/LazyTrinoRow.java | 9 +- .../iceberg/delete/PositionDeleteFilter.java | 3 +- .../trino/plugin/iceberg/delete/TrinoRow.java | 111 +++++++++++++++++- .../trino/plugin/iceberg/TestIcebergV2.java | 13 ++ 8 files changed, 145 insertions(+), 20 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index bf2e9bf21af6..297bd283075c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -349,6 +349,7 @@ public ConnectorPageSource createPageSource( if (!deletes.isEmpty()) { Supplier> deletePredicate = memoize(() -> getDeleteManager(partitionSpec, partitionData) .getDeletePredicate( + session, path, dataSequenceNumber, deletes, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java index 68ba48f49848..ed7526f4eaed 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFilter.java @@ -14,10 +14,11 @@ package io.trino.plugin.iceberg.delete; import io.trino.plugin.iceberg.IcebergColumnHandle; +import io.trino.spi.connector.ConnectorSession; import java.util.List; public interface DeleteFilter { - RowPredicate createPredicate(List columns, long dataSequenceNumber); + RowPredicate createPredicate(ConnectorSession session, List columns, long dataSequenceNumber); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java index dd64f3cf8b87..69e98b622671 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java @@ -24,6 +24,7 @@ import io.trino.plugin.iceberg.delete.EqualityDeleteFilter.EqualityDeleteFilterBuilder; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.Range; @@ -69,6 +70,7 @@ public DeleteManager(TypeManager typeManager) } public Optional getDeletePredicate( + ConnectorSession session, String dataFilePath, long dataSequenceNumber, List deleteFiles, @@ -92,9 +94,9 @@ public Optional getDeletePredicate( } Optional positionDeletes = createPositionDeleteFilter(dataFilePath, positionDeleteFiles, readerPageSourceWithRowPositions, deletePageSourceProvider) - .map(filter -> filter.createPredicate(readColumns, dataSequenceNumber)); - Optional equalityDeletes = createEqualityDeleteFilter(equalityDeleteFiles, tableSchema, deletePageSourceProvider).stream() - .map(filter -> filter.createPredicate(readColumns, dataSequenceNumber)) + .map(filter -> filter.createPredicate(session, readColumns, dataSequenceNumber)); + Optional equalityDeletes = createEqualityDeleteFilter(session, equalityDeleteFiles, tableSchema, deletePageSourceProvider).stream() + .map(filter -> filter.createPredicate(session, readColumns, dataSequenceNumber)) .reduce(RowPredicate::and); if (positionDeletes.isEmpty()) { @@ -168,7 +170,7 @@ private static boolean shouldLoadPositionDeleteFile(DeleteFile deleteFile, Optio (positionUpperBound.isEmpty() || positionUpperBound.get() >= startRowPosition.get()); } - private List createEqualityDeleteFilter(List equalityDeleteFiles, Schema schema, DeletePageSourceProvider deletePageSourceProvider) + private List createEqualityDeleteFilter(ConnectorSession session, List equalityDeleteFiles, Schema schema, DeletePageSourceProvider deletePageSourceProvider) { if (equalityDeleteFiles.isEmpty()) { return List.of(); @@ -189,7 +191,7 @@ private List createEqualityDeleteFilter(List e EqualityDeleteFilterBuilder builder = equalityDeleteFiltersBySchema.computeIfAbsent(fieldIds, _ -> EqualityDeleteFilter.builder(schemaFromHandles(deleteColumns))); deleteFilters.add(builder); - ListenableFuture loadFuture = builder.readEqualityDeletes(deleteFile, deleteColumns, deletePageSourceProvider); + ListenableFuture loadFuture = builder.readEqualityDeletes(session, deleteFile, deleteColumns, deletePageSourceProvider); if (loadFuture.state() != SUCCESS) { pendingLoads.add(loadFuture); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java index af093e5288b1..eaed0e0440eb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/EqualityDeleteFilter.java @@ -21,6 +21,7 @@ import io.trino.plugin.iceberg.delete.DeleteManager.DeletePageSourceProvider; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SourcePage; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; @@ -53,7 +54,7 @@ private EqualityDeleteFilter(Schema deleteSchema, Map columns, long splitDataSequenceNumber) + public RowPredicate createPredicate(ConnectorSession session, List columns, long splitDataSequenceNumber) { StructType fileStructType = structTypeFromHandles(columns); StructType deleteStructType = deleteSchema.asStruct(); @@ -68,7 +69,7 @@ public RowPredicate createPredicate(List columns, long spli .toArray(Type[]::new); return (page, position) -> { - StructProjection row = projection.wrap(new LazyTrinoRow(types, page, position)); + StructProjection row = projection.wrap(new LazyTrinoRow(session, types, page, position)); DataSequenceNumber maxDeleteVersion = deletedRows.get(structLikeWrapper.set(row)); // clear reference to avoid memory leak structLikeWrapper.set(null); @@ -94,19 +95,19 @@ private EqualityDeleteFilterBuilder(Schema deleteSchema) this.deletedRows = new ConcurrentHashMap<>(); } - public ListenableFuture readEqualityDeletes(DeleteFile deleteFile, List deleteColumns, DeletePageSourceProvider deletePageSourceProvider) + public ListenableFuture readEqualityDeletes(ConnectorSession session, DeleteFile deleteFile, List deleteColumns, DeletePageSourceProvider deletePageSourceProvider) { verify(deleteColumns.size() == deleteSchema.columns().size(), "delete columns size doesn't match delete schema size"); // ensure only one thread loads the file ListenableFutureTask futureTask = loadingFiles.computeIfAbsent( deleteFile.path(), - key -> ListenableFutureTask.create(() -> readEqualityDeletesInternal(deleteFile, deleteColumns, deletePageSourceProvider), null)); + key -> ListenableFutureTask.create(() -> readEqualityDeletesInternal(session, deleteFile, deleteColumns, deletePageSourceProvider), null)); futureTask.run(); return Futures.nonCancellationPropagating(futureTask); } - private void readEqualityDeletesInternal(DeleteFile deleteFile, List deleteColumns, DeletePageSourceProvider deletePageSourceProvider) + private void readEqualityDeletesInternal(ConnectorSession session, DeleteFile deleteFile, List deleteColumns, DeletePageSourceProvider deletePageSourceProvider) { DataSequenceNumber sequenceNumber = new DataSequenceNumber(deleteFile.dataSequenceNumber()); try (ConnectorPageSource pageSource = deletePageSourceProvider.openDeletes(deleteFile, deleteColumns, TupleDomain.all())) { @@ -122,7 +123,7 @@ private void readEqualityDeletesInternal(DeleteFile deleteFile, List { if (existing.dataSequenceNumber() > newValue.dataSequenceNumber()) { return existing; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/LazyTrinoRow.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/LazyTrinoRow.java index 02fab388b31c..1ac0a2837414 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/LazyTrinoRow.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/LazyTrinoRow.java @@ -13,13 +13,14 @@ */ package io.trino.plugin.iceberg.delete; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SourcePage; import io.trino.spi.type.Type; import org.apache.iceberg.StructLike; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkElementIndex; -import static io.trino.plugin.iceberg.IcebergPageSink.getIcebergValue; +import static io.trino.plugin.iceberg.delete.TrinoRow.getObjectValue; import static java.util.Objects.requireNonNull; /** @@ -28,13 +29,15 @@ final class LazyTrinoRow implements StructLike { + private final ConnectorSession session; private final Type[] types; private final SourcePage page; private final int position; private final Object[] values; - public LazyTrinoRow(Type[] types, SourcePage page, int position) + public LazyTrinoRow(ConnectorSession session, Type[] types, SourcePage page, int position) { + this.session = requireNonNull(session, "session is null"); checkArgument(types.length == page.getChannelCount(), "mismatched types for page"); this.types = requireNonNull(types, "types is null"); this.page = requireNonNull(page, "page is null"); @@ -68,7 +71,7 @@ private Object get(int i) return value; } - value = getIcebergValue(page.getBlock(i), position, types[i]); + value = getObjectValue(session, page.getBlock(i), position, types[i]); values[i] = value; return value; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java index f1ef60afd606..2119d6d5ac27 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFilter.java @@ -17,6 +17,7 @@ import io.trino.plugin.iceberg.IcebergColumnHandle; import io.trino.spi.block.Block; import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SourcePage; import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider; import org.roaringbitmap.longlong.LongBitmapDataProvider; @@ -39,7 +40,7 @@ public PositionDeleteFilter(ImmutableLongBitmapDataProvider deletedRows) } @Override - public RowPredicate createPredicate(List columns, long dataSequenceNumber) + public RowPredicate createPredicate(ConnectorSession session, List columns, long dataSequenceNumber) { int filePosChannel = rowPositionChannel(columns); return (page, position) -> { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoRow.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoRow.java index 490ad34cc235..7835fa7756fb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoRow.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/TrinoRow.java @@ -13,26 +13,49 @@ */ package io.trino.plugin.iceberg.delete; +import io.trino.spi.block.Block; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SourcePage; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.RowType; import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; import org.apache.iceberg.StructLike; import java.util.Arrays; +import java.util.List; import static com.google.common.base.Preconditions.checkArgument; -import static io.trino.plugin.iceberg.IcebergPageSink.getIcebergValue; +import static io.trino.plugin.iceberg.util.Timestamps.getTimestampTz; +import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.Decimals.readBigDecimal; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TimeType.TIME_MICROS; +import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS; +import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS; +import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND; +import static io.trino.spi.type.TinyintType.TINYINT; +import static io.trino.spi.type.UuidType.UUID; +import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid; -final class TrinoRow +public final class TrinoRow implements StructLike { private final Object[] values; - public TrinoRow(Type[] types, SourcePage page, int position) + public TrinoRow(ConnectorSession session, Type[] types, SourcePage page, int position) { checkArgument(types.length == page.getChannelCount(), "mismatched types for page"); values = new Object[types.length]; for (int i = 0; i < values.length; i++) { - values[i] = getIcebergValue(page.getBlock(i), position, types[i]); + values[i] = getObjectValue(session, page.getBlock(i), position, types[i]); } } @@ -59,4 +82,84 @@ public String toString() { return "TrinoRow" + Arrays.toString(values); } + + static Object getObjectValue(ConnectorSession session, Block block, int position, Type type) + { + if (block.isNull(position)) { + return null; + } + if (type.equals(BIGINT)) { + return BIGINT.getLong(block, position); + } + if (type.equals(TINYINT)) { + return (int) TINYINT.getByte(block, position); + } + if (type.equals(SMALLINT)) { + return (int) SMALLINT.getShort(block, position); + } + if (type.equals(INTEGER)) { + return INTEGER.getInt(block, position); + } + if (type.equals(DATE)) { + return DATE.getInt(block, position); + } + if (type.equals(BOOLEAN)) { + return BOOLEAN.getBoolean(block, position); + } + if (type instanceof DecimalType decimalType) { + return readBigDecimal(decimalType, block, position); + } + if (type.equals(REAL)) { + return REAL.getFloat(block, position); + } + if (type.equals(DOUBLE)) { + return DOUBLE.getDouble(block, position); + } + if (type.equals(TIME_MICROS)) { + return TIME_MICROS.getLong(block, position) / PICOSECONDS_PER_MICROSECOND; + } + if (type.equals(TIMESTAMP_MICROS)) { + return TIMESTAMP_MICROS.getLong(block, position); + } + if (type.equals(TIMESTAMP_TZ_MICROS)) { + return timestampTzToMicros(getTimestampTz(block, position)); + } + if (type instanceof VarbinaryType varbinaryType) { + return varbinaryType.getSlice(block, position).toByteBuffer(); + } + if (type instanceof VarcharType varcharType) { + return varcharType.getSlice(block, position).toStringUtf8(); + } + if (type.equals(UUID)) { + return trinoUuidToJavaUuid(UUID.getSlice(block, position)); + } + if (type instanceof RowType rowType) { + List values = (List) rowType.getObjectValue(session, block, position); + return rowData(values.toArray()); + } + throw new UnsupportedOperationException("Type not supported as equality delete column: " + type.getDisplayName()); + } + + public static StructLike rowData(Object... values) + { + return new StructLike() { + @Override + public int size() + { + return values.length; + } + + @Override + public T get(int i, Class aClass) + { + return aClass.cast(values[i]); + } + + @Override + public void set(int i, T t) + { + throw new UnsupportedOperationException("Testing StructLike does not support setting values."); + } + }; + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index d11334fbc4a8..d2d23f0fe9c2 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -95,6 +95,7 @@ import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore; import static io.trino.plugin.iceberg.IcebergTestUtils.getMetadataFileAndUpdatedMillis; import static io.trino.plugin.iceberg.IcebergTestUtils.getTrinoCatalog; +import static io.trino.plugin.iceberg.delete.TrinoRow.rowData; import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable; import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTableWithSchema; import static io.trino.spi.type.BigintType.BIGINT; @@ -293,6 +294,18 @@ public void testV2TableWithEqualityDeleteWhenColumnIsNested() assertQuery("SELECT array_column[1], map_column[1], row_column.x FROM " + tableName, "SELECT 1, 2, 1 FROM nation WHERE regionkey != 1"); } + @Test + public void testV2TableWithEqualityDeleteWhenColumnIsRow() + throws Exception + { + String tableName = "test_v2_equality_delete_column_is_row" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS " + + "SELECT CAST(ROW(1, 2) AS ROW(x BIGINT, y DOUBLE)) row_column, regionkey FROM tpch.tiny.nation", 25); + Table icebergTable = loadTable(tableName); + writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.empty(), ImmutableMap.of("row_column", rowData(1L, 2.0))); + assertQueryReturnsEmptyResult("SELECT row_column.x FROM " + tableName); + } + @Test public void testParquetMissingFieldId() throws Exception