From c918300a11cf018c21e5a3c42e3424973382c051 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Fri, 15 May 2026 13:58:44 +0700 Subject: [PATCH 1/3] Flink: Add decimal write/read roundtrip test for FlinkParquetReaders Co-Authored-By: Claude Opus 4.7 (1M context) --- .../flink/data/FlinkParquetReaders.java | 1 - .../flink/data/TestFlinkParquetReader.java | 35 +++++++++++++++++++ .../flink/data/FlinkParquetReaders.java | 1 - .../flink/data/TestFlinkParquetReader.java | 35 +++++++++++++++++++ .../flink/data/FlinkParquetReaders.java | 1 - .../flink/data/TestFlinkParquetReader.java | 35 +++++++++++++++++++ 6 files changed, 105 insertions(+), 3 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 3de64aa99865..9627258ea3a5 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -362,7 +362,6 @@ private static class BinaryDecimalReader public DecimalData read(DecimalData ignored) { Binary binary = column.nextBinary(); BigDecimal bigDecimal = new BigDecimal(new BigInteger(binary.getBytes()), scale); - // TODO: need a unit test to write-read-validate decimal via FlinkParquetWrite/Reader return DecimalData.fromBigDecimal(bigDecimal, precision, scale); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 4e8c9f03f84c..f6d0b63d8c47 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.data; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; @@ -221,6 +222,40 @@ public void testTwoLevelList() throws IOException { } } + @Test + public void testDecimalRoundtripWithFlinkWriter() throws IOException { + Schema schema = + new Schema( + required(1, "dec_9_2", Types.DecimalType.of(9, 2)), + required(2, "dec_15_3", Types.DecimalType.of(15, 3)), + required(3, "dec_38_10", Types.DecimalType.of(38, 10))); + + List data = Lists.newArrayList(RandomGenericData.generate(schema, NUM_RECORDS, 19981L)); + + OutputFile output = new InMemoryOutputFile(); + LogicalType logicalType = FlinkSchemaUtil.convert(schema); + try (FileAppender writer = + Parquet.write(output) + .schema(schema) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(logicalType, msgType)) + .build()) { + writer.addAll(RandomRowData.convert(schema, data)); + } + + try (CloseableIterable reader = + Parquet.read(output.toInputFile()) + .project(schema) + .createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema)) + .build()) { + Iterator rows = reader.iterator(); + for (Record expected : data) { + assertThat(rows).hasNext(); + TestHelpers.assertRowData(schema.asStruct(), logicalType, expected, rows.next()); + } + assertThat(rows).isExhausted(); + } + } + private void writeAndValidate( Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 3de64aa99865..9627258ea3a5 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -362,7 +362,6 @@ private static class BinaryDecimalReader public DecimalData read(DecimalData ignored) { Binary binary = column.nextBinary(); BigDecimal bigDecimal = new BigDecimal(new BigInteger(binary.getBytes()), scale); - // TODO: need a unit test to write-read-validate decimal via FlinkParquetWrite/Reader return DecimalData.fromBigDecimal(bigDecimal, precision, scale); } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 4e8c9f03f84c..f6d0b63d8c47 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.data; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; @@ -221,6 +222,40 @@ public void testTwoLevelList() throws IOException { } } + @Test + public void testDecimalRoundtripWithFlinkWriter() throws IOException { + Schema schema = + new Schema( + required(1, "dec_9_2", Types.DecimalType.of(9, 2)), + required(2, "dec_15_3", Types.DecimalType.of(15, 3)), + required(3, "dec_38_10", Types.DecimalType.of(38, 10))); + + List data = Lists.newArrayList(RandomGenericData.generate(schema, NUM_RECORDS, 19981L)); + + OutputFile output = new InMemoryOutputFile(); + LogicalType logicalType = FlinkSchemaUtil.convert(schema); + try (FileAppender writer = + Parquet.write(output) + .schema(schema) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(logicalType, msgType)) + .build()) { + writer.addAll(RandomRowData.convert(schema, data)); + } + + try (CloseableIterable reader = + Parquet.read(output.toInputFile()) + .project(schema) + .createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema)) + .build()) { + Iterator rows = reader.iterator(); + for (Record expected : data) { + assertThat(rows).hasNext(); + TestHelpers.assertRowData(schema.asStruct(), logicalType, expected, rows.next()); + } + assertThat(rows).isExhausted(); + } + } + private void writeAndValidate( Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 0e6856daa67a..49d96d1325fd 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -379,7 +379,6 @@ private static class BinaryDecimalReader public DecimalData read(DecimalData ignored) { Binary binary = column.nextBinary(); BigDecimal bigDecimal = new BigDecimal(new BigInteger(binary.getBytes()), scale); - // TODO: need a unit test to write-read-validate decimal via FlinkParquetWrite/Reader return DecimalData.fromBigDecimal(bigDecimal, precision, scale); } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 006c55d1b8a7..bf6a2a4ab30a 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.data; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; @@ -221,6 +222,40 @@ public void testTwoLevelList() throws IOException { } } + @Test + public void testDecimalRoundtripWithFlinkWriter() throws IOException { + Schema schema = + new Schema( + required(1, "dec_9_2", Types.DecimalType.of(9, 2)), + required(2, "dec_15_3", Types.DecimalType.of(15, 3)), + required(3, "dec_38_10", Types.DecimalType.of(38, 10))); + + List data = Lists.newArrayList(RandomGenericData.generate(schema, NUM_RECORDS, 19981L)); + + OutputFile output = new InMemoryOutputFile(); + LogicalType logicalType = FlinkSchemaUtil.convert(schema); + try (FileAppender writer = + Parquet.write(output) + .schema(schema) + .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(logicalType, msgType)) + .build()) { + writer.addAll(RandomRowData.convert(schema, data)); + } + + try (CloseableIterable reader = + Parquet.read(output.toInputFile()) + .project(schema) + .createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema)) + .build()) { + Iterator rows = reader.iterator(); + for (Record expected : data) { + assertThat(rows).hasNext(); + TestHelpers.assertRowData(schema.asStruct(), logicalType, expected, rows.next()); + } + assertThat(rows).isExhausted(); + } + } + private void writeAndValidate( Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { From 6d2f777850521b3c093e6e9a4ccebbe803e4f4aa Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Fri, 15 May 2026 22:11:25 +0700 Subject: [PATCH 2/3] Data: Add engine write/read decimal roundtrip coverage to BaseFormatModelTests Per review feedback on #16346, instead of a Flink-only decimal Parquet test, add a StructWithDecimals generator to DataGenerators.ALL and a testDataWriterEngineWriteEngineRead case to BaseFormatModelTests. This exercises decimal(9,2)/(15,3)/(38,10) write-read roundtrips (Parquet INT32/INT64/FIXED_LEN_BYTE_ARRAY encodings) for Flink and Spark across Avro, Parquet and ORC, and closes the missing engine-write -> engine-read symmetry in the shared harness. The duplicated TestFlinkParquetReader decimal test is removed from Flink v1.20/v2.0/v2.1; the FlinkParquetReaders TODO stays resolved by the broader shared coverage. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../iceberg/data/BaseFormatModelTests.java | 36 +++++++++++++++++++ .../apache/iceberg/data/DataGenerators.java | 17 ++++++++- .../flink/data/TestFlinkParquetReader.java | 35 ------------------ .../flink/data/TestFlinkParquetReader.java | 35 ------------------ .../flink/data/TestFlinkParquetReader.java | 35 ------------------ 5 files changed, 52 insertions(+), 106 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java index a38b025e0f05..bdff9f27b2cf 100644 --- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java +++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java @@ -272,6 +272,42 @@ void testDataWriterGenericWriteEngineRead(FileFormat fileFormat, DataGenerator d assertEquals(schema, convertToEngineRecords(genericRecords, schema), readRecords); } + /** Write with engine type T, read with engine type T */ + @ParameterizedTest + @FieldSource("FORMAT_AND_GENERATOR") + void testDataWriterEngineWriteEngineRead(FileFormat fileFormat, DataGenerator dataGenerator) + throws IOException { + Schema schema = dataGenerator.schema(); + FileWriterBuilder, Object> writerBuilder = + FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(), encryptedFile); + + DataWriter writer = writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build(); + + List genericRecords = dataGenerator.generateRecords(); + List engineRecords = convertToEngineRecords(genericRecords, schema); + + try (writer) { + engineRecords.forEach(writer::write); + } + + DataFile dataFile = writer.toDataFile(); + + assertThat(dataFile).isNotNull(); + assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size()); + assertThat(dataFile.format()).isEqualTo(fileFormat); + + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); + List readRecords; + try (CloseableIterable reader = + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) + .project(schema) + .build()) { + readRecords = ImmutableList.copyOf(reader); + } + + assertEquals(schema, engineRecords, readRecords); + } + /** Write with engine type T, read with Generic Record */ @ParameterizedTest @FieldSource("FORMAT_AND_GENERATOR") diff --git a/data/src/test/java/org/apache/iceberg/data/DataGenerators.java b/data/src/test/java/org/apache/iceberg/data/DataGenerators.java index 390c0949cb72..7c27ca6c55b8 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataGenerators.java +++ b/data/src/test/java/org/apache/iceberg/data/DataGenerators.java @@ -29,7 +29,8 @@ */ class DataGenerators { - static final DataGenerator[] ALL = new DataGenerator[] {new StructOfPrimitive()}; + static final DataGenerator[] ALL = + new DataGenerator[] {new StructOfPrimitive(), new StructWithDecimals()}; private DataGenerators() {} @@ -50,6 +51,20 @@ public Schema schema() { } } + static class StructWithDecimals implements DataGenerator { + private final Schema schema = + new Schema( + required(1, "row_id", Types.StringType.get()), + required(2, "dec_9_2", Types.DecimalType.of(9, 2)), + required(3, "dec_15_3", Types.DecimalType.of(15, 3)), + required(4, "dec_38_10", Types.DecimalType.of(38, 10))); + + @Override + public Schema schema() { + return schema; + } + } + static class DefaultSchema implements DataGenerator { private final Schema schema = new Schema( diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index f6d0b63d8c47..4e8c9f03f84c 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -19,7 +19,6 @@ package org.apache.iceberg.flink.data; import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; @@ -222,40 +221,6 @@ public void testTwoLevelList() throws IOException { } } - @Test - public void testDecimalRoundtripWithFlinkWriter() throws IOException { - Schema schema = - new Schema( - required(1, "dec_9_2", Types.DecimalType.of(9, 2)), - required(2, "dec_15_3", Types.DecimalType.of(15, 3)), - required(3, "dec_38_10", Types.DecimalType.of(38, 10))); - - List data = Lists.newArrayList(RandomGenericData.generate(schema, NUM_RECORDS, 19981L)); - - OutputFile output = new InMemoryOutputFile(); - LogicalType logicalType = FlinkSchemaUtil.convert(schema); - try (FileAppender writer = - Parquet.write(output) - .schema(schema) - .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(logicalType, msgType)) - .build()) { - writer.addAll(RandomRowData.convert(schema, data)); - } - - try (CloseableIterable reader = - Parquet.read(output.toInputFile()) - .project(schema) - .createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema)) - .build()) { - Iterator rows = reader.iterator(); - for (Record expected : data) { - assertThat(rows).hasNext(); - TestHelpers.assertRowData(schema.asStruct(), logicalType, expected, rows.next()); - } - assertThat(rows).isExhausted(); - } - } - private void writeAndValidate( Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index f6d0b63d8c47..4e8c9f03f84c 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -19,7 +19,6 @@ package org.apache.iceberg.flink.data; import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; @@ -222,40 +221,6 @@ public void testTwoLevelList() throws IOException { } } - @Test - public void testDecimalRoundtripWithFlinkWriter() throws IOException { - Schema schema = - new Schema( - required(1, "dec_9_2", Types.DecimalType.of(9, 2)), - required(2, "dec_15_3", Types.DecimalType.of(15, 3)), - required(3, "dec_38_10", Types.DecimalType.of(38, 10))); - - List data = Lists.newArrayList(RandomGenericData.generate(schema, NUM_RECORDS, 19981L)); - - OutputFile output = new InMemoryOutputFile(); - LogicalType logicalType = FlinkSchemaUtil.convert(schema); - try (FileAppender writer = - Parquet.write(output) - .schema(schema) - .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(logicalType, msgType)) - .build()) { - writer.addAll(RandomRowData.convert(schema, data)); - } - - try (CloseableIterable reader = - Parquet.read(output.toInputFile()) - .project(schema) - .createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema)) - .build()) { - Iterator rows = reader.iterator(); - for (Record expected : data) { - assertThat(rows).hasNext(); - TestHelpers.assertRowData(schema.asStruct(), logicalType, expected, rows.next()); - } - assertThat(rows).isExhausted(); - } - } - private void writeAndValidate( Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index bf6a2a4ab30a..006c55d1b8a7 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -19,7 +19,6 @@ package org.apache.iceberg.flink.data; import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; @@ -222,40 +221,6 @@ public void testTwoLevelList() throws IOException { } } - @Test - public void testDecimalRoundtripWithFlinkWriter() throws IOException { - Schema schema = - new Schema( - required(1, "dec_9_2", Types.DecimalType.of(9, 2)), - required(2, "dec_15_3", Types.DecimalType.of(15, 3)), - required(3, "dec_38_10", Types.DecimalType.of(38, 10))); - - List data = Lists.newArrayList(RandomGenericData.generate(schema, NUM_RECORDS, 19981L)); - - OutputFile output = new InMemoryOutputFile(); - LogicalType logicalType = FlinkSchemaUtil.convert(schema); - try (FileAppender writer = - Parquet.write(output) - .schema(schema) - .createWriterFunc(msgType -> FlinkParquetWriters.buildWriter(logicalType, msgType)) - .build()) { - writer.addAll(RandomRowData.convert(schema, data)); - } - - try (CloseableIterable reader = - Parquet.read(output.toInputFile()) - .project(schema) - .createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema)) - .build()) { - Iterator rows = reader.iterator(); - for (Record expected : data) { - assertThat(rows).hasNext(); - TestHelpers.assertRowData(schema.asStruct(), logicalType, expected, rows.next()); - } - assertThat(rows).isExhausted(); - } - } - private void writeAndValidate( Iterable iterable, Schema writeSchema, Schema expectedSchema) throws IOException { From 5dc4b5d021c10982bad795e6663494d4517c64b4 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Mon, 18 May 2026 17:52:40 +0700 Subject: [PATCH 3/3] Data: Rename decimal test generator to Decimals and drop unused row_id column The generator has no nested struct, so StructWithDecimals was a misleading name; rename it to Decimals. The row_id String column is not needed for the decimal write/read roundtrip and carries no identity semantics, so drop it and renumber the decimal field IDs to 1/2/3. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../java/org/apache/iceberg/data/DataGenerators.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/DataGenerators.java b/data/src/test/java/org/apache/iceberg/data/DataGenerators.java index 7c27ca6c55b8..41c540ca2fa2 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataGenerators.java +++ b/data/src/test/java/org/apache/iceberg/data/DataGenerators.java @@ -29,8 +29,7 @@ */ class DataGenerators { - static final DataGenerator[] ALL = - new DataGenerator[] {new StructOfPrimitive(), new StructWithDecimals()}; + static final DataGenerator[] ALL = new DataGenerator[] {new StructOfPrimitive(), new Decimals()}; private DataGenerators() {} @@ -51,13 +50,12 @@ public Schema schema() { } } - static class StructWithDecimals implements DataGenerator { + static class Decimals implements DataGenerator { private final Schema schema = new Schema( - required(1, "row_id", Types.StringType.get()), - required(2, "dec_9_2", Types.DecimalType.of(9, 2)), - required(3, "dec_15_3", Types.DecimalType.of(15, 3)), - required(4, "dec_38_10", Types.DecimalType.of(38, 10))); + required(1, "dec_9_2", Types.DecimalType.of(9, 2)), + required(2, "dec_15_3", Types.DecimalType.of(15, 3)), + required(3, "dec_38_10", Types.DecimalType.of(38, 10))); @Override public Schema schema() {