From 93827a1d699feaf88cc7605255b171ad5673d314 Mon Sep 17 00:00:00 2001 From: kaveti Date: Fri, 17 Apr 2026 12:37:22 +0530 Subject: [PATCH] Add added_snapshot_id column to Iceberg $files table --- docs/src/main/sphinx/connector/iceberg.md | 15 +++++-- .../plugin/iceberg/system/FilesTable.java | 3 ++ .../system/files/FilesTablePageSource.java | 43 ++++++++++++------- .../apache/iceberg/IcebergManifestUtils.java | 7 +++ .../iceberg/BaseIcebergSystemTables.java | 13 ++++++ .../trino/plugin/iceberg/DataFileRecord.java | 2 +- 6 files changed, 64 insertions(+), 19 deletions(-) diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index 98a453b8cf4f..dad8700a1c0b 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -1490,9 +1490,9 @@ SELECT * FROM "test_table$files"; ``` ```text - content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | file_sequence_number | data_sequence_number | referenced_data_file | pos | manifest_location | first_row_id | content_offset | content_size_in_bytes -----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+----------------+----------------------+----------------------+----------------------+-----+------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+---------------------- - 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | | | | 1 | 1 | | 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/snap-6116016324956900164-0-3c1b2496-0670-4e37-81f6.avro | | | + content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | added_snapshot_id | file_sequence_number | data_sequence_number | referenced_data_file | pos | manifest_location | first_row_id | content_offset | content_size_in_bytes +----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+----------------+--------------------+----------------------+----------------------+----------------------+-----+------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+---------------------- + 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | | | | 6116016324956900164 | 1 | 1 | | 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/snap-6116016324956900164-0-3c1b2496-0670-4e37-81f6.avro | | | ``` The output of the query has the following columns: @@ -1570,6 +1570,15 @@ The output of the query has the following columns: * - `readable_metrics` - `JSON` - File metrics in human-readable form. +* - `added_snapshot_id` + - `BIGINT` + - The snapshot ID when the file was first added to the table, as recorded in + the selected snapshot's live manifest entry. This makes it possible to join + current live files with `$snapshots` to inspect when each file was + introduced. If a file is moved to a different manifest by manifest rewrite, + the manifest location may change, but `added_snapshot_id` still refers to + the snapshot in which the file was originally added. Use `$entries` or + `$all_entries` to inspect historical manifest references across snapshots. * - `file_sequence_number` - `BIGINT` - The sequence number of the file, tracking when the file was added. diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java index 150d5a8fb0a6..41655354826e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java @@ -73,6 +73,7 @@ public final class FilesTable public static final String EQUALITY_IDS_COLUMN_NAME = "equality_ids"; public static final String SORT_ORDER_ID_COLUMN_NAME = "sort_order_id"; public static final String READABLE_METRICS_COLUMN_NAME = "readable_metrics"; + public static final String ADDED_SNAPSHOT_ID_COLUMN_NAME = "added_snapshot_id"; public static final String FILE_SEQUENCE_NUMBER_COLUMN_NAME = "file_sequence_number"; public static final String DATA_SEQUENCE_NUMBER_COLUMN_NAME = "data_sequence_number"; public static final String REFERENCED_DATA_FILE_COLUMN_NAME = "referenced_data_file"; @@ -101,6 +102,7 @@ public final class FilesTable EQUALITY_IDS_COLUMN_NAME, SORT_ORDER_ID_COLUMN_NAME, READABLE_METRICS_COLUMN_NAME, + ADDED_SNAPSHOT_ID_COLUMN_NAME, FILE_SEQUENCE_NUMBER_COLUMN_NAME, DATA_SEQUENCE_NUMBER_COLUMN_NAME, REFERENCED_DATA_FILE_COLUMN_NAME, @@ -180,6 +182,7 @@ public static Type getColumnType(String columnName, TypeManager typeManager) MANIFEST_LOCATION_COLUMN_NAME -> VARCHAR; case RECORD_COUNT_COLUMN_NAME, FILE_SIZE_IN_BYTES_COLUMN_NAME, + ADDED_SNAPSHOT_ID_COLUMN_NAME, FILE_SEQUENCE_NUMBER_COLUMN_NAME, DATA_SEQUENCE_NUMBER_COLUMN_NAME, POS_COLUMN_NAME, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java index a4f85e1a581b..3cc04ddcde73 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java @@ -33,6 +33,7 @@ import io.trino.spi.type.TypeManager; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.IcebergManifestUtils.FileEntryWithMetadata; import org.apache.iceberg.ManifestReader; import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.PartitionField; @@ -66,19 +67,28 @@ import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes; import static io.trino.plugin.iceberg.IcebergUtil.readerForManifest; import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper; +import static io.trino.plugin.iceberg.system.FilesTable.ADDED_SNAPSHOT_ID_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.COLUMN_SIZES_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_COLUMN_NAME; +import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_OFFSET_COLUMN_NAME; +import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME; +import static io.trino.plugin.iceberg.system.FilesTable.DATA_SEQUENCE_NUMBER_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.EQUALITY_IDS_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.FILE_FORMAT_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.FILE_PATH_COLUMN_NAME; +import static io.trino.plugin.iceberg.system.FilesTable.FILE_SEQUENCE_NUMBER_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.FILE_SIZE_IN_BYTES_COLUMN_NAME; +import static io.trino.plugin.iceberg.system.FilesTable.FIRST_ROW_ID_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.KEY_METADATA_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.LOWER_BOUNDS_COLUMN_NAME; +import static io.trino.plugin.iceberg.system.FilesTable.MANIFEST_LOCATION_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.NAN_VALUE_COUNTS_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.NULL_VALUE_COUNTS_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.PARTITION_COLUMN_NAME; +import static io.trino.plugin.iceberg.system.FilesTable.POS_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.READABLE_METRICS_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.RECORD_COUNT_COLUMN_NAME; +import static io.trino.plugin.iceberg.system.FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.SORT_ORDER_ID_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.SPEC_ID_COLUMN_NAME; import static io.trino.plugin.iceberg.system.FilesTable.SPLIT_OFFSETS_COLUMN_NAME; @@ -95,6 +105,7 @@ import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.VARCHAR; import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.IcebergManifestUtils.liveEntriesWithMetadata; import static org.apache.iceberg.MetricsUtil.readableMetricsStruct; public final class FilesTablePageSource @@ -108,7 +119,7 @@ public final class FilesTablePageSource private final List partitionFields; private final Optional partitionColumnType; private final List primitiveFields; - private final Iterator> contentIterator; + private final Iterator entryIterator; private final Map columnNameToIndex; private final PageBuilder pageBuilder; private final long completedBytes; @@ -137,7 +148,7 @@ public FilesTablePageSource( .collect(toImmutableList()); ManifestReader> manifestReader = closer.register(readerForManifest(split.manifestFile(), fileIoFactory.create(trinoFileSystem), idToPartitionSpecMapping)); // TODO figure out why selecting the specific column causes null to be returned for offset_splits - this.contentIterator = closer.register(requireNonNull(manifestReader, "manifestReader is null").iterator()); + this.entryIterator = closer.register(liveEntriesWithMetadata(requireNonNull(manifestReader, "manifestReader is null")).iterator()); this.pageBuilder = new PageBuilder(requiredColumns.stream().map(column -> { if (column.equals(PARTITION_COLUMN_NAME)) { return split.partitionColumnType().orElseThrow(); @@ -184,10 +195,11 @@ public SourcePage getNextSourcePage() return null; } - while (contentIterator.hasNext() && !pageBuilder.isFull()) { + while (entryIterator.hasNext() && !pageBuilder.isFull()) { pageBuilder.declarePosition(); long start = System.nanoTime(); - ContentFile contentFile = contentIterator.next(); + FileEntryWithMetadata entry = entryIterator.next(); + ContentFile contentFile = entry.file(); writeValueOrNull(pageBuilder, CONTENT_COLUMN_NAME, () -> contentFile.content().id(), INTEGER::writeInt); writeValueOrNull(pageBuilder, FILE_PATH_COLUMN_NAME, contentFile::location, VARCHAR::writeString); @@ -218,22 +230,23 @@ public SourcePage getNextSourcePage() (blkBldr, value) -> INTEGER.writeLong(blkBldr, value)); writeValueOrNull(pageBuilder, READABLE_METRICS_COLUMN_NAME, () -> metadataSchema.findField(MetricsUtil.READABLE_METRICS), (blkBldr, value) -> VARCHAR.writeString(blkBldr, readableMetricsToJson(readableMetricsStruct(schema, contentFile, value.type().asStructType()), primitiveFields))); - writeValueOrNull(pageBuilder, FilesTable.FILE_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::fileSequenceNumber, BIGINT::writeLong); - writeValueOrNull(pageBuilder, FilesTable.DATA_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::dataSequenceNumber, BIGINT::writeLong); + writeValueOrNull(pageBuilder, ADDED_SNAPSHOT_ID_COLUMN_NAME, entry::snapshotId, BIGINT::writeLong); + writeValueOrNull(pageBuilder, FILE_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::fileSequenceNumber, BIGINT::writeLong); + writeValueOrNull(pageBuilder, DATA_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::dataSequenceNumber, BIGINT::writeLong); if (contentFile instanceof DeleteFile deleteFile) { - writeValueOrNull(pageBuilder, FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME, deleteFile::referencedDataFile, VARCHAR::writeString); - writeValueOrNull(pageBuilder, FilesTable.CONTENT_OFFSET_COLUMN_NAME, deleteFile::contentOffset, BIGINT::writeLong); - writeValueOrNull(pageBuilder, FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME, deleteFile::contentSizeInBytes, BIGINT::writeLong); + writeValueOrNull(pageBuilder, REFERENCED_DATA_FILE_COLUMN_NAME, deleteFile::referencedDataFile, VARCHAR::writeString); + writeValueOrNull(pageBuilder, CONTENT_OFFSET_COLUMN_NAME, deleteFile::contentOffset, BIGINT::writeLong); + writeValueOrNull(pageBuilder, CONTENT_SIZE_IN_BYTES_COLUMN_NAME, deleteFile::contentSizeInBytes, BIGINT::writeLong); } else { // For non-delete files, these columns should be null - writeNull(pageBuilder, FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME); - writeNull(pageBuilder, FilesTable.CONTENT_OFFSET_COLUMN_NAME); - writeNull(pageBuilder, FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME); + writeNull(pageBuilder, REFERENCED_DATA_FILE_COLUMN_NAME); + writeNull(pageBuilder, CONTENT_OFFSET_COLUMN_NAME); + writeNull(pageBuilder, CONTENT_SIZE_IN_BYTES_COLUMN_NAME); } - writeValueOrNull(pageBuilder, FilesTable.POS_COLUMN_NAME, contentFile::pos, BIGINT::writeLong); - writeValueOrNull(pageBuilder, FilesTable.MANIFEST_LOCATION_COLUMN_NAME, contentFile::manifestLocation, VARCHAR::writeString); - writeValueOrNull(pageBuilder, FilesTable.FIRST_ROW_ID_COLUMN_NAME, contentFile::firstRowId, BIGINT::writeLong); + writeValueOrNull(pageBuilder, POS_COLUMN_NAME, contentFile::pos, BIGINT::writeLong); + writeValueOrNull(pageBuilder, MANIFEST_LOCATION_COLUMN_NAME, contentFile::manifestLocation, VARCHAR::writeString); + writeValueOrNull(pageBuilder, FIRST_ROW_ID_COLUMN_NAME, contentFile::firstRowId, BIGINT::writeLong); readTimeNanos += System.nanoTime() - start; } diff --git a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/IcebergManifestUtils.java b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/IcebergManifestUtils.java index 0536fc08e03f..c7ce021ed57a 100644 --- a/plugin/trino-iceberg/src/main/java/org/apache/iceberg/IcebergManifestUtils.java +++ b/plugin/trino-iceberg/src/main/java/org/apache/iceberg/IcebergManifestUtils.java @@ -34,4 +34,11 @@ public static > CloseableIterable> liveE { return CloseableIterable.transform(manifestReader.liveEntries(), ManifestEntry::file); } + + public static > CloseableIterable liveEntriesWithMetadata(ManifestReader manifestReader) + { + return CloseableIterable.transform(manifestReader.liveEntries(), entry -> new FileEntryWithMetadata(entry.file(), entry.snapshotId())); + } + + public record FileEntryWithMetadata(ContentFile file, long snapshotId) {} } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java index e2417925e3ec..dd9e2b1c92a5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java @@ -538,6 +538,7 @@ public void testFilesPartitionTable() "('equality_ids', 'array(integer)', '', '')," + "('sort_order_id', 'integer', '', '')," + "('readable_metrics', 'json', '', '')," + + "('added_snapshot_id', 'bigint', '', '')," + "('file_sequence_number', 'bigint', '', '')," + "('data_sequence_number', 'bigint', '', '')," + "('referenced_data_file', 'varchar', '', '')," + @@ -558,6 +559,18 @@ public void testFilesPartitionTable() .build()); } + @Test + public void testFilesTableAddedSnapshotId() + { + assertThat(query("SELECT count(*) FROM test_schema.\"test_table$files\" WHERE added_snapshot_id IS NOT NULL")) + .matches("VALUES BIGINT '4'"); + assertThat(query("SELECT count(DISTINCT added_snapshot_id) FROM test_schema.\"test_table$files\"")) + .matches("VALUES BIGINT '2'"); + assertThat(query("SELECT count(*) FROM test_schema.\"test_table$files\" f " + + "WHERE f.added_snapshot_id IN (SELECT snapshot_id FROM test_schema.\"test_table$snapshots\")")) + .matches("VALUES BIGINT '4'"); + } + @Test void testFilesTableReadableMetrics() { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java index 4f5bd9b9c2e0..bbeabeb1e4db 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java @@ -37,7 +37,7 @@ public class DataFileRecord @SuppressWarnings("unchecked") public static DataFileRecord toDataFileRecord(MaterializedRow row) { - assertThat(row.getFieldCount()).isEqualTo(25); + assertThat(row.getFieldCount()).isEqualTo(26); return new DataFileRecord( (int) row.getField(0), (String) row.getField(1),