Skip to content

Commit 63810f9

Browse files
committed
Iceberg: Add delete file metrics to $partitions metadata table
Add position_delete_file_count, position_delete_record_count, equality_delete_file_count, and equality_delete_record_count columns to the Iceberg $partitions system table. Previously, $partitions exposed no information about delete files, making it impossible to identify partitions with accumulated delete files without expensive queries against $files. These new columns enable instant partition-level delete file visibility. Changes: - IcebergStatistics: Add 4 delete file fields to record and Builder, add acceptDeleteFile(DeleteFile) method - PartitionsTable: Add 4 BIGINT columns, iterate fileScanTask.deletes() with per-partition deduplication to avoid double-counting - Update existing test assertions for shifted column indices - Add TestIcebergV2.testPartitionsTableDeleteFileColumns() verifying non-zero counts after DELETE/equality deletes and reset after OPTIMIZE
1 parent 5dc36e8 commit 63810f9

5 files changed

Lines changed: 122 additions & 13 deletions

File tree

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergStatistics.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import io.trino.spi.type.TypeManager;
2020
import jakarta.annotation.Nullable;
2121
import org.apache.iceberg.DataFile;
22+
import org.apache.iceberg.DeleteFile;
23+
import org.apache.iceberg.FileContent;
2224
import org.apache.iceberg.PartitionSpec;
2325
import org.apache.iceberg.types.Conversions;
2426
import org.apache.iceberg.types.Types;
@@ -45,6 +47,10 @@ public record IcebergStatistics(
4547
long recordCount,
4648
long fileCount,
4749
long size,
50+
long positionDeleteFileCount,
51+
long positionDeleteRecordCount,
52+
long equalityDeleteFileCount,
53+
long equalityDeleteRecordCount,
4854
Map<Integer, Object> minValues,
4955
Map<Integer, Object> maxValues,
5056
Map<Integer, Long> nullCounts,
@@ -68,6 +74,10 @@ public static class Builder
6874
private long recordCount;
6975
private long fileCount;
7076
private long size;
77+
private long positionDeleteFileCount;
78+
private long positionDeleteRecordCount;
79+
private long equalityDeleteFileCount;
80+
private long equalityDeleteRecordCount;
7181
private final Map<Integer, ColumnStatistics> columnStatistics = new HashMap<>();
7282
private final Map<Integer, Long> nullCounts = new HashMap<>();
7383
private final Map<Integer, Long> nanCounts = new HashMap<>();
@@ -133,6 +143,18 @@ public void acceptDataFile(DataFile dataFile, PartitionSpec partitionSpec)
133143
}
134144
}
135145

146+
public void acceptDeleteFile(DeleteFile deleteFile)
147+
{
148+
if (deleteFile.content() == FileContent.POSITION_DELETES) {
149+
positionDeleteFileCount++;
150+
positionDeleteRecordCount += deleteFile.recordCount();
151+
}
152+
else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
153+
equalityDeleteFileCount++;
154+
equalityDeleteRecordCount += deleteFile.recordCount();
155+
}
156+
}
157+
136158
public IcebergStatistics build()
137159
{
138160
ImmutableMap.Builder<Integer, Object> minValues = ImmutableMap.builder();
@@ -147,6 +169,10 @@ public IcebergStatistics build()
147169
recordCount,
148170
fileCount,
149171
size,
172+
positionDeleteFileCount,
173+
positionDeleteRecordCount,
174+
equalityDeleteFileCount,
175+
equalityDeleteRecordCount,
150176
minValues.buildOrThrow(),
151177
maxValues.buildOrThrow(),
152178
ImmutableMap.copyOf(nullCounts),

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/system/PartitionsTable.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.trino.spi.type.RowType;
3030
import io.trino.spi.type.TypeManager;
3131
import org.apache.iceberg.DataFile;
32+
import org.apache.iceberg.DeleteFile;
3233
import org.apache.iceberg.FileScanTask;
3334
import org.apache.iceberg.PartitionField;
3435
import org.apache.iceberg.Table;
@@ -41,6 +42,7 @@
4142
import java.io.UncheckedIOException;
4243
import java.util.ArrayList;
4344
import java.util.HashMap;
45+
import java.util.HashSet;
4446
import java.util.List;
4547
import java.util.Map;
4648
import java.util.Map.Entry;
@@ -98,7 +100,9 @@ public PartitionsTable(SchemaTableName tableName, TypeManager typeManager, Table
98100
partitionColumnType.ifPresent(icebergPartitionColumn ->
99101
columnMetadataBuilder.add(new ColumnMetadata("partition", icebergPartitionColumn.rowType())));
100102

101-
Stream.of("record_count", "file_count", "total_size")
103+
Stream.of("record_count", "file_count", "total_size",
104+
"position_delete_file_count", "position_delete_record_count",
105+
"equality_delete_file_count", "equality_delete_record_count")
102106
.forEach(metric -> columnMetadataBuilder.add(new ColumnMetadata(metric, BIGINT)));
103107

104108
Set<Integer> identityPartitionIds = getIdentityPartitions(icebergTable.spec()).keySet().stream()
@@ -176,14 +180,22 @@ private Map<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics> getStatistic
176180
{
177181
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
178182
Map<StructLikeWrapperWithFieldIdToIndex, IcebergStatistics.Builder> partitions = new HashMap<>();
183+
Map<StructLikeWrapperWithFieldIdToIndex, Set<String>> seenDeleteFiles = new HashMap<>();
179184
for (FileScanTask fileScanTask : fileScanTasks) {
180185
DataFile dataFile = fileScanTask.file();
181186
StructLikeWrapperWithFieldIdToIndex structLikeWrapperWithFieldIdToIndex = createStructLikeWrapper(fileScanTask);
182187

183-
partitions.computeIfAbsent(
188+
IcebergStatistics.Builder builder = partitions.computeIfAbsent(
184189
structLikeWrapperWithFieldIdToIndex,
185-
_ -> new IcebergStatistics.Builder(icebergTable.schema().columns(), typeManager))
186-
.acceptDataFile(dataFile, fileScanTask.spec());
190+
_ -> new IcebergStatistics.Builder(icebergTable.schema().columns(), typeManager));
191+
builder.acceptDataFile(dataFile, fileScanTask.spec());
192+
193+
Set<String> seen = seenDeleteFiles.computeIfAbsent(structLikeWrapperWithFieldIdToIndex, _ -> new HashSet<>());
194+
for (DeleteFile deleteFile : fileScanTask.deletes()) {
195+
if (seen.add(deleteFile.location())) {
196+
builder.acceptDeleteFile(deleteFile);
197+
}
198+
}
187199
}
188200

189201
return partitions.entrySet().stream()
@@ -232,6 +244,10 @@ private RecordCursor buildRecordCursor(Map<StructLikeWrapperWithFieldIdToIndex,
232244
row.add(icebergStatistics.recordCount());
233245
row.add(icebergStatistics.fileCount());
234246
row.add(icebergStatistics.size());
247+
row.add(icebergStatistics.positionDeleteFileCount());
248+
row.add(icebergStatistics.positionDeleteRecordCount());
249+
row.add(icebergStatistics.equalityDeleteFileCount());
250+
row.add(icebergStatistics.equalityDeleteRecordCount());
235251

236252
// add column level metrics
237253
dataColumnType.ifPresent(dataColumnType -> {

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1041,7 +1041,7 @@ public void testCreatePartitionedTable()
10411041
String schema = getSession().getSchema().orElseThrow();
10421042
assertThat(query("SELECT column_name FROM information_schema.columns WHERE table_schema = '" + schema + "' AND table_name = 'test_partitioned_table$partitions' "))
10431043
.skippingTypesCheck()
1044-
.matches("VALUES 'partition', 'record_count', 'file_count', 'total_size'");
1044+
.matches("VALUES 'partition', 'record_count', 'file_count', 'total_size', 'position_delete_file_count', 'position_delete_record_count', 'equality_delete_file_count', 'equality_delete_record_count'");
10451045
assertThat(query("SELECT " +
10461046
" record_count," +
10471047
" file_count, " +

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ public void testPartitionsTable()
142142
"('record_count', 'bigint', '', '')," +
143143
"('file_count', 'bigint', '', '')," +
144144
"('total_size', 'bigint', '', '')," +
145+
"('position_delete_file_count', 'bigint', '', '')," +
146+
"('position_delete_record_count', 'bigint', '', '')," +
147+
"('equality_delete_file_count', 'bigint', '', '')," +
148+
"('equality_delete_record_count', 'bigint', '', '')," +
145149
"('data', 'row(\"_bigint\" row(\"min\" bigint, \"max\" bigint, \"null_count\" bigint, \"nan_count\" bigint))', '', '')");
146150

147151
MaterializedResult result = computeActual("SELECT * from test_schema.\"test_table$partitions\"");
@@ -155,10 +159,18 @@ public void testPartitionsTable()
155159
assertThat(rowsByPartition.get(LocalDate.parse("2019-09-09")).getField(1)).isEqualTo(3L);
156160
assertThat(rowsByPartition.get(LocalDate.parse("2019-09-10")).getField(1)).isEqualTo(2L);
157161

162+
// Test if delete file counts are zero (no deletes in test data)
163+
for (MaterializedRow row : result.getMaterializedRows()) {
164+
assertThat(row.getField(4)).isEqualTo(0L); // position_delete_file_count
165+
assertThat(row.getField(5)).isEqualTo(0L); // position_delete_record_count
166+
assertThat(row.getField(6)).isEqualTo(0L); // equality_delete_file_count
167+
assertThat(row.getField(7)).isEqualTo(0L); // equality_delete_record_count
168+
}
169+
158170
// Test if min/max values, null value count and nan value count are computed correctly.
159-
assertThat(rowsByPartition.get(LocalDate.parse("2019-09-08")).getField(4)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION, new MaterializedRow(DEFAULT_PRECISION, 0L, 0L, 0L, null)));
160-
assertThat(rowsByPartition.get(LocalDate.parse("2019-09-09")).getField(4)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION, new MaterializedRow(DEFAULT_PRECISION, 1L, 3L, 0L, null)));
161-
assertThat(rowsByPartition.get(LocalDate.parse("2019-09-10")).getField(4)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION, new MaterializedRow(DEFAULT_PRECISION, 4L, 5L, 0L, null)));
171+
assertThat(rowsByPartition.get(LocalDate.parse("2019-09-08")).getField(8)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION, new MaterializedRow(DEFAULT_PRECISION, 0L, 0L, 0L, null)));
172+
assertThat(rowsByPartition.get(LocalDate.parse("2019-09-09")).getField(8)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION, new MaterializedRow(DEFAULT_PRECISION, 1L, 3L, 0L, null)));
173+
assertThat(rowsByPartition.get(LocalDate.parse("2019-09-10")).getField(8)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION, new MaterializedRow(DEFAULT_PRECISION, 4L, 5L, 0L, null)));
162174
}
163175

164176
@Test
@@ -179,19 +191,19 @@ public void testPartitionsTableWithNan()
179191
assertThat(rowsByPartition.get(LocalDate.parse("2022-01-04")).getField(1)).isEqualTo(3L);
180192

181193
// Test if min/max values, null value count and nan value count are computed correctly.
182-
assertThat(rowsByPartition.get(LocalDate.parse("2022-01-01")).getField(4)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION,
194+
assertThat(rowsByPartition.get(LocalDate.parse("2022-01-01")).getField(8)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION,
183195
new MaterializedRow(DEFAULT_PRECISION, 1L, 1L, 0L, null),
184196
new MaterializedRow(DEFAULT_PRECISION, 1.1d, 1.1d, 0L, null),
185197
new MaterializedRow(DEFAULT_PRECISION, 1.2f, 1.2f, 0L, null)));
186-
assertThat(rowsByPartition.get(LocalDate.parse("2022-01-02")).getField(4)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION,
198+
assertThat(rowsByPartition.get(LocalDate.parse("2022-01-02")).getField(8)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION,
187199
new MaterializedRow(DEFAULT_PRECISION, 2L, 2L, 0L, null),
188200
new MaterializedRow(DEFAULT_PRECISION, null, null, 0L, nanCount(1L)),
189201
new MaterializedRow(DEFAULT_PRECISION, 2.2f, 2.2f, 0L, null)));
190-
assertThat(rowsByPartition.get(LocalDate.parse("2022-01-03")).getField(4)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION,
202+
assertThat(rowsByPartition.get(LocalDate.parse("2022-01-03")).getField(8)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION,
191203
new MaterializedRow(DEFAULT_PRECISION, 3L, 3L, 0L, null),
192204
new MaterializedRow(DEFAULT_PRECISION, 3.3, 3.3d, 0L, null),
193205
new MaterializedRow(DEFAULT_PRECISION, null, null, 0L, nanCount(1L))));
194-
assertThat(rowsByPartition.get(LocalDate.parse("2022-01-04")).getField(4)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION,
206+
assertThat(rowsByPartition.get(LocalDate.parse("2022-01-04")).getField(8)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION,
195207
new MaterializedRow(DEFAULT_PRECISION, 4L, 6L, 0L, null),
196208
new MaterializedRow(DEFAULT_PRECISION, null, null, 0L, nanCount(2L)),
197209
new MaterializedRow(DEFAULT_PRECISION, null, null, 0L, nanCount(2L))));
@@ -220,7 +232,7 @@ public void testPartitionsTableOnDropColumn()
220232
assertThat(resultAfterDrop.getRowCount()).isEqualTo(3);
221233
Map<LocalDate, MaterializedRow> rowsByPartitionAfterDrop = resultAfterDrop.getMaterializedRows().stream()
222234
.collect(toImmutableMap(row -> ((LocalDate) ((MaterializedRow) row.getField(0)).getField(0)), Function.identity()));
223-
assertThat(rowsByPartitionAfterDrop.get(LocalDate.parse("2019-09-08")).getField(4)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION,
235+
assertThat(rowsByPartitionAfterDrop.get(LocalDate.parse("2019-09-08")).getField(8)).isEqualTo(new MaterializedRow(DEFAULT_PRECISION,
224236
new MaterializedRow(DEFAULT_PRECISION, 0L, 0L, 0L, null)));
225237
}
226238

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1850,6 +1850,61 @@ private Table updateTableToV2(String tableName)
18501850
return table;
18511851
}
18521852

1853+
@Test
1854+
public void testPartitionsTableDeleteFileColumns()
1855+
throws Exception
1856+
{
1857+
try (TestTable testTable = newTrinoTable("test_partitions_delete_columns_",
1858+
"WITH (partitioning = ARRAY['regionkey']) AS SELECT * FROM tpch.tiny.nation")) {
1859+
String tableName = testTable.getName();
1860+
1861+
// Before any deletes, all delete columns should be zero
1862+
assertQuery(
1863+
"SELECT sum(position_delete_file_count), sum(position_delete_record_count), " +
1864+
"sum(equality_delete_file_count), sum(equality_delete_record_count) " +
1865+
"FROM \"" + tableName + "$partitions\"",
1866+
"VALUES (BIGINT '0', BIGINT '0', BIGINT '0', BIGINT '0')");
1867+
1868+
// Create position delete files via MOR path
1869+
assertUpdate("DELETE FROM " + tableName + " WHERE nationkey = 1", 1);
1870+
1871+
// Verify position delete file counts are non-zero for the affected partition
1872+
assertQuery(
1873+
"SELECT position_delete_file_count FROM \"" + tableName + "$partitions\" " +
1874+
"WHERE partition.regionkey = 1",
1875+
"VALUES BIGINT '1'");
1876+
assertQuery(
1877+
"SELECT position_delete_record_count FROM \"" + tableName + "$partitions\" " +
1878+
"WHERE partition.regionkey = 1",
1879+
"VALUES BIGINT '1'");
1880+
1881+
// Unaffected partitions should still have zero delete counts
1882+
assertQuery(
1883+
"SELECT DISTINCT position_delete_file_count FROM \"" + tableName + "$partitions\" " +
1884+
"WHERE partition.regionkey != 1",
1885+
"VALUES BIGINT '0'");
1886+
1887+
// Write an equality delete for regionkey=2
1888+
Table icebergTable = loadTable(tableName);
1889+
writeEqualityDeleteToNationTable(icebergTable, Optional.of(icebergTable.spec()), Optional.of(new PartitionData(new Long[] {2L})));
1890+
1891+
// Verify equality delete file counts
1892+
assertQuery(
1893+
"SELECT equality_delete_file_count FROM \"" + tableName + "$partitions\" " +
1894+
"WHERE partition.regionkey = 2",
1895+
"VALUES BIGINT '1'");
1896+
1897+
// OPTIMIZE should clear delete files
1898+
assertQuerySucceeds("ALTER TABLE " + tableName + " EXECUTE OPTIMIZE");
1899+
1900+
assertQuery(
1901+
"SELECT sum(position_delete_file_count), sum(position_delete_record_count), " +
1902+
"sum(equality_delete_file_count), sum(equality_delete_record_count) " +
1903+
"FROM \"" + tableName + "$partitions\"",
1904+
"VALUES (BIGINT '0', BIGINT '0', BIGINT '0', BIGINT '0')");
1905+
}
1906+
}
1907+
18531908
private BaseTable loadTable(String tableName)
18541909
{
18551910
return IcebergTestUtils.loadTable(tableName, metastore, fileSystemFactory, "hive", "tpch");

0 commit comments

Comments
 (0)