Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ public ConnectorPageSource createPageSource(
if (!deletes.isEmpty()) {
Supplier<Optional<RowPredicate>> deletePredicate = memoize(() -> getDeleteManager(partitionSpec, partitionData)
.getDeletePredicate(
session,
path,
dataSequenceNumber,
deletes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
package io.trino.plugin.iceberg.delete;

import io.trino.plugin.iceberg.IcebergColumnHandle;
import io.trino.spi.connector.ConnectorSession;

import java.util.List;

public interface DeleteFilter
{
RowPredicate createPredicate(List<IcebergColumnHandle> columns, long dataSequenceNumber);
RowPredicate createPredicate(ConnectorSession session, List<IcebergColumnHandle> columns, long dataSequenceNumber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.plugin.iceberg.delete.EqualityDeleteFilter.EqualityDeleteFilterBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.Range;
Expand Down Expand Up @@ -69,6 +70,7 @@ public DeleteManager(TypeManager typeManager)
}

public Optional<RowPredicate> getDeletePredicate(
ConnectorSession session,
String dataFilePath,
long dataSequenceNumber,
List<DeleteFile> deleteFiles,
Expand All @@ -92,9 +94,9 @@ public Optional<RowPredicate> getDeletePredicate(
}

Optional<RowPredicate> positionDeletes = createPositionDeleteFilter(dataFilePath, positionDeleteFiles, readerPageSourceWithRowPositions, deletePageSourceProvider)
.map(filter -> filter.createPredicate(readColumns, dataSequenceNumber));
Optional<RowPredicate> equalityDeletes = createEqualityDeleteFilter(equalityDeleteFiles, tableSchema, deletePageSourceProvider).stream()
.map(filter -> filter.createPredicate(readColumns, dataSequenceNumber))
.map(filter -> filter.createPredicate(session, readColumns, dataSequenceNumber));
Optional<RowPredicate> equalityDeletes = createEqualityDeleteFilter(session, equalityDeleteFiles, tableSchema, deletePageSourceProvider).stream()
.map(filter -> filter.createPredicate(session, readColumns, dataSequenceNumber))
.reduce(RowPredicate::and);

if (positionDeletes.isEmpty()) {
Expand Down Expand Up @@ -168,7 +170,7 @@ private static boolean shouldLoadPositionDeleteFile(DeleteFile deleteFile, Optio
(positionUpperBound.isEmpty() || positionUpperBound.get() >= startRowPosition.get());
}

private List<EqualityDeleteFilter> createEqualityDeleteFilter(List<DeleteFile> equalityDeleteFiles, Schema schema, DeletePageSourceProvider deletePageSourceProvider)
private List<EqualityDeleteFilter> createEqualityDeleteFilter(ConnectorSession session, List<DeleteFile> equalityDeleteFiles, Schema schema, DeletePageSourceProvider deletePageSourceProvider)
{
if (equalityDeleteFiles.isEmpty()) {
return List.of();
Expand All @@ -189,7 +191,7 @@ private List<EqualityDeleteFilter> createEqualityDeleteFilter(List<DeleteFile> e
EqualityDeleteFilterBuilder builder = equalityDeleteFiltersBySchema.computeIfAbsent(fieldIds, _ -> EqualityDeleteFilter.builder(schemaFromHandles(deleteColumns)));
deleteFilters.add(builder);

ListenableFuture<?> loadFuture = builder.readEqualityDeletes(deleteFile, deleteColumns, deletePageSourceProvider);
ListenableFuture<?> loadFuture = builder.readEqualityDeletes(session, deleteFile, deleteColumns, deletePageSourceProvider);
if (loadFuture.state() != SUCCESS) {
pendingLoads.add(loadFuture);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.plugin.iceberg.delete.DeleteManager.DeletePageSourceProvider;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -53,7 +54,7 @@ private EqualityDeleteFilter(Schema deleteSchema, Map<StructLikeWrapper, DataSeq
}

@Override
public RowPredicate createPredicate(List<IcebergColumnHandle> columns, long splitDataSequenceNumber)
public RowPredicate createPredicate(ConnectorSession session, List<IcebergColumnHandle> columns, long splitDataSequenceNumber)
{
StructType fileStructType = structTypeFromHandles(columns);
StructType deleteStructType = deleteSchema.asStruct();
Expand All @@ -68,7 +69,7 @@ public RowPredicate createPredicate(List<IcebergColumnHandle> columns, long spli
.toArray(Type[]::new);

return (page, position) -> {
StructProjection row = projection.wrap(new LazyTrinoRow(types, page, position));
StructProjection row = projection.wrap(new LazyTrinoRow(session, types, page, position));
DataSequenceNumber maxDeleteVersion = deletedRows.get(structLikeWrapper.set(row));
// clear reference to avoid memory leak
structLikeWrapper.set(null);
Expand All @@ -94,19 +95,19 @@ private EqualityDeleteFilterBuilder(Schema deleteSchema)
this.deletedRows = new ConcurrentHashMap<>();
}

public ListenableFuture<?> readEqualityDeletes(DeleteFile deleteFile, List<IcebergColumnHandle> deleteColumns, DeletePageSourceProvider deletePageSourceProvider)
public ListenableFuture<?> readEqualityDeletes(ConnectorSession session, DeleteFile deleteFile, List<IcebergColumnHandle> deleteColumns, DeletePageSourceProvider deletePageSourceProvider)
{
verify(deleteColumns.size() == deleteSchema.columns().size(), "delete columns size doesn't match delete schema size");

// ensure only one thread loads the file
ListenableFutureTask<?> futureTask = loadingFiles.computeIfAbsent(
deleteFile.path(),
key -> ListenableFutureTask.create(() -> readEqualityDeletesInternal(deleteFile, deleteColumns, deletePageSourceProvider), null));
key -> ListenableFutureTask.create(() -> readEqualityDeletesInternal(session, deleteFile, deleteColumns, deletePageSourceProvider), null));
futureTask.run();
return Futures.nonCancellationPropagating(futureTask);
}

private void readEqualityDeletesInternal(DeleteFile deleteFile, List<IcebergColumnHandle> deleteColumns, DeletePageSourceProvider deletePageSourceProvider)
private void readEqualityDeletesInternal(ConnectorSession session, DeleteFile deleteFile, List<IcebergColumnHandle> deleteColumns, DeletePageSourceProvider deletePageSourceProvider)
{
DataSequenceNumber sequenceNumber = new DataSequenceNumber(deleteFile.dataSequenceNumber());
try (ConnectorPageSource pageSource = deletePageSourceProvider.openDeletes(deleteFile, deleteColumns, TupleDomain.all())) {
Expand All @@ -122,7 +123,7 @@ private void readEqualityDeletesInternal(DeleteFile deleteFile, List<IcebergColu
}

for (int position = 0; position < page.getPositionCount(); position++) {
TrinoRow row = new TrinoRow(types, page, position);
TrinoRow row = new TrinoRow(session, types, page, position);
deletedRows.merge(wrapper.copyFor(row), sequenceNumber, (existing, newValue) -> {
if (existing.dataSequenceNumber() > newValue.dataSequenceNumber()) {
return existing;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
*/
package io.trino.plugin.iceberg.delete;

import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.type.Type;
import org.apache.iceberg.StructLike;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkElementIndex;
import static io.trino.plugin.iceberg.IcebergPageSink.getIcebergValue;
import static io.trino.plugin.iceberg.delete.TrinoRow.getObjectValue;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -28,13 +29,15 @@
final class LazyTrinoRow
implements StructLike
{
private final ConnectorSession session;
private final Type[] types;
private final SourcePage page;
private final int position;
private final Object[] values;

public LazyTrinoRow(Type[] types, SourcePage page, int position)
public LazyTrinoRow(ConnectorSession session, Type[] types, SourcePage page, int position)
{
this.session = requireNonNull(session, "session is null");
checkArgument(types.length == page.getChannelCount(), "mismatched types for page");
this.types = requireNonNull(types, "types is null");
this.page = requireNonNull(page, "page is null");
Expand Down Expand Up @@ -68,7 +71,7 @@ private Object get(int i)
return value;
}

value = getIcebergValue(page.getBlock(i), position, types[i]);
value = getObjectValue(session, page.getBlock(i), position, types[i]);
values[i] = value;
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.plugin.iceberg.IcebergColumnHandle;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SourcePage;
import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider;
import org.roaringbitmap.longlong.LongBitmapDataProvider;
Expand All @@ -39,7 +40,7 @@ public PositionDeleteFilter(ImmutableLongBitmapDataProvider deletedRows)
}

@Override
public RowPredicate createPredicate(List<IcebergColumnHandle> columns, long dataSequenceNumber)
public RowPredicate createPredicate(ConnectorSession session, List<IcebergColumnHandle> columns, long dataSequenceNumber)
{
int filePosChannel = rowPositionChannel(columns);
return (page, position) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,49 @@
*/
package io.trino.plugin.iceberg.delete;

import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.iceberg.StructLike;

import java.util.Arrays;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.iceberg.IcebergPageSink.getIcebergValue;
import static io.trino.plugin.iceberg.util.Timestamps.getTimestampTz;
import static io.trino.plugin.iceberg.util.Timestamps.timestampTzToMicros;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.Decimals.readBigDecimal;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RealType.REAL;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MICROS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;
import static io.trino.spi.type.Timestamps.PICOSECONDS_PER_MICROSECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.UuidType.UUID;
import static io.trino.spi.type.UuidType.trinoUuidToJavaUuid;

final class TrinoRow
public final class TrinoRow
implements StructLike
{
private final Object[] values;

public TrinoRow(Type[] types, SourcePage page, int position)
public TrinoRow(ConnectorSession session, Type[] types, SourcePage page, int position)
{
checkArgument(types.length == page.getChannelCount(), "mismatched types for page");
values = new Object[types.length];
for (int i = 0; i < values.length; i++) {
values[i] = getIcebergValue(page.getBlock(i), position, types[i]);
values[i] = getObjectValue(session, page.getBlock(i), position, types[i]);
}
}

Expand All @@ -59,4 +82,84 @@ public String toString()
{
return "TrinoRow" + Arrays.toString(values);
}

static Object getObjectValue(ConnectorSession session, Block block, int position, Type type)
{
if (block.isNull(position)) {
return null;
}
if (type.equals(BIGINT)) {
return BIGINT.getLong(block, position);
}
if (type.equals(TINYINT)) {
return (int) TINYINT.getByte(block, position);
}
if (type.equals(SMALLINT)) {
return (int) SMALLINT.getShort(block, position);
}
if (type.equals(INTEGER)) {
return INTEGER.getInt(block, position);
}
if (type.equals(DATE)) {
return DATE.getInt(block, position);
}
if (type.equals(BOOLEAN)) {
return BOOLEAN.getBoolean(block, position);
}
if (type instanceof DecimalType decimalType) {
return readBigDecimal(decimalType, block, position);
}
if (type.equals(REAL)) {
return REAL.getFloat(block, position);
}
if (type.equals(DOUBLE)) {
return DOUBLE.getDouble(block, position);
}
if (type.equals(TIME_MICROS)) {
return TIME_MICROS.getLong(block, position) / PICOSECONDS_PER_MICROSECOND;
}
if (type.equals(TIMESTAMP_MICROS)) {
return TIMESTAMP_MICROS.getLong(block, position);
}
if (type.equals(TIMESTAMP_TZ_MICROS)) {
return timestampTzToMicros(getTimestampTz(block, position));
}
if (type instanceof VarbinaryType varbinaryType) {
return varbinaryType.getSlice(block, position).toByteBuffer();
}
if (type instanceof VarcharType varcharType) {
return varcharType.getSlice(block, position).toStringUtf8();
}
if (type.equals(UUID)) {
return trinoUuidToJavaUuid(UUID.getSlice(block, position));
}
if (type instanceof RowType rowType) {
List<Object> values = (List<Object>) rowType.getObjectValue(session, block, position);
return rowData(values.toArray());
}
throw new UnsupportedOperationException("Type not supported as equality delete column: " + type.getDisplayName());
}

public static StructLike rowData(Object... values)
{
return new StructLike() {
@Override
public int size()
{
return values.length;
}

@Override
public <T> T get(int i, Class<T> aClass)
{
return aClass.cast(values[i]);
}

@Override
public <T> void set(int i, T t)
{
throw new UnsupportedOperationException("Testing StructLike does not support setting values.");
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore;
import static io.trino.plugin.iceberg.IcebergTestUtils.getMetadataFileAndUpdatedMillis;
import static io.trino.plugin.iceberg.IcebergTestUtils.getTrinoCatalog;
import static io.trino.plugin.iceberg.delete.TrinoRow.rowData;
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable;
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTableWithSchema;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -293,6 +294,18 @@ public void testV2TableWithEqualityDeleteWhenColumnIsNested()
assertQuery("SELECT array_column[1], map_column[1], row_column.x FROM " + tableName, "SELECT 1, 2, 1 FROM nation WHERE regionkey != 1");
}

@Test
public void testV2TableWithEqualityDeleteWhenColumnIsRow()
throws Exception
{
String tableName = "test_v2_equality_delete_column_is_row" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " AS " +
"SELECT CAST(ROW(1, 2) AS ROW(x BIGINT, y DOUBLE)) row_column, regionkey FROM tpch.tiny.nation", 25);
Table icebergTable = loadTable(tableName);
writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.empty(), ImmutableMap.of("row_column", rowData(1L, 2.0)));
assertQueryReturnsEmptyResult("SELECT row_column.x FROM " + tableName);
}

@Test
public void testParquetMissingFieldId()
throws Exception
Expand Down
Loading