Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ private TableProperties() {}
"write.delete.parquet.row-group-check-max-record-count";
public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000;

public static final String PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED =
"write.parquet.row-group-size-check-uncompressed";
public static final boolean PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED_DEFAULT = false;

public static final String PARQUET_BLOOM_FILTER_MAX_BYTES =
"write.parquet.bloom-filter-max-bytes";
public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024;
Expand Down
1 change: 1 addition & 0 deletions docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.format.default | parquet | Default file format for the table; parquet, avro, or orc |
| write.delete.format.default | data file format | Default delete file format for the table; parquet, avro, or orc |
| write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size |
| write.parquet.row-group-size-check-uncompressed | false | Track uncompressed row group size to enforce the target accurately with compressing codecs |
| write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size |
| write.parquet.page-version | v1 | Parquet data page version: v1 (DataPage V1) or v2 (DataPage V2) |
| write.parquet.page-row-limit | 20000 | Parquet page row limit |
Expand Down
27 changes: 23 additions & 4 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED_DEFAULT;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -372,6 +374,7 @@ public <D> FileAppender<D> build() throws IOException {
int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
boolean dictionaryEnabled = context.dictionaryEnabled();
boolean trackUncompressedRowGroupSize = context.trackUncompressedRowGroupSize();

if (compressionLevel != null) {
switch (codec) {
Expand Down Expand Up @@ -462,7 +465,8 @@ public <D> FileAppender<D> build() throws IOException {
parquetProperties,
metricsConfig,
writeMode,
fileEncryptionProperties);
fileEncryptionProperties,
trackUncompressedRowGroupSize);
} else {
ParquetWriteBuilder<D> parquetWriteBuilder =
new ParquetWriteBuilder<D>(ParquetIO.file(file))
Expand Down Expand Up @@ -510,6 +514,7 @@ static class Context {
private final Map<String, String> columnBloomFilterEnabled;
private final Map<String, String> columnStatsEnabled;
private final boolean dictionaryEnabled;
private final boolean trackUncompressedRowGroupSize;

private Context(
int rowGroupSize,
Expand All @@ -526,7 +531,8 @@ private Context(
Map<String, String> columnBloomFilterNdv,
Map<String, String> columnBloomFilterEnabled,
Map<String, String> columnStatsEnabled,
boolean dictionaryEnabled) {
boolean dictionaryEnabled,
boolean trackUncompressedRowGroupSize) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
this.pageRowLimit = pageRowLimit;
Expand All @@ -542,6 +548,7 @@ private Context(
this.columnBloomFilterEnabled = columnBloomFilterEnabled;
this.columnStatsEnabled = columnStatsEnabled;
this.dictionaryEnabled = dictionaryEnabled;
this.trackUncompressedRowGroupSize = trackUncompressedRowGroupSize;
}

static Context dataContext(Map<String, String> config) {
Expand Down Expand Up @@ -615,6 +622,12 @@ static Context dataContext(Map<String, String> config) {
boolean dictionaryEnabled =
PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true);

boolean trackUncompressedRowGroupSize =
PropertyUtil.propertyAsBoolean(
config,
PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED,
PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED_DEFAULT);

return new Context(
rowGroupSize,
pageSize,
Expand All @@ -630,7 +643,8 @@ static Context dataContext(Map<String, String> config) {
columnBloomFilterNdv,
columnBloomFilterEnabled,
columnStatsEnabled,
dictionaryEnabled);
dictionaryEnabled,
trackUncompressedRowGroupSize);
}

static Context deleteContext(Map<String, String> config) {
Expand Down Expand Up @@ -707,7 +721,8 @@ static Context deleteContext(Map<String, String> config) {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
dictionaryEnabled);
dictionaryEnabled,
dataContext.trackUncompressedRowGroupSize());
}

private static CompressionCodecName toCodec(String codecAsString) {
Expand Down Expand Up @@ -786,6 +801,10 @@ Map<String, String> columnStatsEnabled() {
boolean dictionaryEnabled() {
return dictionaryEnabled;
}

boolean trackUncompressedRowGroupSize() {
return trackUncompressedRowGroupSize;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private final OutputFile output;
private final Configuration conf;
private final InternalFileEncryptor fileEncryptor;
private final boolean trackUncompressedSize;

private ColumnChunkPageWriteStore pageStore = null;
private ColumnWriteStore writeStore;
private long recordCount = 0;
private long nextCheckRecordCount = 10;
private long rowGroupUncompressedSize = 0;
private boolean closed;
private ParquetFileWriter writer;
private int rowGroupOrdinal;
Expand All @@ -84,10 +86,12 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
ParquetProperties properties,
MetricsConfig metricsConfig,
ParquetFileWriter.Mode writeMode,
FileEncryptionProperties encryptionProperties) {
FileEncryptionProperties encryptionProperties,
boolean trackUncompressedSize) {
this.schema = schema;
this.targetRowGroupSize = rowGroupSize;
this.props = properties;
this.trackUncompressedSize = trackUncompressedSize;
this.metadata = ImmutableMap.copyOf(metadata);
this.compressor =
new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec);
Expand Down Expand Up @@ -135,11 +139,21 @@ private void ensureWriterInitialized() {
@Override
public void add(T value) {
recordCount += 1;
model.write(0, value);
if (trackUncompressedSize) {
writeTracked(value);
} else {
model.write(0, value);
}
writeStore.endRecord();
checkSize();
}

private void writeTracked(T value) {
long sizeBefore = writeStore.getBufferedSize();
model.write(0, value);
rowGroupUncompressedSize += writeStore.getBufferedSize() - sizeBefore;
}

@Override
public Metrics metrics() {
Preconditions.checkState(closed, "Cannot return metrics for unclosed writer");
Expand Down Expand Up @@ -190,6 +204,30 @@ public List<Long> splitOffsets() {
}

private void checkSize() {
if (trackUncompressedSize) {
checkSizeUncompressed();
} else {
checkSizeCompressed();
}
}

private void checkSizeUncompressed() {
if (rowGroupUncompressedSize >= targetRowGroupSize) {
flushRowGroup(false);
} else if (recordCount >= nextCheckRecordCount) {
double avgRecordSize = ((double) rowGroupUncompressedSize) / recordCount;
if (rowGroupUncompressedSize > (targetRowGroupSize - 2 * avgRecordSize)) {
flushRowGroup(false);
} else {
long remainingSpace = targetRowGroupSize - rowGroupUncompressedSize;
long remainingRecords = (long) (remainingSpace / avgRecordSize);
this.nextCheckRecordCount =
recordCount + Math.min(remainingRecords / 2, props.getMaxRowCountForPageSizeCheck());
}
}
}

private void checkSizeCompressed() {
if (recordCount >= nextCheckRecordCount) {
long bufferedSize = writeStore.getBufferedSize();
double avgRecordSize = ((double) bufferedSize) / recordCount;
Expand Down Expand Up @@ -234,6 +272,7 @@ private void startRowGroup() {
Math.max(recordCount / 2, props.getMinRowCountForPageSizeCheck()),
props.getMaxRowCountForPageSizeCheck());
this.recordCount = 0;
this.rowGroupUncompressedSize = 0;

this.pageStore =
new ColumnChunkPageWriteStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -66,6 +67,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestParquetDataWriter {
private static final Schema SCHEMA =
Expand Down Expand Up @@ -541,4 +544,69 @@ protected int resolveColumnIndex(Void engineSchema, String columnName) {
variantSchema.asStruct(), variantRecords.get(i), writtenRecords.get(i));
}
}

@ParameterizedTest
@ValueSource(strings = {"gzip", "snappy", "zstd", "uncompressed"})
public void testRowGroupSizeEnforcedWhenCompressionEnabled(String codec) throws IOException {
Comment thread
nssalian marked this conversation as resolved.
// With uncompressed tracking, row groups split at the configured target
DataFile dataFile = writeCompressibleRecords(codec, true);

assertThat(dataFile.recordCount()).as("Record count should match").isEqualTo(100);
assertThat(dataFile.splitOffsets().size())
.as("Row group count should reflect enforcement of the 8 MB target")
.isGreaterThanOrEqualTo(4);
}

@Test
public void testDefaultPathUsesCompressedSize() throws IOException {
// Without uncompressed tracking, compressed bytes never hit the target
DataFile dataFile = writeCompressibleRecords("gzip", false);

assertThat(dataFile.splitOffsets().size())
.as("Default path should use compressed size (single row group due to compression)")
.isEqualTo(1);
}

// Writes 100 records of 512 KB compressible JSON (~50 MB uncompressed) with an 8 MB target.
private DataFile writeCompressibleRecords(String codec, boolean trackUncompressed)
throws IOException {
OutputFile file = Files.localOutput(createTempFile(temp));

long targetRowGroupSize = 8 * 1024 * 1024;

Parquet.DataWriteBuilder builder =
Parquet.writeData(file)
.schema(SCHEMA)
.createWriterFunc(GenericParquetWriter::create)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.set("write.parquet.row-group-size-bytes", String.valueOf(targetRowGroupSize))
.set("write.parquet.page-size-bytes", "1048576")
.set("write.parquet.compression-codec", codec);

if (trackUncompressed) {
builder.set("write.parquet.row-group-size-check-uncompressed", "true");
}

DataWriter<Record> dataWriter = builder.build();

try (dataWriter) {
Random rng = new Random(42);
for (int i = 0; i < 100; i++) {
GenericRecord record = GenericRecord.create(SCHEMA);
record.setField("id", (long) i);
StringBuilder sb = new StringBuilder(512 * 1024);
sb.append("{\"id\":").append(i).append(",\"values\":[");
while (sb.length() < 512 * 1024) {
sb.append(rng.nextInt(100000)).append(',');
}
sb.setCharAt(sb.length() - 1, ']');
sb.append('}');
record.setField("data", sb.toString());
dataWriter.write(record);
}
}

return dataWriter.toDataFile();
}
}