From 8ccb4ca35da38cf48250f8744668dc932736fadf Mon Sep 17 00:00:00 2001 From: xuba Date: Thu, 11 Dec 2025 21:51:20 +0800 Subject: [PATCH 01/10] Core: Add total size fields for position and equality delete files in PartitionsTable Signed-off-by: xuba --- .../org/apache/iceberg/PartitionsTable.java | 22 ++++++++++ .../source/TestIcebergSourceTablesBase.java | 44 +++++++++++++++++++ .../source/TestIcebergSourceTablesBase.java | 44 +++++++++++++++++++ .../source/TestIcebergSourceTablesBase.java | 44 +++++++++++++++++++ 4 files changed, 154 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index 09c6e7893b7e..1b7ea1ef9076 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -71,6 +71,11 @@ public class PartitionsTable extends BaseMetadataTable { "position_delete_file_count", Types.IntegerType.get(), "Count of position delete files"), + Types.NestedField.required( + 12, + "total_position_delete_file_size_in_bytes", + Types.LongType.get(), + "Total size in bytes of position delete files"), Types.NestedField.required( 7, "equality_delete_record_count", @@ -81,6 +86,11 @@ public class PartitionsTable extends BaseMetadataTable { "equality_delete_file_count", Types.IntegerType.get(), "Count of equality delete files"), + Types.NestedField.required( + 13, + "total_equality_delete_file_size_in_bytes", + Types.LongType.get(), + "Total size in bytes of equality delete files"), Types.NestedField.optional( 9, "last_updated_at", @@ -108,8 +118,10 @@ public Schema schema() { "total_data_file_size_in_bytes", "position_delete_record_count", "position_delete_file_count", + "total_position_delete_file_size_in_bytes", "equality_delete_record_count", "equality_delete_file_count", + "total_equality_delete_file_size_in_bytes", "last_updated_at", "last_updated_snapshot_id"); } @@ -137,8 +149,10 @@ private DataTask task(StaticTableScan scan) { root.dataFileSizeInBytes, root.posDeleteRecordCount, root.posDeleteFileCount, + root.posDeleteFileSizeInBytes, root.eqDeleteRecordCount, root.eqDeleteFileCount, + root.eqDeleteFileSizeInBytes, root.lastUpdatedAt, root.lastUpdatedSnapshotId)); } else { @@ -160,8 +174,10 @@ private static StaticDataTask.Row convertPartition(Partition partition) { partition.dataFileSizeInBytes, partition.posDeleteRecordCount, partition.posDeleteFileCount, + partition.posDeleteFileSizeInBytes, partition.eqDeleteRecordCount, partition.eqDeleteFileCount, + partition.eqDeleteFileSizeInBytes, partition.lastUpdatedAt, partition.lastUpdatedSnapshotId); } @@ -277,8 +293,10 @@ static class Partition { private long dataFileSizeInBytes; private long posDeleteRecordCount; private int posDeleteFileCount; + private long posDeleteFileSizeInBytes; private long eqDeleteRecordCount; private int eqDeleteFileCount; + private long eqDeleteFileSizeInBytes; private Long lastUpdatedAt; private Long lastUpdatedSnapshotId; @@ -290,8 +308,10 @@ static class Partition { this.dataFileSizeInBytes = 0L; this.posDeleteRecordCount = 0L; this.posDeleteFileCount = 0; + this.posDeleteFileSizeInBytes = 0L; this.eqDeleteRecordCount = 0L; this.eqDeleteFileCount = 0; + this.eqDeleteFileSizeInBytes = 0L; } void update(ContentFile file, Snapshot snapshot) { @@ -314,10 +334,12 @@ void update(ContentFile file, Snapshot snapshot) { case POSITION_DELETES: this.posDeleteRecordCount += file.recordCount(); this.posDeleteFileCount += 1; + this.posDeleteFileSizeInBytes += file.fileSizeInBytes(); break; case EQUALITY_DELETES: this.eqDeleteRecordCount += file.recordCount(); this.eqDeleteFileCount += 1; + this.eqDeleteFileSizeInBytes += file.fileSizeInBytes(); break; default: throw new UnsupportedOperationException( diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 53c9ac6b2257..af3f532b6b1e 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -37,6 +37,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.StringJoiner; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -45,6 +46,7 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.Files; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; @@ -1285,6 +1287,11 @@ public void testUnpartitionedPartitionsTable() { "position_delete_file_count", Types.IntegerType.get(), "Count of position delete files"), + required( + 12, + "total_position_delete_file_size_in_bytes", + Types.LongType.get(), + "Total size in bytes of position delete files"), required( 7, "equality_delete_record_count", @@ -1295,6 +1302,11 @@ public void testUnpartitionedPartitionsTable() { "equality_delete_file_count", Types.IntegerType.get(), "Count of equality delete files"), + required( + 13, + "total_equality_delete_file_size_in_bytes", + Types.LongType.get(), + "Total size in bytes of equality delete files"), optional( 9, "last_updated_at", @@ -1325,8 +1337,10 @@ public void testUnpartitionedPartitionsTable() { totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .build(); List actual = @@ -1394,8 +1408,10 @@ public void testPartitionsTable() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1410,8 +1426,10 @@ public void testPartitionsTable() { totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(secondCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", secondCommitId) @@ -1530,8 +1548,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1546,8 +1566,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(secondCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", secondCommitId) @@ -1674,8 +1696,10 @@ public void testPartitionsTableDeleteStats() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1690,8 +1714,14 @@ public void testPartitionsTableDeleteStats() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 2L) // should be incremented now .set("position_delete_file_count", 2) // should be incremented now + .set( + "total_position_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(posDeleteCommitId).addedDeleteFiles(table.io()), + FileContent.POSITION_DELETES)) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(posDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", posDeleteCommitId) @@ -1725,8 +1755,14 @@ public void testPartitionsTableDeleteStats() { .set("file_count", 1) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 2L) // should be incremented now .set("equality_delete_file_count", 2) // should be incremented now + .set( + "total_equality_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(eqDeleteCommitId).addedDeleteFiles(table.io()), + FileContent.EQUALITY_DELETES)) .set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", eqDeleteCommitId) .build()); @@ -2446,6 +2482,14 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } + private long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { + return Lists.newArrayList(deleteFiles).stream() + .filter(Objects::nonNull) + .filter(f -> f.content() == content) + .mapToLong(DeleteFile::fileSizeInBytes) + .sum(); + } + private void assertDataFilePartitions( List dataFiles, List expectedPartitionIds) { assertThat(dataFiles) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 67647925e59d..1846da51464f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -37,6 +37,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.StringJoiner; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -45,6 +46,7 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.Files; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; @@ -1286,6 +1288,11 @@ public void testUnpartitionedPartitionsTable() { "position_delete_file_count", Types.IntegerType.get(), "Count of position delete files"), + required( + 12, + "total_position_delete_file_size_in_bytes", + Types.LongType.get(), + "Total size in bytes of position delete files"), required( 7, "equality_delete_record_count", @@ -1296,6 +1303,11 @@ public void testUnpartitionedPartitionsTable() { "equality_delete_file_count", Types.IntegerType.get(), "Count of equality delete files"), + required( + 13, + "total_equality_delete_file_size_in_bytes", + Types.LongType.get(), + "Total size in bytes of equality delete files"), optional( 9, "last_updated_at", @@ -1326,8 +1338,10 @@ public void testUnpartitionedPartitionsTable() { totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .build(); List actual = @@ -1395,8 +1409,10 @@ public void testPartitionsTable() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1411,8 +1427,10 @@ public void testPartitionsTable() { totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(secondCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", secondCommitId) @@ -1533,8 +1551,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1549,8 +1569,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(secondCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", secondCommitId) @@ -1677,8 +1699,10 @@ public void testPartitionsTableDeleteStats() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1693,8 +1717,14 @@ public void testPartitionsTableDeleteStats() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 2L) // should be incremented now .set("position_delete_file_count", 2) // should be incremented now + .set( + "total_position_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(posDeleteCommitId).addedDeleteFiles(table.io()), + FileContent.POSITION_DELETES)) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(posDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", posDeleteCommitId) @@ -1728,8 +1758,14 @@ public void testPartitionsTableDeleteStats() { .set("file_count", 1) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 2L) // should be incremented now .set("equality_delete_file_count", 2) // should be incremented now + .set( + "total_equality_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(eqDeleteCommitId).addedDeleteFiles(table.io()), + FileContent.EQUALITY_DELETES)) .set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", eqDeleteCommitId) .build()); @@ -2449,6 +2485,14 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } + private long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { + return Lists.newArrayList(deleteFiles).stream() + .filter(Objects::nonNull) + .filter(f -> f.content() == content) + .mapToLong(DeleteFile::fileSizeInBytes) + .sum(); + } + private void assertDataFilePartitions( List dataFiles, List expectedPartitionIds) { assertThat(dataFiles) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 67647925e59d..1846da51464f 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -37,6 +37,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.StringJoiner; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -45,6 +46,7 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.Files; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; @@ -1286,6 +1288,11 @@ public void testUnpartitionedPartitionsTable() { "position_delete_file_count", Types.IntegerType.get(), "Count of position delete files"), + required( + 12, + "total_position_delete_file_size_in_bytes", + Types.LongType.get(), + "Total size in bytes of position delete files"), required( 7, "equality_delete_record_count", @@ -1296,6 +1303,11 @@ public void testUnpartitionedPartitionsTable() { "equality_delete_file_count", Types.IntegerType.get(), "Count of equality delete files"), + required( + 13, + "total_equality_delete_file_size_in_bytes", + Types.LongType.get(), + "Total size in bytes of equality delete files"), optional( 9, "last_updated_at", @@ -1326,8 +1338,10 @@ public void testUnpartitionedPartitionsTable() { totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .build(); List actual = @@ -1395,8 +1409,10 @@ public void testPartitionsTable() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1411,8 +1427,10 @@ public void testPartitionsTable() { totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(secondCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", secondCommitId) @@ -1533,8 +1551,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1549,8 +1569,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(secondCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", secondCommitId) @@ -1677,8 +1699,10 @@ public void testPartitionsTableDeleteStats() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1693,8 +1717,14 @@ public void testPartitionsTableDeleteStats() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 2L) // should be incremented now .set("position_delete_file_count", 2) // should be incremented now + .set( + "total_position_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(posDeleteCommitId).addedDeleteFiles(table.io()), + FileContent.POSITION_DELETES)) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(posDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", posDeleteCommitId) @@ -1728,8 +1758,14 @@ public void testPartitionsTableDeleteStats() { .set("file_count", 1) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 2L) // should be incremented now .set("equality_delete_file_count", 2) // should be incremented now + .set( + "total_equality_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(eqDeleteCommitId).addedDeleteFiles(table.io()), + FileContent.EQUALITY_DELETES)) .set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", eqDeleteCommitId) .build()); @@ -2449,6 +2485,14 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } + private long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { + return Lists.newArrayList(deleteFiles).stream() + .filter(Objects::nonNull) + .filter(f -> f.content() == content) + .mapToLong(DeleteFile::fileSizeInBytes) + .sum(); + } + private void assertDataFilePartitions( List dataFiles, List expectedPartitionIds) { assertThat(dataFiles) From c7bfb4102d0a427e3cc1cd4440acdfca45893176 Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 12 Dec 2025 13:33:18 +0800 Subject: [PATCH 02/10] address Signed-off-by: xuba --- docs/docs/flink-queries.md | 12 ++++++------ docs/docs/spark-queries.md | 12 ++++++------ .../spark/source/TestIcebergSourceTablesBase.java | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/docs/docs/flink-queries.md b/docs/docs/flink-queries.md index 8518f2f40339..5681719e1a87 100644 --- a/docs/docs/flink-queries.md +++ b/docs/docs/flink-queries.md @@ -492,12 +492,12 @@ To show a table's current partitions: SELECT * FROM prod.db.table$partitions; ``` -| partition | spec_id | record_count | file_count | total_data_file_size_in_bytes | position_delete_record_count | position_delete_file_count | equality_delete_record_count | equality_delete_file_count | last_updated_at(μs) | last_updated_snapshot_id | -| -------------- |---------|---------------|------------|--------------------------|------------------------------|----------------------------|------------------------------|----------------------------|---------------------|--------------------------| -| {20211001, 11} | 0 | 1 | 1 | 100 | 2 | 1 | 0 | 0 | 1633086034192000 | 9205185327307503337 | -| {20211002, 11} | 0 | 4 | 3 | 500 | 1 | 1 | 0 | 0 | 1633172537358000 | 867027598972211003 | -| {20211001, 10} | 0 | 7 | 4 | 700 | 0 | 0 | 0 | 0 | 1633082598716000 | 3280122546965981531 | -| {20211002, 10} | 0 | 3 | 2 | 400 | 0 | 0 | 1 | 1 | 1633169159489000 | 6941468797545315876 | +| partition | spec_id | record_count | file_count | total_data_file_size_in_bytes | position_delete_record_count | position_delete_file_count | total_position_delete_file_size_in_bytes | equality_delete_record_count | equality_delete_file_count | total_equality_delete_file_size_in_bytes | last_updated_at(μs) | last_updated_snapshot_id | +| -------------- |---------|---------------|------------|--------------------------|------------------------------|----------------------------|------------------------------------------|------------------------------|----------------------------|------------------------------------------|---------------------|--------------------------| +| {20211001, 11} | 0 | 1 | 1 | 100 | 2 | 1 | 50 | 0 | 0 | 0 | 1633086034192000 | 9205185327307503337 | +| {20211002, 11} | 0 | 4 | 3 | 500 | 1 | 1 | 100 | 0 | 0 | 0 | 1633172537358000 | 867027598972211003 | +| {20211001, 10} | 0 | 7 | 4 | 700 | 0 | 0 | 0 | 0 | 0 | 0 | 1633082598716000 | 3280122546965981531 | +| {20211002, 10} | 0 | 3 | 2 | 400 | 0 | 0 | 0 | 1 | 1 | 400 | 1633169159489000 | 6941468797545315876 | Note: For unpartitioned tables, the partitions table will not contain the partition and spec_id fields. diff --git a/docs/docs/spark-queries.md b/docs/docs/spark-queries.md index ce626f42b14b..12d94705a12d 100644 --- a/docs/docs/spark-queries.md +++ b/docs/docs/spark-queries.md @@ -427,12 +427,12 @@ To show a table's current partitions: SELECT * FROM prod.db.table.partitions; ``` -| partition | spec_id | record_count | file_count | total_data_file_size_in_bytes | position_delete_record_count | position_delete_file_count | equality_delete_record_count | equality_delete_file_count | last_updated_at(μs) | last_updated_snapshot_id | -| -------------- |---------|---------------|------------|--------------------------|------------------------------|----------------------------|------------------------------|----------------------------|---------------------|--------------------------| -| {20211001, 11} | 0 | 1 | 1 | 100 | 2 | 1 | 0 | 0 | 1633086034192000 | 9205185327307503337 | -| {20211002, 11} | 0 | 4 | 3 | 500 | 1 | 1 | 0 | 0 | 1633172537358000 | 867027598972211003 | -| {20211001, 10} | 0 | 7 | 4 | 700 | 0 | 0 | 0 | 0 | 1633082598716000 | 3280122546965981531 | -| {20211002, 10} | 0 | 3 | 2 | 400 | 0 | 0 | 1 | 1 | 1633169159489000 | 6941468797545315876 | +| partition | spec_id | record_count | file_count | total_data_file_size_in_bytes | position_delete_record_count | position_delete_file_count | total_position_delete_file_size_in_bytes | equality_delete_record_count | equality_delete_file_count | total_equality_delete_file_size_in_bytes | last_updated_at(μs) | last_updated_snapshot_id | +| -------------- |---------|---------------|------------|--------------------------|------------------------------|----------------------------|------------------------------------------|------------------------------|----------------------------|------------------------------------------|---------------------|--------------------------| +| {20211001, 11} | 0 | 1 | 1 | 100 | 2 | 1 | 50 | 0 | 0 | 0 | 1633086034192000 | 9205185327307503337 | +| {20211002, 11} | 0 | 4 | 3 | 500 | 1 | 1 | 100 | 0 | 0 | 0 | 1633172537358000 | 867027598972211003 | +| {20211001, 10} | 0 | 7 | 4 | 700 | 0 | 0 | 0 | 0 | 0 | 0 | 1633082598716000 | 3280122546965981531 | +| {20211002, 10} | 0 | 3 | 2 | 400 | 0 | 0 | 0 | 1 | 1 | 400 | 1633169159489000 | 6941468797545315876 | Note: diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 1846da51464f..0bd0d36d882f 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2485,7 +2485,7 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } - private long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { + private static long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { return Lists.newArrayList(deleteFiles).stream() .filter(Objects::nonNull) .filter(f -> f.content() == content) From a1d2758caa5cdc542e7e97d6b841ea5f9a0957cb Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 12 Dec 2025 13:37:53 +0800 Subject: [PATCH 03/10] change totalDeleteFileSizeInBytes method to static Signed-off-by: xuba --- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 2 +- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index af3f532b6b1e..fc0a5e79d14b 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2482,7 +2482,7 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } - private long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { + private static long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { return Lists.newArrayList(deleteFiles).stream() .filter(Objects::nonNull) .filter(f -> f.content() == content) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 1846da51464f..0bd0d36d882f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2485,7 +2485,7 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } - private long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { + private static long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { return Lists.newArrayList(deleteFiles).stream() .filter(Objects::nonNull) .filter(f -> f.content() == content) From 999b8e301cf7d2326bfa9d4b1fba7c81344f6297 Mon Sep 17 00:00:00 2001 From: xuba Date: Fri, 12 Dec 2025 13:48:50 +0800 Subject: [PATCH 04/10] spotless Signed-off-by: xuba --- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 3 ++- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 3 ++- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index fc0a5e79d14b..b0b0e41b427d 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2482,7 +2482,8 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } - private static long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { + private static long totalDeleteFileSizeInBytes( + Iterable deleteFiles, FileContent content) { return Lists.newArrayList(deleteFiles).stream() .filter(Objects::nonNull) .filter(f -> f.content() == content) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 0bd0d36d882f..5986156a54e7 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2485,7 +2485,8 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } - private static long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { + private static long totalDeleteFileSizeInBytes( + Iterable deleteFiles, FileContent content) { return Lists.newArrayList(deleteFiles).stream() .filter(Objects::nonNull) .filter(f -> f.content() == content) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 0bd0d36d882f..5986156a54e7 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -2485,7 +2485,8 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } - private static long totalDeleteFileSizeInBytes(Iterable deleteFiles, FileContent content) { + private static long totalDeleteFileSizeInBytes( + Iterable deleteFiles, FileContent content) { return Lists.newArrayList(deleteFiles).stream() .filter(Objects::nonNull) .filter(f -> f.content() == content) From f6c9a558e700cda10a36e384ffe4deda4305e9e6 Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 16 Dec 2025 10:25:32 +0800 Subject: [PATCH 05/10] Add total size fields for position and equality delete files Signed-off-by: xuba --- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 5986156a54e7..c1d96828d6f2 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1609,8 +1609,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", null) .set("last_updated_snapshot_id", null) From 4545ecac2d5cb27b9c9df1e1df56d3b6d790de9b Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 16 Dec 2025 15:24:38 +0800 Subject: [PATCH 06/10] add for spark3.5, spark3.4 Signed-off-by: xuba --- .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 2 ++ .../iceberg/spark/source/TestIcebergSourceTablesBase.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index b0b0e41b427d..09da70eafcff 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1606,8 +1606,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", null) .set("last_updated_snapshot_id", null) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 5986156a54e7..c1d96828d6f2 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1609,8 +1609,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", null) .set("last_updated_snapshot_id", null) From 45783c3effb68679563e181ab34ffe1d1cd9a992 Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 10 Feb 2026 11:25:07 +0800 Subject: [PATCH 07/10] Add tests for partitions table schema and data validation Signed-off-by: xuba --- .../flink/source/TestFlinkMetaDataTable.java | 126 ++++++++++++++++++ .../flink/source/TestFlinkMetaDataTable.java | 126 ++++++++++++++++++ .../flink/source/TestFlinkMetaDataTable.java | 126 ++++++++++++++++++ 3 files changed, 378 insertions(+) diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index 6ae7bfb53a2e..fa299515382b 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -419,6 +419,43 @@ public void testPartitionedTable() throws Exception { assertThat(actualPartitionsWithProjection.get(i).getField(0)).isEqualTo(1); } + // check partitions table schema + List actualPartitions = + sql("SELECT * FROM %s$partitions ORDER BY `partition`.`data`", TABLE_NAME); + assertThat(actualPartitions).hasSize(2); + + Row partitionA = actualPartitions.get(0); + assertThat(partitionA.getField("partition")).isEqualTo(Row.of("a")); + assertThat(partitionA.getField("spec_id")).isEqualTo(0); + assertThat(partitionA.getField("record_count")).isEqualTo(2L); + assertThat(partitionA.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionA.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionA.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_record_count")).isEqualTo(1L); + assertThat(partitionA.getField("equality_delete_file_count")).isEqualTo(1); + assertThat((Long) partitionA.getField("total_equality_delete_file_size_in_bytes")) + .isGreaterThan(0L); + assertThat(partitionA.getField("last_updated_at")).isNotNull(); + assertThat(partitionA.getField("last_updated_snapshot_id")).isNotNull(); + + Row partitionB = actualPartitions.get(1); + assertThat(partitionB.getField("partition")).isEqualTo(Row.of("b")); + assertThat(partitionB.getField("spec_id")).isEqualTo(0); + assertThat(partitionB.getField("record_count")).isEqualTo(2L); + assertThat(partitionB.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionB.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_record_count")).isEqualTo(1L); + assertThat(partitionB.getField("equality_delete_file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_equality_delete_file_size_in_bytes")) + .isGreaterThan(0L); + assertThat(partitionB.getField("last_updated_at")).isNotNull(); + assertThat(partitionB.getField("last_updated_snapshot_id")).isNotNull(); + // Check files table List expectedFiles = Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream()) @@ -592,6 +629,95 @@ public void testAllFilesPartitioned() throws Exception { TestHelpers.assertEquals(filesTableSchema, expectedFiles, actualFiles); } + @TestTemplate + public void testPartitionsTable() { + assumeThat(isPartition).isTrue(); + + // 2 inserts for partition 'a' and 'b' in @BeforeEach, 2 records each + // Check partitions table + List partitions = + sql("SELECT * FROM %s$partitions ORDER BY `partition`.`data`", TABLE_NAME); + assertThat(partitions).hasSize(2); + + // Verify partition 'a' + Row partitionA = partitions.get(0); + assertThat(partitionA.getField("partition")).isEqualTo(Row.of("a")); + assertThat(partitionA.getField("spec_id")).isEqualTo(0); + assertThat(partitionA.getField("record_count")).isEqualTo(2L); + assertThat(partitionA.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionA.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionA.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("last_updated_at")).isNotNull(); + assertThat(partitionA.getField("last_updated_snapshot_id")).isNotNull(); + + // Verify partition 'b' + Row partitionB = partitions.get(1); + assertThat(partitionB.getField("partition")).isEqualTo(Row.of("b")); + assertThat(partitionB.getField("spec_id")).isEqualTo(0); + assertThat(partitionB.getField("record_count")).isEqualTo(2L); + assertThat(partitionB.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionB.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionB.getField("last_updated_at")).isNotNull(); + assertThat(partitionB.getField("last_updated_snapshot_id")).isNotNull(); + + long partitionASnapshotId = (Long) partitionA.getField("last_updated_snapshot_id"); + long partitionBSnapshotId = (Long) partitionB.getField("last_updated_snapshot_id"); + + // partitions 'a' and 'b' were inserted in separate commits + assertThat(partitionASnapshotId).isNotEqualTo(partitionBSnapshotId); + + // Test filtering by partition + List filteredPartitions = + sql("SELECT * FROM %s$partitions WHERE `partition`.`data` = 'a'", TABLE_NAME); + assertThat(filteredPartitions).hasSize(1); + assertThat(filteredPartitions.get(0).getField("partition")).isEqualTo(Row.of("a")); + + // Test projection + List projectedPartitions = + sql( + "SELECT record_count, file_count FROM %s$partitions ORDER BY `partition`.`data`", + TABLE_NAME); + assertThat(projectedPartitions).hasSize(2); + assertThat(projectedPartitions.get(0).getField("record_count")).isEqualTo(2L); + assertThat(projectedPartitions.get(0).getField("file_count")).isEqualTo(1); + assertThat(projectedPartitions.get(1).getField("record_count")).isEqualTo(2L); + assertThat(projectedPartitions.get(1).getField("file_count")).isEqualTo(1); + } + + @TestTemplate + public void testPartitionsTableForUnpartitionedTable() { + assumeThat(isPartition).isFalse(); + + // For unpartitioned tables, there should be one row representing the whole table + List partitions = sql("SELECT * FROM %s$partitions", TABLE_NAME); + assertThat(partitions).hasSize(1); + + Row partition = partitions.get(0); + // Unpartitioned table should have record count = 4 (3 from first insert + 1 from second insert) + assertThat(partition.getField("record_count")).isEqualTo(4L); + assertThat(partition.getField("file_count")).isEqualTo(2); + assertThat((Long) partition.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partition.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partition.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partition.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partition.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partition.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partition.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partition.getField("last_updated_at")).isNotNull(); + assertThat(partition.getField("last_updated_snapshot_id")).isNotNull(); + } + @TestTemplate public void testMetadataLogEntries() { Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index 6ae7bfb53a2e..fa299515382b 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -419,6 +419,43 @@ public void testPartitionedTable() throws Exception { assertThat(actualPartitionsWithProjection.get(i).getField(0)).isEqualTo(1); } + // check partitions table schema + List actualPartitions = + sql("SELECT * FROM %s$partitions ORDER BY `partition`.`data`", TABLE_NAME); + assertThat(actualPartitions).hasSize(2); + + Row partitionA = actualPartitions.get(0); + assertThat(partitionA.getField("partition")).isEqualTo(Row.of("a")); + assertThat(partitionA.getField("spec_id")).isEqualTo(0); + assertThat(partitionA.getField("record_count")).isEqualTo(2L); + assertThat(partitionA.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionA.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionA.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_record_count")).isEqualTo(1L); + assertThat(partitionA.getField("equality_delete_file_count")).isEqualTo(1); + assertThat((Long) partitionA.getField("total_equality_delete_file_size_in_bytes")) + .isGreaterThan(0L); + assertThat(partitionA.getField("last_updated_at")).isNotNull(); + assertThat(partitionA.getField("last_updated_snapshot_id")).isNotNull(); + + Row partitionB = actualPartitions.get(1); + assertThat(partitionB.getField("partition")).isEqualTo(Row.of("b")); + assertThat(partitionB.getField("spec_id")).isEqualTo(0); + assertThat(partitionB.getField("record_count")).isEqualTo(2L); + assertThat(partitionB.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionB.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_record_count")).isEqualTo(1L); + assertThat(partitionB.getField("equality_delete_file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_equality_delete_file_size_in_bytes")) + .isGreaterThan(0L); + assertThat(partitionB.getField("last_updated_at")).isNotNull(); + assertThat(partitionB.getField("last_updated_snapshot_id")).isNotNull(); + // Check files table List expectedFiles = Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream()) @@ -592,6 +629,95 @@ public void testAllFilesPartitioned() throws Exception { TestHelpers.assertEquals(filesTableSchema, expectedFiles, actualFiles); } + @TestTemplate + public void testPartitionsTable() { + assumeThat(isPartition).isTrue(); + + // 2 inserts for partition 'a' and 'b' in @BeforeEach, 2 records each + // Check partitions table + List partitions = + sql("SELECT * FROM %s$partitions ORDER BY `partition`.`data`", TABLE_NAME); + assertThat(partitions).hasSize(2); + + // Verify partition 'a' + Row partitionA = partitions.get(0); + assertThat(partitionA.getField("partition")).isEqualTo(Row.of("a")); + assertThat(partitionA.getField("spec_id")).isEqualTo(0); + assertThat(partitionA.getField("record_count")).isEqualTo(2L); + assertThat(partitionA.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionA.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionA.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("last_updated_at")).isNotNull(); + assertThat(partitionA.getField("last_updated_snapshot_id")).isNotNull(); + + // Verify partition 'b' + Row partitionB = partitions.get(1); + assertThat(partitionB.getField("partition")).isEqualTo(Row.of("b")); + assertThat(partitionB.getField("spec_id")).isEqualTo(0); + assertThat(partitionB.getField("record_count")).isEqualTo(2L); + assertThat(partitionB.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionB.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionB.getField("last_updated_at")).isNotNull(); + assertThat(partitionB.getField("last_updated_snapshot_id")).isNotNull(); + + long partitionASnapshotId = (Long) partitionA.getField("last_updated_snapshot_id"); + long partitionBSnapshotId = (Long) partitionB.getField("last_updated_snapshot_id"); + + // partitions 'a' and 'b' were inserted in separate commits + assertThat(partitionASnapshotId).isNotEqualTo(partitionBSnapshotId); + + // Test filtering by partition + List filteredPartitions = + sql("SELECT * FROM %s$partitions WHERE `partition`.`data` = 'a'", TABLE_NAME); + assertThat(filteredPartitions).hasSize(1); + assertThat(filteredPartitions.get(0).getField("partition")).isEqualTo(Row.of("a")); + + // Test projection + List projectedPartitions = + sql( + "SELECT record_count, file_count FROM %s$partitions ORDER BY `partition`.`data`", + TABLE_NAME); + assertThat(projectedPartitions).hasSize(2); + assertThat(projectedPartitions.get(0).getField("record_count")).isEqualTo(2L); + assertThat(projectedPartitions.get(0).getField("file_count")).isEqualTo(1); + assertThat(projectedPartitions.get(1).getField("record_count")).isEqualTo(2L); + assertThat(projectedPartitions.get(1).getField("file_count")).isEqualTo(1); + } + + @TestTemplate + public void testPartitionsTableForUnpartitionedTable() { + assumeThat(isPartition).isFalse(); + + // For unpartitioned tables, there should be one row representing the whole table + List partitions = sql("SELECT * FROM %s$partitions", TABLE_NAME); + assertThat(partitions).hasSize(1); + + Row partition = partitions.get(0); + // Unpartitioned table should have record count = 4 (3 from first insert + 1 from second insert) + assertThat(partition.getField("record_count")).isEqualTo(4L); + assertThat(partition.getField("file_count")).isEqualTo(2); + assertThat((Long) partition.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partition.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partition.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partition.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partition.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partition.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partition.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partition.getField("last_updated_at")).isNotNull(); + assertThat(partition.getField("last_updated_snapshot_id")).isNotNull(); + } + @TestTemplate public void testMetadataLogEntries() { Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index 6ae7bfb53a2e..fa299515382b 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -419,6 +419,43 @@ public void testPartitionedTable() throws Exception { assertThat(actualPartitionsWithProjection.get(i).getField(0)).isEqualTo(1); } + // check partitions table schema + List actualPartitions = + sql("SELECT * FROM %s$partitions ORDER BY `partition`.`data`", TABLE_NAME); + assertThat(actualPartitions).hasSize(2); + + Row partitionA = actualPartitions.get(0); + assertThat(partitionA.getField("partition")).isEqualTo(Row.of("a")); + assertThat(partitionA.getField("spec_id")).isEqualTo(0); + assertThat(partitionA.getField("record_count")).isEqualTo(2L); + assertThat(partitionA.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionA.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionA.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_record_count")).isEqualTo(1L); + assertThat(partitionA.getField("equality_delete_file_count")).isEqualTo(1); + assertThat((Long) partitionA.getField("total_equality_delete_file_size_in_bytes")) + .isGreaterThan(0L); + assertThat(partitionA.getField("last_updated_at")).isNotNull(); + assertThat(partitionA.getField("last_updated_snapshot_id")).isNotNull(); + + Row partitionB = actualPartitions.get(1); + assertThat(partitionB.getField("partition")).isEqualTo(Row.of("b")); + assertThat(partitionB.getField("spec_id")).isEqualTo(0); + assertThat(partitionB.getField("record_count")).isEqualTo(2L); + assertThat(partitionB.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionB.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_record_count")).isEqualTo(1L); + assertThat(partitionB.getField("equality_delete_file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_equality_delete_file_size_in_bytes")) + .isGreaterThan(0L); + assertThat(partitionB.getField("last_updated_at")).isNotNull(); + assertThat(partitionB.getField("last_updated_snapshot_id")).isNotNull(); + // Check files table List expectedFiles = Stream.concat(expectedDataFiles.stream(), expectedDeleteFiles.stream()) @@ -592,6 +629,95 @@ public void testAllFilesPartitioned() throws Exception { TestHelpers.assertEquals(filesTableSchema, expectedFiles, actualFiles); } + @TestTemplate + public void testPartitionsTable() { + assumeThat(isPartition).isTrue(); + + // 2 inserts for partition 'a' and 'b' in @BeforeEach, 2 records each + // Check partitions table + List partitions = + sql("SELECT * FROM %s$partitions ORDER BY `partition`.`data`", TABLE_NAME); + assertThat(partitions).hasSize(2); + + // Verify partition 'a' + Row partitionA = partitions.get(0); + assertThat(partitionA.getField("partition")).isEqualTo(Row.of("a")); + assertThat(partitionA.getField("spec_id")).isEqualTo(0); + assertThat(partitionA.getField("record_count")).isEqualTo(2L); + assertThat(partitionA.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionA.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionA.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("last_updated_at")).isNotNull(); + assertThat(partitionA.getField("last_updated_snapshot_id")).isNotNull(); + + // Verify partition 'b' + Row partitionB = partitions.get(1); + assertThat(partitionB.getField("partition")).isEqualTo(Row.of("b")); + assertThat(partitionB.getField("spec_id")).isEqualTo(0); + assertThat(partitionB.getField("record_count")).isEqualTo(2L); + assertThat(partitionB.getField("file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partitionB.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionB.getField("last_updated_at")).isNotNull(); + assertThat(partitionB.getField("last_updated_snapshot_id")).isNotNull(); + + long partitionASnapshotId = (Long) partitionA.getField("last_updated_snapshot_id"); + long partitionBSnapshotId = (Long) partitionB.getField("last_updated_snapshot_id"); + + // partitions 'a' and 'b' were inserted in separate commits + assertThat(partitionASnapshotId).isNotEqualTo(partitionBSnapshotId); + + // Test filtering by partition + List filteredPartitions = + sql("SELECT * FROM %s$partitions WHERE `partition`.`data` = 'a'", TABLE_NAME); + assertThat(filteredPartitions).hasSize(1); + assertThat(filteredPartitions.get(0).getField("partition")).isEqualTo(Row.of("a")); + + // Test projection + List projectedPartitions = + sql( + "SELECT record_count, file_count FROM %s$partitions ORDER BY `partition`.`data`", + TABLE_NAME); + assertThat(projectedPartitions).hasSize(2); + assertThat(projectedPartitions.get(0).getField("record_count")).isEqualTo(2L); + assertThat(projectedPartitions.get(0).getField("file_count")).isEqualTo(1); + assertThat(projectedPartitions.get(1).getField("record_count")).isEqualTo(2L); + assertThat(projectedPartitions.get(1).getField("file_count")).isEqualTo(1); + } + + @TestTemplate + public void testPartitionsTableForUnpartitionedTable() { + assumeThat(isPartition).isFalse(); + + // For unpartitioned tables, there should be one row representing the whole table + List partitions = sql("SELECT * FROM %s$partitions", TABLE_NAME); + assertThat(partitions).hasSize(1); + + Row partition = partitions.get(0); + // Unpartitioned table should have record count = 4 (3 from first insert + 1 from second insert) + assertThat(partition.getField("record_count")).isEqualTo(4L); + assertThat(partition.getField("file_count")).isEqualTo(2); + assertThat((Long) partition.getField("total_data_file_size_in_bytes")).isGreaterThan(0L); + assertThat(partition.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partition.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partition.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partition.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partition.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partition.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partition.getField("last_updated_at")).isNotNull(); + assertThat(partition.getField("last_updated_snapshot_id")).isNotNull(); + } + @TestTemplate public void testMetadataLogEntries() { Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME)); From ec90f6c4b0760fb4cc8ea692c37da57dc1b34731 Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 10 Feb 2026 13:14:44 +0800 Subject: [PATCH 08/10] Add total size fields for position and equality delete files in partitions table Signed-off-by: xuba --- .../source/TestIcebergSourceTablesBase.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 67647925e59d..21430861935b 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -37,6 +37,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.StringJoiner; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -45,6 +46,7 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; import org.apache.iceberg.Files; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; @@ -1286,6 +1288,11 @@ public void testUnpartitionedPartitionsTable() { "position_delete_file_count", Types.IntegerType.get(), "Count of position delete files"), + required( + 12, + "total_position_delete_file_size_in_bytes", + Types.LongType.get(), + "Total size in bytes of position delete files"), required( 7, "equality_delete_record_count", @@ -1296,6 +1303,11 @@ public void testUnpartitionedPartitionsTable() { "equality_delete_file_count", Types.IntegerType.get(), "Count of equality delete files"), + required( + 13, + "total_equality_delete_file_size_in_bytes", + Types.LongType.get(), + "Total size in bytes of equality delete files"), optional( 9, "last_updated_at", @@ -1326,8 +1338,10 @@ public void testUnpartitionedPartitionsTable() { totalSizeInBytes(table.currentSnapshot().addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .build(); List actual = @@ -1395,8 +1409,10 @@ public void testPartitionsTable() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1411,8 +1427,10 @@ public void testPartitionsTable() { totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(secondCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", secondCommitId) @@ -1533,8 +1551,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1549,8 +1569,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { dataFiles.get(1).fileSizeInBytes() + dataFiles.get(2).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(secondCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", secondCommitId) @@ -1587,8 +1609,10 @@ public void testPartitionsTableLastUpdatedSnapshot() { .set("total_data_file_size_in_bytes", dataFiles.get(0).fileSizeInBytes()) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", null) .set("last_updated_snapshot_id", null) @@ -1677,8 +1701,10 @@ public void testPartitionsTableDeleteStats() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", firstCommitId) @@ -1693,8 +1719,14 @@ public void testPartitionsTableDeleteStats() { totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) .set("position_delete_record_count", 2L) // should be incremented now .set("position_delete_file_count", 2) // should be incremented now + .set( + "total_position_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(posDeleteCommitId).addedDeleteFiles(table.io()), + FileContent.POSITION_DELETES)) .set("equality_delete_record_count", 0L) .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) .set("spec_id", 0) .set("last_updated_at", table.snapshot(posDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", posDeleteCommitId) @@ -1730,6 +1762,11 @@ public void testPartitionsTableDeleteStats() { .set("position_delete_file_count", 0) .set("equality_delete_record_count", 2L) // should be incremented now .set("equality_delete_file_count", 2) // should be incremented now + .set( + "total_equality_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(eqDeleteCommitId).addedDeleteFiles(table.io()), + FileContent.EQUALITY_DELETES)) .set("last_updated_at", table.snapshot(eqDeleteCommitId).timestampMillis() * 1000) .set("last_updated_snapshot_id", eqDeleteCommitId) .build()); @@ -2449,6 +2486,15 @@ private long totalSizeInBytes(Iterable dataFiles) { return Lists.newArrayList(dataFiles).stream().mapToLong(DataFile::fileSizeInBytes).sum(); } + private static long totalDeleteFileSizeInBytes( + Iterable deleteFiles, FileContent content) { + return Lists.newArrayList(deleteFiles).stream() + .filter(Objects::nonNull) + .filter(f -> f.content() == content) + .mapToLong(DeleteFile::fileSizeInBytes) + .sum(); + } + private void assertDataFilePartitions( List dataFiles, List expectedPartitionIds) { assertThat(dataFiles) From e9e40ffca334edbb55743f61c215051ba7662656 Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 10 Feb 2026 14:50:44 +0800 Subject: [PATCH 09/10] fix Signed-off-by: xuba --- .../apache/iceberg/spark/source/TestIcebergSourceTablesBase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 21430861935b..c1d96828d6f2 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -1760,6 +1760,7 @@ public void testPartitionsTableDeleteStats() { .set("file_count", 1) .set("position_delete_record_count", 0L) .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) .set("equality_delete_record_count", 2L) // should be incremented now .set("equality_delete_file_count", 2) // should be incremented now .set( From 8c46f88eb779a009814640ffd69a1694de07c763 Mon Sep 17 00:00:00 2001 From: xuba Date: Tue, 17 Mar 2026 14:08:13 +0800 Subject: [PATCH 10/10] Add tests for partitions table delete stats with delete vectors Signed-off-by: xuba --- .../flink/source/TestFlinkMetaDataTable.java | 74 ++++++++++ .../flink/source/TestFlinkMetaDataTable.java | 74 ++++++++++ .../flink/source/TestFlinkMetaDataTable.java | 74 ++++++++++ .../source/TestIcebergSourceTablesBase.java | 126 ++++++++++++++++++ .../source/TestIcebergSourceTablesBase.java | 126 ++++++++++++++++++ .../source/TestIcebergSourceTablesBase.java | 126 ++++++++++++++++++ .../source/TestIcebergSourceTablesBase.java | 126 ++++++++++++++++++ 7 files changed, 726 insertions(+) diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index fa299515382b..334b83d76128 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -37,6 +37,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; @@ -51,6 +52,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.avro.Avro; @@ -59,11 +61,15 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.CatalogTestBase; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -695,6 +701,60 @@ public void testPartitionsTable() { assertThat(projectedPartitions.get(1).getField("file_count")).isEqualTo(1); } + @TestTemplate + public void testPartitionsTableWithDVs() throws IOException { + assumeThat(isPartition).isTrue(); + + String dvTableName = "test_dv_table"; + sql( + "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) PARTITIONED BY (data) WITH ('format-version'='3', 'write.format.default'='%s')", + dvTableName, format.name()); + try { + sql("INSERT INTO %s VALUES (1,'a',10),(2,'a',20)", dvTableName); + sql("INSERT INTO %s VALUES (1,'b',10),(2,'b',20)", dvTableName); + + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, dvTableName)); + + // Write DV against the data file in partition 'b' + DataFile dataFile = + Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), null); + List dvs = writeDV(table, dataFile.partition(), dataFile.location(), 1); + table.newRowDelta().addDeletes(dvs.get(0)).commit(); + table.refresh(); + + List partitions = + sql("SELECT * FROM %s$partitions ORDER BY `partition`.`data`", dvTableName); + assertThat(partitions).hasSize(2); + + // partition 'a' should have no deletes + Row partitionA = partitions.get(0); + assertThat(partitionA.getField("partition")).isEqualTo(Row.of("a")); + assertThat(partitionA.getField("record_count")).isEqualTo(2L); + assertThat(partitionA.getField("file_count")).isEqualTo(1); + assertThat(partitionA.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + + // partition 'b' should have DV delete stats + Row partitionB = partitions.get(1); + assertThat(partitionB.getField("partition")).isEqualTo(Row.of("b")); + assertThat(partitionB.getField("record_count")).isEqualTo(2L); + assertThat(partitionB.getField("file_count")).isEqualTo(1); + assertThat(partitionB.getField("position_delete_record_count")).isEqualTo(1L); + assertThat(partitionB.getField("position_delete_file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_position_delete_file_size_in_bytes")) + .isGreaterThan(0L); + assertThat(partitionB.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + } finally { + sql("DROP TABLE IF EXISTS %s", dvTableName); + } + } + @TestTemplate public void testPartitionsTableForUnpartitionedTable() { assumeThat(isPartition).isFalse(); @@ -939,4 +999,18 @@ private List allDataManifests(Table table) { private List deleteManifests(Table table) { return table.currentSnapshot().deleteManifests(table.io()); } + + private List writeDV( + Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (int row = 0; row < numPositionsToDelete; row++) { + closeableWriter.delete(path, row, table.spec(), partition); + } + } + + return writer.result().deleteFiles(); + } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index fa299515382b..334b83d76128 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -37,6 +37,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; @@ -51,6 +52,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.avro.Avro; @@ -59,11 +61,15 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.CatalogTestBase; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -695,6 +701,60 @@ public void testPartitionsTable() { assertThat(projectedPartitions.get(1).getField("file_count")).isEqualTo(1); } + @TestTemplate + public void testPartitionsTableWithDVs() throws IOException { + assumeThat(isPartition).isTrue(); + + String dvTableName = "test_dv_table"; + sql( + "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) PARTITIONED BY (data) WITH ('format-version'='3', 'write.format.default'='%s')", + dvTableName, format.name()); + try { + sql("INSERT INTO %s VALUES (1,'a',10),(2,'a',20)", dvTableName); + sql("INSERT INTO %s VALUES (1,'b',10),(2,'b',20)", dvTableName); + + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, dvTableName)); + + // Write DV against the data file in partition 'b' + DataFile dataFile = + Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), null); + List dvs = writeDV(table, dataFile.partition(), dataFile.location(), 1); + table.newRowDelta().addDeletes(dvs.get(0)).commit(); + table.refresh(); + + List partitions = + sql("SELECT * FROM %s$partitions ORDER BY `partition`.`data`", dvTableName); + assertThat(partitions).hasSize(2); + + // partition 'a' should have no deletes + Row partitionA = partitions.get(0); + assertThat(partitionA.getField("partition")).isEqualTo(Row.of("a")); + assertThat(partitionA.getField("record_count")).isEqualTo(2L); + assertThat(partitionA.getField("file_count")).isEqualTo(1); + assertThat(partitionA.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + + // partition 'b' should have DV delete stats + Row partitionB = partitions.get(1); + assertThat(partitionB.getField("partition")).isEqualTo(Row.of("b")); + assertThat(partitionB.getField("record_count")).isEqualTo(2L); + assertThat(partitionB.getField("file_count")).isEqualTo(1); + assertThat(partitionB.getField("position_delete_record_count")).isEqualTo(1L); + assertThat(partitionB.getField("position_delete_file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_position_delete_file_size_in_bytes")) + .isGreaterThan(0L); + assertThat(partitionB.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + } finally { + sql("DROP TABLE IF EXISTS %s", dvTableName); + } + } + @TestTemplate public void testPartitionsTableForUnpartitionedTable() { assumeThat(isPartition).isFalse(); @@ -939,4 +999,18 @@ private List allDataManifests(Table table) { private List deleteManifests(Table table) { return table.currentSnapshot().deleteManifests(table.io()); } + + private List writeDV( + Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (int row = 0; row < numPositionsToDelete; row++) { + closeableWriter.delete(path, row, table.spec(), partition); + } + } + + return writer.result().deleteFiles(); + } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java index fa299515382b..334b83d76128 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkMetaDataTable.java @@ -37,6 +37,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; +import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; @@ -51,6 +52,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.avro.Avro; @@ -59,11 +61,15 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.flink.CatalogTestBase; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -695,6 +701,60 @@ public void testPartitionsTable() { assertThat(projectedPartitions.get(1).getField("file_count")).isEqualTo(1); } + @TestTemplate + public void testPartitionsTableWithDVs() throws IOException { + assumeThat(isPartition).isTrue(); + + String dvTableName = "test_dv_table"; + sql( + "CREATE TABLE %s (id INT, data VARCHAR,d DOUBLE) PARTITIONED BY (data) WITH ('format-version'='3', 'write.format.default'='%s')", + dvTableName, format.name()); + try { + sql("INSERT INTO %s VALUES (1,'a',10),(2,'a',20)", dvTableName); + sql("INSERT INTO %s VALUES (1,'b',10),(2,'b',20)", dvTableName); + + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, dvTableName)); + + // Write DV against the data file in partition 'b' + DataFile dataFile = + Iterables.getFirst(table.currentSnapshot().addedDataFiles(table.io()), null); + List dvs = writeDV(table, dataFile.partition(), dataFile.location(), 1); + table.newRowDelta().addDeletes(dvs.get(0)).commit(); + table.refresh(); + + List partitions = + sql("SELECT * FROM %s$partitions ORDER BY `partition`.`data`", dvTableName); + assertThat(partitions).hasSize(2); + + // partition 'a' should have no deletes + Row partitionA = partitions.get(0); + assertThat(partitionA.getField("partition")).isEqualTo(Row.of("a")); + assertThat(partitionA.getField("record_count")).isEqualTo(2L); + assertThat(partitionA.getField("file_count")).isEqualTo(1); + assertThat(partitionA.getField("position_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("position_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_position_delete_file_size_in_bytes")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionA.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionA.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + + // partition 'b' should have DV delete stats + Row partitionB = partitions.get(1); + assertThat(partitionB.getField("partition")).isEqualTo(Row.of("b")); + assertThat(partitionB.getField("record_count")).isEqualTo(2L); + assertThat(partitionB.getField("file_count")).isEqualTo(1); + assertThat(partitionB.getField("position_delete_record_count")).isEqualTo(1L); + assertThat(partitionB.getField("position_delete_file_count")).isEqualTo(1); + assertThat((Long) partitionB.getField("total_position_delete_file_size_in_bytes")) + .isGreaterThan(0L); + assertThat(partitionB.getField("equality_delete_record_count")).isEqualTo(0L); + assertThat(partitionB.getField("equality_delete_file_count")).isEqualTo(0); + assertThat(partitionB.getField("total_equality_delete_file_size_in_bytes")).isEqualTo(0L); + } finally { + sql("DROP TABLE IF EXISTS %s", dvTableName); + } + } + @TestTemplate public void testPartitionsTableForUnpartitionedTable() { assumeThat(isPartition).isFalse(); @@ -939,4 +999,18 @@ private List allDataManifests(Table table) { private List deleteManifests(Table table) { return table.currentSnapshot().deleteManifests(table.io()); } + + private List writeDV( + Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (int row = 0; row < numPositionsToDelete; row++) { + closeableWriter.delete(path, row, table.spec(), partition); + } + } + + return writer.result().deleteFiles(); + } } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index 09da70eafcff..1a17401cc4ba 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -47,6 +47,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; @@ -64,6 +65,8 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -1774,6 +1777,115 @@ public void testPartitionsTableDeleteStats() { } } + @Test + public void testPartitionsTableDeleteStatsWithDVs() throws IOException { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_dv_test"); + Table table = + createTable( + tableIdentifier, SCHEMA, SPEC, ImmutableMap.of(TableProperties.FORMAT_VERSION, "3")); + Table partitionsTable = loadTable(tableIdentifier, "partitions"); + + Dataset df1 = + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")), + SimpleRecord.class); + Dataset df2 = + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")), + SimpleRecord.class); + + df1.select("id", "data") + .write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + long firstCommitId = table.currentSnapshot().snapshotId(); + + // add a second file + df2.select("id", "data") + .write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + long secondCommitId = table.currentSnapshot().snapshotId(); + + // write DVs against the data file in partition id=2 + DataFile dataFile = + Iterables.getFirst(table.snapshot(secondCommitId).addedDataFiles(table.io()), null); + List dvs = writeDV(table, dataFile.partition(), dataFile.location(), 2); + table.newRowDelta().addDeletes(dvs.get(0)).commit(); + table.refresh(); + long dvCommitId = table.currentSnapshot().snapshotId(); + + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "partitions")) + .orderBy("partition.id") + .collectAsList(); + assertThat(actual).as("Actual results should have two rows").hasSize(2); + + GenericRecordBuilder builder = + new GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), "partitions")); + GenericRecordBuilder partitionBuilder = + new GenericRecordBuilder( + AvroSchemaUtil.convert( + partitionsTable.schema().findType("partition").asStructType(), "partition")); + List expected = Lists.newArrayList(); + expected.add( + builder + .set("partition", partitionBuilder.set("id", 1).build()) + .set("record_count", 3L) + .set("file_count", 1) + .set( + "total_data_file_size_in_bytes", + totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) + .set("position_delete_record_count", 0L) + .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) + .set("equality_delete_record_count", 0L) + .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) + .set("spec_id", 0) + .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) + .set("last_updated_snapshot_id", firstCommitId) + .build()); + expected.add( + builder + .set("partition", partitionBuilder.set("id", 2).build()) + .set("record_count", 3L) + .set("file_count", 1) + .set( + "total_data_file_size_in_bytes", + totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io()))) + .set("position_delete_record_count", 2L) + .set("position_delete_file_count", 1) + .set( + "total_position_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(dvCommitId).addedDeleteFiles(table.io()), + FileContent.POSITION_DELETES)) + .set("equality_delete_record_count", 0L) + .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) + .set("spec_id", 0) + .set("last_updated_at", table.snapshot(dvCommitId).timestampMillis() * 1000) + .set("last_updated_snapshot_id", dvCommitId) + .build()); + + for (int i = 0; i < 2; i += 1) { + TestHelpers.assertEqualsSafe( + partitionsTable.schema().asStruct(), expected.get(i), actual.get(i)); + } + } + @Test public synchronized void testSnapshotReadAfterAddColumn() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); @@ -2505,4 +2617,18 @@ private void assertDataFilePartitions( .isEqualTo(expectedPartitionIds.get(i).intValue()); } } + + private List writeDV( + Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (int row = 0; row < numPositionsToDelete; row++) { + closeableWriter.delete(path, row, table.spec(), partition); + } + } + + return writer.result().deleteFiles(); + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index c1d96828d6f2..0958da203e48 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -47,6 +47,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; @@ -64,6 +65,8 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -1777,6 +1780,115 @@ public void testPartitionsTableDeleteStats() { } } + @Test + public void testPartitionsTableDeleteStatsWithDVs() throws IOException { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_dv_test"); + Table table = + createTable( + tableIdentifier, SCHEMA, SPEC, ImmutableMap.of(TableProperties.FORMAT_VERSION, "3")); + Table partitionsTable = loadTable(tableIdentifier, "partitions"); + + Dataset df1 = + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")), + SimpleRecord.class); + Dataset df2 = + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")), + SimpleRecord.class); + + df1.select("id", "data") + .write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + long firstCommitId = table.currentSnapshot().snapshotId(); + + // add a second file + df2.select("id", "data") + .write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + long secondCommitId = table.currentSnapshot().snapshotId(); + + // write DVs against the data file in partition id=2 + DataFile dataFile = + Iterables.getFirst(table.snapshot(secondCommitId).addedDataFiles(table.io()), null); + List dvs = writeDV(table, dataFile.partition(), dataFile.location(), 2); + table.newRowDelta().addDeletes(dvs.get(0)).commit(); + table.refresh(); + long dvCommitId = table.currentSnapshot().snapshotId(); + + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "partitions")) + .orderBy("partition.id") + .collectAsList(); + assertThat(actual).as("Actual results should have two rows").hasSize(2); + + GenericRecordBuilder builder = + new GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), "partitions")); + GenericRecordBuilder partitionBuilder = + new GenericRecordBuilder( + AvroSchemaUtil.convert( + partitionsTable.schema().findType("partition").asStructType(), "partition")); + List expected = Lists.newArrayList(); + expected.add( + builder + .set("partition", partitionBuilder.set("id", 1).build()) + .set("record_count", 3L) + .set("file_count", 1) + .set( + "total_data_file_size_in_bytes", + totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) + .set("position_delete_record_count", 0L) + .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) + .set("equality_delete_record_count", 0L) + .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) + .set("spec_id", 0) + .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) + .set("last_updated_snapshot_id", firstCommitId) + .build()); + expected.add( + builder + .set("partition", partitionBuilder.set("id", 2).build()) + .set("record_count", 3L) + .set("file_count", 1) + .set( + "total_data_file_size_in_bytes", + totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io()))) + .set("position_delete_record_count", 2L) + .set("position_delete_file_count", 1) + .set( + "total_position_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(dvCommitId).addedDeleteFiles(table.io()), + FileContent.POSITION_DELETES)) + .set("equality_delete_record_count", 0L) + .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) + .set("spec_id", 0) + .set("last_updated_at", table.snapshot(dvCommitId).timestampMillis() * 1000) + .set("last_updated_snapshot_id", dvCommitId) + .build()); + + for (int i = 0; i < 2; i += 1) { + TestHelpers.assertEqualsSafe( + partitionsTable.schema().asStruct(), expected.get(i), actual.get(i)); + } + } + @Test public synchronized void testSnapshotReadAfterAddColumn() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); @@ -2508,4 +2620,18 @@ private void assertDataFilePartitions( .isEqualTo(expectedPartitionIds.get(i).intValue()); } } + + private List writeDV( + Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (int row = 0; row < numPositionsToDelete; row++) { + closeableWriter.delete(path, row, table.spec(), partition); + } + } + + return writer.result().deleteFiles(); + } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index c1d96828d6f2..0958da203e48 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -47,6 +47,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; @@ -64,6 +65,8 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -1777,6 +1780,115 @@ public void testPartitionsTableDeleteStats() { } } + @Test + public void testPartitionsTableDeleteStatsWithDVs() throws IOException { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_dv_test"); + Table table = + createTable( + tableIdentifier, SCHEMA, SPEC, ImmutableMap.of(TableProperties.FORMAT_VERSION, "3")); + Table partitionsTable = loadTable(tableIdentifier, "partitions"); + + Dataset df1 = + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")), + SimpleRecord.class); + Dataset df2 = + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")), + SimpleRecord.class); + + df1.select("id", "data") + .write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + long firstCommitId = table.currentSnapshot().snapshotId(); + + // add a second file + df2.select("id", "data") + .write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + long secondCommitId = table.currentSnapshot().snapshotId(); + + // write DVs against the data file in partition id=2 + DataFile dataFile = + Iterables.getFirst(table.snapshot(secondCommitId).addedDataFiles(table.io()), null); + List dvs = writeDV(table, dataFile.partition(), dataFile.location(), 2); + table.newRowDelta().addDeletes(dvs.get(0)).commit(); + table.refresh(); + long dvCommitId = table.currentSnapshot().snapshotId(); + + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "partitions")) + .orderBy("partition.id") + .collectAsList(); + assertThat(actual).as("Actual results should have two rows").hasSize(2); + + GenericRecordBuilder builder = + new GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), "partitions")); + GenericRecordBuilder partitionBuilder = + new GenericRecordBuilder( + AvroSchemaUtil.convert( + partitionsTable.schema().findType("partition").asStructType(), "partition")); + List expected = Lists.newArrayList(); + expected.add( + builder + .set("partition", partitionBuilder.set("id", 1).build()) + .set("record_count", 3L) + .set("file_count", 1) + .set( + "total_data_file_size_in_bytes", + totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) + .set("position_delete_record_count", 0L) + .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) + .set("equality_delete_record_count", 0L) + .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) + .set("spec_id", 0) + .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) + .set("last_updated_snapshot_id", firstCommitId) + .build()); + expected.add( + builder + .set("partition", partitionBuilder.set("id", 2).build()) + .set("record_count", 3L) + .set("file_count", 1) + .set( + "total_data_file_size_in_bytes", + totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io()))) + .set("position_delete_record_count", 2L) + .set("position_delete_file_count", 1) + .set( + "total_position_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(dvCommitId).addedDeleteFiles(table.io()), + FileContent.POSITION_DELETES)) + .set("equality_delete_record_count", 0L) + .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) + .set("spec_id", 0) + .set("last_updated_at", table.snapshot(dvCommitId).timestampMillis() * 1000) + .set("last_updated_snapshot_id", dvCommitId) + .build()); + + for (int i = 0; i < 2; i += 1) { + TestHelpers.assertEqualsSafe( + partitionsTable.schema().asStruct(), expected.get(i), actual.get(i)); + } + } + @Test public synchronized void testSnapshotReadAfterAddColumn() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); @@ -2508,4 +2620,18 @@ private void assertDataFilePartitions( .isEqualTo(expectedPartitionIds.get(i).intValue()); } } + + private List writeDV( + Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (int row = 0; row < numPositionsToDelete; row++) { + closeableWriter.delete(path, row, table.spec(), partition); + } + } + + return writer.result().deleteFiles(); + } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index c1d96828d6f2..0958da203e48 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -47,6 +47,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestFile; @@ -64,6 +65,8 @@ import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -1777,6 +1780,115 @@ public void testPartitionsTableDeleteStats() { } } + @Test + public void testPartitionsTableDeleteStatsWithDVs() throws IOException { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "partitions_dv_test"); + Table table = + createTable( + tableIdentifier, SCHEMA, SPEC, ImmutableMap.of(TableProperties.FORMAT_VERSION, "3")); + Table partitionsTable = loadTable(tableIdentifier, "partitions"); + + Dataset df1 = + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(1, "b"), new SimpleRecord(1, "c")), + SimpleRecord.class); + Dataset df2 = + spark.createDataFrame( + Lists.newArrayList( + new SimpleRecord(2, "d"), new SimpleRecord(2, "e"), new SimpleRecord(2, "f")), + SimpleRecord.class); + + df1.select("id", "data") + .write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + long firstCommitId = table.currentSnapshot().snapshotId(); + + // add a second file + df2.select("id", "data") + .write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + table.refresh(); + long secondCommitId = table.currentSnapshot().snapshotId(); + + // write DVs against the data file in partition id=2 + DataFile dataFile = + Iterables.getFirst(table.snapshot(secondCommitId).addedDataFiles(table.io()), null); + List dvs = writeDV(table, dataFile.partition(), dataFile.location(), 2); + table.newRowDelta().addDeletes(dvs.get(0)).commit(); + table.refresh(); + long dvCommitId = table.currentSnapshot().snapshotId(); + + List actual = + spark + .read() + .format("iceberg") + .load(loadLocation(tableIdentifier, "partitions")) + .orderBy("partition.id") + .collectAsList(); + assertThat(actual).as("Actual results should have two rows").hasSize(2); + + GenericRecordBuilder builder = + new GenericRecordBuilder(AvroSchemaUtil.convert(partitionsTable.schema(), "partitions")); + GenericRecordBuilder partitionBuilder = + new GenericRecordBuilder( + AvroSchemaUtil.convert( + partitionsTable.schema().findType("partition").asStructType(), "partition")); + List expected = Lists.newArrayList(); + expected.add( + builder + .set("partition", partitionBuilder.set("id", 1).build()) + .set("record_count", 3L) + .set("file_count", 1) + .set( + "total_data_file_size_in_bytes", + totalSizeInBytes(table.snapshot(firstCommitId).addedDataFiles(table.io()))) + .set("position_delete_record_count", 0L) + .set("position_delete_file_count", 0) + .set("total_position_delete_file_size_in_bytes", 0L) + .set("equality_delete_record_count", 0L) + .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) + .set("spec_id", 0) + .set("last_updated_at", table.snapshot(firstCommitId).timestampMillis() * 1000) + .set("last_updated_snapshot_id", firstCommitId) + .build()); + expected.add( + builder + .set("partition", partitionBuilder.set("id", 2).build()) + .set("record_count", 3L) + .set("file_count", 1) + .set( + "total_data_file_size_in_bytes", + totalSizeInBytes(table.snapshot(secondCommitId).addedDataFiles(table.io()))) + .set("position_delete_record_count", 2L) + .set("position_delete_file_count", 1) + .set( + "total_position_delete_file_size_in_bytes", + totalDeleteFileSizeInBytes( + table.snapshot(dvCommitId).addedDeleteFiles(table.io()), + FileContent.POSITION_DELETES)) + .set("equality_delete_record_count", 0L) + .set("equality_delete_file_count", 0) + .set("total_equality_delete_file_size_in_bytes", 0L) + .set("spec_id", 0) + .set("last_updated_at", table.snapshot(dvCommitId).timestampMillis() * 1000) + .set("last_updated_snapshot_id", dvCommitId) + .build()); + + for (int i = 0; i < 2; i += 1) { + TestHelpers.assertEqualsSafe( + partitionsTable.schema().asStruct(), expected.get(i), actual.get(i)); + } + } + @Test public synchronized void testSnapshotReadAfterAddColumn() { TableIdentifier tableIdentifier = TableIdentifier.of("db", "table"); @@ -2508,4 +2620,18 @@ private void assertDataFilePartitions( .isEqualTo(expectedPartitionIds.get(i).intValue()); } } + + private List writeDV( + Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (int row = 0; row < numPositionsToDelete; row++) { + closeableWriter.delete(path, row, table.spec(), partition); + } + } + + return writer.result().deleteFiles(); + } }