Skip to content

Commit 3bee45e

Browse files
committed
Add added_snapshot_id column to Iceberg $files table
1 parent 2fc068b commit 3bee45e

6 files changed

Lines changed: 66 additions & 19 deletions

File tree

docs/src/main/sphinx/connector/iceberg.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,9 +1490,9 @@ SELECT * FROM "test_table$files";
14901490
```
14911491

14921492
```text
1493-
content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | file_sequence_number | data_sequence_number | referenced_data_file | pos | manifest_location | first_row_id | content_offset | content_size_in_bytes
1494-
----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+----------------+----------------------+----------------------+----------------------+-----+------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+----------------------
1495-
0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | <null> | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | <null> | <null> | <null> | 1 | 1 | <null> | 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/snap-6116016324956900164-0-3c1b2496-0670-4e37-81f6.avro | <null> | <null> | <null>
1493+
content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | added_snapshot_id | file_sequence_number | data_sequence_number | referenced_data_file | pos | manifest_location | first_row_id | content_offset | content_size_in_bytes
1494+
----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+----------------+--------------------+----------------------+----------------------+----------------------+-----+------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+----------------------
1495+
0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | <null> | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | <null> | <null> | <null> | 6116016324956900164 | 1 | 1 | <null> | 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/snap-6116016324956900164-0-3c1b2496-0670-4e37-81f6.avro | <null> | <null> | <null>
14961496
```
14971497

14981498
The output of the query has the following columns:
@@ -1570,6 +1570,15 @@ The output of the query has the following columns:
15701570
* - `readable_metrics`
15711571
- `JSON`
15721572
- File metrics in human-readable form.
1573+
* - `added_snapshot_id`
1574+
- `BIGINT`
1575+
- The snapshot ID when the file was first added to the table, as recorded in
1576+
the selected snapshot's live manifest entry. This makes it possible to join
1577+
current live files with `$snapshots` to inspect when each file was
1578+
introduced. If a file is moved to a different manifest by manifest rewrite,
1579+
the manifest location may change, but `added_snapshot_id` still refers to
1580+
the snapshot in which the file was originally added. Use `$entries` or
1581+
`$all_entries` to inspect historical manifest references across snapshots.
15731582
* - `file_sequence_number`
15741583
- `BIGINT`
15751584
- The sequence number of the file, tracking when the file was added.

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/FilesTable.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public final class FilesTable
7373
public static final String EQUALITY_IDS_COLUMN_NAME = "equality_ids";
7474
public static final String SORT_ORDER_ID_COLUMN_NAME = "sort_order_id";
7575
public static final String READABLE_METRICS_COLUMN_NAME = "readable_metrics";
76+
public static final String ADDED_SNAPSHOT_ID_COLUMN_NAME = "added_snapshot_id";
7677
public static final String FILE_SEQUENCE_NUMBER_COLUMN_NAME = "file_sequence_number";
7778
public static final String DATA_SEQUENCE_NUMBER_COLUMN_NAME = "data_sequence_number";
7879
public static final String REFERENCED_DATA_FILE_COLUMN_NAME = "referenced_data_file";
@@ -101,6 +102,7 @@ public final class FilesTable
101102
EQUALITY_IDS_COLUMN_NAME,
102103
SORT_ORDER_ID_COLUMN_NAME,
103104
READABLE_METRICS_COLUMN_NAME,
105+
ADDED_SNAPSHOT_ID_COLUMN_NAME,
104106
FILE_SEQUENCE_NUMBER_COLUMN_NAME,
105107
DATA_SEQUENCE_NUMBER_COLUMN_NAME,
106108
REFERENCED_DATA_FILE_COLUMN_NAME,
@@ -180,6 +182,7 @@ public static Type getColumnType(String columnName, TypeManager typeManager)
180182
MANIFEST_LOCATION_COLUMN_NAME -> VARCHAR;
181183
case RECORD_COUNT_COLUMN_NAME,
182184
FILE_SIZE_IN_BYTES_COLUMN_NAME,
185+
ADDED_SNAPSHOT_ID_COLUMN_NAME,
183186
FILE_SEQUENCE_NUMBER_COLUMN_NAME,
184187
DATA_SEQUENCE_NUMBER_COLUMN_NAME,
185188
POS_COLUMN_NAME,

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/files/FilesTablePageSource.java

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.trino.spi.type.TypeManager;
3434
import org.apache.iceberg.ContentFile;
3535
import org.apache.iceberg.DeleteFile;
36+
import org.apache.iceberg.IcebergManifestUtils.FileEntryWithMetadata;
3637
import org.apache.iceberg.ManifestReader;
3738
import org.apache.iceberg.MetricsUtil;
3839
import org.apache.iceberg.PartitionField;
@@ -66,19 +67,28 @@
6667
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
6768
import static io.trino.plugin.iceberg.IcebergUtil.readerForManifest;
6869
import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper;
70+
import static io.trino.plugin.iceberg.system.FilesTable.ADDED_SNAPSHOT_ID_COLUMN_NAME;
6971
import static io.trino.plugin.iceberg.system.FilesTable.COLUMN_SIZES_COLUMN_NAME;
7072
import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_COLUMN_NAME;
73+
import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_OFFSET_COLUMN_NAME;
74+
import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME;
75+
import static io.trino.plugin.iceberg.system.FilesTable.DATA_SEQUENCE_NUMBER_COLUMN_NAME;
7176
import static io.trino.plugin.iceberg.system.FilesTable.EQUALITY_IDS_COLUMN_NAME;
7277
import static io.trino.plugin.iceberg.system.FilesTable.FILE_FORMAT_COLUMN_NAME;
7378
import static io.trino.plugin.iceberg.system.FilesTable.FILE_PATH_COLUMN_NAME;
79+
import static io.trino.plugin.iceberg.system.FilesTable.FILE_SEQUENCE_NUMBER_COLUMN_NAME;
7480
import static io.trino.plugin.iceberg.system.FilesTable.FILE_SIZE_IN_BYTES_COLUMN_NAME;
81+
import static io.trino.plugin.iceberg.system.FilesTable.FIRST_ROW_ID_COLUMN_NAME;
7582
import static io.trino.plugin.iceberg.system.FilesTable.KEY_METADATA_COLUMN_NAME;
7683
import static io.trino.plugin.iceberg.system.FilesTable.LOWER_BOUNDS_COLUMN_NAME;
84+
import static io.trino.plugin.iceberg.system.FilesTable.MANIFEST_LOCATION_COLUMN_NAME;
7785
import static io.trino.plugin.iceberg.system.FilesTable.NAN_VALUE_COUNTS_COLUMN_NAME;
7886
import static io.trino.plugin.iceberg.system.FilesTable.NULL_VALUE_COUNTS_COLUMN_NAME;
7987
import static io.trino.plugin.iceberg.system.FilesTable.PARTITION_COLUMN_NAME;
88+
import static io.trino.plugin.iceberg.system.FilesTable.POS_COLUMN_NAME;
8089
import static io.trino.plugin.iceberg.system.FilesTable.READABLE_METRICS_COLUMN_NAME;
8190
import static io.trino.plugin.iceberg.system.FilesTable.RECORD_COUNT_COLUMN_NAME;
91+
import static io.trino.plugin.iceberg.system.FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME;
8292
import static io.trino.plugin.iceberg.system.FilesTable.SORT_ORDER_ID_COLUMN_NAME;
8393
import static io.trino.plugin.iceberg.system.FilesTable.SPEC_ID_COLUMN_NAME;
8494
import static io.trino.plugin.iceberg.system.FilesTable.SPLIT_OFFSETS_COLUMN_NAME;
@@ -95,6 +105,7 @@
95105
import static io.trino.spi.type.VarbinaryType.VARBINARY;
96106
import static io.trino.spi.type.VarcharType.VARCHAR;
97107
import static java.util.Objects.requireNonNull;
108+
import static org.apache.iceberg.IcebergManifestUtils.liveEntriesWithMetadata;
98109
import static org.apache.iceberg.MetricsUtil.readableMetricsStruct;
99110

100111
public final class FilesTablePageSource
@@ -108,7 +119,7 @@ public final class FilesTablePageSource
108119
private final List<PartitionField> partitionFields;
109120
private final Optional<IcebergPartitionColumn> partitionColumnType;
110121
private final List<Types.NestedField> primitiveFields;
111-
private final Iterator<? extends ContentFile<?>> contentIterator;
122+
private final Iterator<FileEntryWithMetadata> entryIterator;
112123
private final Map<String, Integer> columnNameToIndex;
113124
private final PageBuilder pageBuilder;
114125
private final long completedBytes;
@@ -137,7 +148,7 @@ public FilesTablePageSource(
137148
.collect(toImmutableList());
138149
ManifestReader<? extends ContentFile<?>> manifestReader = closer.register(readerForManifest(split.manifestFile(), fileIoFactory.create(trinoFileSystem), idToPartitionSpecMapping));
139150
// TODO figure out why selecting the specific column causes null to be returned for offset_splits
140-
this.contentIterator = closer.register(requireNonNull(manifestReader, "manifestReader is null").iterator());
151+
this.entryIterator = closer.register(liveEntriesWithMetadata(requireNonNull(manifestReader, "manifestReader is null")).iterator());
141152
this.pageBuilder = new PageBuilder(requiredColumns.stream().map(column -> {
142153
if (column.equals(PARTITION_COLUMN_NAME)) {
143154
return split.partitionColumnType().orElseThrow();
@@ -184,10 +195,11 @@ public SourcePage getNextSourcePage()
184195
return null;
185196
}
186197

187-
while (contentIterator.hasNext() && !pageBuilder.isFull()) {
198+
while (entryIterator.hasNext() && !pageBuilder.isFull()) {
188199
pageBuilder.declarePosition();
189200
long start = System.nanoTime();
190-
ContentFile<?> contentFile = contentIterator.next();
201+
FileEntryWithMetadata entry = entryIterator.next();
202+
ContentFile<?> contentFile = entry.file();
191203

192204
writeValueOrNull(pageBuilder, CONTENT_COLUMN_NAME, () -> contentFile.content().id(), INTEGER::writeInt);
193205
writeValueOrNull(pageBuilder, FILE_PATH_COLUMN_NAME, contentFile::location, VARCHAR::writeString);
@@ -218,22 +230,23 @@ public SourcePage getNextSourcePage()
218230
(blkBldr, value) -> INTEGER.writeLong(blkBldr, value));
219231
writeValueOrNull(pageBuilder, READABLE_METRICS_COLUMN_NAME, () -> metadataSchema.findField(MetricsUtil.READABLE_METRICS),
220232
(blkBldr, value) -> VARCHAR.writeString(blkBldr, readableMetricsToJson(readableMetricsStruct(schema, contentFile, value.type().asStructType()), primitiveFields)));
221-
writeValueOrNull(pageBuilder, FilesTable.FILE_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::fileSequenceNumber, BIGINT::writeLong);
222-
writeValueOrNull(pageBuilder, FilesTable.DATA_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::dataSequenceNumber, BIGINT::writeLong);
233+
writeValueOrNull(pageBuilder, ADDED_SNAPSHOT_ID_COLUMN_NAME, entry::snapshotId, BIGINT::writeLong);
234+
writeValueOrNull(pageBuilder, FILE_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::fileSequenceNumber, BIGINT::writeLong);
235+
writeValueOrNull(pageBuilder, DATA_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::dataSequenceNumber, BIGINT::writeLong);
223236
if (contentFile instanceof DeleteFile deleteFile) {
224-
writeValueOrNull(pageBuilder, FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME, deleteFile::referencedDataFile, VARCHAR::writeString);
225-
writeValueOrNull(pageBuilder, FilesTable.CONTENT_OFFSET_COLUMN_NAME, deleteFile::contentOffset, BIGINT::writeLong);
226-
writeValueOrNull(pageBuilder, FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME, deleteFile::contentSizeInBytes, BIGINT::writeLong);
237+
writeValueOrNull(pageBuilder, REFERENCED_DATA_FILE_COLUMN_NAME, deleteFile::referencedDataFile, VARCHAR::writeString);
238+
writeValueOrNull(pageBuilder, CONTENT_OFFSET_COLUMN_NAME, deleteFile::contentOffset, BIGINT::writeLong);
239+
writeValueOrNull(pageBuilder, CONTENT_SIZE_IN_BYTES_COLUMN_NAME, deleteFile::contentSizeInBytes, BIGINT::writeLong);
227240
}
228241
else {
229242
// For non-delete files, these columns should be null
230-
writeNull(pageBuilder, FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME);
231-
writeNull(pageBuilder, FilesTable.CONTENT_OFFSET_COLUMN_NAME);
232-
writeNull(pageBuilder, FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME);
243+
writeNull(pageBuilder, REFERENCED_DATA_FILE_COLUMN_NAME);
244+
writeNull(pageBuilder, CONTENT_OFFSET_COLUMN_NAME);
245+
writeNull(pageBuilder, CONTENT_SIZE_IN_BYTES_COLUMN_NAME);
233246
}
234-
writeValueOrNull(pageBuilder, FilesTable.POS_COLUMN_NAME, contentFile::pos, BIGINT::writeLong);
235-
writeValueOrNull(pageBuilder, FilesTable.MANIFEST_LOCATION_COLUMN_NAME, contentFile::manifestLocation, VARCHAR::writeString);
236-
writeValueOrNull(pageBuilder, FilesTable.FIRST_ROW_ID_COLUMN_NAME, contentFile::firstRowId, BIGINT::writeLong);
247+
writeValueOrNull(pageBuilder, POS_COLUMN_NAME, contentFile::pos, BIGINT::writeLong);
248+
writeValueOrNull(pageBuilder, MANIFEST_LOCATION_COLUMN_NAME, contentFile::manifestLocation, VARCHAR::writeString);
249+
writeValueOrNull(pageBuilder, FIRST_ROW_ID_COLUMN_NAME, contentFile::firstRowId, BIGINT::writeLong);
237250
readTimeNanos += System.nanoTime() - start;
238251
}
239252

plugin/trino-iceberg/src/main/java/org/apache/iceberg/IcebergManifestUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,13 @@ public static <F extends ContentFile<F>> CloseableIterable<ContentFile<F>> liveE
3434
{
3535
return CloseableIterable.transform(manifestReader.liveEntries(), ManifestEntry::file);
3636
}
37+
38+
public static <F extends ContentFile<F>> CloseableIterable<FileEntryWithMetadata> liveEntriesWithMetadata(ManifestReader<F> manifestReader)
39+
{
40+
return CloseableIterable.transform(manifestReader.liveEntries(), entry -> new FileEntryWithMetadata(entry.file(), entry.snapshotId()));
41+
}
42+
43+
public record FileEntryWithMetadata(
44+
ContentFile<?> file,
45+
long snapshotId) {}
3746
}

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ public void testFilesPartitionTable()
538538
"('equality_ids', 'array(integer)', '', '')," +
539539
"('sort_order_id', 'integer', '', '')," +
540540
"('readable_metrics', 'json', '', '')," +
541+
"('added_snapshot_id', 'bigint', '', '')," +
541542
"('file_sequence_number', 'bigint', '', '')," +
542543
"('data_sequence_number', 'bigint', '', '')," +
543544
"('referenced_data_file', 'varchar', '', '')," +
@@ -558,6 +559,18 @@ public void testFilesPartitionTable()
558559
.build());
559560
}
560561

562+
@Test
563+
public void testFilesTableAddedSnapshotId()
564+
{
565+
assertThat(query("SELECT count(*) FROM test_schema.\"test_table$files\" WHERE added_snapshot_id IS NOT NULL"))
566+
.matches("VALUES BIGINT '4'");
567+
assertThat(query("SELECT count(DISTINCT added_snapshot_id) FROM test_schema.\"test_table$files\""))
568+
.matches("VALUES BIGINT '2'");
569+
assertThat(query("SELECT count(*) FROM test_schema.\"test_table$files\" f " +
570+
"WHERE f.added_snapshot_id IN (SELECT snapshot_id FROM test_schema.\"test_table$snapshots\")"))
571+
.matches("VALUES BIGINT '4'");
572+
}
573+
561574
@Test
562575
void testFilesTableReadableMetrics()
563576
{

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DataFileRecord
3737
@SuppressWarnings("unchecked")
3838
public static DataFileRecord toDataFileRecord(MaterializedRow row)
3939
{
40-
assertThat(row.getFieldCount()).isEqualTo(25);
40+
assertThat(row.getFieldCount()).isEqualTo(26);
4141
return new DataFileRecord(
4242
(int) row.getField(0),
4343
(String) row.getField(1),

0 commit comments

Comments
 (0)