Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ private TableProperties() {}
"write.delete.parquet.row-group-check-max-record-count";
public static final int PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT = 10000;

public static final String PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED =
"write.parquet.row-group-size-check-uncompressed";
public static final boolean PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED_DEFAULT = false;

public static final String PARQUET_BLOOM_FILTER_MAX_BYTES =
"write.parquet.bloom-filter-max-bytes";
public static final int PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024;
Expand Down
1 change: 1 addition & 0 deletions docs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.format.default | parquet | Default file format for the table; parquet, avro, or orc |
| write.delete.format.default | data file format | Default delete file format for the table; parquet, avro, or orc |
| write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size |
| write.parquet.row-group-size-check-uncompressed | false | Track uncompressed row group size to enforce the target accurately with compressing codecs |
| write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size |
| write.parquet.page-version | v1 | Parquet data page version: v1 (DataPage V1) or v2 (DataPage V2) |
| write.parquet.page-row-limit | 20000 | Parquet page row limit |
Expand Down
27 changes: 23 additions & 4 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED_DEFAULT;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -372,6 +374,7 @@ public <D> FileAppender<D> build() throws IOException {
int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
boolean dictionaryEnabled = context.dictionaryEnabled();
boolean trackUncompressedRowGroupSize = context.trackUncompressedRowGroupSize();

if (compressionLevel != null) {
switch (codec) {
Expand Down Expand Up @@ -462,7 +465,8 @@ public <D> FileAppender<D> build() throws IOException {
parquetProperties,
metricsConfig,
writeMode,
fileEncryptionProperties);
fileEncryptionProperties,
trackUncompressedRowGroupSize);
} else {
ParquetWriteBuilder<D> parquetWriteBuilder =
new ParquetWriteBuilder<D>(ParquetIO.file(file))
Expand Down Expand Up @@ -510,6 +514,7 @@ static class Context {
private final Map<String, String> columnBloomFilterEnabled;
private final Map<String, String> columnStatsEnabled;
private final boolean dictionaryEnabled;
private final boolean trackUncompressedRowGroupSize;

private Context(
int rowGroupSize,
Expand All @@ -526,7 +531,8 @@ private Context(
Map<String, String> columnBloomFilterNdv,
Map<String, String> columnBloomFilterEnabled,
Map<String, String> columnStatsEnabled,
boolean dictionaryEnabled) {
boolean dictionaryEnabled,
boolean trackUncompressedRowGroupSize) {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
this.pageRowLimit = pageRowLimit;
Expand All @@ -542,6 +548,7 @@ private Context(
this.columnBloomFilterEnabled = columnBloomFilterEnabled;
this.columnStatsEnabled = columnStatsEnabled;
this.dictionaryEnabled = dictionaryEnabled;
this.trackUncompressedRowGroupSize = trackUncompressedRowGroupSize;
}

static Context dataContext(Map<String, String> config) {
Expand Down Expand Up @@ -615,6 +622,12 @@ static Context dataContext(Map<String, String> config) {
boolean dictionaryEnabled =
PropertyUtil.propertyAsBoolean(config, ParquetOutputFormat.ENABLE_DICTIONARY, true);

boolean trackUncompressedRowGroupSize =
PropertyUtil.propertyAsBoolean(
config,
PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED,
PARQUET_ROW_GROUP_SIZE_CHECK_UNCOMPRESSED_DEFAULT);

return new Context(
rowGroupSize,
pageSize,
Expand All @@ -630,7 +643,8 @@ static Context dataContext(Map<String, String> config) {
columnBloomFilterNdv,
columnBloomFilterEnabled,
columnStatsEnabled,
dictionaryEnabled);
dictionaryEnabled,
trackUncompressedRowGroupSize);
}

static Context deleteContext(Map<String, String> config) {
Expand Down Expand Up @@ -707,7 +721,8 @@ static Context deleteContext(Map<String, String> config) {
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
dictionaryEnabled);
dictionaryEnabled,
dataContext.trackUncompressedRowGroupSize());
}

private static CompressionCodecName toCodec(String codecAsString) {
Expand Down Expand Up @@ -786,6 +801,10 @@ Map<String, String> columnStatsEnabled() {
boolean dictionaryEnabled() {
return dictionaryEnabled;
}

boolean trackUncompressedRowGroupSize() {
return trackUncompressedRowGroupSize;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private final OutputFile output;
private final Configuration conf;
private final InternalFileEncryptor fileEncryptor;
private final boolean trackUncompressedSize;

private ColumnChunkPageWriteStore pageStore = null;
private ColumnWriteStore writeStore;
private long recordCount = 0;
private long nextCheckRecordCount = 10;
private long rowGroupUncompressedSize = 0;
private boolean closed;
private ParquetFileWriter writer;
private int rowGroupOrdinal;
Expand All @@ -84,10 +86,12 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
ParquetProperties properties,
MetricsConfig metricsConfig,
ParquetFileWriter.Mode writeMode,
FileEncryptionProperties encryptionProperties) {
FileEncryptionProperties encryptionProperties,
boolean trackUncompressedSize) {
this.schema = schema;
this.targetRowGroupSize = rowGroupSize;
this.props = properties;
this.trackUncompressedSize = trackUncompressedSize;
this.metadata = ImmutableMap.copyOf(metadata);
this.compressor =
new ParquetCodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec);
Expand Down Expand Up @@ -135,11 +139,21 @@ private void ensureWriterInitialized() {
@Override
public void add(T value) {
recordCount += 1;
model.write(0, value);
if (trackUncompressedSize) {
writeTracked(value);
} else {
model.write(0, value);
}
writeStore.endRecord();
checkSize();
}

private void writeTracked(T value) {
long sizeBefore = writeStore.getBufferedSize();
model.write(0, value);
rowGroupUncompressedSize += writeStore.getBufferedSize() - sizeBefore;
}

@Override
public Metrics metrics() {
Preconditions.checkState(closed, "Cannot return metrics for unclosed writer");
Expand Down Expand Up @@ -190,6 +204,30 @@ public List<Long> splitOffsets() {
}

private void checkSize() {
if (trackUncompressedSize) {
checkSizeUncompressed();
} else {
checkSizeDefault();
}
}

private void checkSizeUncompressed() {
if (rowGroupUncompressedSize >= targetRowGroupSize) {
flushRowGroup(false);
} else if (recordCount >= nextCheckRecordCount) {
double avgRecordSize = ((double) rowGroupUncompressedSize) / recordCount;
if (rowGroupUncompressedSize > (targetRowGroupSize - 2 * avgRecordSize)) {
flushRowGroup(false);
} else {
long remainingSpace = targetRowGroupSize - rowGroupUncompressedSize;
long remainingRecords = (long) (remainingSpace / avgRecordSize);
this.nextCheckRecordCount =
recordCount + Math.min(remainingRecords / 2, props.getMaxRowCountForPageSizeCheck());
}
}
}

private void checkSizeDefault() {
Comment thread
nssalian marked this conversation as resolved.
Outdated
if (recordCount >= nextCheckRecordCount) {
long bufferedSize = writeStore.getBufferedSize();
double avgRecordSize = ((double) bufferedSize) / recordCount;
Expand Down Expand Up @@ -234,6 +272,7 @@ private void startRowGroup() {
Math.max(recordCount / 2, props.getMinRowCountForPageSizeCheck()),
props.getMaxRowCountForPageSizeCheck());
this.recordCount = 0;
this.rowGroupUncompressedSize = 0;

this.pageStore =
new ColumnChunkPageWriteStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -66,6 +67,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestParquetDataWriter {
private static final Schema SCHEMA =
Expand Down Expand Up @@ -541,4 +544,50 @@ protected int resolveColumnIndex(Void engineSchema, String columnName) {
variantSchema.asStruct(), variantRecords.get(i), writtenRecords.get(i));
}
}

@ParameterizedTest
@ValueSource(strings = {"gzip", "snappy", "zstd", "uncompressed"})
public void testRowGroupSizeEnforcedWhenCompressionEnabled(String codec) throws IOException {
Comment thread
nssalian marked this conversation as resolved.
// 50 MB uncompressed data with 8 MB target; verifies row group splits when
// uncompressed size tracking is enabled
OutputFile file = Files.localOutput(createTempFile(temp));

long targetRowGroupSize = 8 * 1024 * 1024;

DataWriter<Record> dataWriter =
Parquet.writeData(file)
.schema(SCHEMA)
.createWriterFunc(GenericParquetWriter::create)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.set("write.parquet.row-group-size-bytes", String.valueOf(targetRowGroupSize))
.set("write.parquet.page-size-bytes", "1048576")
.set("write.parquet.compression-codec", codec)
.set("write.parquet.row-group-size-check-uncompressed", "true")
.build();

try (dataWriter) {
Random rng = new Random(42);
for (int i = 0; i < 100; i++) {
GenericRecord record = GenericRecord.create(SCHEMA);
record.setField("id", (long) i);
StringBuilder sb = new StringBuilder(512 * 1024);
sb.append("{\"id\":").append(i).append(",\"values\":[");
while (sb.length() < 512 * 1024) {
sb.append(rng.nextInt(100000)).append(',');
}
sb.setCharAt(sb.length() - 1, ']');
sb.append('}');
record.setField("data", sb.toString());
dataWriter.write(record);
}
}

DataFile dataFile = dataWriter.toDataFile();

assertThat(dataFile.recordCount()).as("Record count should match").isEqualTo(100);
assertThat(dataFile.splitOffsets().size())
.as("Row group count should reflect enforcement of the 8 MB target")
.isGreaterThanOrEqualTo(4);
}
}