diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md
index 3e9a2c7f5919..4ed093dd55e6 100644
--- a/docs/src/main/sphinx/connector/iceberg.md
+++ b/docs/src/main/sphinx/connector/iceberg.md
@@ -945,7 +945,7 @@ connector using a {doc}`WITH ` clause.
- Optionally specifies the file system location URI for the table.
* - `format_version`
- Optionally specifies the format version of the Iceberg specification to use
- for new tables; either `1` or `2`. Defaults to `2`. Version `2` is required
+ for new tables; either `1`, `2` or `3`. Defaults to `2`. Version `2` is required
for row level deletes.
* - `max_commit_retry`
- Number of times to retry a commit before failing. Defaults to the value of
diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml
index b536dc6b167b..1956fb7d1060 100644
--- a/plugin/trino-iceberg/pom.xml
+++ b/plugin/trino-iceberg/pom.xml
@@ -223,6 +223,12 @@
iceberg-core
+
+ org.apache.iceberg
+ iceberg-data
+ ${dep.iceberg.version}
+
+
org.apache.iceberg
iceberg-nessie
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java
index 73140ce60f0a..a0ba6d319c84 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CommitTaskData.java
@@ -13,22 +13,28 @@
*/
package io.trino.plugin.iceberg;
+import com.google.common.collect.ImmutableList;
import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
import java.util.List;
import java.util.Optional;
+import java.util.OptionalLong;
import static java.util.Objects.requireNonNull;
public record CommitTaskData(
String path,
- IcebergFileFormat fileFormat,
+ FileFormat fileFormat,
long fileSizeInBytes,
MetricsWrapper metrics,
String partitionSpecJson,
Optional partitionDataJson,
FileContent content,
Optional referencedDataFile,
+ List deletionVectorFiles,
+ OptionalLong deletionVectorContentOffset,
+ OptionalLong deletionVectorContentSize,
Optional> fileSplitOffsets)
{
public CommitTaskData
@@ -40,6 +46,9 @@ public record CommitTaskData(
requireNonNull(partitionDataJson, "partitionDataJson is null");
requireNonNull(content, "content is null");
requireNonNull(referencedDataFile, "referencedDataFile is null");
+ deletionVectorFiles = ImmutableList.copyOf(deletionVectorFiles);
requireNonNull(fileSplitOffsets, "fileSplitOffsets is null");
+ requireNonNull(deletionVectorContentOffset, "deletionVectorContentOffset is null");
+ requireNonNull(deletionVectorContentSize, "deletionVectorContentSize is null");
}
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java
index 854ce8e7324b..0f3393c89a43 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java
@@ -18,6 +18,7 @@
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.type.Type;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.Record;
@@ -36,6 +37,7 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.FileFormat.AVRO;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
public final class IcebergAvroFileWriter
@@ -46,6 +48,7 @@ public final class IcebergAvroFileWriter
// Use static table name instead of the actual name because it becomes outdated once the table is renamed
public static final String AVRO_TABLE_NAME = "table";
+ private final String location;
private final Schema icebergSchema;
private final List types;
private final FileAppender avroWriter;
@@ -58,6 +61,7 @@ public IcebergAvroFileWriter(
List types,
HiveCompressionCodec hiveCompressionCodec)
{
+ this.location = requireNonNull(file.location(), "location is null");
this.rollbackAction = requireNonNull(rollbackAction, "rollbackAction null");
this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null");
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
@@ -71,10 +75,22 @@ public IcebergAvroFileWriter(
.build();
}
catch (IOException e) {
- throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file: " + file.location(), e);
+ throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Avro file: " + location, e);
}
}
+ @Override
+ public FileFormat fileFormat()
+ {
+ return AVRO;
+ }
+
+ @Override
+ public String location()
+ {
+ return location;
+ }
+
@Override
public long getWrittenBytes()
{
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
index 77de8d534b51..965436fe548e 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
@@ -52,7 +52,8 @@
public class IcebergConfig
{
public static final int FORMAT_VERSION_SUPPORT_MIN = 1;
- public static final int FORMAT_VERSION_SUPPORT_MAX = 2;
+ private static final int FORMAT_VERSION_DEFAULT = 2;
+ public static final int FORMAT_VERSION_SUPPORT_MAX = 3;
public static final String EXTENDED_STATISTICS_CONFIG = "iceberg.extended-statistics.enabled";
public static final String EXTENDED_STATISTICS_DESCRIPTION = "Enable collection (ANALYZE) and use of extended statistics.";
public static final String COLLECT_EXTENDED_STATISTICS_ON_WRITE_DESCRIPTION = "Collect extended statistics during writes";
@@ -74,7 +75,7 @@ public class IcebergConfig
private boolean registerTableProcedureEnabled;
private boolean addFilesProcedureEnabled;
private Optional hiveCatalogName = Optional.empty();
- private int formatVersion = FORMAT_VERSION_SUPPORT_MAX;
+ private int formatVersion = FORMAT_VERSION_DEFAULT;
private Duration expireSnapshotsMinRetention = new Duration(7, DAYS);
private Duration removeOrphanFilesMinRetention = new Duration(7, DAYS);
private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE);
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriter.java
index f6616bab1c9e..72b4f0c5cf79 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriter.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriter.java
@@ -13,7 +13,9 @@
*/
package io.trino.plugin.iceberg;
+import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.FileWriter;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import java.util.List;
@@ -22,6 +24,15 @@
public interface IcebergFileWriter
extends FileWriter
{
+ FileFormat fileFormat();
+
+ String location();
+
+ default List rewrittenDeleteFiles()
+ {
+ return ImmutableList.of();
+ }
+
FileMetrics getFileMetrics();
record FileMetrics(Metrics metrics, Optional> splitOffsets) {}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java
index 0d4a7049fff4..23cb98fe2ddc 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java
@@ -33,14 +33,19 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.orc.OrcWriterConfig;
+import io.trino.plugin.iceberg.delete.DeletionVectorWriter;
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DeleteFileSet;
import org.weakref.jmx.Managed;
import java.io.Closeable;
@@ -48,10 +53,12 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveCompressionCodecs.toCompressionCodec;
import static io.trino.plugin.hive.HiveMetadata.TRINO_QUERY_ID_NAME;
@@ -82,6 +89,7 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;
@@ -139,16 +147,37 @@ public IcebergFileWriter createPositionDeleteWriter(
TrinoFileSystem fileSystem,
Location outputPath,
ConnectorSession session,
- IcebergFileFormat fileFormat,
- Map storageProperties)
+ String dataFilePath,
+ FileFormat fileFormat,
+ PartitionSpec partitionSpec,
+ Optional partition,
+ Map storageProperties,
+ Map previousDeleteFiles)
{
return switch (fileFormat) {
+ case PUFFIN -> createDeletionVectorWriter(nodeVersion, fileSystem, outputPath, dataFilePath, partitionSpec, partition, previousDeleteFiles);
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session);
+ case METADATA -> throw new IllegalArgumentException("Unexpected METADATA file format");
};
}
+ private static DeletionVectorWriter createDeletionVectorWriter(
+ NodeVersion nodeVersion,
+ TrinoFileSystem fileSystem,
+ Location outputPath,
+ String dataFilePath,
+ PartitionSpec partitionSpec,
+ Optional partition,
+ Map previousDeleteFiles)
+ {
+ Function previousDeleteLoader = DeletionVectorWriter.create(fileSystem, previousDeleteFiles);
+ int positionChannel = POSITION_DELETE_SCHEMA.columns().indexOf(DELETE_FILE_POS);
+ checkState(positionChannel != -1, "positionChannel not found");
+ return new DeletionVectorWriter(nodeVersion, fileSystem, outputPath, dataFilePath, partitionSpec, partition, previousDeleteLoader::apply, positionChannel);
+ }
+
private IcebergFileWriter createParquetWriter(
MetricsConfig metricsConfig,
TrinoFileSystem fileSystem,
@@ -234,6 +263,7 @@ private IcebergFileWriter createOrcWriter(
}
return new IcebergOrcFileWriter(
+ outputPath,
metricsConfig,
icebergSchema,
orcDataSink,
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java
index f6095c9d4bd9..747e126ff9c6 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMergeSink.java
@@ -30,6 +30,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Type;
+import org.apache.iceberg.util.DeleteFileSet;
import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider;
import org.roaringbitmap.longlong.LongBitmapDataProvider;
import org.roaringbitmap.longlong.Roaring64Bitmap;
@@ -55,8 +56,10 @@ public class IcebergMergeSink
private final LocationProvider locationProvider;
private final IcebergFileWriterFactory fileWriterFactory;
private final TrinoFileSystem fileSystem;
+ private final Map previousDeleteFiles;
private final JsonCodec jsonCodec;
private final ConnectorSession session;
+ private final int formatVersion;
private final IcebergFileFormat fileFormat;
private final Map storageProperties;
private final Schema schema;
@@ -69,8 +72,10 @@ public IcebergMergeSink(
LocationProvider locationProvider,
IcebergFileWriterFactory fileWriterFactory,
TrinoFileSystem fileSystem,
+ Map previousDeleteFiles,
JsonCodec jsonCodec,
ConnectorSession session,
+ int formatVersion,
IcebergFileFormat fileFormat,
Map storageProperties,
Schema schema,
@@ -81,8 +86,10 @@ public IcebergMergeSink(
this.locationProvider = requireNonNull(locationProvider, "locationProvider is null");
this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
+ this.previousDeleteFiles = ImmutableMap.copyOf(previousDeleteFiles);
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.session = requireNonNull(session, "session is null");
+ this.formatVersion = formatVersion;
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
this.schema = requireNonNull(schema, "schema is null");
@@ -162,8 +169,10 @@ private PositionDeleteWriter createPositionDeleteWriter(String dataFilePath, Par
fileSystem,
jsonCodec,
session,
+ formatVersion,
fileFormat,
- storageProperties);
+ storageProperties,
+ previousDeleteFiles);
}
private static Collection writePositionDeletes(PositionDeleteWriter writer, ImmutableLongBitmapDataProvider rowsToDelete)
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
index c835a2d80de1..d7f2327ed87a 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
@@ -47,6 +47,7 @@
import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer;
import io.trino.plugin.iceberg.aggregation.IcebergThetaSketchForStats;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
+import io.trino.plugin.iceberg.delete.PositionDeleteFiles;
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
import io.trino.plugin.iceberg.procedure.IcebergAddFilesFromTableHandle;
import io.trino.plugin.iceberg.procedure.IcebergAddFilesHandle;
@@ -138,6 +139,7 @@
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentFileParsers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
@@ -304,6 +306,7 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
+import static io.trino.plugin.iceberg.IcebergTableProperties.validateFormatVersion;
import static io.trino.plugin.iceberg.IcebergUtil.buildPath;
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs;
import static io.trino.plugin.iceberg.IcebergUtil.checkFormatForProperty;
@@ -400,6 +403,7 @@
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
+import static org.apache.iceberg.TableUtil.formatVersion;
import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
import static org.apache.iceberg.types.TypeUtil.indexParents;
import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash;
@@ -410,8 +414,8 @@ public class IcebergMetadata
{
private static final Logger log = Logger.get(IcebergMetadata.class);
private static final Pattern PATH_PATTERN = Pattern.compile("(.*)/[^/]+");
- private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 2;
- private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 2;
+ private static final int OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION = 3;
+ private static final int CLEANING_UP_PROCEDURES_MAX_SUPPORTED_TABLE_VERSION = 3;
private static final String RETENTION_THRESHOLD = "retention_threshold";
private static final String UNKNOWN_SNAPSHOT_TOKEN = "UNKNOWN";
public static final Set UPDATABLE_TABLE_PROPERTIES = ImmutableSet.builder()
@@ -1281,7 +1285,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
"to use unique table locations for every table.", location));
}
}
- return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), retryMode);
+ return newWritableTableHandle(tableMetadata.getTable(), transaction.table(), ImmutableList.of(), retryMode);
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking new table's location: " + location, e);
@@ -1388,7 +1392,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
beginTransaction(icebergTable);
- return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode);
+ return newWritableTableHandle(table.getSchemaTableName(), icebergTable, ImmutableList.of(), retryMode);
}
private List getChildNamespaces(ConnectorSession session, String parentNamespace)
@@ -1404,15 +1408,21 @@ private List getChildNamespaces(ConnectorSession session, String parentN
.collect(toImmutableList());
}
- private IcebergWritableTableHandle newWritableTableHandle(SchemaTableName name, Table table, RetryMode retryMode)
+ private IcebergWritableTableHandle newWritableTableHandle(
+ SchemaTableName name,
+ Table table,
+ List previousDeleteFiles,
+ RetryMode retryMode)
{
return new IcebergWritableTableHandle(
name,
+ formatVersion(table),
SchemaParser.toJson(table.schema()),
transformValues(table.specs(), PartitionSpecParser::toJson),
table.spec().specId(),
getSupportedSortFields(table.schema(), table.sortOrder()),
getProjectedColumns(table.schema(), typeManager),
+ previousDeleteFiles,
table.location(),
getFileFormat(table),
table.properties(),
@@ -3031,7 +3041,11 @@ public ColumnHandle getMergeRowIdColumnHandle(ConnectorSession session, Connecto
@Override
public Optional getUpdateLayout(ConnectorSession session, ConnectorTableHandle tableHandle)
{
- return getInsertLayout(session, tableHandle)
+ Optional insertLayout = getInsertLayout(session, tableHandle);
+ if (insertLayout.isEmpty() || insertLayout.get().getPartitioning().isEmpty()) {
+ return Optional.of(new IcebergPartitioningHandle(true, List.of()));
+ }
+ return insertLayout
.flatMap(ConnectorTableLayout::getPartitioning)
.map(IcebergPartitioningHandle.class::cast)
.map(IcebergPartitioningHandle::forUpdate);
@@ -3048,10 +3062,36 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT
beginTransaction(icebergTable);
- IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode);
+ List previousDeleteFiles = loadPreviousDeleteFiles(icebergTable);
+ IcebergWritableTableHandle insertHandle = newWritableTableHandle(table.getSchemaTableName(), icebergTable, previousDeleteFiles, retryMode);
return new IcebergMergeTableHandle(table, insertHandle);
}
+ private static List loadPreviousDeleteFiles(Table icebergTable)
+ {
+ int formatVersion = formatVersion(icebergTable);
+ validateFormatVersion(formatVersion);
+ if (formatVersion < 3) {
+ return ImmutableList.of();
+ }
+
+ ImmutableList.Builder rewritableDeletes = ImmutableList.builder();
+ try (CloseableIterable iterator = icebergTable.newScan().planFiles()) {
+ for (FileScanTask task : iterator) {
+ rewritableDeletes.add(new PositionDeleteFiles(
+ task.file().location(),
+ task.spec().specId(),
+ task.deletes().stream()
+ .map(deleteFile -> ContentFileParsers.toJson(deleteFile, task.spec()))
+ .collect(toImmutableList())));
+ }
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return rewritableDeletes.build();
+ }
+
@Override
public void finishMerge(ConnectorSession session, ConnectorMergeTableHandle mergeTableHandle, List sourceTableHandles, Collection fragments, Collection computedStatistics)
{
@@ -3125,10 +3165,21 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
case POSITION_DELETES -> {
FileMetadata.Builder deleteBuilder = FileMetadata.deleteFileBuilder(partitionSpec)
.withPath(task.path())
- .withFormat(task.fileFormat().toIceberg())
+ .withFormat(task.fileFormat())
.ofPositionDeletes()
.withFileSizeInBytes(task.fileSizeInBytes())
.withMetrics(task.metrics().metrics());
+
+ if (task.fileFormat() == FileFormat.PUFFIN) {
+ deleteBuilder.withRecordCount(task.metrics().recordCount());
+ deleteBuilder.withContentOffset(task.deletionVectorContentOffset().orElseThrow(() -> new IllegalStateException("deletionVectorContentOffset is missing while constructing deletion vector")));
+ deleteBuilder.withContentSizeInBytes(task.deletionVectorContentSize().orElseThrow(() -> new IllegalStateException("deletionVectorContentSize is missing while constructing deletion vector")));
+ deleteBuilder.withReferencedDataFile(task.referencedDataFile().orElseThrow(() -> new IllegalStateException("referencedDataFile is missing while constructing deletion vector")));
+ for (String rewrittenDeleteFile : task.deletionVectorFiles()) {
+ rowDelta.removeDeletes((DeleteFile) ContentFileParsers.fromJson(rewrittenDeleteFile, partitionSpec));
+ }
+ }
+
task.fileSplitOffsets().ifPresent(deleteBuilder::withSplitOffsets);
if (!partitionSpec.fields().isEmpty()) {
String partitionDataJson = task.partitionDataJson()
@@ -3142,7 +3193,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
case DATA -> {
DataFiles.Builder builder = DataFiles.builder(partitionSpec)
.withPath(task.path())
- .withFormat(task.fileFormat().toIceberg())
+ .withFormat(task.fileFormat())
.withFileSizeInBytes(task.fileSizeInBytes())
.withMetrics(task.metrics().metrics());
if (!icebergTable.spec().fields().isEmpty()) {
@@ -3679,7 +3730,8 @@ && getOnlyElement(sourceTableHandles) instanceof IcebergTableHandle handle
fromSnapshotForRefresh = Optional.of(Long.parseLong(sourceTable.getValue()));
}
- return newWritableTableHandle(table.getSchemaTableName(), icebergTable, retryMode);
+ List rewritableDeletes = loadPreviousDeleteFiles(icebergTable);
+ return newWritableTableHandle(table.getSchemaTableName(), icebergTable, rewritableDeletes, retryMode);
}
@Override
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java
index 3ec48539315e..ddb8656757e3 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergOrcFileWriter.java
@@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;
import io.airlift.log.Logger;
+import io.trino.filesystem.Location;
import io.trino.orc.OrcDataSink;
import io.trino.orc.OrcDataSource;
import io.trino.orc.OrcWriteValidation.OrcWriteValidationMode;
@@ -28,6 +29,7 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.Type;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
@@ -49,6 +51,7 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED;
import static io.trino.plugin.iceberg.util.OrcMetrics.computeMetrics;
import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.FileFormat.ORC;
public final class IcebergOrcFileWriter
implements IcebergFileWriter
@@ -58,6 +61,7 @@ public final class IcebergOrcFileWriter
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();
private final OrcWriter orcWriter;
+ private final Location outputPath;
private final Schema icebergSchema;
private final ColumnMetadata orcColumns;
private final MetricsConfig metricsConfig;
@@ -68,6 +72,7 @@ public final class IcebergOrcFileWriter
private long validationCpuNanos;
public IcebergOrcFileWriter(
+ Location outputPath,
MetricsConfig metricsConfig,
Schema icebergSchema,
OrcDataSink orcDataSink,
@@ -84,6 +89,7 @@ public IcebergOrcFileWriter(
OrcWriterStats stats)
{
requireNonNull(orcDataSink, "orcDataSink is null");
+ this.outputPath = requireNonNull(outputPath, "outputPath is null");
this.rollbackAction = requireNonNull(rollbackAction, "rollbackAction is null");
this.fileInputColumnIndexes = requireNonNull(fileInputColumnIndexes, "fileInputColumnIndexes is null");
@@ -108,6 +114,18 @@ public IcebergOrcFileWriter(
orcColumns = fileColumnOrcTypes;
}
+ @Override
+ public FileFormat fileFormat()
+ {
+ return ORC;
+ }
+
+ @Override
+ public String location()
+ {
+ return outputPath.toString();
+ }
+
@Override
public FileMetrics getFileMetrics()
{
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java
index 3432159270f5..7826d1c71a6a 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java
@@ -54,6 +54,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -424,13 +425,16 @@ private void closeWriter(int writerIndex)
CommitTaskData task = new CommitTaskData(
writeContext.getPath(),
- fileFormat,
+ writer.fileFormat(),
writer.getWrittenBytes(),
new MetricsWrapper(writer.getFileMetrics().metrics()),
PartitionSpecParser.toJson(partitionSpec),
writeContext.getPartitionData().map(PartitionData::toJson),
DATA,
Optional.empty(),
+ writer.rewrittenDeleteFiles(),
+ OptionalLong.empty(),
+ OptionalLong.empty(),
writer.getFileMetrics().splitOffsets());
commitTasks.add(wrappedBuffer(jsonCodec.toJsonBytes(task)));
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java
index bc4d694483f7..298230f5f093 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSinkProvider.java
@@ -17,6 +17,7 @@
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;
import io.trino.plugin.hive.SortingFileWriterConfig;
+import io.trino.plugin.iceberg.delete.PositionDeleteFiles;
import io.trino.plugin.iceberg.procedure.IcebergOptimizeHandle;
import io.trino.plugin.iceberg.procedure.IcebergTableExecuteHandle;
import io.trino.spi.PageIndexerFactory;
@@ -32,14 +33,19 @@
import io.trino.spi.connector.ConnectorTableExecuteHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.type.TypeManager;
+import org.apache.iceberg.ContentFileParsers;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.util.DeleteFileSet;
import java.util.Map;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Maps.transformValues;
import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider;
import static java.util.Objects.requireNonNull;
@@ -167,13 +173,19 @@ public ConnectorMergeSink createMergeSink(ConnectorTransactionHandle transaction
Schema schema = SchemaParser.fromJson(tableHandle.schemaAsJson());
Map partitionsSpecs = transformValues(tableHandle.partitionsSpecsAsJson(), json -> PartitionSpecParser.fromJson(schema, json));
ConnectorPageSink pageSink = createPageSink(session, tableHandle);
+ Map previousDeleteFiles = tableHandle.previousDeleteFiles().stream()
+ .collect(toImmutableMap(PositionDeleteFiles::dataFileLocation, file -> DeleteFileSet.of(file.deletes().stream()
+ .map(delete -> (DeleteFile) ContentFileParsers.fromJson(delete, partitionsSpecs.get(file.partitionSpecId())))
+ .collect(toImmutableList()))));
return new IcebergMergeSink(
locationProvider,
fileWriterFactory,
fileSystemFactory.create(session.getIdentity(), tableHandle.fileIoProperties()),
+ previousDeleteFiles,
jsonCodec,
session,
+ tableHandle.formatVersion(),
tableHandle.fileFormat(),
tableHandle.storageProperties(),
schema,
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
index bf2e9bf21af6..0f02aada2787 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
@@ -349,6 +349,7 @@ public ConnectorPageSource createPageSource(
if (!deletes.isEmpty()) {
Supplier> deletePredicate = memoize(() -> getDeleteManager(partitionSpec, partitionData)
.getDeletePredicate(
+ fileSystem,
path,
dataSequenceNumber,
deletes,
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java
index 06cccccebe3c..666fc21f6192 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java
@@ -22,6 +22,7 @@
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.type.Type;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.schema.MessageType;
@@ -38,6 +39,7 @@
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.FileFormat.PARQUET;
public final class IcebergParquetFileWriter
implements IcebergFileWriter
@@ -77,6 +79,18 @@ public IcebergParquetFileWriter(
this.metricsConfig = requireNonNull(metricsConfig, "metricsConfig is null");
}
+ @Override
+ public FileFormat fileFormat()
+ {
+ return PARQUET;
+ }
+
+ @Override
+ public String location()
+ {
+ return location.toString();
+ }
+
@Override
public FileMetrics getFileMetrics()
{
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java
index 6dc1501affa8..c34712dbb35e 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSortingFileWriter.java
@@ -23,6 +23,7 @@
import io.trino.spi.connector.SortOrder;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeOperators;
+import org.apache.iceberg.FileFormat;
import java.io.Closeable;
import java.util.List;
@@ -62,6 +63,18 @@ public IcebergSortingFileWriter(
OrcFileWriterFactory::createOrcDataSink);
}
+ @Override
+ public FileFormat fileFormat()
+ {
+ return outputWriter.fileFormat();
+ }
+
+ @Override
+ public String location()
+ {
+ return outputWriter.location();
+ }
+
@Override
public FileMetrics getFileMetrics()
{
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
index 42274551b5b5..74a2f68304aa 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java
@@ -244,7 +244,7 @@ public static int getFormatVersion(Map tableProperties)
return (int) tableProperties.get(FORMAT_VERSION_PROPERTY);
}
- private static void validateFormatVersion(int version)
+ public static void validateFormatVersion(int version)
{
if (version < FORMAT_VERSION_SUPPORT_MIN || version > FORMAT_VERSION_SUPPORT_MAX) {
throw new TrinoException(INVALID_TABLE_PROPERTY,
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java
index 7f347564c3b7..89f10aa0e68c 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergWritableTableHandle.java
@@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import io.trino.plugin.iceberg.delete.PositionDeleteFiles;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.RetryMode;
@@ -28,11 +29,13 @@
public record IcebergWritableTableHandle(
SchemaTableName name,
+ int formatVersion,
String schemaAsJson,
Map partitionsSpecsAsJson,
int partitionSpecId,
List sortOrder,
List inputColumns,
+ List previousDeleteFiles,
String outputPath,
IcebergFileFormat fileFormat,
Map storageProperties,
@@ -47,6 +50,7 @@ public record IcebergWritableTableHandle(
partitionsSpecsAsJson = ImmutableMap.copyOf(requireNonNull(partitionsSpecsAsJson, "partitionsSpecsAsJson is null"));
sortOrder = ImmutableList.copyOf(requireNonNull(sortOrder, "sortOrder is null"));
inputColumns = ImmutableList.copyOf(requireNonNull(inputColumns, "inputColumns is null"));
+ previousDeleteFiles = ImmutableList.copyOf(previousDeleteFiles);
requireNonNull(outputPath, "outputPath is null");
requireNonNull(fileFormat, "fileFormat is null");
storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java
index 94e60363d4bb..bb4600ad398e 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteFile.java
@@ -34,6 +34,8 @@ public record DeleteFile(
FileFormat format,
long recordCount,
long fileSizeInBytes,
+ Long contentOffset,
+ Long contentSizeInBytes,
List equalityFieldIds,
Optional rowPositionLowerBound,
Optional rowPositionUpperBound,
@@ -56,6 +58,8 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile)
deleteFile.format(),
deleteFile.recordCount(),
deleteFile.fileSizeInBytes(),
+ deleteFile.contentOffset(),
+ deleteFile.contentSizeInBytes(),
Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of),
rowPositionLowerBound,
rowPositionUpperBound,
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java
index dd64f3cf8b87..8d4b039aa7a9 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java
@@ -19,6 +19,9 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.slice.Slice;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.filesystem.TrinoInput;
import io.trino.plugin.iceberg.IcebergColumnHandle;
import io.trino.plugin.iceberg.IcebergPageSourceProvider.ReaderPageSourceWithRowPositions;
import io.trino.plugin.iceberg.delete.EqualityDeleteFilter.EqualityDeleteFilterBuilder;
@@ -51,10 +54,12 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA;
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
import static io.trino.plugin.iceberg.IcebergUtil.schemaFromHandles;
+import static io.trino.plugin.iceberg.delete.DeletionVectors.readDeletionVector;
import static io.trino.plugin.iceberg.delete.PositionDeleteFilter.readPositionDeletes;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Future.State.SUCCESS;
+import static org.apache.iceberg.FileFormat.PUFFIN;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
@@ -69,6 +74,7 @@ public DeleteManager(TypeManager typeManager)
}
public Optional getDeletePredicate(
+ TrinoFileSystem fileSystem,
String dataFilePath,
long dataSequenceNumber,
List deleteFiles,
@@ -91,7 +97,7 @@ public Optional getDeletePredicate(
}
}
- Optional positionDeletes = createPositionDeleteFilter(dataFilePath, positionDeleteFiles, readerPageSourceWithRowPositions, deletePageSourceProvider)
+ Optional positionDeletes = createPositionDeleteFilter(fileSystem, dataFilePath, positionDeleteFiles, readerPageSourceWithRowPositions, deletePageSourceProvider)
.map(filter -> filter.createPredicate(readColumns, dataSequenceNumber));
Optional equalityDeletes = createEqualityDeleteFilter(equalityDeleteFiles, tableSchema, deletePageSourceProvider).stream()
.map(filter -> filter.createPredicate(readColumns, dataSequenceNumber))
@@ -114,6 +120,7 @@ ConnectorPageSource openDeletes(
}
private Optional createPositionDeleteFilter(
+ TrinoFileSystem fileSystem,
String dataFilePath,
List positionDeleteFiles,
ReaderPageSourceWithRowPositions readerPageSourceWithRowPositions,
@@ -141,11 +148,21 @@ private Optional createPositionDeleteFilter(
LongBitmapDataProvider deletedRows = new Roaring64Bitmap();
for (DeleteFile deleteFile : positionDeleteFiles) {
if (shouldLoadPositionDeleteFile(deleteFile, startRowPosition, endRowPosition)) {
- try (ConnectorPageSource pageSource = deletePageSourceProvider.openDeletes(deleteFile, deleteColumns, deleteDomain)) {
- readPositionDeletes(pageSource, targetPath, deletedRows);
+ if (deleteFile.format() == PUFFIN) {
+ try (TrinoInput input = fileSystem.newInputFile(Location.of(deleteFile.path())).newInput()) {
+ readDeletionVector(input, deleteFile.recordCount(), deleteFile.contentOffset(), deleteFile.contentSizeInBytes(), deletedRows);
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
- catch (IOException e) {
- throw new UncheckedIOException(e);
+ else {
+ try (ConnectorPageSource pageSource = deletePageSourceProvider.openDeletes(deleteFile, deleteColumns, deleteDomain)) {
+ readPositionDeletes(pageSource, targetPath, deletedRows);
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
}
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorFileWriter.java
new file mode 100644
index 000000000000..62caa59043dc
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorFileWriter.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.delete;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.plugin.hive.NodeVersion;
+import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
+import jakarta.annotation.Nullable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.deletes.TrinoBitmapPositionDeleteIndex;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.BlobMetadata;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.StructLikeUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.FileFormat.PUFFIN;
+import static org.apache.iceberg.MetadataColumns.ROW_POSITION;
+import static org.apache.iceberg.puffin.StandardBlobTypes.DV_V1;
+
+/**
+ * Copy {@link org.apache.iceberg.deletes.BaseDVFileWriter} and replace its file system with TrinoFileSystem
+ */
+public class DeletionVectorFileWriter
+ implements DVFileWriter
+{
+ private static final String REFERENCED_DATA_FILE_KEY = "referenced-data-file";
+ private static final String CARDINALITY_KEY = "cardinality";
+
+ private final NodeVersion nodeVersion;
+ private final TrinoFileSystem fileSystem;
+ private final Location location;
+ private final Function loadPreviousDeletes;
+ private final Map deletesByPath = new HashMap<>();
+ private final Map blobsByPath = new HashMap<>();
+
+ private DeleteWriteResult result;
+
+ public DeletionVectorFileWriter(
+ NodeVersion nodeVersion,
+ TrinoFileSystem fileSystem,
+ Location location,
+ Function loadPreviousDeletes)
+ {
+ this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
+ this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
+ this.location = requireNonNull(location, "location is null");
+ this.loadPreviousDeletes = requireNonNull(loadPreviousDeletes, "loadPreviousDeletes is null");
+ }
+
+ @Override
+ public void delete(String path, long pos, PartitionSpec spec, StructLike partition)
+ {
+ Deletes deletes = deletesByPath.computeIfAbsent(path, _ -> new Deletes(path, spec, partition));
+ PositionDeleteIndex positions = deletes.positions();
+ positions.delete(pos);
+ }
+
+ @Override
+ public DeleteWriteResult result()
+ {
+ checkState(result != null, "Cannot get result from unclosed writer");
+ return result;
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ if (result == null) {
+ CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
+ List rewrittenDeleteFiles = new ArrayList<>();
+
+ PuffinWriter writer = newWriter();
+
+ try (PuffinWriter closeableWriter = writer) {
+ for (Deletes deletes : deletesByPath.values()) {
+ String path = deletes.path();
+ PositionDeleteIndex positions = deletes.positions();
+ PositionDeleteIndex previousPositions = loadPreviousDeletes.apply(path);
+ if (previousPositions != null) {
+ positions.merge(previousPositions);
+ for (DeleteFile previousDeleteFile : previousPositions.deleteFiles()) {
+ // only DVs and file-scoped deletes can be discarded from the table state
+ if (ContentFileUtil.isFileScoped(previousDeleteFile)) {
+ rewrittenDeleteFiles.add(previousDeleteFile);
+ }
+ }
+ }
+ write(closeableWriter, deletes);
+ referencedDataFiles.add(path);
+ }
+ }
+
+ // DVs share the Puffin path and file size but have different offsets
+ String puffinPath = writer.location();
+ long puffinFileSize = writer.fileSize();
+
+ List dvs = deletesByPath.keySet().stream()
+ .map(path -> createDV(puffinPath, puffinFileSize, path))
+ .collect(toImmutableList());
+
+ this.result = new DeleteWriteResult(dvs, referencedDataFiles, rewrittenDeleteFiles);
+ }
+ }
+
+ private DeleteFile createDV(String path, long size, String referencedDataFile)
+ {
+ Deletes deletes = deletesByPath.get(referencedDataFile);
+ BlobMetadata blobMetadata = blobsByPath.get(referencedDataFile);
+ return FileMetadata.deleteFileBuilder(deletes.spec())
+ .ofPositionDeletes()
+ .withFormat(PUFFIN)
+ .withPath(path)
+ .withPartition(deletes.partition())
+ .withFileSizeInBytes(size)
+ .withReferencedDataFile(referencedDataFile)
+ .withContentOffset(blobMetadata.offset())
+ .withContentSizeInBytes(blobMetadata.length())
+ .withRecordCount(deletes.positions().cardinality())
+ .build();
+ }
+
+ private void write(PuffinWriter writer, Deletes deletes)
+ {
+ String path = deletes.path();
+ PositionDeleteIndex positions = deletes.positions();
+ BlobMetadata blobMetadata = writer.write(toBlob(positions, path));
+ blobsByPath.put(path, blobMetadata);
+ }
+
+ private PuffinWriter newWriter()
+ {
+ return Puffin.write(new ForwardingOutputFile(fileSystem, location))
+ .createdBy("Trino version " + nodeVersion.toString())
+ .build();
+ }
+
+ private static Blob toBlob(PositionDeleteIndex positions, String path)
+ {
+ return new Blob(
+ DV_V1,
+ ImmutableList.of(ROW_POSITION.fieldId()),
+ -1 /* snapshot ID is inherited */,
+ -1 /* sequence number is inherited */,
+ positions.serialize(),
+ null /* uncompressed */,
+ ImmutableMap.builder()
+ .put(REFERENCED_DATA_FILE_KEY, path)
+ .put(CARDINALITY_KEY, String.valueOf(positions.cardinality()))
+ .buildOrThrow());
+ }
+
+ private static class Deletes
+ {
+ private final String path;
+ private final PartitionSpec spec;
+ private final StructLike partition;
+ private final PositionDeleteIndex positions;
+
+ private Deletes(String path, PartitionSpec spec, @Nullable StructLike partition)
+ {
+ this.path = requireNonNull(path, "path is null");
+ this.spec = requireNonNull(spec, "spec is null");
+ this.partition = StructLikeUtil.copy(partition);
+ this.positions = new TrinoBitmapPositionDeleteIndex();
+ }
+
+ public String path()
+ {
+ return path;
+ }
+
+ public PartitionSpec spec()
+ {
+ return spec;
+ }
+
+ @Nullable
+ public StructLike partition()
+ {
+ return partition;
+ }
+
+ public PositionDeleteIndex positions()
+ {
+ return positions;
+ }
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorWriter.java
new file mode 100644
index 000000000000..26aa74cc6081
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectorWriter.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.delete;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystem;
+import io.trino.plugin.hive.NodeVersion;
+import io.trino.plugin.iceberg.IcebergFileWriter;
+import io.trino.plugin.iceberg.PartitionData;
+import io.trino.plugin.iceberg.fileio.ForwardingInputFile;
+import io.trino.spi.Page;
+import io.trino.spi.TrinoException;
+import io.trino.spi.block.LongArrayBlock;
+import org.apache.iceberg.ContentFileParsers;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.data.BaseDeleteLoader;
+import org.apache.iceberg.data.DeleteLoader;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.util.DeleteFileSet;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.airlift.slice.SizeOf.instanceSize;
+import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_CLOSE_ERROR;
+import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
+import static java.util.Objects.requireNonNull;
+import static org.apache.iceberg.FileFormat.PUFFIN;
+
+public class DeletionVectorWriter
+ implements IcebergFileWriter
+{
+ private static final int INSTANCE_SIZE = instanceSize(DeletionVectorWriter.class);
+
+ private final DeletionVectorFileWriter writer;
+ private final String dataFilePath;
+ private final PartitionSpec partitionSpec;
+ private final PartitionData partition;
+ private final int positionChannel;
+ private final Closeable rollbackAction;
+ private DeleteWriteResult result;
+
+ public DeletionVectorWriter(
+ NodeVersion nodeVersion,
+ TrinoFileSystem fileSystem,
+ Location outputPath,
+ String dataFilePath,
+ PartitionSpec partitionSpec,
+ Optional partition,
+ Function loadPreviousDeletes,
+ int positionChannel)
+ {
+ writer = new DeletionVectorFileWriter(nodeVersion, fileSystem, outputPath, loadPreviousDeletes);
+ this.dataFilePath = requireNonNull(dataFilePath, "dataFilePath is null");
+ this.partitionSpec = requireNonNull(partitionSpec, "partitionSpec is null");
+ this.partition = requireNonNull(partition, "partition is null").orElse(null);
+ checkArgument(positionChannel >= 0, "positionChannel is negative");
+ this.positionChannel = positionChannel;
+ rollbackAction = () -> fileSystem.deleteFile(outputPath);
+ }
+
+ public static Function create(TrinoFileSystem fileSystem, Map deleteFiles)
+ {
+ if (deleteFiles == null) {
+ return _ -> null;
+ }
+ return new PreviousDeleteLoader(fileSystem, deleteFiles);
+ }
+
+ private static class PreviousDeleteLoader
+ implements Function
+ {
+ private final Map deleteFiles;
+ private final DeleteLoader deleteLoader;
+
+ private PreviousDeleteLoader(TrinoFileSystem fileSystem, Map deleteFiles)
+ {
+ requireNonNull(fileSystem, "fileSystem is null");
+ this.deleteFiles = ImmutableMap.copyOf(deleteFiles);
+ this.deleteLoader = new BaseDeleteLoader(deleteFile -> new ForwardingInputFile(fileSystem.newInputFile(Location.of(deleteFile.location()))));
+ }
+
+ @Override
+ public PositionDeleteIndex apply(CharSequence path)
+ {
+ DeleteFileSet deleteFileSet = deleteFiles.get(path.toString());
+ if (deleteFileSet == null) {
+ return null;
+ }
+
+ return deleteLoader.loadPositionDeletes(deleteFileSet, path);
+ }
+ }
+
+ @Override
+ public FileFormat fileFormat()
+ {
+ return PUFFIN;
+ }
+
+ @Override
+ public String location()
+ {
+ return deleteFile().location();
+ }
+
+ @Override
+ public List rewrittenDeleteFiles()
+ {
+ return result.rewrittenDeleteFiles().stream()
+ .map(file -> ContentFileParsers.toJson(file, partitionSpec))
+ .collect(toImmutableList());
+ }
+
+ @Override
+ public FileMetrics getFileMetrics()
+ {
+ DeleteFile deleteFile = deleteFile();
+ Metrics metrics = new Metrics(
+ deleteFile.recordCount(),
+ deleteFile.columnSizes(),
+ deleteFile.valueCounts(),
+ deleteFile.nullValueCounts(),
+ deleteFile.nanValueCounts(),
+ deleteFile.lowerBounds(),
+ deleteFile.upperBounds());
+ return new FileMetrics(metrics, Optional.ofNullable(deleteFile.splitOffsets()));
+ }
+
+ @Override
+ public long getWrittenBytes()
+ {
+ return deleteFile().fileSizeInBytes();
+ }
+
+ @Override
+ public long getMemoryUsage()
+ {
+ return INSTANCE_SIZE;
+ }
+
+ @Override
+ public void appendRows(Page dataPage)
+ {
+ LongArrayBlock block = (LongArrayBlock) dataPage.getBlock(positionChannel);
+ for (int i = 0; i < block.getPositionCount(); i++) {
+ writer.delete(dataFilePath, block.getLong(i), partitionSpec, partition);
+ }
+ }
+
+ private DeleteFile deleteFile()
+ {
+ try {
+ return result.deleteFiles().getLast();
+ }
+ catch (NoSuchElementException e) {
+ throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Delete file must exist", e);
+ }
+ }
+
+ public DeleteWriteResult result()
+ {
+ return writer.result();
+ }
+
+ @Override
+ public Closeable commit()
+ {
+ try {
+ writer.close();
+ result = writer.result();
+ }
+ catch (IOException e) {
+ try {
+ rollbackAction.close();
+ }
+ catch (Exception ex) {
+ if (!e.equals(ex)) {
+ e.addSuppressed(ex);
+ }
+ }
+ throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error closing Deletion Vector file", e);
+ }
+ return rollbackAction;
+ }
+
+ @Override
+ public void rollback()
+ {
+ try (rollbackAction) {
+ writer.close();
+ result = writer.result();
+ }
+ catch (Exception e) {
+ throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error rolling back write to Deletion Vector file", e);
+ }
+ }
+
+ @Override
+ public long getValidationCpuNanos()
+ {
+ return 0;
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectors.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectors.java
new file mode 100644
index 000000000000..9e6dbf13869d
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeletionVectors.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.delete;
+
+import io.trino.filesystem.TrinoInput;
+import org.roaringbitmap.longlong.LongBitmapDataProvider;
+
+import java.io.IOException;
+
+import static java.lang.Math.toIntExact;
+import static org.apache.iceberg.deletes.TrinoBitmapPositionDeleteIndex.deserialize;
+
+public final class DeletionVectors
+{
+ public static final int LENGTH_SIZE_BYTES = 4;
+ public static final int CRC_SIZE_BYTES = 4;
+
+ private DeletionVectors() {}
+
+ public static void readDeletionVector(TrinoInput input, long recordCount, Long contentOffset, Long contentSizeInBytes, LongBitmapDataProvider deletedRows)
+ throws IOException
+ {
+ byte[] bytes = input.readFully(contentOffset, LENGTH_SIZE_BYTES + toIntExact(contentSizeInBytes) + CRC_SIZE_BYTES).getBytes();
+ deserialize(bytes, recordCount, contentSizeInBytes).forEach(deletedRows::addLong);
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFiles.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFiles.java
new file mode 100644
index 000000000000..eabbb9911a40
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteFiles.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.delete;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public record PositionDeleteFiles(String dataFileLocation, int partitionSpecId, List deletes)
+{
+ public PositionDeleteFiles
+ {
+ requireNonNull(dataFileLocation, "dataFileLocation is null");
+ deletes = ImmutableList.copyOf(deletes);
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteWriter.java
index 1f6bc8f71543..be627a86e7a8 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteWriter.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/PositionDeleteWriter.java
@@ -28,17 +28,23 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorSession;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.util.DeleteFileSet;
import org.roaringbitmap.longlong.ImmutableLongBitmapDataProvider;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
+import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.spi.predicate.Utils.nativeValueToBlock;
@@ -46,6 +52,7 @@
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
+import static org.apache.iceberg.FileFormat.PUFFIN;
public class PositionDeleteWriter
{
@@ -53,10 +60,8 @@ public class PositionDeleteWriter
private final Block dataFilePathBlock;
private final PartitionSpec partitionSpec;
private final Optional partition;
- private final String outputPath;
private final JsonCodec jsonCodec;
private final IcebergFileWriter writer;
- private final IcebergFileFormat fileFormat;
public PositionDeleteWriter(
String dataFilePath,
@@ -67,23 +72,35 @@ public PositionDeleteWriter(
TrinoFileSystem fileSystem,
JsonCodec jsonCodec,
ConnectorSession session,
+ int formatVersion,
IcebergFileFormat fileFormat,
- Map storageProperties)
+ Map storageProperties,
+ Map previousDeleteFiles)
{
this.dataFilePath = requireNonNull(dataFilePath, "dataFilePath is null");
this.dataFilePathBlock = nativeValueToBlock(VARCHAR, utf8Slice(dataFilePath));
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
this.partitionSpec = requireNonNull(partitionSpec, "partitionSpec is null");
this.partition = requireNonNull(partition, "partition is null");
- this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
+ requireNonNull(fileFormat, "fileFormat is null");
// Prepend query ID to the file name, allowing us to determine the files written by a query.
// This is necessary for opportunistic cleanup of extra files, which may be present for
// successfully completed queries in the presence of failure recovery mechanisms.
- String fileName = fileFormat.toIceberg().addExtension(session.getQueryId() + "-" + randomUUID());
- this.outputPath = partition
+ FileFormat icebergFileFormat = formatVersion >= 3 ? PUFFIN : fileFormat.toIceberg();
+ String fileName = icebergFileFormat.addExtension(session.getQueryId() + "-" + randomUUID());
+ String outputPath = partition
.map(partitionData -> locationProvider.newDataLocation(partitionSpec, partitionData, fileName))
.orElseGet(() -> locationProvider.newDataLocation(fileName));
- this.writer = fileWriterFactory.createPositionDeleteWriter(fileSystem, Location.of(outputPath), session, fileFormat, storageProperties);
+ this.writer = fileWriterFactory.createPositionDeleteWriter(
+ fileSystem,
+ Location.of(outputPath),
+ session,
+ dataFilePath,
+ icebergFileFormat,
+ partitionSpec,
+ partition,
+ storageProperties,
+ previousDeleteFiles);
}
public Collection write(ImmutableLongBitmapDataProvider rowsToDelete)
@@ -91,15 +108,28 @@ public Collection write(ImmutableLongBitmapDataProvider rowsToDelete)
writeDeletes(rowsToDelete);
writer.commit();
+ OptionalLong contentOffset = OptionalLong.empty();
+ OptionalLong contentSize = OptionalLong.empty();
+ if (writer instanceof DeletionVectorWriter deletionVectorWriter) {
+ checkState(writer.fileFormat() == PUFFIN, "File format must be PUFFIN for deletion vector");
+ DeleteWriteResult result = deletionVectorWriter.result();
+ DeleteFile deleteFile = result.deleteFiles().getLast();
+ contentOffset = OptionalLong.of(deleteFile.contentOffset());
+ contentSize = OptionalLong.of(deleteFile.contentSizeInBytes());
+ }
+
CommitTaskData task = new CommitTaskData(
- outputPath,
- fileFormat,
+ writer.location(),
+ writer.fileFormat(),
writer.getWrittenBytes(),
new MetricsWrapper(writer.getFileMetrics().metrics()),
PartitionSpecParser.toJson(partitionSpec),
partition.map(PartitionData::toJson),
FileContent.POSITION_DELETES,
Optional.of(dataFilePath),
+ writer.rewrittenDeleteFiles(),
+ contentOffset,
+ contentSize,
writer.getFileMetrics().splitOffsets());
return List.of(wrappedBuffer(jsonCodec.toJsonBytes(task)));
diff --git a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/ContentFileParsers.java b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/ContentFileParsers.java
new file mode 100644
index 000000000000..0cee4f358126
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/ContentFileParsers.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.airlift.json.ObjectMapperProvider;
+
+public final class ContentFileParsers
+{
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapperProvider().get();
+
+ private ContentFileParsers() {}
+
+ public static String toJson(ContentFile> contentFile, PartitionSpec spec)
+ {
+ return ContentFileParser.toJson(contentFile, spec);
+ }
+
+ public static ContentFile> fromJson(String jsonNode, PartitionSpec spec)
+ {
+ try {
+ return ContentFileParser.fromJson(OBJECT_MAPPER.readTree(jsonNode), spec);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoBitmapPositionDeleteIndex.java b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoBitmapPositionDeleteIndex.java
new file mode 100644
index 000000000000..e7ad87100691
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoBitmapPositionDeleteIndex.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.zip.CRC32;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.trino.plugin.iceberg.delete.DeletionVectors.CRC_SIZE_BYTES;
+import static io.trino.plugin.iceberg.delete.DeletionVectors.LENGTH_SIZE_BYTES;
+
+// Exposes package-private BitmapPositionDeleteIndex
+public class TrinoBitmapPositionDeleteIndex
+ extends BitmapPositionDeleteIndex
+{
+ private static final int BITMAP_DATA_OFFSET = 4;
+ private static final int MAGIC_NUMBER = 1681511377;
+
+ public static TrinoRoaringPositionBitmap deserialize(byte[] bytes, long recordCount, Long contentSizeInBytes)
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ int bitmapDataLength = readBitmapDataLength(buffer, contentSizeInBytes);
+ TrinoRoaringPositionBitmap bitmap = deserializeBitmap(bytes, bitmapDataLength, recordCount);
+ int crc = computeChecksum(bytes, bitmapDataLength);
+ int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
+ int expectedCrc = buffer.getInt(crcOffset);
+ checkArgument(crc == expectedCrc, "Invalid CRC");
+ return bitmap;
+ }
+
+ private static int readBitmapDataLength(ByteBuffer buffer, Long contentSizeInBytes)
+ {
+ int length = buffer.getInt();
+ long expectedLength = contentSizeInBytes - LENGTH_SIZE_BYTES - CRC_SIZE_BYTES;
+ checkArgument(length == expectedLength, "Invalid bitmap data length: %s, expected %s", length, expectedLength);
+ return length;
+ }
+
+ private static TrinoRoaringPositionBitmap deserializeBitmap(byte[] bytes, int bitmapDataLength, long recordCount)
+ {
+ ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
+ int magicNumber = bitmapData.getInt();
+ checkArgument(magicNumber == MAGIC_NUMBER, "Invalid magic number: %s, expected %s", magicNumber, MAGIC_NUMBER);
+ TrinoRoaringPositionBitmap bitmap = TrinoRoaringPositionBitmap.deserialize(bitmapData);
+ long cardinality = bitmap.cardinality();
+ checkArgument(cardinality == recordCount, "Invalid cardinality: %s, expected %s", cardinality, recordCount);
+ return bitmap;
+ }
+
+ private static int computeChecksum(byte[] bytes, int bitmapDataLength)
+ {
+ CRC32 crc = new CRC32();
+ crc.update(bytes, BITMAP_DATA_OFFSET, bitmapDataLength);
+ return (int) crc.getValue();
+ }
+
+ private static ByteBuffer pointToBitmapData(byte[] bytes, int bitmapDataLength)
+ {
+ ByteBuffer bitmapData = ByteBuffer.wrap(bytes, BITMAP_DATA_OFFSET, bitmapDataLength);
+ bitmapData.order(ByteOrder.LITTLE_ENDIAN);
+ return bitmapData;
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoRoaringPositionBitmap.java b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoRoaringPositionBitmap.java
new file mode 100644
index 000000000000..0acb8da6844c
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/deletes/TrinoRoaringPositionBitmap.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import java.nio.ByteBuffer;
+
+public class TrinoRoaringPositionBitmap
+ extends RoaringPositionBitmap
+{
+ public static TrinoRoaringPositionBitmap deserialize(ByteBuffer buffer)
+ {
+ TrinoRoaringPositionBitmap bitmap = new TrinoRoaringPositionBitmap();
+ bitmap.setAll(RoaringPositionBitmap.deserialize(buffer));
+ return bitmap;
+ }
+}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java
index 2e3e8843d8a2..b7539079c4fa 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java
@@ -133,6 +133,16 @@ public void testHiddenPathColumn()
}
}
+ @Test
+ public void testDeleteWithV3Format()
+ {
+ try (TestTable table = newTrinoTable("test_delete_with_v3", "WITH (format_version = 3) AS SELECT * FROM region")) {
+ assertUpdate("DELETE FROM " + table.getName() + " WHERE regionkey = 1", 1);
+ assertThat(query("SELECT * FROM " + table.getName()))
+ .matches("SELECT * FROM region WHERE regionkey <> 1");
+ }
+ }
+
// Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic.
@RepeatedTest(4)
@Timeout(120)
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
index e3b593a127dc..fc904250149d 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java
@@ -184,13 +184,15 @@ public abstract class BaseIcebergConnectorTest
private static final Pattern WITH_CLAUSE_EXTRACTOR = Pattern.compile(".*(WITH\\s*\\([^)]*\\))\\s*$", Pattern.DOTALL);
protected final IcebergFileFormat format;
+ private final int formatVersion;
protected TrinoFileSystem fileSystem;
protected TimeUnit storageTimePrecision;
- protected BaseIcebergConnectorTest(IcebergFileFormat format)
+ protected BaseIcebergConnectorTest(IcebergFileFormat format, int formatVersion)
{
this.format = requireNonNull(format, "format is null");
+ this.formatVersion = formatVersion;
}
@Override
@@ -206,6 +208,7 @@ protected IcebergQueryRunner.Builder createQueryRunnerBuilder()
return IcebergQueryRunner.builder()
.setIcebergProperties(ImmutableMap.builder()
.put("iceberg.file-format", format.name())
+ .put("iceberg.format-version", Integer.toString(formatVersion))
// Only allow some extra properties. Add "sorted_by" so that we can test that the property is disallowed by the connector explicitly.
.put("iceberg.allowed-extra-properties", "extra.property.one,extra.property.two,extra.property.three,sorted_by")
// Allows testing the sorting writer flushing to the file system with smaller tables
@@ -377,7 +380,7 @@ public void testShowCreateTable()
")\n" +
"WITH (\n" +
" format = '" + format.name() + "',\n" +
- " format_version = 2,\n" +
+ " format_version = " + formatVersion + ",\n" +
" location = '\\E.*/tpch/orders-.*\\Q',\n" +
" max_commit_retry = 4\n" +
")\\E");
@@ -1951,12 +1954,13 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat)
"""
WITH (
format = '%s',
- format_version = 2,
+ format_version = %s,
location = '%s',
max_commit_retry = 4,
partitioning = ARRAY['adate']
)""",
format,
+ formatVersion,
tempDirPath));
assertUpdate("CREATE TABLE test_create_table_like_copy0 (LIKE test_create_table_like_original, col2 INTEGER)");
@@ -1968,11 +1972,12 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat)
"""
WITH (
format = '%s',
- format_version = 2,
+ format_version = %s,
location = '%s',
max_commit_retry = 4
)""",
format,
+ formatVersion,
getTableLocation("test_create_table_like_copy1")));
assertUpdate("CREATE TABLE test_create_table_like_copy2 (LIKE test_create_table_like_original EXCLUDING PROPERTIES)");
@@ -1980,11 +1985,12 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat)
"""
WITH (
format = '%s',
- format_version = 2,
+ format_version = %s,
location = '%s',
max_commit_retry = 4
)""",
format,
+ formatVersion,
getTableLocation("test_create_table_like_copy2")));
assertUpdate("DROP TABLE test_create_table_like_copy2");
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergParquetConnectorTest.java
similarity index 98%
rename from plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java
rename to plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergParquetConnectorTest.java
index 28481bdc8ebe..013f7bb3bfcf 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergParquetConnectorTest.java
@@ -41,12 +41,12 @@
import static java.time.ZoneOffset.UTC;
import static org.assertj.core.api.Assertions.assertThat;
-public class TestIcebergParquetConnectorTest
+public abstract class BaseIcebergParquetConnectorTest
extends BaseIcebergConnectorTest
{
- public TestIcebergParquetConnectorTest()
+ public BaseIcebergParquetConnectorTest(int formatVersion)
{
- super(PARQUET);
+ super(PARQUET, formatVersion);
}
@Override
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java
index 0f85b2f457ab..9c578a6044ee 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java
@@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;
+import com.google.common.collect.ImmutableList;
import io.trino.Session;
import io.trino.filesystem.FileEntry;
import io.trino.filesystem.FileIterator;
@@ -238,4 +239,16 @@ public static ParquetMetadata getParquetFileMetadata(TrinoInputFile inputFile)
throw new UncheckedIOException(e);
}
}
+
+ public static List listFiles(TrinoFileSystem trinoFileSystem, String location)
+ throws IOException
+ {
+ ImmutableList.Builder files = ImmutableList.builder();
+ FileIterator fileIterator = trinoFileSystem.listFiles(Location.of(location));
+ while (fileIterator.hasNext()) {
+ FileEntry entry = fileIterator.next();
+ files.add(entry.location().fileName());
+ }
+ return files.build();
+ }
}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java
index bbfb093e5e68..7e6bd051522a 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestFileBasedConflictDetection.java
@@ -21,6 +21,7 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.RowType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
@@ -207,11 +208,11 @@ void testConflictDetectionOnEvolvedTable()
"""
{"partitionValues":[40]}
""";
- CommitTaskData commitTaskData1 = new CommitTaskData("test_location/data/new.parquet", IcebergFileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(currentPartitionSpec),
- Optional.of(partitionDataJson), DATA, Optional.empty(), Optional.empty());
+ CommitTaskData commitTaskData1 = new CommitTaskData("test_location/data/new.parquet", FileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(currentPartitionSpec),
+ Optional.of(partitionDataJson), DATA, Optional.empty(), List.of(), OptionalLong.empty(), OptionalLong.empty(), Optional.empty());
// Remove file from version with previous partition specification
- CommitTaskData commitTaskData2 = new CommitTaskData("test_location/data/old.parquet", IcebergFileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(previousPartitionSpec),
- Optional.of(partitionDataJson), POSITION_DELETES, Optional.empty(), Optional.empty());
+ CommitTaskData commitTaskData2 = new CommitTaskData("test_location/data/old.parquet", FileFormat.PARQUET, 0, new MetricsWrapper(new Metrics()), PartitionSpecParser.toJson(previousPartitionSpec),
+ Optional.of(partitionDataJson), POSITION_DELETES, Optional.empty(), List.of(), OptionalLong.empty(), OptionalLong.empty(), Optional.empty());
TupleDomain icebergColumnHandleTupleDomain = extractTupleDomainsFromCommitTasks(getIcebergTableHandle(currentPartitionSpec), icebergTable, List.of(commitTaskData1, commitTaskData2), null);
assertThat(icebergColumnHandleTupleDomain.getDomains().orElseThrow()).isEmpty();
@@ -223,23 +224,29 @@ private static List getCommitTaskDataForUpdate(PartitionSpec par
// Update operation contains two commit tasks
CommitTaskData commitTaskData1 = new CommitTaskData(
"test_location/data/new.parquet",
- IcebergFileFormat.PARQUET,
+ FileFormat.PARQUET,
0,
new MetricsWrapper(new Metrics()),
PartitionSpecParser.toJson(partitionSpec),
partitionDataJson,
DATA,
Optional.empty(),
+ List.of(),
+ OptionalLong.empty(),
+ OptionalLong.empty(),
Optional.empty());
CommitTaskData commitTaskData2 = new CommitTaskData(
"test_location/data/old.parquet",
- IcebergFileFormat.PARQUET,
+ FileFormat.PARQUET,
0,
new MetricsWrapper(new Metrics()),
PartitionSpecParser.toJson(partitionSpec),
partitionDataJson,
POSITION_DELETES,
Optional.empty(),
+ List.of(),
+ OptionalLong.empty(),
+ OptionalLong.empty(),
Optional.empty());
return List.of(commitTaskData1, commitTaskData2);
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAvroConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAvroConnectorTest.java
index d85a8fe78043..9387b8e97b19 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAvroConnectorTest.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergAvroConnectorTest.java
@@ -15,6 +15,7 @@
import org.junit.jupiter.api.Test;
+import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
import static io.trino.plugin.iceberg.IcebergFileFormat.AVRO;
import static org.junit.jupiter.api.Assumptions.abort;
@@ -23,7 +24,7 @@ public class TestIcebergAvroConnectorTest
{
public TestIcebergAvroConnectorTest()
{
- super(AVRO);
+ super(AVRO, FORMAT_VERSION_SUPPORT_MAX);
}
@Override
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java
index c87fb97bb09d..02fb3a1afe26 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergFileOperations.java
@@ -21,6 +21,7 @@
import io.trino.SystemSessionProperties;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.metastore.HiveMetastore;
+import io.trino.plugin.iceberg.util.FileOperationUtils;
import io.trino.plugin.iceberg.util.FileOperationUtils.Scope;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.sql.planner.plan.FilterNode;
@@ -52,6 +53,7 @@
import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.DELETE;
import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.MANIFEST;
import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.METADATA_JSON;
+import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.PUFFIN;
import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.SNAPSHOT;
import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.STATS;
import static io.trino.plugin.iceberg.util.FileOperationUtils.Scope.ALL_FILES;
@@ -912,6 +914,45 @@ public void testSystemMetadataMaterializedViews()
assertUpdate("DROP SCHEMA " + schemaName + " CASCADE");
}
+ @Test
+ public void testV3MaterializedViews()
+ {
+ String schemaName = "test_v3_materialized_views_" + randomNameSuffix();
+ assertUpdate("CREATE SCHEMA " + schemaName);
+ Session session = Session.builder(getSession())
+ .setSchema(schemaName)
+ .build();
+
+ assertUpdate(session, "CREATE TABLE test_table WITH (format_version = 3) AS SELECT * FROM (VALUES 1, 2) AS t(a)", 2);
+
+ assertUpdate(session, "CREATE MATERIALIZED VIEW mv AS SELECT * FROM test_table");
+ assertFileSystemAccesses(session, "REFRESH MATERIALIZED VIEW mv",
+ ImmutableMultiset.builder()
+ .add(new FileOperationUtils.FileOperation(METADATA_JSON, "OutputFile.create"))
+ .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"), 2)
+ .add(new FileOperationUtils.FileOperation(MANIFEST, "OutputFile.create"))
+ .add(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"))
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "OutputFile.create"), 2)
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"), 2)
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"), 2)
+ .build());
+
+ assertUpdate(session, "DELETE FROM test_table WHERE a = 1", 1);
+ assertFileSystemAccesses(session, "REFRESH MATERIALIZED VIEW mv",
+ ImmutableMultiset.builder()
+ .add(new FileOperationUtils.FileOperation(METADATA_JSON, "OutputFile.create"))
+ .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"), 2)
+ .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "OutputFile.create"), 2)
+ .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"), 4)
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "OutputFile.create"), 2)
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"), 3)
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"), 3)
+ .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newInput"))
+ .build());
+
+ assertUpdate("DROP SCHEMA " + schemaName + " CASCADE");
+ }
+
@Test
public void testV2TableEnsureEqualityDeleteFilesAreReadOnce()
throws Exception
@@ -949,6 +990,89 @@ public void testV2TableEnsureEqualityDeleteFilesAreReadOnce()
assertUpdate("DROP TABLE " + tableName);
}
+ @Test
+ public void testDeletionVector()
+ {
+ String tableName = "test_deletion_vector" + randomNameSuffix();
+ assertUpdate("CREATE TABLE " + tableName + "(id INT, age INT) WITH (format_version = 3)");
+ assertUpdate("INSERT INTO " + tableName + " VALUES (1, 10), (2, 20), (3, 30)", 3);
+
+ assertFileSystemAccesses(
+ "DELETE FROM " + tableName + " WHERE id = 1",
+ ImmutableMultiset.builder()
+ .add(new FileOperationUtils.FileOperation(METADATA_JSON, "OutputFile.create"))
+ .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"), 2)
+ .add(new FileOperationUtils.FileOperation(MANIFEST, "OutputFile.create"))
+ .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"), 2)
+ .add(new FileOperationUtils.FileOperation(SNAPSHOT, "OutputFile.create"))
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"), 3)
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"), 3)
+ .add(new FileOperationUtils.FileOperation(PUFFIN, "OutputFile.create"))
+ .build());
+
+ assertFileSystemAccesses(
+ "SELECT * FROM " + tableName,
+ ImmutableMultiset.builder()
+ .add(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"))
+ .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"), 2)
+ .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"))
+ .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"))
+ .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newInput"))
+ .build());
+
+ assertFileSystemAccesses(
+ "DELETE FROM " + tableName + " WHERE id = 2",
+ ImmutableMultiset.builder()
+ .add(new FileOperationUtils.FileOperation(METADATA_JSON, "OutputFile.create"))
+ .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"), 2)
+ .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "OutputFile.create"), 2)
+ .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"), 6)
+ .add(new FileOperationUtils.FileOperation(SNAPSHOT, "OutputFile.create"))
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"), 3)
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"), 3)
+ .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newInput"))
+ .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newStream"))
+ .add(new FileOperationUtils.FileOperation(PUFFIN, "OutputFile.create"))
+ .build());
+
+ assertFileSystemAccesses(
+ "SELECT * FROM " + tableName,
+ ImmutableMultiset.builder()
+ .add(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"))
+ .addCopies(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"), 2)
+ .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"))
+ .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"))
+ .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newInput"))
+ .build());
+
+
+ assertFileSystemAccesses(
+ "ALTER TABLE " + tableName + " EXECUTE OPTIMIZE",
+ ImmutableMultiset.builder()
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"), 3)
+ .add(new FileOperation(STATS, "InputFile.newStream"))
+ .addCopies(new FileOperation(MANIFEST, "InputFile.newStream"),4)
+ .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"), 3)
+ .add(new FileOperationUtils.FileOperation(SNAPSHOT, "OutputFile.create"))
+ .add(new FileOperationUtils.FileOperation(PUFFIN, "InputFile.newInput"))
+ .addCopies(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"),3)
+ .addCopies(new FileOperationUtils.FileOperation(METADATA_JSON, "OutputFile.create"), 2)
+ .add(new FileOperation(STATS, "OutputFile.create"))
+ .addCopies(new FileOperation(MANIFEST, "OutputFile.create"), 3)
+ .build());
+
+ assertFileSystemAccesses(
+ "SELECT * FROM " + tableName,
+ ImmutableMultiset.builder()
+ .add(new FileOperationUtils.FileOperation(METADATA_JSON, "InputFile.newStream"))
+ .add(new FileOperationUtils.FileOperation(MANIFEST, "InputFile.newStream"))
+ .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.newStream"))
+ .add(new FileOperationUtils.FileOperation(SNAPSHOT, "InputFile.length"))
+ .build());
+
+ assertUpdate("DROP TABLE " + tableName);
+ }
+
@Test
public void testShowTables()
{
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java
index 7f6a39290266..cf90fb7f2ff8 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioOrcConnectorTest.java
@@ -29,6 +29,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.io.Resources.getResource;
+import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
import static io.trino.plugin.iceberg.IcebergFileFormat.ORC;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting;
import static io.trino.testing.TestingNames.randomNameSuffix;
@@ -50,7 +51,7 @@ public class TestIcebergMinioOrcConnectorTest
public TestIcebergMinioOrcConnectorTest()
{
- super(ORC);
+ super(ORC, FORMAT_VERSION_SUPPORT_MAX);
}
@Override
@@ -65,6 +66,7 @@ protected QueryRunner createQueryRunner()
.setIcebergProperties(
ImmutableMap.builder()
.put("iceberg.file-format", format.name())
+ .put("iceberg.format-version", "3")
.put("fs.hadoop.enabled", "true")
.put("fs.native-s3.enabled", "true")
.put("s3.aws-access-key", MINIO_ACCESS_KEY)
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV2ConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV2ConnectorTest.java
new file mode 100644
index 000000000000..10b0d5fe8118
--- /dev/null
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV2ConnectorTest.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg;
+
+public class TestIcebergParquetV2ConnectorTest
+ extends BaseIcebergParquetConnectorTest
+{
+ public TestIcebergParquetV2ConnectorTest()
+ {
+ super(2);
+ }
+}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV3ConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV3ConnectorTest.java
new file mode 100644
index 000000000000..2a16a85e98a6
--- /dev/null
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetV3ConnectorTest.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg;
+
+public class TestIcebergParquetV3ConnectorTest
+ extends BaseIcebergParquetConnectorTest
+{
+ public TestIcebergParquetV3ConnectorTest()
+ {
+ super(3);
+ }
+}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
index 98c3005e4a5e..5e06cb343795 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java
@@ -92,6 +92,7 @@
import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore;
import static io.trino.plugin.iceberg.IcebergTestUtils.getMetadataFileAndUpdatedMillis;
+import static io.trino.plugin.iceberg.IcebergTestUtils.listFiles;
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTable;
import static io.trino.plugin.iceberg.util.EqualityDeleteUtils.writeEqualityDeleteForTableWithSchema;
import static io.trino.spi.type.BigintType.BIGINT;
@@ -681,18 +682,69 @@ public void testOptimizingWholeTableRemovesDeleteFiles()
}
@Test
- public void testUpgradeTableToV2FromTrino()
+ void testRemoveOrphanDeletionVectors()
+ throws Exception
+ {
+ Session singleWriterPerTask = Session.builder(getSession())
+ .setSystemProperty("task_min_writer_count", "1")
+ .build();
+
+ Session shortRetentionUnlocked = Session.builder(getSession())
+ .setCatalogSessionProperty("iceberg", "expire_snapshots_min_retention", "0s")
+ .setCatalogSessionProperty("iceberg", "remove_orphan_files_min_retention", "0s")
+ .build();
+
+ try (TestTable table = newTrinoTable("expire_snapshots_dv", "(x int) WITH (format_version = 3)", List.of("1", "2"))) {
+ Table icebergTable = loadTable(table.getName());
+ String dataLocation = icebergTable.location() + "/data";
+
+ assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 1", 1);
+ assertThat(listFiles(fileSystemFactory.create(SESSION), dataLocation))
+ .anyMatch(file -> file.endsWith(".puffin"));
+
+ assertUpdate(singleWriterPerTask, "ALTER TABLE " + table.getName() + " EXECUTE optimize");
+ computeActual(shortRetentionUnlocked, "ALTER TABLE " + table.getName() + " EXECUTE expire_snapshots(retention_threshold => '0s')");
+ computeActual(shortRetentionUnlocked, "ALTER TABLE " + table.getName() + " EXECUTE remove_orphan_files(retention_threshold => '0s')");
+ assertThat(query("SELECT * FROM " + table.getName())).matches("VALUES 2");
+
+ assertThat(listFiles(fileSystemFactory.create(SESSION), dataLocation))
+ .noneMatch(file -> file.endsWith(".puffin"));
+ }
+ }
+
+ @Test
+ public void testUpgradeTableToV3FromTrino()
{
String tableName = "test_upgrade_table_to_v2_from_trino_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25);
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1);
+
+ // v1 -> v2
assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2");
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2);
assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
+
+ // v2 -> v3
+ assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 3");
+ assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(3);
+ assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
}
@Test
- public void testDowngradingV2TableToV1Fails()
+ public void testUpgradeTableFromV1ToV3()
+ {
+ String tableName = "test_upgrade_table_from_v1_to_v3_from_trino_" + randomNameSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 1) AS SELECT * FROM tpch.tiny.nation", 25);
+ assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(1);
+
+ // v1 -> v3
+ assertUpdate("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 3");
+ assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(3);
+ assertQuery("SELECT * FROM " + tableName, "SELECT * FROM nation");
+ }
+
+ @Test
+ public void testDowngradingFromV2Fails()
{
String tableName = "test_downgrading_v2_table_to_v1_fails_" + randomNameSuffix();
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25);
@@ -704,6 +756,25 @@ public void testDowngradingV2TableToV1Fails()
.hasMessage("Cannot downgrade v2 table to v1");
}
+ @Test
+ public void testDowngradingFromV3Fails()
+ {
+ String tableName = "test_downgrading_from_v3_fails_" + randomNameSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 3) AS SELECT * FROM tpch.tiny.nation", 25);
+ assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(3);
+
+ assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 2"))
+ .failure()
+ .hasMessage("Failed to set new property values")
+ .rootCause()
+ .hasMessage("Cannot downgrade v3 table to v2");
+ assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 1"))
+ .failure()
+ .hasMessage("Failed to set new property values")
+ .rootCause()
+ .hasMessage("Cannot downgrade v3 table to v1");
+ }
+
@Test
public void testUpgradingToInvalidVersionFails()
{
@@ -711,7 +782,7 @@ public void testUpgradingToInvalidVersionFails()
assertUpdate("CREATE TABLE " + tableName + " WITH (format_version = 2) AS SELECT * FROM tpch.tiny.nation", 25);
assertThat(loadTable(tableName).operations().current().formatVersion()).isEqualTo(2);
assertThat(query("ALTER TABLE " + tableName + " SET PROPERTIES format_version = 42"))
- .failure().hasMessage("line 1:79: Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 2");
+ .failure().hasMessage("line 1:79: Unable to set catalog 'iceberg' table property 'format_version' to [42]: format_version must be between 1 and 3");
}
@Test
@@ -1415,6 +1486,21 @@ private void testMapValueSchemaChange(String format, String expectedValue)
}
}
+ @Test
+ void testPositionDeleteAndDeletionVector()
+ {
+ try (TestTable table = newTrinoTable("test_delete_v2_v3", "(x int) WITH (format_version = 2)", List.of("1", "2", "3", "4"))) {
+ assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 1", 1);
+ assertUpdate("ALTER TABLE " + table.getName() + " SET PROPERTIES format_version = 3");
+
+ assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 2", 1);
+ assertThat(query("SELECT * FROM " + table.getName())).matches("VALUES 3, 4");
+
+ assertUpdate("DELETE FROM " + table.getName() + " WHERE x = 3", 1);
+ assertThat(query("SELECT * FROM " + table.getName())).matches("VALUES 4");
+ }
+ }
+
@Test
public void testUpdateAfterEqualityDelete()
throws Exception
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java
index f1052173d5d3..65322b4fd9e3 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergUnityRestCatalogConnectorSmokeTest.java
@@ -261,6 +261,14 @@ public void testDeleteRowsConcurrently()
.hasMessageContaining("Access Denied");
}
+ @Test
+ @Override
+ public void testDeleteWithV3Format()
+ {
+ assertThatThrownBy(super::testDeleteWithV3Format)
+ .hasMessageContaining("Access Denied");
+ }
+
@Test
@Override
public void testCreateOrReplaceTable()
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java
index 48c73bb6fb8a..d9dee8e88c76 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java
@@ -312,6 +312,14 @@ public void testDropTableWithNonExistentTableLocation()
.hasMessageMatching("Failed to load table: (.*)");
}
+ @Test
+ @Override
+ public void testDeleteWithV3Format()
+ {
+ assertThatThrownBy(super::testDeleteWithV3Format)
+ .hasMessageMatching("Unsupported format version: v3.*");
+ }
+
@Override
protected boolean isFileSorted(Location path, String sortColumnName)
{
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/delete/TestDeletionVectors.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/delete/TestDeletionVectors.java
new file mode 100644
index 000000000000..fa76188a03dc
--- /dev/null
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/delete/TestDeletionVectors.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.delete;
+
+import io.trino.filesystem.Location;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.filesystem.TrinoInput;
+import io.trino.metastore.HiveMetastore;
+import io.trino.plugin.iceberg.IcebergQueryRunner;
+import io.trino.plugin.iceberg.IcebergTestUtils;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.sql.TestTable;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.deletes.BaseDVFileWriter;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.junit.jupiter.api.Test;
+import org.roaringbitmap.longlong.LongBitmapDataProvider;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
+
+import java.util.List;
+
+import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory;
+import static io.trino.plugin.iceberg.IcebergTestUtils.getHiveMetastore;
+import static io.trino.plugin.iceberg.delete.DeletionVectors.readDeletionVector;
+import static io.trino.testing.TestingConnectorSession.SESSION;
+import static org.apache.iceberg.FileFormat.PUFFIN;
+import static org.assertj.core.api.Assertions.assertThat;
+
+final class TestDeletionVectors
+ extends AbstractTestQueryFramework
+{
+ private HiveMetastore metastore;
+ private TrinoFileSystemFactory fileSystemFactory;
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ QueryRunner queryRunner = IcebergQueryRunner.builder()
+ .addIcebergProperty("iceberg.format-version", "3")
+ .build();
+ metastore = getHiveMetastore(queryRunner);
+ fileSystemFactory = getFileSystemFactory(queryRunner);
+ return queryRunner;
+ }
+
+ @Test
+ void testReadDeletionVector()
+ throws Exception
+ {
+ try (TestTable table = newTrinoTable("test_dv", "(x int)", List.of("1", "2", "3", "4", "5"))) {
+ BaseTable icebergTable = loadTable(table.getName());
+ String filePath = (String) computeScalar("SELECT file_path FROM \"" + table.getName() + "$files\"");
+
+ // Write deletion vectors in the Puffin format
+ OutputFileFactory fileFactory = OutputFileFactory.builderFor(icebergTable, 1, 1).format(PUFFIN).build();
+ BaseDVFileWriter writer = new BaseDVFileWriter(fileFactory, _ -> null);
+ writer.delete(filePath, 0, icebergTable.spec(), null);
+ writer.delete(filePath, 2, icebergTable.spec(), null);
+ writer.delete(filePath, 4, icebergTable.spec(), null);
+ writer.close();
+
+ DeleteWriteResult result = writer.result();
+ List deleteFiles = result.deleteFiles();
+ assertThat(deleteFiles).hasSize(1);
+ DeleteFile deleteFile = deleteFiles.getFirst();
+
+ // Verify deletion vectors
+ LongBitmapDataProvider deletedRows = new Roaring64Bitmap();
+ try (TrinoInput input = fileSystemFactory.create(SESSION).newInputFile(Location.of(deleteFile.location())).newInput()) {
+ readDeletionVector(input, deleteFile.recordCount(), deleteFile.contentOffset(), deleteFile.contentSizeInBytes(), deletedRows);
+ }
+ assertThat(deletedRows.getLongCardinality()).isEqualTo(3);
+ assertThat(deletedRows.contains(0)).isTrue();
+ assertThat(deletedRows.contains(1)).isFalse();
+ assertThat(deletedRows.contains(2)).isTrue();
+ assertThat(deletedRows.contains(3)).isFalse();
+ assertThat(deletedRows.contains(4)).isTrue();
+ }
+ }
+
+ private BaseTable loadTable(String tableName)
+ {
+ return IcebergTestUtils.loadTable(tableName, metastore, fileSystemFactory, "iceberg", "tpch");
+ }
+}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java
index d93e19527807..4e2465314ac5 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/util/FileOperationUtils.java
@@ -19,6 +19,7 @@
import java.util.List;
import java.util.function.Predicate;
+import java.util.regex.Pattern;
import static io.trino.filesystem.tracing.FileSystemAttributes.FILE_LOCATION;
import static io.trino.plugin.iceberg.util.FileOperationUtils.FileType.DATA;
@@ -29,6 +30,8 @@
public final class FileOperationUtils
{
+ private static final Pattern MANIFEST_PATTERN = Pattern.compile(".*-m[0-9]*.avro");
+
private FileOperationUtils() {}
public static Multiset getOperations(List spans)
@@ -73,6 +76,7 @@ public enum FileType
SNAPSHOT,
MANIFEST,
STATS,
+ PUFFIN,
DATA,
DELETE,
METASTORE,
@@ -86,12 +90,15 @@ public static FileType fromFilePath(String path)
if (path.contains("/snap-")) {
return SNAPSHOT;
}
- if (path.endsWith("-m0.avro")) {
+ if (MANIFEST_PATTERN.matcher(path).matches()) {
return MANIFEST;
}
if (path.endsWith(".stats")) {
return STATS;
}
+ if (path.endsWith(".puffin")) {
+ return PUFFIN;
+ }
if (path.contains("/data/") && (path.endsWith(".orc") || path.endsWith(".parquet"))) {
return DATA;
}
diff --git a/pom.xml b/pom.xml
index a133129e7f63..a7b3ea6d30da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2707,6 +2707,21 @@
shaded.parquet.it.unimi.dsi.fastutil
+
+
+
+ org.apache.orc
+ orc-core
+
+
+ org.apache.orc
+ orc-format
+
+
+
+ org.apache.orc
+
+
diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java
index c7b1a17de184..a3a2bc975d24 100644
--- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java
+++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java
@@ -16,14 +16,15 @@
import io.trino.filesystem.Location;
import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin;
import io.trino.plugin.exchange.filesystem.containers.MinioStorage;
+import io.trino.plugin.iceberg.BaseIcebergParquetConnectorTest;
import io.trino.plugin.iceberg.IcebergQueryRunner;
-import io.trino.plugin.iceberg.TestIcebergParquetConnectorTest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Isolated;
import static io.trino.plugin.exchange.filesystem.containers.MinioStorage.getExchangeManagerProperties;
+import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting;
import static io.trino.testing.FaultTolerantExecutionConnectorTestHelper.getExtraProperties;
import static io.trino.testing.TestingNames.randomNameSuffix;
@@ -34,10 +35,15 @@
@Isolated
@TestInstance(PER_CLASS)
public class TestIcebergParquetFaultTolerantExecutionConnectorTest
- extends TestIcebergParquetConnectorTest
+ extends BaseIcebergParquetConnectorTest
{
private MinioStorage minioStorage;
+ public TestIcebergParquetFaultTolerantExecutionConnectorTest()
+ {
+ super(FORMAT_VERSION_SUPPORT_MAX);
+ }
+
@Override
protected IcebergQueryRunner.Builder createQueryRunnerBuilder()
{
diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
index 457cc44653c3..0cf148f26475 100644
--- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
+++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
@@ -1963,12 +1963,19 @@ public void testTrinoReadsSparkRowLevelDeletesWithRowTypes(StorageFormat tableSt
@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
public void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat)
+ {
+ // V1 doesn't support row level deletes
+ testSparkReadsTrinoRowLevelDeletes(storageFormat, 2);
+ testSparkReadsTrinoRowLevelDeletes(storageFormat, 3);
+ }
+
+ private static void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat, int formatVersion)
{
String tableName = toLowerCase(format("test_spark_reads_trino_row_level_deletes_%s_%s", storageFormat.name(), randomNameSuffix()));
String sparkTableName = sparkTableName(tableName);
String trinoTableName = trinoTableName(tableName);
- onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(a INT, b INT) WITH(partitioning = ARRAY['b'], format_version = 2, format = '" + storageFormat.name() + "')");
+ onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(a INT, b INT) WITH(partitioning = ARRAY['b'], format_version = " + formatVersion + ", format = '" + storageFormat.name() + "')");
onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)");
// Delete one row in a file
onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE a = 13");
@@ -1988,15 +1995,49 @@ public void testSparkReadsTrinoRowLevelDeletes(StorageFormat storageFormat)
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
+ public void testSparkReadsTrinoMultipleDeleteFiles(StorageFormat storageFormat)
+ {
+ // V1 doesn't support row level deletes
+ testSparkReadsTrinoMultipleDeleteFiles(storageFormat, 2);
+ testSparkReadsTrinoMultipleDeleteFiles(storageFormat, 3);
+ }
+
+ private static void testSparkReadsTrinoMultipleDeleteFiles(StorageFormat storageFormat, int formatVersion)
+ {
+ String tableName = toLowerCase(format("test_spark_reads_trino_multiple_delete_files_%s_%s", storageFormat.name(), randomNameSuffix()));
+ String sparkTableName = sparkTableName(tableName);
+ String trinoTableName = trinoTableName(tableName);
+
+ onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(a INT, b INT) WITH(partitioning = ARRAY['b'], format_version = " + formatVersion + ", format = '" + storageFormat.name() + "')");
+ onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)");
+
+ // Delete one row from each data file
+ onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE a IN (1, 11)");
+
+ List expected = ImmutableList.of(row(2, 2), row(3, 2), row(12, 12), row(13, 12));
+ assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(expected);
+ assertThat(onSpark().executeQuery("SELECT * FROM " + sparkTableName)).containsOnly(expected);
+
+ onSpark().executeQuery("DROP TABLE " + sparkTableName);
+ }
+
@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats")
public void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storageFormat)
+ {
+ // V1 doesn't support row level deletes
+ testSparkReadsTrinoRowLevelDeletesWithRowTypes(storageFormat, 2);
+ testSparkReadsTrinoRowLevelDeletesWithRowTypes(storageFormat, 3);
+ }
+
+ private static void testSparkReadsTrinoRowLevelDeletesWithRowTypes(StorageFormat storageFormat, int formatVersion)
{
String tableName = toLowerCase(format("test_spark_reads_trino_row_level_deletes_row_types_%s_%s", storageFormat.name(), randomNameSuffix()));
String sparkTableName = sparkTableName(tableName);
String trinoTableName = trinoTableName(tableName);
onTrino().executeQuery("CREATE TABLE " + trinoTableName + "(part_key INT, int_t INT, row_t ROW(a INT, b INT)) " +
- "WITH(partitioning = ARRAY['part_key'], format_version = 2, format = '" + storageFormat.name() + "') ");
+ "WITH(partitioning = ARRAY['part_key'], format_version = " + formatVersion + ", format = '" + storageFormat.name() + "') ");
onTrino().executeQuery("INSERT INTO " + trinoTableName + " VALUES (1, 1, row(1, 2)), (1, 2, row(3, 4)), (1, 3, row(5, 6)), (2, 4, row(1, 2))");
onTrino().executeQuery("DELETE FROM " + trinoTableName + " WHERE int_t = 2");
@@ -2075,14 +2116,20 @@ public void testMissingMetrics()
}
@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
- public void testOptimizeOnV2IcebergTable()
+ public void testOptimizeIcebergTable()
+ {
+ testOptimizeIcebergTable(2);
+ testOptimizeIcebergTable(3);
+ }
+
+ private static void testOptimizeIcebergTable(int formatVersion)
{
String tableName = format("test_optimize_on_v2_iceberg_table_%s", randomNameSuffix());
String sparkTableName = sparkTableName(tableName);
String trinoTableName = trinoTableName(tableName);
onSpark().executeQuery("CREATE TABLE " + sparkTableName + "(a INT, b INT) " +
"USING ICEBERG PARTITIONED BY (b) " +
- "TBLPROPERTIES ('format-version'='2', 'write.delete.mode'='merge-on-read')");
+ "TBLPROPERTIES ('format-version'='" + formatVersion + "', 'write.delete.mode'='merge-on-read')");
onSpark().executeQuery("INSERT INTO " + sparkTableName + " VALUES (1, 2), (2, 2), (3, 2), (11, 12), (12, 12), (13, 12)");
onTrino().executeQuery(format("ALTER TABLE %s EXECUTE OPTIMIZE", trinoTableName));