Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -1490,9 +1490,9 @@ SELECT * FROM "test_table$files";
```

```text
content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | file_sequence_number | data_sequence_number | referenced_data_file | pos | manifest_location | first_row_id | content_offset | content_size_in_bytes
----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+----------------+----------------------+----------------------+----------------------+-----+------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+----------------------
0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | <null> | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | <null> | <null> | <null> | 1 | 1 | <null> | 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/snap-6116016324956900164-0-3c1b2496-0670-4e37-81f6.avro | <null> | <null> | <null>
content | file_path | record_count | file_format | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | added_snapshot_id | file_sequence_number | data_sequence_number | referenced_data_file | pos | manifest_location | first_row_id | content_offset | content_size_in_bytes
----------+-------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------+----------------------+----------------------+-------------------+--------------------+-------------------+-----------------------------+-----------------------------+----------------+----------------+----------------+--------------------+----------------------+----------------------+----------------------+-----+------------------------------------------------------------------------------------------------------------------------------+--------------+----------------+----------------------
0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/data/c1=3/c2=2021-01-14/af9872b2-40f3-428f-9c87-186d2750d84e.parquet | 1 | PARQUET | 442 | {1=40, 2=40, 3=44} | {1=1, 2=1, 3=1} | {1=0, 2=0, 3=0} | <null> | {1=3, 2=2021-01-14, 3=1.3} | {1=3, 2=2021-01-14, 3=1.3} | <null> | <null> | <null> | 6116016324956900164 | 1 | 1 | <null> | 0 | hdfs://hadoop-master:9000/user/hive/warehouse/test_table/metadata/snap-6116016324956900164-0-3c1b2496-0670-4e37-81f6.avro | <null> | <null> | <null>
```

The output of the query has the following columns:
Expand Down Expand Up @@ -1570,6 +1570,15 @@ The output of the query has the following columns:
* - `readable_metrics`
- `JSON`
- File metrics in human-readable form.
* - `added_snapshot_id`
Comment thread
chenjian2664 marked this conversation as resolved.
- `BIGINT`
- The snapshot ID when the file was first added to the table, as recorded in
the selected snapshot's live manifest entry. This makes it possible to join
current live files with `$snapshots` to inspect when each file was
introduced. If a file is moved to a different manifest by manifest rewrite,
the manifest location may change, but `added_snapshot_id` still refers to
the snapshot in which the file was originally added. Use `$entries` or
`$all_entries` to inspect historical manifest references across snapshots.
* - `file_sequence_number`
- `BIGINT`
- The sequence number of the file, tracking when the file was added.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public final class FilesTable
public static final String EQUALITY_IDS_COLUMN_NAME = "equality_ids";
public static final String SORT_ORDER_ID_COLUMN_NAME = "sort_order_id";
public static final String READABLE_METRICS_COLUMN_NAME = "readable_metrics";
public static final String ADDED_SNAPSHOT_ID_COLUMN_NAME = "added_snapshot_id";
public static final String FILE_SEQUENCE_NUMBER_COLUMN_NAME = "file_sequence_number";
public static final String DATA_SEQUENCE_NUMBER_COLUMN_NAME = "data_sequence_number";
public static final String REFERENCED_DATA_FILE_COLUMN_NAME = "referenced_data_file";
Expand Down Expand Up @@ -101,6 +102,7 @@ public final class FilesTable
EQUALITY_IDS_COLUMN_NAME,
SORT_ORDER_ID_COLUMN_NAME,
READABLE_METRICS_COLUMN_NAME,
ADDED_SNAPSHOT_ID_COLUMN_NAME,
FILE_SEQUENCE_NUMBER_COLUMN_NAME,
DATA_SEQUENCE_NUMBER_COLUMN_NAME,
REFERENCED_DATA_FILE_COLUMN_NAME,
Expand Down Expand Up @@ -180,6 +182,7 @@ public static Type getColumnType(String columnName, TypeManager typeManager)
MANIFEST_LOCATION_COLUMN_NAME -> VARCHAR;
case RECORD_COUNT_COLUMN_NAME,
FILE_SIZE_IN_BYTES_COLUMN_NAME,
ADDED_SNAPSHOT_ID_COLUMN_NAME,
FILE_SEQUENCE_NUMBER_COLUMN_NAME,
DATA_SEQUENCE_NUMBER_COLUMN_NAME,
POS_COLUMN_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.IcebergManifestUtils.FileEntryWithMetadata;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.MetricsUtil;
import org.apache.iceberg.PartitionField;
Expand Down Expand Up @@ -66,19 +67,28 @@
import static io.trino.plugin.iceberg.IcebergUtil.primitiveFieldTypes;
import static io.trino.plugin.iceberg.IcebergUtil.readerForManifest;
import static io.trino.plugin.iceberg.StructLikeWrapperWithFieldIdToIndex.createStructLikeWrapper;
import static io.trino.plugin.iceberg.system.FilesTable.ADDED_SNAPSHOT_ID_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.COLUMN_SIZES_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_OFFSET_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.DATA_SEQUENCE_NUMBER_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.EQUALITY_IDS_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.FILE_FORMAT_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.FILE_PATH_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.FILE_SEQUENCE_NUMBER_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.FILE_SIZE_IN_BYTES_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.FIRST_ROW_ID_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.KEY_METADATA_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.LOWER_BOUNDS_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.MANIFEST_LOCATION_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.NAN_VALUE_COUNTS_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.NULL_VALUE_COUNTS_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.PARTITION_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.POS_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.READABLE_METRICS_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.RECORD_COUNT_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.SORT_ORDER_ID_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.SPEC_ID_COLUMN_NAME;
import static io.trino.plugin.iceberg.system.FilesTable.SPLIT_OFFSETS_COLUMN_NAME;
Expand All @@ -95,6 +105,7 @@
import static io.trino.spi.type.VarbinaryType.VARBINARY;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.IcebergManifestUtils.liveEntriesWithMetadata;
import static org.apache.iceberg.MetricsUtil.readableMetricsStruct;

public final class FilesTablePageSource
Expand All @@ -108,7 +119,7 @@ public final class FilesTablePageSource
private final List<PartitionField> partitionFields;
private final Optional<IcebergPartitionColumn> partitionColumnType;
private final List<Types.NestedField> primitiveFields;
private final Iterator<? extends ContentFile<?>> contentIterator;
private final Iterator<FileEntryWithMetadata> entryIterator;
private final Map<String, Integer> columnNameToIndex;
private final PageBuilder pageBuilder;
private final long completedBytes;
Expand Down Expand Up @@ -137,7 +148,7 @@ public FilesTablePageSource(
.collect(toImmutableList());
ManifestReader<? extends ContentFile<?>> manifestReader = closer.register(readerForManifest(split.manifestFile(), fileIoFactory.create(trinoFileSystem), idToPartitionSpecMapping));
// TODO figure out why selecting the specific column causes null to be returned for offset_splits
this.contentIterator = closer.register(requireNonNull(manifestReader, "manifestReader is null").iterator());
this.entryIterator = closer.register(liveEntriesWithMetadata(requireNonNull(manifestReader, "manifestReader is null")).iterator());
this.pageBuilder = new PageBuilder(requiredColumns.stream().map(column -> {
if (column.equals(PARTITION_COLUMN_NAME)) {
return split.partitionColumnType().orElseThrow();
Expand Down Expand Up @@ -184,10 +195,11 @@ public SourcePage getNextSourcePage()
return null;
}

while (contentIterator.hasNext() && !pageBuilder.isFull()) {
while (entryIterator.hasNext() && !pageBuilder.isFull()) {
pageBuilder.declarePosition();
long start = System.nanoTime();
ContentFile<?> contentFile = contentIterator.next();
FileEntryWithMetadata entry = entryIterator.next();
ContentFile<?> contentFile = entry.file();

writeValueOrNull(pageBuilder, CONTENT_COLUMN_NAME, () -> contentFile.content().id(), INTEGER::writeInt);
writeValueOrNull(pageBuilder, FILE_PATH_COLUMN_NAME, contentFile::location, VARCHAR::writeString);
Expand Down Expand Up @@ -218,22 +230,23 @@ public SourcePage getNextSourcePage()
(blkBldr, value) -> INTEGER.writeLong(blkBldr, value));
writeValueOrNull(pageBuilder, READABLE_METRICS_COLUMN_NAME, () -> metadataSchema.findField(MetricsUtil.READABLE_METRICS),
(blkBldr, value) -> VARCHAR.writeString(blkBldr, readableMetricsToJson(readableMetricsStruct(schema, contentFile, value.type().asStructType()), primitiveFields)));
writeValueOrNull(pageBuilder, FilesTable.FILE_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::fileSequenceNumber, BIGINT::writeLong);
writeValueOrNull(pageBuilder, FilesTable.DATA_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::dataSequenceNumber, BIGINT::writeLong);
writeValueOrNull(pageBuilder, ADDED_SNAPSHOT_ID_COLUMN_NAME, entry::snapshotId, BIGINT::writeLong);
writeValueOrNull(pageBuilder, FILE_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::fileSequenceNumber, BIGINT::writeLong);
writeValueOrNull(pageBuilder, DATA_SEQUENCE_NUMBER_COLUMN_NAME, contentFile::dataSequenceNumber, BIGINT::writeLong);
if (contentFile instanceof DeleteFile deleteFile) {
writeValueOrNull(pageBuilder, FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME, deleteFile::referencedDataFile, VARCHAR::writeString);
writeValueOrNull(pageBuilder, FilesTable.CONTENT_OFFSET_COLUMN_NAME, deleteFile::contentOffset, BIGINT::writeLong);
writeValueOrNull(pageBuilder, FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME, deleteFile::contentSizeInBytes, BIGINT::writeLong);
writeValueOrNull(pageBuilder, REFERENCED_DATA_FILE_COLUMN_NAME, deleteFile::referencedDataFile, VARCHAR::writeString);
writeValueOrNull(pageBuilder, CONTENT_OFFSET_COLUMN_NAME, deleteFile::contentOffset, BIGINT::writeLong);
writeValueOrNull(pageBuilder, CONTENT_SIZE_IN_BYTES_COLUMN_NAME, deleteFile::contentSizeInBytes, BIGINT::writeLong);
}
else {
// For non-delete files, these columns should be null
writeNull(pageBuilder, FilesTable.REFERENCED_DATA_FILE_COLUMN_NAME);
writeNull(pageBuilder, FilesTable.CONTENT_OFFSET_COLUMN_NAME);
writeNull(pageBuilder, FilesTable.CONTENT_SIZE_IN_BYTES_COLUMN_NAME);
writeNull(pageBuilder, REFERENCED_DATA_FILE_COLUMN_NAME);
writeNull(pageBuilder, CONTENT_OFFSET_COLUMN_NAME);
writeNull(pageBuilder, CONTENT_SIZE_IN_BYTES_COLUMN_NAME);
}
writeValueOrNull(pageBuilder, FilesTable.POS_COLUMN_NAME, contentFile::pos, BIGINT::writeLong);
writeValueOrNull(pageBuilder, FilesTable.MANIFEST_LOCATION_COLUMN_NAME, contentFile::manifestLocation, VARCHAR::writeString);
writeValueOrNull(pageBuilder, FilesTable.FIRST_ROW_ID_COLUMN_NAME, contentFile::firstRowId, BIGINT::writeLong);
writeValueOrNull(pageBuilder, POS_COLUMN_NAME, contentFile::pos, BIGINT::writeLong);
writeValueOrNull(pageBuilder, MANIFEST_LOCATION_COLUMN_NAME, contentFile::manifestLocation, VARCHAR::writeString);
writeValueOrNull(pageBuilder, FIRST_ROW_ID_COLUMN_NAME, contentFile::firstRowId, BIGINT::writeLong);
readTimeNanos += System.nanoTime() - start;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ public static <F extends ContentFile<F>> CloseableIterable<ContentFile<F>> liveE
{
return CloseableIterable.transform(manifestReader.liveEntries(), ManifestEntry::file);
}

public static <F extends ContentFile<F>> CloseableIterable<FileEntryWithMetadata> liveEntriesWithMetadata(ManifestReader<F> manifestReader)
{
return CloseableIterable.transform(manifestReader.liveEntries(), entry -> new FileEntryWithMetadata(entry.file(), entry.snapshotId()));
}

public record FileEntryWithMetadata(ContentFile<?> file, long snapshotId) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ public void testFilesPartitionTable()
"('equality_ids', 'array(integer)', '', '')," +
"('sort_order_id', 'integer', '', '')," +
"('readable_metrics', 'json', '', '')," +
"('added_snapshot_id', 'bigint', '', '')," +
"('file_sequence_number', 'bigint', '', '')," +
"('data_sequence_number', 'bigint', '', '')," +
"('referenced_data_file', 'varchar', '', '')," +
Expand All @@ -558,6 +559,18 @@ public void testFilesPartitionTable()
.build());
}

@Test
public void testFilesTableAddedSnapshotId()
{
assertThat(query("SELECT count(*) FROM test_schema.\"test_table$files\" WHERE added_snapshot_id IS NOT NULL"))
.matches("VALUES BIGINT '4'");
assertThat(query("SELECT count(DISTINCT added_snapshot_id) FROM test_schema.\"test_table$files\""))
.matches("VALUES BIGINT '2'");
assertThat(query("SELECT count(*) FROM test_schema.\"test_table$files\" f " +
"WHERE f.added_snapshot_id IN (SELECT snapshot_id FROM test_schema.\"test_table$snapshots\")"))
.matches("VALUES BIGINT '4'");
}

@Test
void testFilesTableReadableMetrics()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class DataFileRecord
@SuppressWarnings("unchecked")
public static DataFileRecord toDataFileRecord(MaterializedRow row)
{
assertThat(row.getFieldCount()).isEqualTo(25);
assertThat(row.getFieldCount()).isEqualTo(26);
return new DataFileRecord(
(int) row.getField(0),
(String) row.getField(1),
Expand Down