diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 021ef95d9122..bfe373e5a0e9 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -143,6 +143,11 @@ private TableProperties() {} public static final String DELETE_PARQUET_PAGE_ROW_LIMIT = "write.delete.parquet.page-row-limit"; public static final int PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20_000; + public static final String PARQUET_ROW_GROUP_ROW_LIMIT = "write.parquet.row-group-row-limit"; + public static final String DELETE_PARQUET_ROW_GROUP_ROW_LIMIT = + "write.delete.parquet.row-group-row-limit"; + public static final int PARQUET_ROW_GROUP_ROW_LIMIT_DEFAULT = Integer.MAX_VALUE; + public static final String PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes"; public static final String DELETE_PARQUET_DICT_SIZE_BYTES = "write.delete.parquet.dict-size-bytes"; diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 17bf1f8ac0a1..5c7aa8bbfce0 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -47,6 +47,7 @@ Iceberg tables support table properties to configure table behavior, like the de | write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size | | write.parquet.page-version | v1 | Parquet data page version: v1 (DataPage V1) or v2 (DataPage V2) | | write.parquet.page-row-limit | 20000 | Parquet page row limit | +| write.parquet.row-group-row-limit | 2147483647 (INT_MAX) | Parquet row group row count limit; row groups are flushed once this many rows have been written | | write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size | | write.parquet.compression-codec | zstd | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed | | write.parquet.compression-level | null | Parquet compression level | diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index f02974d6e79c..a82ae120ecd1 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -26,6 +26,7 @@ import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_VERSION; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_ROW_LIMIT; import static org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX; @@ -49,6 +50,8 @@ import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_ROW_LIMIT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_ROW_LIMIT_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; @@ -365,6 +368,7 @@ public FileAppender build() throws IOException { int rowGroupSize = context.rowGroupSize(); int pageSize = context.pageSize(); int pageRowLimit = context.pageRowLimit(); + int rowGroupRowLimit = context.rowGroupRowLimit(); int dictionaryPageSize = context.dictionaryPageSize(); String compressionLevel = context.compressionLevel(); CompressionCodecName codec = context.codec(); @@ -433,6 +437,7 @@ public FileAppender build() throws IOException { .withWriterVersion(context.writerVersion()) .withPageSize(pageSize) .withPageRowCountLimit(pageRowLimit) + .withRowGroupRowCountLimit(rowGroupRowLimit) .withDictionaryEncoding(dictionaryEnabled) .withDictionaryPageSize(dictionaryPageSize) .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount) @@ -476,6 +481,7 @@ public FileAppender build() throws IOException { .withRowGroupSize((long) rowGroupSize) .withPageSize(pageSize) .withPageRowCountLimit(pageRowLimit) + .withRowGroupRowCountLimit(rowGroupRowLimit) .withDictionaryEncoding(dictionaryEnabled) .withDictionaryPageSize(dictionaryPageSize) .withEncryption(fileEncryptionProperties); @@ -498,6 +504,7 @@ static class Context { private final int rowGroupSize; private final int pageSize; private final int pageRowLimit; + private final int rowGroupRowLimit; private final int dictionaryPageSize; private final WriterVersion writerVersion; private final CompressionCodecName codec; @@ -515,6 +522,7 @@ private Context( int rowGroupSize, int pageSize, int pageRowLimit, + int rowGroupRowLimit, int dictionaryPageSize, WriterVersion writerVersion, CompressionCodecName codec, @@ -530,6 +538,7 @@ private Context( this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; this.pageRowLimit = pageRowLimit; + this.rowGroupRowLimit = rowGroupRowLimit; this.dictionaryPageSize = dictionaryPageSize; this.writerVersion = writerVersion; this.codec = codec; @@ -560,6 +569,11 @@ static Context dataContext(Map config) { config, PARQUET_PAGE_ROW_LIMIT, PARQUET_PAGE_ROW_LIMIT_DEFAULT); Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit must be > 0"); + int rowGroupRowLimit = + PropertyUtil.propertyAsInt( + config, PARQUET_ROW_GROUP_ROW_LIMIT, PARQUET_ROW_GROUP_ROW_LIMIT_DEFAULT); + Preconditions.checkArgument(rowGroupRowLimit > 0, "Row group row count limit must be > 0"); + int dictionaryPageSize = PropertyUtil.propertyAsInt( config, PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT); @@ -619,6 +633,7 @@ static Context dataContext(Map config) { rowGroupSize, pageSize, pageRowLimit, + rowGroupRowLimit, dictionaryPageSize, writerVersion, codec, @@ -652,6 +667,11 @@ static Context deleteContext(Map config) { config, DELETE_PARQUET_PAGE_ROW_LIMIT, dataContext.pageRowLimit()); Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit must be > 0"); + int rowGroupRowLimit = + PropertyUtil.propertyAsInt( + config, DELETE_PARQUET_ROW_GROUP_ROW_LIMIT, dataContext.rowGroupRowLimit()); + Preconditions.checkArgument(rowGroupRowLimit > 0, "Row group row count limit must be > 0"); + int dictionaryPageSize = PropertyUtil.propertyAsInt( config, DELETE_PARQUET_DICT_SIZE_BYTES, dataContext.dictionaryPageSize()); @@ -696,6 +716,7 @@ static Context deleteContext(Map config) { rowGroupSize, pageSize, pageRowLimit, + rowGroupRowLimit, dictionaryPageSize, writerVersion, codec, @@ -739,6 +760,10 @@ int pageRowLimit() { return pageRowLimit; } + int rowGroupRowLimit() { + return rowGroupRowLimit; + } + int dictionaryPageSize() { return dictionaryPageSize; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index 2334e75532be..b9a26a859fca 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -190,6 +190,12 @@ public List splitOffsets() { } private void checkSize() { + // This comparison is cheap, so we don't need the "spacing out checks" logic below. + if (recordCount >= props.getRowGroupRowCountLimit()) { + flushRowGroup(false); + return; + } + if (recordCount >= nextCheckRecordCount) { long bufferedSize = writeStore.getBufferedSize(); double avgRecordSize = ((double) bufferedSize) / recordCount; diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index 5f1e0c83cc0f..0b71e7118f3b 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -23,6 +23,7 @@ import static org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT; +import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_ROW_LIMIT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; import static org.apache.iceberg.parquet.ParquetWritingTestUtils.write; @@ -105,6 +106,38 @@ public void testRowGroupSizeConfigurableWithWriter() throws IOException { } } + @Test + public void testRowGroupRowLimitConfigurable() throws IOException { + Schema schema = new Schema(optional(1, "intCol", IntegerType.get())); + + int recordCount = 25; + int rowGroupRowLimit = 10; + + List records = Lists.newArrayListWithCapacity(recordCount); + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + for (int i = 1; i <= recordCount; i++) { + GenericData.Record record = new GenericData.Record(avroSchema); + record.put("intCol", i); + records.add(record); + } + + File file = createTempFile(temp); + write( + file, + schema, + ImmutableMap.of(PARQUET_ROW_GROUP_ROW_LIMIT, Integer.toString(rowGroupRowLimit)), + ParquetAvroWriter::buildWriter, + records.toArray(new GenericData.Record[] {})); + + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(file)))) { + List blocks = reader.getFooter().getBlocks(); + assertThat(blocks).hasSize(3); + for (BlockMetaData block : blocks) { + assertThat(block.getRowCount()).isLessThanOrEqualTo(rowGroupRowLimit); + } + } + } + @Test public void testMetricsMissingColumnStatisticsInRowGroups() throws IOException { Schema schema = new Schema(optional(1, "stringCol", Types.StringType.get()));