From 74daded57943791c0a12548370540aa0c6fe88ec Mon Sep 17 00:00:00 2001 From: Neelesh Salian Date: Wed, 13 May 2026 18:46:58 -0700 Subject: [PATCH 1/2] Parquet: Add opt-in uncompressed row group size tracking --- .../org/apache/iceberg/TableProperties.java | 4 ++ docs/docs/configuration.md | 1 + .../org/apache/iceberg/parquet/Parquet.java | 27 ++++++++-- .../apache/iceberg/parquet/ParquetWriter.java | 43 +++++++++++++++- .../parquet/TestParquetDataWriter.java | 49 +++++++++++++++++++ 5 files changed, 118 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 021ef95d9122..e1e7cbfc4147 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -176,6 +176,10 @@ private TableProperties() {} "write.delete.parquet.row-group-check-max-record-count"; public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000; + public static final String PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED = + "write.parquet.row-group-size-check-uncompressed"; + public static final boolean PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED_DEFAULT = false; + public static final String PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes"; public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024; diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 17bf1f8ac0a1..6e0ca18b6a4b 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -44,6 +44,7 @@ Iceberg tables support table properties to configure table behavior, like the de | write.format.default | parquet | Default file format for the table; parquet, avro, or orc | | write.delete.format.default | data file format | Default delete file format for the table; parquet, avro, or orc | | write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size | +| write.parquet.row-group-size-check-uncompressed | false | Track uncompressed row group size to enforce the target accurately with compressing codecs | | write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size | | write.parquet.page-version | v1 | Parquet data page version: v1 (DataPage V1) or v2 (DataPage V2) | | write.parquet.page-row-limit | 20000 | Parquet page row limit | diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index f02974d6e79c..0ffceb76bf15 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -51,6 +51,8 @@ import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED_DEFAULT; import java.io.File; import java.io.IOException; @@ -372,6 +374,7 @@ public FileAppender build() throws IOException { int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount(); int bloomFilterMaxBytes = context.bloomFilterMaxBytes(); boolean dictionaryEnabled = context.dictionaryEnabled(); + boolean trackUncompressedRowGroupSize = context.trackUncompressedRowGroupSize(); if (compressionLevel != null) { switch (codec) { @@ -462,7 +465,8 @@ public FileAppender build() throws IOException { parquetProperties, metricsConfig, writeMode, - fileEncryptionProperties); + fileEncryptionProperties, + trackUncompressedRowGroupSize); } else { ParquetWriteBuilder parquetWriteBuilder = new ParquetWriteBuilder(ParquetIO.file(file)) @@ -510,6 +514,7 @@ static class Context { private final Map columnBloomFilterEnabled; private final Map columnStatsEnabled; private final boolean dictionaryEnabled; + private final boolean trackUncompressedRowGroupSize; private Context( int rowGroupSize, @@ -526,7 +531,8 @@ private Context( Map columnBloomFilterNdv, Map columnBloomFilterEnabled, Map columnStatsEnabled, - boolean dictionaryEnabled) { + boolean dictionaryEnabled, + boolean trackUncompressedRowGroupSize) { this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; this.pageRowLimit = pageRowLimit; @@ -542,6 +548,7 @@ private Context( this.columnBloomFilterEnabled = columnBloomFilterEnabled; this.columnStatsEnabled = columnStatsEnabled; this.dictionaryEnabled = dictionaryEnabled; + this.trackUncompressedRowGroupSize = trackUncompressedRowGroupSize; } static Context dataContext(Map config) { @@ -615,6 +622,12 @@ static Context dataContext(Map config) { boolean dictionaryEnabled = PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true); + boolean trackUncompressedRowGroupSize = + PropertyUtil.propertyAsBoolean( + config, + PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED, + PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED_DEFAULT); + return new Context( rowGroupSize, pageSize, @@ -630,7 +643,8 @@ static Context dataContext(Map config) { columnBloomFilterNdv, columnBloomFilterEnabled, columnStatsEnabled, - dictionaryEnabled); + dictionaryEnabled, + trackUncompressedRowGroupSize); } static Context deleteContext(Map config) { @@ -707,7 +721,8 @@ static Context deleteContext(Map config) { ImmutableMap.of(), ImmutableMap.of(), ImmutableMap.of(), - dictionaryEnabled); + dictionaryEnabled, + dataContext.trackUncompressedRowGroupSize()); } private static CompressionCodecName toCodec(String codecAsString) { @@ -786,6 +801,10 @@ Map columnStatsEnabled() { boolean dictionaryEnabled() { return dictionaryEnabled; } + + boolean trackUncompressedRowGroupSize() { + return trackUncompressedRowGroupSize; + } } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 2334e75532be..660eb1f53e03 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -59,11 +59,13 @@ class ParquetWriter implements FileAppender, Closeable { private final OutputFile output; private final Configuration conf; private final InternalFileEncryptor fileEncryptor; + private final boolean trackUncompressedSize; private ColumnChunkPageWriteStore pageStore = null; private ColumnWriteStore writeStore; private long recordCount = 0; private long nextCheckRecordCount = 10; + private long rowGroupUncompressedSize = 0; private boolean closed; private ParquetFileWriter writer; private int rowGroupOrdinal; @@ -84,10 +86,12 @@ class ParquetWriter implements FileAppender, Closeable { ParquetProperties properties, MetricsConfig metricsConfig, ParquetFileWriter.Mode writeMode, - FileEncryptionProperties encryptionProperties) { + FileEncryptionProperties encryptionProperties, + boolean trackUncompressedSize) { this.schema = schema; this.targetRowGroupSize = rowGroupSize; this.props = properties; + this.trackUncompressedSize = trackUncompressedSize; this.metadata = ImmutableMap.copyOf(metadata); this.compressor = new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); @@ -135,11 +139,21 @@ private void ensureWriterInitialized() { @Override public void add(T value) { recordCount += 1; - model.write(0, value); + if (trackUncompressedSize) { + writeTracked(value); + } else { + model.write(0, value); + } writeStore.endRecord(); checkSize(); } + private void writeTracked(T value) { + long sizeBefore = writeStore.getBufferedSize(); + model.write(0, value); + rowGroupUncompressedSize += writeStore.getBufferedSize() - sizeBefore; + } + @Override public Metrics metrics() { Preconditions.checkState(closed, "Cannot return metrics for unclosed writer"); @@ -190,6 +204,30 @@ public List splitOffsets() { } private void checkSize() { + if (trackUncompressedSize) { + checkSizeUncompressed(); + } else { + checkSizeDefault(); + } + } + + private void checkSizeUncompressed() { + if (rowGroupUncompressedSize >= targetRowGroupSize) { + flushRowGroup(false); + } else if (recordCount >= nextCheckRecordCount) { + double avgRecordSize = ((double) rowGroupUncompressedSize) / recordCount; + if (rowGroupUncompressedSize > (targetRowGroupSize - 2 * avgRecordSize)) { + flushRowGroup(false); + } else { + long remainingSpace = targetRowGroupSize - rowGroupUncompressedSize; + long remainingRecords = (long) (remainingSpace / avgRecordSize); + this.nextCheckRecordCount = + recordCount + Math.min(remainingRecords / 2, props.getMaxRowCountForPageSizeCheck()); + } + } + } + + private void checkSizeDefault() { if (recordCount >= nextCheckRecordCount) { long bufferedSize = writeStore.getBufferedSize(); double avgRecordSize = ((double) bufferedSize) / recordCount; @@ -234,6 +272,7 @@ private void startRowGroup() { Math.max(recordCount / 2, props.getMinRowCountForPageSizeCheck()), props.getMaxRowCountForPageSizeCheck()); this.recordCount = 0; + this.rowGroupUncompressedSize = 0; this.pageStore = new ColumnChunkPageWriteStore( diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java index 36e254628a6a..67c029af9d7f 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java @@ -26,6 +26,7 @@ import java.nio.file.Path; import java.util.List; import java.util.Optional; +import java.util.Random; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; @@ -66,6 +67,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class TestParquetDataWriter { private static final Schema SCHEMA = @@ -541,4 +544,50 @@ protected int resolveColumnIndex(Void engineSchema, String columnName) { variantSchema.asStruct(), variantRecords.get(i), writtenRecords.get(i)); } } + + @ParameterizedTest + @ValueSource(strings = {"gzip", "snappy", "zstd", "uncompressed"}) + public void testRowGroupSizeEnforcedWhenCompressionEnabled(String codec) throws IOException { + // 50 MB uncompressed data with 8 MB target; verifies row group splits when + // uncompressed size tracking is enabled + OutputFile file = Files.localOutput(createTempFile(temp)); + + long targetRowGroupSize = 8 * 1024 * 1024; + + DataWriter dataWriter = + Parquet.writeData(file) + .schema(SCHEMA) + .createWriterFunc(GenericParquetWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .set("write.parquet.row-group-size-bytes", String.valueOf(targetRowGroupSize)) + .set("write.parquet.page-size-bytes", "1048576") + .set("write.parquet.compression-codec", codec) + .set("write.parquet.row-group-size-check-uncompressed", "true") + .build(); + + try (dataWriter) { + Random rng = new Random(42); + for (int i = 0; i < 100; i++) { + GenericRecord record = GenericRecord.create(SCHEMA); + record.setField("id", (long) i); + StringBuilder sb = new StringBuilder(512 * 1024); + sb.append("{\"id\":").append(i).append(",\"values\":["); + while (sb.length() < 512 * 1024) { + sb.append(rng.nextInt(100000)).append(','); + } + sb.setCharAt(sb.length() - 1, ']'); + sb.append('}'); + record.setField("data", sb.toString()); + dataWriter.write(record); + } + } + + DataFile dataFile = dataWriter.toDataFile(); + + assertThat(dataFile.recordCount()).as("Record count should match").isEqualTo(100); + assertThat(dataFile.splitOffsets().size()) + .as("Row group count should reflect enforcement of the 8 MB target") + .isGreaterThanOrEqualTo(4); + } } From 56720406e8ef5698ed66cd92bb88e50d483a6dbb Mon Sep 17 00:00:00 2001 From: Neelesh Salian Date: Fri, 15 May 2026 11:24:01 -0700 Subject: [PATCH 2/2] Adding test and rename checkSize method --- .../apache/iceberg/parquet/ParquetWriter.java | 4 +- .../parquet/TestParquetDataWriter.java | 43 +++++++++++++------ 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 660eb1f53e03..dd9d20bc79dc 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -207,7 +207,7 @@ private void checkSize() { if (trackUncompressedSize) { checkSizeUncompressed(); } else { - checkSizeDefault(); + checkSizeCompressed(); } } @@ -227,7 +227,7 @@ private void checkSizeUncompressed() { } } - private void checkSizeDefault() { + private void checkSizeCompressed() { if (recordCount >= nextCheckRecordCount) { long bufferedSize = writeStore.getBufferedSize(); double avgRecordSize = ((double) bufferedSize) / recordCount; diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java index 67c029af9d7f..cec3660c0bd7 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java @@ -548,13 +548,33 @@ protected int resolveColumnIndex(Void engineSchema, String columnName) { @ParameterizedTest @ValueSource(strings = {"gzip", "snappy", "zstd", "uncompressed"}) public void testRowGroupSizeEnforcedWhenCompressionEnabled(String codec) throws IOException { - // 50 MB uncompressed data with 8 MB target; verifies row group splits when - // uncompressed size tracking is enabled + // With uncompressed tracking, row groups split at the configured target + DataFile dataFile = writeCompressibleRecords(codec, true); + + assertThat(dataFile.recordCount()).as("Record count should match").isEqualTo(100); + assertThat(dataFile.splitOffsets().size()) + .as("Row group count should reflect enforcement of the 8 MB target") + .isGreaterThanOrEqualTo(4); + } + + @Test + public void testDefaultPathUsesCompressedSize() throws IOException { + // Without uncompressed tracking, compressed bytes never hit the target + DataFile dataFile = writeCompressibleRecords("gzip", false); + + assertThat(dataFile.splitOffsets().size()) + .as("Default path should use compressed size (single row group due to compression)") + .isEqualTo(1); + } + + // Writes 100 records of 512 KB compressible JSON (~50 MB uncompressed) with an 8 MB target. + private DataFile writeCompressibleRecords(String codec, boolean trackUncompressed) + throws IOException { OutputFile file = Files.localOutput(createTempFile(temp)); long targetRowGroupSize = 8 * 1024 * 1024; - DataWriter dataWriter = + Parquet.DataWriteBuilder builder = Parquet.writeData(file) .schema(SCHEMA) .createWriterFunc(GenericParquetWriter::create) @@ -562,9 +582,13 @@ public void testRowGroupSizeEnforcedWhenCompressionEnabled(String codec) throws .withSpec(PartitionSpec.unpartitioned()) .set("write.parquet.row-group-size-bytes", String.valueOf(targetRowGroupSize)) .set("write.parquet.page-size-bytes", "1048576") - .set("write.parquet.compression-codec", codec) - .set("write.parquet.row-group-size-check-uncompressed", "true") - .build(); + .set("write.parquet.compression-codec", codec); + + if (trackUncompressed) { + builder.set("write.parquet.row-group-size-check-uncompressed", "true"); + } + + DataWriter dataWriter = builder.build(); try (dataWriter) { Random rng = new Random(42); @@ -583,11 +607,6 @@ public void testRowGroupSizeEnforcedWhenCompressionEnabled(String codec) throws } } - DataFile dataFile = dataWriter.toDataFile(); - - assertThat(dataFile.recordCount()).as("Record count should match").isEqualTo(100); - assertThat(dataFile.splitOffsets().size()) - .as("Row group count should reflect enforcement of the 8 MB target") - .isGreaterThanOrEqualTo(4); + return dataWriter.toDataFile(); } }