diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 3ddf68c57ee2..5bbadcb98d10 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -382,6 +382,12 @@ provided + + com.azure + azure-security-keyvault-keys + runtime + + com.azure azure-storage-blob @@ -406,6 +412,13 @@ + + com.google.cloud.gcs.analytics + gcs-analytics-core + 1.2.1 + runtime + + io.airlift http-client diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java index a40ba9e427bc..f0e1ade85260 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java @@ -88,14 +88,12 @@ import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; import static java.util.Objects.requireNonNull; -import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE; import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema; import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert; public class IcebergFileWriterFactory { private static final Schema POSITION_DELETE_SCHEMA = pathPosSchema(); - private static final MetricsConfig FULL_METRICS_CONFIG = MetricsConfig.fromProperties(ImmutableMap.of(DEFAULT_WRITE_METRICS_MODE, "full")); private static final Splitter COLUMN_NAMES_SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings(); private final TypeManager typeManager; @@ -152,8 +150,8 @@ public IcebergFileWriter createPositionDeleteWriter( Map storageProperties) { return switch (fileFormat) { - case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties); - case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE)); + case PARQUET -> createParquetWriter(MetricsConfig.forPositionDelete(), fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties); + case ORC -> createOrcWriter(MetricsConfig.forPositionDelete(), fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE)); case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, storageProperties); }; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index c8c55610d5ac..f5afd474148c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -3294,7 +3294,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col switch (task.content()) { case DATA -> dataTasks.add(task); case POSITION_DELETES -> deleteTasks.add(task); - case EQUALITY_DELETES -> throw new UnsupportedOperationException("Unsupported task content: " + task.content()); + case EQUALITY_DELETES, DATA_MANIFEST, DELETE_MANIFEST -> throw new UnsupportedOperationException("Unsupported task content: " + task.content()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java index 8b51e6f3b3a1..b887d4924f54 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSink.java @@ -167,7 +167,7 @@ public IcebergPageSink( this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null"); this.session = requireNonNull(session, "session is null"); this.fileFormat = requireNonNull(fileFormat, "fileFormat is null"); - this.metricsConfig = MetricsConfig.fromProperties(requireNonNull(storageProperties, "storageProperties is null")); + this.metricsConfig = MetricsConfig.from(requireNonNull(storageProperties, "storageProperties is null"), null, null); this.maxOpenWriters = maxOpenWriters; this.pagePartitioner = new PagePartitioner(pageIndexerFactory, toPartitionColumns(partitionColumns, partitionSpec, outputSchema)); this.targetMaxFileSize = IcebergSessionProperties.getTargetMaxFileSize(session); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index 334f531aa9ef..d53cbe39de1d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -364,7 +364,7 @@ private synchronized Iterator prepareFileTasksIterator(L } yield isUnconstrainedPathAndTimeDomain(); } - case DATA -> throw new IllegalStateException("Unexpected delete file: " + deleteFile); + case DATA, DATA_MANIFEST, DELETE_MANIFEST -> throw new IllegalStateException("Unexpected delete file: " + deleteFile); }) .collect(toImmutableList()); scannedFiles.add(new DataFileWithDeleteFiles(wholeFileTask.file(), fullyAppliedDeletes)); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java index 3fcc2d94d12a..7823f3a60865 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/delete/DeleteManager.java @@ -85,7 +85,7 @@ public Optional getDeletePredicate( } } case EQUALITY_DELETES -> equalityDeleteFiles.add(deleteFile); - case DATA -> throw new VerifyException("DATA is not delete file type"); + case DATA, DATA_MANIFEST, DELETE_MANIFEST -> throw new VerifyException("DATA is not delete file type"); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java index 81e58b6b9c2d..dd92a6cc532a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java @@ -23,6 +23,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestListFile; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -149,6 +150,12 @@ public InputFile newInputFile(DeleteFile file) return SupportsBulkOperations.super.newInputFile(file); } + @Override + public InputFile newInputFile(ManifestListFile manifestList) + { + return SupportsBulkOperations.super.newInputFile(manifestList); + } + private void deleteBatch(List filesToDelete) { try { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index d90fb9e808c7..1e4756fa04e4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -8000,6 +8000,77 @@ public void testDescribeOutputWithVersionedTable() } } + @Test + public void testTimeTravelWithFilterOnRenamedColumn() + { + testTimeTravelWithFilterOnRenamedColumn(false); + testTimeTravelWithFilterOnRenamedColumn(true); + } + + private void testTimeTravelWithFilterOnRenamedColumn(boolean partitioned) + { + String partition = partitioned ? "WITH (partitioning = ARRAY['part'])" : ""; + try (TestTable table = newTrinoTable("time_travel_with_filter_on_rename_", "(x int, y int, part int)" + partition)) { + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)", 3); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)"); + long firstSnapshotId = getCurrentSnapshotId(table.getName()); + + assertUpdate("ALTER TABLE " + table.getName() + " RENAME COLUMN x TO renamed_x"); + + // generate a new version + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 2, 3)", 1); + + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + firstSnapshotId + " WHERE x = 1")) + .matches("VALUES (1, 1, 1), (1, 2, 2)"); + } + } + + @Test + public void testTimeTravelWithFilterOnDroppedColumn() + { + testTimeTravelWithFilterOnDroppedColumn(false); + testTimeTravelWithFilterOnDroppedColumn(true); + } + + private void testTimeTravelWithFilterOnDroppedColumn(boolean partitioned) + { + String partition = partitioned ? "WITH (partitioning = ARRAY['part'])" : ""; + try (TestTable table = newTrinoTable("time_travel_with_filter_on_drop_", "(x int, y int, part int)" + partition)) { + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)", 3); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)"); + long firstSnapshotId = getCurrentSnapshotId(table.getName()); + + assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN x"); + + // generate a new version + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 2)", 1); + + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + firstSnapshotId + " WHERE x = 1")) + .matches("VALUES (1, 1, 1), (1, 2, 2)"); + } + } + + @Test + public void testTimeTravelWithFilterOnRenamedPartitionColumn() + { + try (TestTable table = newTrinoTable("time_travel_with_filter_on_drop_", "(x int, part1 int, part2 int) WITH (partitioning = ARRAY['part1', 'part2'])")) { + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 1), (1, 1, 2), (2, 2, 2)", 3); + assertThat(query("SELECT * FROM " + table.getName())) + .matches("VALUES (1, 1, 1), (1, 1, 2), (2, 2, 2)"); + long firstSnapshotId = getCurrentSnapshotId(table.getName()); + + assertUpdate("ALTER TABLE " + table.getName() + " RENAME COLUMN part1 TO renamed_part"); + + // generate a new version + assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 3)", 1); + + assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + firstSnapshotId + " WHERE part1 = 1")) + .matches("VALUES (1, 1, 1), (1, 1, 2)"); + } + } + @Test public void testDeleteRetainsTableHistory() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index 4e865ddfc097..c20c92b3ace0 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -1783,6 +1783,45 @@ void testAnalyzeNoSnapshot() catalog.dropTable(SESSION, schemaTableName); } + @Test // regression test for https://github.com/trinodb/trino/issues/20511 + void testRequiredField() + { + testRequiredField(true); + testRequiredField(false); + } + + private void testRequiredField(boolean projectionPushdown) + { + Session projectionPushdownEnabled = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", "projection_pushdown_enabled", Boolean.toString(projectionPushdown)) + .build(); + + String table = "test_required_field" + randomNameSuffix(); + SchemaTableName schemaTableName = new SchemaTableName("tpch", table); + + catalog.newCreateTableTransaction( + SESSION, + schemaTableName, + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "struct", Types.StructType.of( + Types.NestedField.required(3, "field", Types.IntegerType.get())))), + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + Optional.ofNullable(catalog.defaultTableLocation(SESSION, schemaTableName)), + ImmutableMap.of()) + .commitTransaction(); + + assertUpdate("INSERT INTO " + table + " VALUES (1, row(10)), (2, NULL)", 2); + + assertThat(query(projectionPushdownEnabled, "SELECT id FROM " + table + " WHERE struct.field IS NOT NULL")) + .matches("VALUES 1"); + assertThat(query(projectionPushdownEnabled, "SELECT id FROM " + table + " WHERE struct.field IS NULL")) + .matches("VALUES 2"); + + catalog.dropTable(SESSION, schemaTableName); + } + private void testHighlyNestedFieldPartitioningWithTimestampTransform(String partitioning, String partitionDirectoryRegex, Set expectedPartitionDirectories) { String tableName = "test_highly_nested_field_partitioning_with_timestamp_transform_" + randomNameSuffix(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java index 8a591a88d3e6..f4dd5c8ff33f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV3.java @@ -37,6 +37,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotChanges; import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; @@ -704,7 +705,7 @@ void testV3WriteDefault() BaseTable tempTable = loadTable(temp); loadTable(tableName).newFastAppend() - .appendFile(getOnlyElement(tempTable.currentSnapshot().addedDataFiles(tempTable.io()))) + .appendFile(getOnlyElement(SnapshotChanges.builderFor(tempTable).build().addedDataFiles())) .commit(); // The 'value' column is missing from the data file and has no initial-default, so it should return NULL @@ -989,7 +990,7 @@ private void assertV3InsertProducesRowLineageMetadata(String fileFormat) long totalRecords = 0; Long expectedLastUpdatedSequenceNumber = null; - for (DataFile file : snapshot.addedDataFiles(table.io())) { + for (DataFile file : SnapshotChanges.builderFor(table).build().addedDataFiles()) { fileCount++; totalRecords += file.recordCount(); @@ -1291,7 +1292,7 @@ void testV3RejectsEncryptionKeysInMetadata() hadoopTableLocation.toString()); icebergTable.newFastAppend() - .appendFile(getOnlyElement(tempTable.currentSnapshot().addedDataFiles(tempTable.io()))) + .appendFile(getOnlyElement(SnapshotChanges.builderFor(tempTable).build().addedDataFiles())) .commit(); // Inject encryption-keys + snapshot key-id into the current metadata.json. diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java index 7acd6c8306cb..fa5dd57ad404 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java @@ -224,7 +224,7 @@ public void testCreateTableWithTrailingSpaceInLocation() public void testDropTableWithMissingMetadataFile() { assertThatThrownBy(super::testDropTableWithMissingMetadataFile) - .hasMessageMatching(".* Table '.*' does not exist"); + .hasMessageMatching("Failed to load table: (.*)"); } @Test @@ -248,7 +248,7 @@ public void testDropTableWithMissingManifestListFile() public void testDropTableWithNonExistentTableLocation() { assertThatThrownBy(super::testDropTableWithNonExistentTableLocation) - .hasMessageMatching(".* Table '.*' does not exist"); + .hasMessageMatching("Failed to load table: (.*)"); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java index e98f0e2daea4..61e5a40f13be 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java @@ -234,7 +234,7 @@ public void testDropTableWithMissingSnapshotFile() assertThatThrownBy(super::testDropTableWithMissingSnapshotFile) .isInstanceOf(QueryFailedException.class) .cause() - .hasMessageContaining("Failed to drop table") + .hasMessageMatching("Failed to open input stream for file: .*avro") .hasNoCause(); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3TablesConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3TablesConnectorSmokeTest.java index 7138f0b08ffe..4a0b588b17d9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3TablesConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3TablesConnectorSmokeTest.java @@ -180,7 +180,7 @@ public void testCreateTableWithTrailingSpaceInLocation() public void testRenameTable() { assertThatThrownBy(super::testRenameTable) - .hasStackTraceContaining("Unable to process: RenameTable endpoint is not supported for Glue Catalog"); + .hasStackTraceContaining("RenameTable endpoint is not supported for Glue Catalog"); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java index 16ceb005505f..43ba0346daa4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java @@ -230,7 +230,7 @@ public void testDropTableWithMissingSnapshotFile() assertThatThrownBy(super::testDropTableWithMissingSnapshotFile) .isInstanceOf(QueryFailedException.class) .cause() - .hasMessageContaining("Failed to drop table") + .hasMessageMatching("Failed to open input stream for file: .*avro") .hasNoCause(); } diff --git a/pom.xml b/pom.xml index a4ee93c4a698..8a2f751a911d 100644 --- a/pom.xml +++ b/pom.xml @@ -198,7 +198,7 @@ v24.12.0 11.7.0 5.4.2 - 1.10.2 + 1.11.0 5.18.1 0.13.0 1.20.0