diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 2334e75532be..967a28ec1169 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -64,6 +64,7 @@ class ParquetWriter implements FileAppender, Closeable { private ColumnWriteStore writeStore; private long recordCount = 0; private long nextCheckRecordCount = 10; + private long uncompressedBufferedSize = 0; private boolean closed; private ParquetFileWriter writer; private int rowGroupOrdinal; @@ -135,7 +136,9 @@ private void ensureWriterInitialized() { @Override public void add(T value) { recordCount += 1; + long sizeBefore = writeStore.getBufferedSize(); model.write(0, value); + uncompressedBufferedSize += writeStore.getBufferedSize() - sizeBefore; writeStore.endRecord(); checkSize(); } @@ -191,7 +194,7 @@ public List splitOffsets() { private void checkSize() { if (recordCount >= nextCheckRecordCount) { - long bufferedSize = writeStore.getBufferedSize(); + long bufferedSize = uncompressedBufferedSize; double avgRecordSize = ((double) bufferedSize) / recordCount; if (bufferedSize > (targetRowGroupSize - 2 * avgRecordSize)) { @@ -234,6 +237,7 @@ private void startRowGroup() { Math.max(recordCount / 2, props.getMinRowCountForPageSizeCheck()), props.getMaxRowCountForPageSizeCheck()); this.recordCount = 0; + this.uncompressedBufferedSize = 0; this.pageStore = new ColumnChunkPageWriteStore( diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index 5f1e0c83cc0f..f67015b6149e 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.iceberg.Files.localInput; import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; +import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; @@ -338,6 +339,50 @@ public void testAvroWriterRejectsVariantType() { .hasMessage("Avro writer does not support variant types"); } + @Test + public void testRowGroupSizeEnforcedWithCompression() throws IOException { + // Regression test for #16325: with compression enabled, row group size check used compressed + // bytes, causing row groups to grow unbounded. + Schema schema = new Schema(optional(1, "stringCol", Types.StringType.get())); + + File file = createTempFile(temp); + + // Write data that is highly compressible but unique enough to avoid dictionary encoding + // from collapsing it. Each record has a unique prefix + repeated padding. + int rowGroupSize = 64 * 1024; // 64 KB + int pageSize = 4 * 1024; // 4 KB page size to force frequent page flushes + int numRecords = 500; + + List records = Lists.newArrayListWithCapacity(numRecords); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 0; i < numRecords; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + // Unique prefix ensures no dictionary, repeated suffix ensures high compression + record.put("stringCol", String.format("%010d", i) + Strings.repeat("a", 1014)); + records.add(record); + } + + write( + file, + schema, + ImmutableMap.builder() + .put(PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(rowGroupSize)) + .put(PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT, "1") + .put(PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT, "100") + .put(PARQUET_COMPRESSION, "gzip") + .put("write.parquet.page-size-bytes", Integer.toString(pageSize)) + .buildOrThrow(), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {})); + + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { + List rowGroups = reader.getRowGroups(); + // With 500 KB of uncompressed data and a 64 KB row group target, + // we must get multiple row groups + assertThat(rowGroups).hasSizeGreaterThan(1); + } + } + private Pair generateFile( Function> createWriterFunc, int desiredRecordCount,