From 7eb18dece304f51d71c4f05c274871657ded02c1 Mon Sep 17 00:00:00 2001 From: selchen_LinkedIn Date: Tue, 5 May 2026 14:16:55 -0700 Subject: [PATCH 1/6] Backport ExtendedV2Writes rule for Spark 3.1 --- .../IcebergSparkSessionExtensions.scala | 6 + .../datasources/v2/ExtendedV2Writes.scala | 80 +++ .../TestRequiredDistributionAndOrdering.java | 491 ++++++++++++++++++ 3 files changed, 577 insertions(+) create mode 100644 spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala create mode 100644 spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala index 30b5df5317..5460e83d21 100644 --- a/spark/v3.1/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala +++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.optimizer.RewriteMergeInto import org.apache.spark.sql.catalyst.optimizer.RewriteUpdate import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy +import org.apache.spark.sql.execution.datasources.v2.ExtendedV2Writes class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { @@ -51,6 +52,11 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { extensions.injectOptimizerRule { spark => RewriteUpdate(spark) } extensions.injectOptimizerRule { spark => RewriteMergeInto(spark) } + // pre-CBO extensions + // attach the Iceberg table's required distribution and ordering to V2 writes after + // the optimizer has resolved the write target but before physical planning + extensions.injectPreCBORule { _ => ExtendedV2Writes } + // planner extensions extensions.injectPlannerStrategy { spark => ExtendedDataSourceV2Strategy(spark) } } diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala new file mode 100644 index 0000000000..4501f05d3b --- /dev/null +++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.plans.logical.AppendData +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.OverwriteByExpression +import org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic +import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression +import org.apache.spark.sql.catalyst.plans.logical.Sort +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils +import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation + +/** + * Backport of the Spark 3.2 V2Writes idea to the v3.1 logical plan API. In Spark 3.1 + * AppendData/OverwriteByExpression/OverwritePartitionsDynamic do not carry a Write, so the + * required distribution and ordering are looked up from the Iceberg table directly via + * Spark3Util — the same helpers RewriteRowLevelOperationHelper.buildWritePlan uses. + * + * v3.1-only path: no Write is built here; ExtendedDataSourceV2Strategy still constructs the + * Write at physical planning. This rule only attaches the required distribution and ordering + * to the query feeding the write. + * + * Row-level commands (MERGE/UPDATE/DELETE) are intentionally skipped: their rewriters already + * call buildWritePlan, which prepares the query before constructing AppendData/ReplaceData. The + * Sort / RepartitionByExpression guard avoids double-wrapping that already-prepared query (and + * also respects an explicit ORDER BY / DISTRIBUTE BY in a user-issued INSERT). + */ +object ExtendedV2Writes extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { + case a @ AppendData(r: DataSourceV2Relation, query, _, _) + if isIcebergRelation(r) && !alreadyPrepared(query) => + a.withNewQuery(prepareQuery(r, query)) + + case o @ OverwriteByExpression(r: DataSourceV2Relation, _, query, _, _) + if isIcebergRelation(r) && !alreadyPrepared(query) => + o.withNewQuery(prepareQuery(r, query)) + + case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _, _) + if isIcebergRelation(r) && !alreadyPrepared(query) => + o.withNewQuery(prepareQuery(r, query)) + } + + // DistributionAndOrderingUtils.prepareQuery wraps its input in either a RepartitionByExpression + // with no explicit numPartitions, or a non-global Sort over such a RepartitionByExpression. Only + // those exact shapes are treated as already-prepared so that user code (coalesce, repartition(N), + // global ORDER BY, sortWithinPartitions on a non-shuffled source) is left untouched. + private def alreadyPrepared(query: LogicalPlan): Boolean = query match { + case Sort(_, false, RepartitionByExpression(_, _, None)) => true + case RepartitionByExpression(_, _, None) => true + case _ => false + } + + private def prepareQuery(r: DataSourceV2Relation, query: LogicalPlan): LogicalPlan = { + val icebergTable = Spark3Util.toIcebergTable(r.table) + val distribution = Spark3Util.buildRequiredDistribution(icebergTable) + val ordering = Spark3Util.buildRequiredOrdering(distribution, icebergTable) + DistributionAndOrderingUtils.prepareQuery(distribution, ordering, query, conf) + } +} diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java new file mode 100644 index 0000000000..89280430e9 --- /dev/null +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.spark.source.ThreeColumnRecord; +import org.apache.spark.SparkException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestRequiredDistributionAndOrdering extends SparkExtensionsTestBase { + + public TestRequiredDistributionAndOrdering( + String catalogName, String implementation, Map config) { + super(catalogName, implementation, config); + } + + @After + public void dropTestTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testDefaultLocalSortWithBucketTransforms() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(2, c1))", + tableName); + + List data = + ImmutableList.of( + new ThreeColumnRecord(1, null, "A"), + new ThreeColumnRecord(2, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(3, "BBBBBBBBBB", "A"), + new ThreeColumnRecord(4, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(5, "BBBBBBBBBB", "A"), + new ThreeColumnRecord(6, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(7, "BBBBBBBBBB", "A")); + Dataset ds = spark.createDataFrame(data, ThreeColumnRecord.class); + Dataset inputDF = ds.coalesce(1).sortWithinPartitions("c1"); + + // should insert a local sort by partition columns by default + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(7L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + @Test + public void testPartitionColumnsArePrependedForRangeDistribution() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(2, c1))", + tableName); + + List data = + ImmutableList.of( + new ThreeColumnRecord(1, null, "A"), + new ThreeColumnRecord(2, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(3, "BBBBBBBBBB", "A"), + new ThreeColumnRecord(4, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(5, "BBBBBBBBBB", "A"), + new ThreeColumnRecord(6, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(7, "BBBBBBBBBB", "A")); + Dataset ds = spark.createDataFrame(data, ThreeColumnRecord.class); + Dataset inputDF = ds.coalesce(1).sortWithinPartitions("c1"); + + // should automatically prepend partition columns to the ordering + sql("ALTER TABLE %s WRITE ORDERED BY c1, c2", tableName); + + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(7L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + @Test + public void testSortOrderIncludesPartitionColumns() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(2, c1))", + tableName); + + List data = + ImmutableList.of( + new ThreeColumnRecord(1, null, "A"), + new ThreeColumnRecord(2, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(3, "BBBBBBBBBB", "A"), + new ThreeColumnRecord(4, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(5, "BBBBBBBBBB", "A"), + new ThreeColumnRecord(6, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(7, "BBBBBBBBBB", "A")); + Dataset ds = spark.createDataFrame(data, ThreeColumnRecord.class); + Dataset inputDF = ds.coalesce(1).sortWithinPartitions("c1"); + + // should succeed with a correct sort order + sql("ALTER TABLE %s WRITE ORDERED BY bucket(2, c3), c1, c2", tableName); + + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(7L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + @Test + public void testHashDistributionOnBucketedColumn() throws NoSuchTableException { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(2, c1))", + tableName); + + List data = + ImmutableList.of( + new ThreeColumnRecord(1, null, "A"), + new ThreeColumnRecord(2, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(3, "BBBBBBBBBB", "A"), + new ThreeColumnRecord(4, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(5, "BBBBBBBBBB", "A"), + new ThreeColumnRecord(6, "BBBBBBBBBB", "B"), + new ThreeColumnRecord(7, "BBBBBBBBBB", "A")); + Dataset ds = spark.createDataFrame(data, ThreeColumnRecord.class); + Dataset inputDF = ds.coalesce(1).sortWithinPartitions("c1"); + + // should automatically prepend partition columns to the local ordering after hash distribution + sql("ALTER TABLE %s WRITE DISTRIBUTED BY PARTITION ORDERED BY c1, c2", tableName); + + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(7L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + @Test + public void testDefaultSortOnDecimalBucketedColumn() { + sql( + "CREATE TABLE %s (c1 INT, c2 DECIMAL(20, 2)) " + + "USING iceberg " + + "PARTITIONED BY (bucket(2, c2))", + tableName); + + sql("INSERT INTO %s VALUES (1, 20.2), (2, 40.2), (3, 60.2)", tableName); + + List expected = + ImmutableList.of( + row(1, new BigDecimal("20.20")), + row(2, new BigDecimal("40.20")), + row(3, new BigDecimal("60.20"))); + + assertEquals("Rows must match", expected, sql("SELECT * FROM %s ORDER BY c1", tableName)); + } + + @Test + public void testDefaultSortOnStringBucketedColumn() { + sql( + "CREATE TABLE %s (c1 INT, c2 STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(2, c2))", + tableName); + + sql("INSERT INTO %s VALUES (1, 'A'), (2, 'B')", tableName); + + List expected = ImmutableList.of(row(1, "A"), row(2, "B")); + + assertEquals("Rows must match", expected, sql("SELECT * FROM %s ORDER BY c1", tableName)); + } + + @Test + public void testDefaultSortOnDecimalTruncatedColumn() { + sql( + "CREATE TABLE %s (c1 INT, c2 DECIMAL(20, 2)) " + + "USING iceberg " + + "PARTITIONED BY (truncate(2, c2))", + tableName); + + sql("INSERT INTO %s VALUES (1, 20.2), (2, 40.2)", tableName); + + List expected = + ImmutableList.of(row(1, new BigDecimal("20.20")), row(2, new BigDecimal("40.20"))); + + assertEquals("Rows must match", expected, sql("SELECT * FROM %s ORDER BY c1", tableName)); + } + + @Test + public void testDefaultSortOnLongTruncatedColumn() { + sql( + "CREATE TABLE %s (c1 INT, c2 BIGINT) " + + "USING iceberg " + + "PARTITIONED BY (truncate(2, c2))", + tableName); + + sql("INSERT INTO %s VALUES (1, 22222222222222), (2, 444444444444)", tableName); + + List expected = ImmutableList.of(row(1, 22222222222222L), row(2, 444444444444L)); + + assertEquals("Rows must match", expected, sql("SELECT * FROM %s ORDER BY c1", tableName)); + } + + // testRangeDistributionWithQuotedColumnNames from the v3.2 suite is intentionally omitted: + // v3.1 SortOrderToSpark passes raw column names through Expressions.column, which can't parse + // dotted identifiers. v3.2 fixed this by indexing schema-quoted names; that fix is out of scope + // for this backport. + + // The tests below feed deliberately unclustered input to the ClusteredDataWriter. Without the + // ExtendedV2Writes rule the writer would reject the data with + // "Incoming records violate the writer assumption that records are clustered by spec and by + // partition" — succeeding here confirms the rule re-clusters using the table's configured + // distribution and ordering. + + @Test + public void testHashDistributionModeViaTableProperty() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertEquals( + "Distribution mode must be hash", + "hash", + table.properties().get(TableProperties.WRITE_DISTRIBUTION_MODE)); + + Dataset inputDF = unclusteredInput(); + + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(20L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + @Test + public void testRangeDistributionModeViaSortOrder() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + + // WRITE ORDERED BY implicitly sets the distribution mode to range + sql("ALTER TABLE %s WRITE ORDERED BY category, id", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertEquals( + "Distribution mode must be range", + "range", + table.properties().get(TableProperties.WRITE_DISTRIBUTION_MODE)); + SortOrder expectedOrder = + SortOrder.builderFor(table.schema()) + .withOrderId(1) + .asc("category", NullOrder.NULLS_FIRST) + .asc("id", NullOrder.NULLS_FIRST) + .build(); + Assert.assertEquals("Sort order must match", expectedOrder, table.sortOrder()); + + Dataset inputDF = unclusteredInput(); + + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(20L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + @Test + public void testHashDistributionWithExplicitSortOrder() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + + sql("ALTER TABLE %s WRITE DISTRIBUTED BY PARTITION ORDERED BY category", tableName); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertEquals( + "Distribution mode must be hash", + "hash", + table.properties().get(TableProperties.WRITE_DISTRIBUTION_MODE)); + + Dataset inputDF = unclusteredInput(); + + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(20L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + @Test + public void testNoneDistributionModeViaTableProperty() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='none')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertEquals( + "Distribution mode must be none", + "none", + table.properties().get(TableProperties.WRITE_DISTRIBUTION_MODE)); + + Dataset inputDF = unclusteredInput(); + + // `none` skips the repartition, but the required ordering is computed independently and + // SortOrderUtil.buildSortOrder synthesizes one from the partition fields for any partitioned + // table (it only returns unsorted when the table is also unpartitioned). The rule therefore + // attaches a local sort by the partition columns, which clusters rows within each Spark task + // and lets the ClusteredDataWriter succeed without a cross-task shuffle. + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(20L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + @Test + public void testRangeDistributionModeViaTableProperty() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='range')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + Table table = validationCatalog.loadTable(tableIdent); + Assert.assertEquals( + "Distribution mode must be range", + "range", + table.properties().get(TableProperties.WRITE_DISTRIBUTION_MODE)); + + Dataset inputDF = unclusteredInput(); + + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(20L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + // The tests below disable ExtendedV2Writes so the same unclustered input reaches the + // ClusteredDataWriter without any rule-injected repartition or local sort. Each one asserts + // the writer rejects it — establishing the pre-rule baseline that the positive tests above + // are claiming to fix. + + @Test + public void testNoneDistributionFailsWithoutRule() { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='none')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + assertWriterRejectsUnclusteredInput(); + } + + @Test + public void testHashDistributionFailsWithoutRule() { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + assertWriterRejectsUnclusteredInput(); + } + + @Test + public void testRangeDistributionFailsWithoutRule() { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='range')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + assertWriterRejectsUnclusteredInput(); + } + + private void assertWriterRejectsUnclusteredInput() { + Dataset inputDF = unclusteredInput(); + spark + .conf() + .set( + "spark.sql.optimizer.excludedRules", + "org.apache.spark.sql.execution.datasources.v2.ExtendedV2Writes"); + try { + Assertions.assertThatThrownBy(() -> inputDF.writeTo(tableName).append()) + .as( + "ClusteredDataWriter should reject unclustered input when ExtendedV2Writes is disabled") + .isInstanceOf(SparkException.class) + .hasStackTraceContaining("Incoming records violate the writer assumption"); + } finally { + spark.conf().unset("spark.sql.optimizer.excludedRules"); + } + } + + // Builds a 20-row dataset spread across all 4 buckets and randomly shuffled across 4 Spark + // partitions, so each task sees rows for multiple buckets — the worst-case input for a + // ClusteredDataWriter without re-clustering. + private Dataset unclusteredInput() { + List data = + ImmutableList.of( + new ThreeColumnRecord(0, "B", "d0"), + new ThreeColumnRecord(1, "A", "d1"), + new ThreeColumnRecord(2, "C", "d2"), + new ThreeColumnRecord(3, "B", "d3"), + new ThreeColumnRecord(4, "A", "d4"), + new ThreeColumnRecord(5, "C", "d5"), + new ThreeColumnRecord(6, "B", "d6"), + new ThreeColumnRecord(7, "A", "d7"), + new ThreeColumnRecord(8, "C", "d8"), + new ThreeColumnRecord(9, "B", "d9"), + new ThreeColumnRecord(10, "A", "d10"), + new ThreeColumnRecord(11, "C", "d11"), + new ThreeColumnRecord(12, "B", "d12"), + new ThreeColumnRecord(13, "A", "d13"), + new ThreeColumnRecord(14, "C", "d14"), + new ThreeColumnRecord(15, "B", "d15"), + new ThreeColumnRecord(16, "A", "d16"), + new ThreeColumnRecord(17, "C", "d17"), + new ThreeColumnRecord(18, "B", "d18"), + new ThreeColumnRecord(19, "A", "d19")); + return spark + .createDataFrame(data, ThreeColumnRecord.class) + .selectExpr("c1 AS id", "c2 AS category", "c3 AS data") + .repartition(4); + } +} From 77698241334619e70e8a718bc665e93ff587e4d4 Mon Sep 17 00:00:00 2001 From: selchen_LinkedIn Date: Tue, 5 May 2026 22:32:30 -0700 Subject: [PATCH 2/6] Add tests for edge cases --- .../TestRequiredDistributionAndOrdering.java | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java index 89280430e9..12baefa171 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java @@ -457,6 +457,160 @@ private void assertWriterRejectsUnclusteredInput() { } } + // The rule still wraps the query in RepartitionByExpression + Sort; both must + // handle a zero-row plan cleanly without producing a failed snapshot. + @Test + public void testEmptyInputWithHashDistribution() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + Dataset emptyDF = unclusteredInput().where("1 = 0"); + emptyDF.writeTo(tableName).append(); + + assertEquals( + "Row count must be zero", + ImmutableList.of(row(0L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + // Tests that catalyst-level handling of nulls in the local sort and (for hash mode) the cluster + // expression doesn't break. + @Test + public void testNullPartitionValuesWithHashDistribution() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (category)", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + List data = + ImmutableList.of( + new ThreeColumnRecord(1, null, "d1"), + new ThreeColumnRecord(2, null, "d2"), + new ThreeColumnRecord(3, null, "d3"), + new ThreeColumnRecord(4, null, "d4")); + Dataset inputDF = + spark + .createDataFrame(data, ThreeColumnRecord.class) + .selectExpr("c1 AS id", "c2 AS category", "c3 AS data") + .repartition(4); + + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(4L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + // With many possible buckets the rule's local sort is the main thing keeping records clustered + // for ClusteredDataWriter. + @Test + public void testHighCardinalityBucketWithHashDistribution() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(64, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + Dataset inputDF = unclusteredInput(); + inputDF.writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(20L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + // The rule's local sort still has to leave that task with rows clustered by spec. + @Test + public void testHashDistributionWithSingleShufflePartition() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + String original = spark.conf().get("spark.sql.shuffle.partitions"); + spark.conf().set("spark.sql.shuffle.partitions", "1"); + try { + unclusteredInput().writeTo(tableName).append(); + } finally { + spark.conf().set("spark.sql.shuffle.partitions", original); + } + + assertEquals( + "Row count must match", + ImmutableList.of(row(20L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + // AQE coalesces post-shuffle partitions. Confirms the rule's RepartitionByExpression + // survives AQE's reshaping enough that the writer still sees clustered input. + @Test + public void testHashDistributionWithAQEEnabled() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", + tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + + String original = spark.conf().get("spark.sql.adaptive.enabled"); + spark.conf().set("spark.sql.adaptive.enabled", "true"); + try { + unclusteredInput().writeTo(tableName).append(); + } finally { + spark.conf().set("spark.sql.adaptive.enabled", original); + } + + assertEquals( + "Row count must match", + ImmutableList.of(row(20L)), + sql("SELECT count(*) FROM %s", tableName)); + } + + // With write.spark.fanout.enabled=true the FanoutDataWriter is used, which + // doesn't require clustered input. The rule still injects a repartition + sort; this test + // confirms that injection is benign — the write succeeds and rows are accounted for. + @Test + public void testFanoutWriterWithHashDistribution() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash', '%s'='true')", + tableName, + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); + + unclusteredInput().writeTo(tableName).append(); + + assertEquals( + "Row count must match", + ImmutableList.of(row(20L)), + sql("SELECT count(*) FROM %s", tableName)); + } + // Builds a 20-row dataset spread across all 4 buckets and randomly shuffled across 4 Spark // partitions, so each task sees rows for multiple buckets — the worst-case input for a // ClusteredDataWriter without re-clustering. From 78c89619037c7d1d15ef8c9fc32b9d9b37d3b4c0 Mon Sep 17 00:00:00 2001 From: selchen_LinkedIn Date: Wed, 6 May 2026 10:11:35 -0700 Subject: [PATCH 3/6] Alter behaviour to not infer sort based on partition sepc --- .../datasources/v2/ExtendedV2Writes.scala | 29 ++++- .../TestRequiredDistributionAndOrdering.java | 108 +++++++++++------- 2 files changed, 95 insertions(+), 42 deletions(-) diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala index 4501f05d3b..6299185228 100644 --- a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala +++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala @@ -29,6 +29,9 @@ import org.apache.spark.sql.catalyst.plans.logical.Sort import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation +import org.apache.spark.sql.connector.iceberg.distributions.Distribution +import org.apache.spark.sql.connector.iceberg.distributions.OrderedDistribution +import org.apache.spark.sql.connector.iceberg.expressions.SortOrder /** * Backport of the Spark 3.2 V2Writes idea to the v3.1 logical plan API. In Spark 3.1 @@ -40,6 +43,11 @@ import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation * Write at physical planning. This rule only attaches the required distribution and ordering * to the query feeding the write. * + * Ordering policy: only RANGE distribution or an explicit table sort order produces a required + * ordering. For HASH/NONE on a partitioned-but-unsorted table the rule does NOT synthesize a + * sort from the partition spec, matching Spark 3.5's SparkWrite behavior; clustering across + * partitions in that case is handled by the writer (fanout writers, in particular). + * * Row-level commands (MERGE/UPDATE/DELETE) are intentionally skipped: their rewriters already * call buildWritePlan, which prepares the query before constructing AppendData/ReplaceData. The * Sort / RepartitionByExpression guard avoids double-wrapping that already-prepared query (and @@ -74,7 +82,26 @@ object ExtendedV2Writes extends Rule[LogicalPlan] { private def prepareQuery(r: DataSourceV2Relation, query: LogicalPlan): LogicalPlan = { val icebergTable = Spark3Util.toIcebergTable(r.table) val distribution = Spark3Util.buildRequiredDistribution(icebergTable) - val ordering = Spark3Util.buildRequiredOrdering(distribution, icebergTable) + val ordering = requiredOrdering(distribution, icebergTable) DistributionAndOrderingUtils.prepareQuery(distribution, ordering, query, conf) } + + // Match Spark 3.5 behavior: only attach a required local sort when the distribution itself is + // ordered (RANGE) or the table has an explicit user-supplied sort order. For HASH/NONE on a + // partitioned-but-unsorted table, do NOT synthesize a sort from the partition spec — leave that + // responsibility to the writer (e.g. fanout writers handle multi-partition tasks). Spark3Util's + // buildRequiredOrdering would otherwise fall through to SortOrderUtil.buildSortOrder, which + // adds the partition columns as the ordering and forces a local Sort into the plan. + private def requiredOrdering( + distribution: Distribution, + icebergTable: org.apache.iceberg.Table): Array[SortOrder] = { + distribution match { + case od: OrderedDistribution => + od.ordering + case _ if !icebergTable.sortOrder().isUnsorted => + Spark3Util.convert(icebergTable.sortOrder()) + case _ => + Array.empty[SortOrder] + } + } } diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java index 12baefa171..318a73c480 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java @@ -48,13 +48,21 @@ public void dropTestTable() { sql("DROP TABLE IF EXISTS %s", tableName); } + // For an unsorted, partitioned table with a bucket transform, the rule must NOT synthesize a + // local sort from the partition spec (matching Spark 3.5's SparkWrite policy). Fanout is enabled + // here so the FanoutDataWriter — which doesn't require pre-clustered input — handles the bucket + // transitions; without fanout the ClusteredDataWriter would reject this same input, as the + // companion *FailsWithoutRule tests show. @Test - public void testDefaultLocalSortWithBucketTransforms() throws NoSuchTableException { + public void testNoSyntheticPartitionSortWithBucketTransforms() throws NoSuchTableException { sql( "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) " + "USING iceberg " + "PARTITIONED BY (bucket(2, c1))", tableName); + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='true')", + tableName, TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); List data = ImmutableList.of( @@ -68,7 +76,6 @@ public void testDefaultLocalSortWithBucketTransforms() throws NoSuchTableExcepti Dataset ds = spark.createDataFrame(data, ThreeColumnRecord.class); Dataset inputDF = ds.coalesce(1).sortWithinPartitions("c1"); - // should insert a local sort by partition columns by default inputDF.writeTo(tableName).append(); assertEquals( @@ -170,13 +177,19 @@ public void testHashDistributionOnBucketedColumn() throws NoSuchTableException { sql("SELECT count(*) FROM %s", tableName)); } + // The "InsertValuesOn..." tests verify that INSERT VALUES into an unsorted partitioned table + // works for various partition transform types. The rule no longer synthesizes a sort from the + // partition spec, so each table enables fanout so the FanoutDataWriter handles the unclustered + // VALUES list without requiring per-task partition clustering. + @Test - public void testDefaultSortOnDecimalBucketedColumn() { + public void testInsertValuesOnDecimalBucketedColumn() { sql( "CREATE TABLE %s (c1 INT, c2 DECIMAL(20, 2)) " + "USING iceberg " - + "PARTITIONED BY (bucket(2, c2))", - tableName); + + "PARTITIONED BY (bucket(2, c2)) " + + "TBLPROPERTIES ('%s'='true')", + tableName, TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); sql("INSERT INTO %s VALUES (1, 20.2), (2, 40.2), (3, 60.2)", tableName); @@ -190,12 +203,13 @@ public void testDefaultSortOnDecimalBucketedColumn() { } @Test - public void testDefaultSortOnStringBucketedColumn() { + public void testInsertValuesOnStringBucketedColumn() { sql( "CREATE TABLE %s (c1 INT, c2 STRING) " + "USING iceberg " - + "PARTITIONED BY (bucket(2, c2))", - tableName); + + "PARTITIONED BY (bucket(2, c2)) " + + "TBLPROPERTIES ('%s'='true')", + tableName, TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); sql("INSERT INTO %s VALUES (1, 'A'), (2, 'B')", tableName); @@ -205,12 +219,13 @@ public void testDefaultSortOnStringBucketedColumn() { } @Test - public void testDefaultSortOnDecimalTruncatedColumn() { + public void testInsertValuesOnDecimalTruncatedColumn() { sql( "CREATE TABLE %s (c1 INT, c2 DECIMAL(20, 2)) " + "USING iceberg " - + "PARTITIONED BY (truncate(2, c2))", - tableName); + + "PARTITIONED BY (truncate(2, c2)) " + + "TBLPROPERTIES ('%s'='true')", + tableName, TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); sql("INSERT INTO %s VALUES (1, 20.2), (2, 40.2)", tableName); @@ -221,12 +236,13 @@ public void testDefaultSortOnDecimalTruncatedColumn() { } @Test - public void testDefaultSortOnLongTruncatedColumn() { + public void testInsertValuesOnLongTruncatedColumn() { sql( "CREATE TABLE %s (c1 INT, c2 BIGINT) " + "USING iceberg " - + "PARTITIONED BY (truncate(2, c2))", - tableName); + + "PARTITIONED BY (truncate(2, c2)) " + + "TBLPROPERTIES ('%s'='true')", + tableName, TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); sql("INSERT INTO %s VALUES (1, 22222222222222), (2, 444444444444)", tableName); @@ -240,11 +256,13 @@ public void testDefaultSortOnLongTruncatedColumn() { // dotted identifiers. v3.2 fixed this by indexing schema-quoted names; that fix is out of scope // for this backport. - // The tests below feed deliberately unclustered input to the ClusteredDataWriter. Without the - // ExtendedV2Writes rule the writer would reject the data with - // "Incoming records violate the writer assumption that records are clustered by spec and by - // partition" — succeeding here confirms the rule re-clusters using the table's configured - // distribution and ordering. + // The tests below feed deliberately unclustered input. The rule clusters across tasks via a + // RepartitionByExpression for HASH/RANGE distribution, and adds a local sort only when the + // table has an explicit sort order or a RANGE distribution. For HASH/NONE on an unsorted + // partitioned table the rule does not synthesize a sort from the partition spec, so the + // ClusteredDataWriter would reject those scenarios — those cases enable fanout so the + // FanoutDataWriter handles the unclustered input. The companion *FailsWithoutRule tests below + // pin down what the writer requires when the rule is disabled. @Test public void testHashDistributionModeViaTableProperty() throws NoSuchTableException { @@ -340,9 +358,13 @@ public void testNoneDistributionModeViaTableProperty() throws NoSuchTableExcepti + "USING iceberg " + "PARTITIONED BY (bucket(4, id))", tableName); + // `none` distribution: the rule attaches no repartition and no synthesized sort. Fanout is + // enabled so the FanoutDataWriter accepts the unclustered input directly. sql( - "ALTER TABLE %s SET TBLPROPERTIES ('%s'='none')", - tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='none', '%s'='true')", + tableName, + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); Table table = validationCatalog.loadTable(tableIdent); Assert.assertEquals( @@ -352,11 +374,6 @@ public void testNoneDistributionModeViaTableProperty() throws NoSuchTableExcepti Dataset inputDF = unclusteredInput(); - // `none` skips the repartition, but the required ordering is computed independently and - // SortOrderUtil.buildSortOrder synthesizes one from the partition fields for any partitioned - // table (it only returns unsorted when the table is also unpartitioned). The rule therefore - // attaches a local sort by the partition columns, which clusters rows within each Spark task - // and lets the ClusteredDataWriter succeed without a cross-task shuffle. inputDF.writeTo(tableName).append(); assertEquals( @@ -457,7 +474,7 @@ private void assertWriterRejectsUnclusteredInput() { } } - // The rule still wraps the query in RepartitionByExpression + Sort; both must + // The rule wraps the query in a RepartitionByExpression for HASH; that wrapping must // handle a zero-row plan cleanly without producing a failed snapshot. @Test public void testEmptyInputWithHashDistribution() throws NoSuchTableException { @@ -512,8 +529,10 @@ public void testNullPartitionValuesWithHashDistribution() throws NoSuchTableExce sql("SELECT count(*) FROM %s", tableName)); } - // With many possible buckets the rule's local sort is the main thing keeping records clustered - // for ClusteredDataWriter. + // High-cardinality bucket transform with HASH distribution: hash collisions in the post-shuffle + // partitioning can co-locate multiple buckets in a single task. Without a synthesized local + // sort, those rows would be interleaved by bucket and ClusteredDataWriter would reject — fanout + // is enabled so FanoutDataWriter accepts the unclustered tail. @Test public void testHighCardinalityBucketWithHashDistribution() throws NoSuchTableException { sql( @@ -522,8 +541,10 @@ public void testHighCardinalityBucketWithHashDistribution() throws NoSuchTableEx + "PARTITIONED BY (bucket(64, id))", tableName); sql( - "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", - tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash', '%s'='true')", + tableName, + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); Dataset inputDF = unclusteredInput(); inputDF.writeTo(tableName).append(); @@ -534,7 +555,8 @@ public void testHighCardinalityBucketWithHashDistribution() throws NoSuchTableEx sql("SELECT count(*) FROM %s", tableName)); } - // The rule's local sort still has to leave that task with rows clustered by spec. + // Forcing a single shuffle partition collapses all 4 bucket values into one task. The rule no + // longer adds a local sort, so the rows are unclustered within the task — fanout is required. @Test public void testHashDistributionWithSingleShufflePartition() throws NoSuchTableException { sql( @@ -543,8 +565,10 @@ public void testHashDistributionWithSingleShufflePartition() throws NoSuchTableE + "PARTITIONED BY (bucket(4, id))", tableName); sql( - "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", - tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash', '%s'='true')", + tableName, + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); String original = spark.conf().get("spark.sql.shuffle.partitions"); spark.conf().set("spark.sql.shuffle.partitions", "1"); @@ -560,8 +584,8 @@ public void testHashDistributionWithSingleShufflePartition() throws NoSuchTableE sql("SELECT count(*) FROM %s", tableName)); } - // AQE coalesces post-shuffle partitions. Confirms the rule's RepartitionByExpression - // survives AQE's reshaping enough that the writer still sees clustered input. + // AQE coalesces post-shuffle partitions, which can merge multiple buckets into one task. With + // no synthesized local sort, fanout is needed to absorb the unclustered tail. @Test public void testHashDistributionWithAQEEnabled() throws NoSuchTableException { sql( @@ -570,8 +594,10 @@ public void testHashDistributionWithAQEEnabled() throws NoSuchTableException { + "PARTITIONED BY (bucket(4, id))", tableName); sql( - "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", - tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash', '%s'='true')", + tableName, + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); String original = spark.conf().get("spark.sql.adaptive.enabled"); spark.conf().set("spark.sql.adaptive.enabled", "true"); @@ -587,9 +613,9 @@ public void testHashDistributionWithAQEEnabled() throws NoSuchTableException { sql("SELECT count(*) FROM %s", tableName)); } - // With write.spark.fanout.enabled=true the FanoutDataWriter is used, which - // doesn't require clustered input. The rule still injects a repartition + sort; this test - // confirms that injection is benign — the write succeeds and rows are accounted for. + // With write.spark.fanout.enabled=true the FanoutDataWriter is used, which doesn't require + // clustered input. The rule still injects a repartition (no sort, since the table is unsorted); + // this test confirms the write succeeds and all rows are accounted for. @Test public void testFanoutWriterWithHashDistribution() throws NoSuchTableException { sql( From 338eabd5fa167f1acc4d1f44cf9eecd2f6e460a7 Mon Sep 17 00:00:00 2001 From: selchen_LinkedIn Date: Wed, 6 May 2026 18:06:49 -0700 Subject: [PATCH 4/6] Remove distribution --- .../datasources/v2/ExtendedV2Writes.scala | 55 ++++++++-------- .../TestRequiredDistributionAndOrdering.java | 63 +++++++------------ 2 files changed, 49 insertions(+), 69 deletions(-) diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala index 6299185228..8a72f17f1b 100644 --- a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala +++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.iceberg.spark.Spark3Util +import org.apache.iceberg.util.SortOrderUtil import org.apache.spark.sql.catalyst.plans.logical.AppendData import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.OverwriteByExpression @@ -30,28 +31,25 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation import org.apache.spark.sql.connector.iceberg.distributions.Distribution +import org.apache.spark.sql.connector.iceberg.distributions.Distributions import org.apache.spark.sql.connector.iceberg.distributions.OrderedDistribution import org.apache.spark.sql.connector.iceberg.expressions.SortOrder /** - * Backport of the Spark 3.2 V2Writes idea to the v3.1 logical plan API. In Spark 3.1 - * AppendData/OverwriteByExpression/OverwritePartitionsDynamic do not carry a Write, so the - * required distribution and ordering are looked up from the Iceberg table directly via - * Spark3Util — the same helpers RewriteRowLevelOperationHelper.buildWritePlan uses. + * Backport of Spark 3.2's V2Writes idea for v3.1's AppendData/OverwriteByExpression/ + * OverwritePartitionsDynamic. Attaches a local Sort to the query feeding the write when the + * table has an explicit sort order or RANGE distribution; never attaches an Exchange. * - * v3.1-only path: no Write is built here; ExtendedDataSourceV2Strategy still constructs the - * Write at physical planning. This rule only attaches the required distribution and ordering - * to the query feeding the write. + * No distribution: Spark 3.1 only has strict RepartitionByExpression. Spark 3.4+'s + * RebalancePartitions (the non-strict node Spark 3.5's V2Writes emits for Iceberg) doesn't + * exist here, and forcing a strict repartition would turn skewed partition keys into stragglers. * - * Ordering policy: only RANGE distribution or an explicit table sort order produces a required - * ordering. For HASH/NONE on a partitioned-but-unsorted table the rule does NOT synthesize a - * sort from the partition spec, matching Spark 3.5's SparkWrite behavior; clustering across - * partitions in that case is handled by the writer (fanout writers, in particular). + * No synthesized partition-spec sort: matches Spark 3.5. When a sort is attached, partition + * cols are prepended via SortOrderUtil so ClusteredDataWriter sees per-task clustering; + * unsorted partitioned tables need fanout (or pre-clustering by the user). * - * Row-level commands (MERGE/UPDATE/DELETE) are intentionally skipped: their rewriters already - * call buildWritePlan, which prepares the query before constructing AppendData/ReplaceData. The - * Sort / RepartitionByExpression guard avoids double-wrapping that already-prepared query (and - * also respects an explicit ORDER BY / DISTRIBUTE BY in a user-issued INSERT). + * MERGE/UPDATE/DELETE are skipped — RewriteRowLevelOperationHelper.buildWritePlan already + * prepares those queries; alreadyPrepared() detects its output shape to avoid double-wrapping. */ object ExtendedV2Writes extends Rule[LogicalPlan] { @@ -69,10 +67,9 @@ object ExtendedV2Writes extends Rule[LogicalPlan] { o.withNewQuery(prepareQuery(r, query)) } - // DistributionAndOrderingUtils.prepareQuery wraps its input in either a RepartitionByExpression - // with no explicit numPartitions, or a non-global Sort over such a RepartitionByExpression. Only - // those exact shapes are treated as already-prepared so that user code (coalesce, repartition(N), - // global ORDER BY, sortWithinPartitions on a non-shuffled source) is left untouched. + // Matches the shapes RewriteRowLevelOperationHelper.buildWritePlan produces. Bare + // Sort(_, false, _) is intentionally NOT matched — it would swallow a user's + // sortWithinPartitions on the wrong columns and skip the table-required ordering. private def alreadyPrepared(query: LogicalPlan): Boolean = query match { case Sort(_, false, RepartitionByExpression(_, _, None)) => true case RepartitionByExpression(_, _, None) => true @@ -81,17 +78,17 @@ object ExtendedV2Writes extends Rule[LogicalPlan] { private def prepareQuery(r: DataSourceV2Relation, query: LogicalPlan): LogicalPlan = { val icebergTable = Spark3Util.toIcebergTable(r.table) - val distribution = Spark3Util.buildRequiredDistribution(icebergTable) - val ordering = requiredOrdering(distribution, icebergTable) - DistributionAndOrderingUtils.prepareQuery(distribution, ordering, query, conf) + // Distribution is computed only so requiredOrdering can read OrderedDistribution.ordering; + // we then pass unspecified() so no Exchange is attached. See class doc. + val tableDistribution = Spark3Util.buildRequiredDistribution(icebergTable) + val ordering = requiredOrdering(tableDistribution, icebergTable) + DistributionAndOrderingUtils.prepareQuery( + Distributions.unspecified(), ordering, query, conf) } - // Match Spark 3.5 behavior: only attach a required local sort when the distribution itself is - // ordered (RANGE) or the table has an explicit user-supplied sort order. For HASH/NONE on a - // partitioned-but-unsorted table, do NOT synthesize a sort from the partition spec — leave that - // responsibility to the writer (e.g. fanout writers handle multi-partition tasks). Spark3Util's - // buildRequiredOrdering would otherwise fall through to SortOrderUtil.buildSortOrder, which - // adds the partition columns as the ordering and forces a local Sort into the plan. + // Sort attached only for RANGE (OrderedDistribution) or an explicit table sort order. Use + // SortOrderUtil.buildSortOrder so partition cols are prepended — sorting by user fields + // alone won't cluster a partition transform (e.g. `id` doesn't cluster `bucket(N, id)`). private def requiredOrdering( distribution: Distribution, icebergTable: org.apache.iceberg.Table): Array[SortOrder] = { @@ -99,7 +96,7 @@ object ExtendedV2Writes extends Rule[LogicalPlan] { case od: OrderedDistribution => od.ordering case _ if !icebergTable.sortOrder().isUnsorted => - Spark3Util.convert(icebergTable.sortOrder()) + Spark3Util.convert(SortOrderUtil.buildSortOrder(icebergTable)) case _ => Array.empty[SortOrder] } diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java index 318a73c480..cdf6ed5722 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java @@ -48,11 +48,8 @@ public void dropTestTable() { sql("DROP TABLE IF EXISTS %s", tableName); } - // For an unsorted, partitioned table with a bucket transform, the rule must NOT synthesize a - // local sort from the partition spec (matching Spark 3.5's SparkWrite policy). Fanout is enabled - // here so the FanoutDataWriter — which doesn't require pre-clustered input — handles the bucket - // transitions; without fanout the ClusteredDataWriter would reject this same input, as the - // companion *FailsWithoutRule tests show. + // Unsorted partitioned table: rule must not synthesize a partition-spec sort. Fanout is + // enabled so FanoutDataWriter accepts the unclustered bucket transitions. @Test public void testNoSyntheticPartitionSortWithBucketTransforms() throws NoSuchTableException { sql( @@ -177,10 +174,8 @@ public void testHashDistributionOnBucketedColumn() throws NoSuchTableException { sql("SELECT count(*) FROM %s", tableName)); } - // The "InsertValuesOn..." tests verify that INSERT VALUES into an unsorted partitioned table - // works for various partition transform types. The rule no longer synthesizes a sort from the - // partition spec, so each table enables fanout so the FanoutDataWriter handles the unclustered - // VALUES list without requiring per-task partition clustering. + // INSERT VALUES into an unsorted partitioned table across various transform types. Fanout is + // enabled because the rule no longer clusters unsorted tables. @Test public void testInsertValuesOnDecimalBucketedColumn() { @@ -256,14 +251,13 @@ public void testInsertValuesOnLongTruncatedColumn() { // dotted identifiers. v3.2 fixed this by indexing schema-quoted names; that fix is out of scope // for this backport. - // The tests below feed deliberately unclustered input. The rule clusters across tasks via a - // RepartitionByExpression for HASH/RANGE distribution, and adds a local sort only when the - // table has an explicit sort order or a RANGE distribution. For HASH/NONE on an unsorted - // partitioned table the rule does not synthesize a sort from the partition spec, so the - // ClusteredDataWriter would reject those scenarios — those cases enable fanout so the - // FanoutDataWriter handles the unclustered input. The companion *FailsWithoutRule tests below - // pin down what the writer requires when the rule is disabled. + // Unclustered input. Rule attaches a local Sort (with partition prefix) for sorted/RANGE + // tables; HASH/NONE on unsorted tables is a no-op, so those enable fanout. Distribution is + // never injected — see ExtendedV2Writes class doc. *FailsWithoutRule tests below pin down + // baseline writer behavior when the rule is disabled. + // HASH on an unsorted table: rule is a no-op. Fanout enabled. For rule-injected clustering + // here, set a sort order — see testHashDistributionWithExplicitSortOrder. @Test public void testHashDistributionModeViaTableProperty() throws NoSuchTableException { sql( @@ -272,8 +266,10 @@ public void testHashDistributionModeViaTableProperty() throws NoSuchTableExcepti + "PARTITIONED BY (bucket(4, id))", tableName); sql( - "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash')", - tableName, TableProperties.WRITE_DISTRIBUTION_MODE); + "ALTER TABLE %s SET TBLPROPERTIES ('%s'='hash', '%s'='true')", + tableName, + TableProperties.WRITE_DISTRIBUTION_MODE, + TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED); Table table = validationCatalog.loadTable(tableIdent); Assert.assertEquals( @@ -409,10 +405,7 @@ public void testRangeDistributionModeViaTableProperty() throws NoSuchTableExcept sql("SELECT count(*) FROM %s", tableName)); } - // The tests below disable ExtendedV2Writes so the same unclustered input reaches the - // ClusteredDataWriter without any rule-injected repartition or local sort. Each one asserts - // the writer rejects it — establishing the pre-rule baseline that the positive tests above - // are claiming to fix. + // Rule disabled — asserts ClusteredDataWriter rejects unclustered input. Pre-rule baseline. @Test public void testNoneDistributionFailsWithoutRule() { @@ -474,8 +467,7 @@ private void assertWriterRejectsUnclusteredInput() { } } - // The rule wraps the query in a RepartitionByExpression for HASH; that wrapping must - // handle a zero-row plan cleanly without producing a failed snapshot. + // Empty input on the rule's no-op path (HASH unsorted): must still produce a clean snapshot. @Test public void testEmptyInputWithHashDistribution() throws NoSuchTableException { sql( @@ -496,8 +488,7 @@ public void testEmptyInputWithHashDistribution() throws NoSuchTableException { sql("SELECT count(*) FROM %s", tableName)); } - // Tests that catalyst-level handling of nulls in the local sort and (for hash mode) the cluster - // expression doesn't break. + // Null partition values, one row per task — trivially clustered. Null-handling guard. @Test public void testNullPartitionValuesWithHashDistribution() throws NoSuchTableException { sql( @@ -529,10 +520,7 @@ public void testNullPartitionValuesWithHashDistribution() throws NoSuchTableExce sql("SELECT count(*) FROM %s", tableName)); } - // High-cardinality bucket transform with HASH distribution: hash collisions in the post-shuffle - // partitioning can co-locate multiple buckets in a single task. Without a synthesized local - // sort, those rows would be interleaved by bucket and ClusteredDataWriter would reject — fanout - // is enabled so FanoutDataWriter accepts the unclustered tail. + // High-cardinality bucket transform: rule is a no-op (unsorted), fanout handles unclustering. @Test public void testHighCardinalityBucketWithHashDistribution() throws NoSuchTableException { sql( @@ -555,8 +543,7 @@ public void testHighCardinalityBucketWithHashDistribution() throws NoSuchTableEx sql("SELECT count(*) FROM %s", tableName)); } - // Forcing a single shuffle partition collapses all 4 bucket values into one task. The rule no - // longer adds a local sort, so the rows are unclustered within the task — fanout is required. + // Single shuffle partition → all bucket values in one task, unclustered. Fanout required. @Test public void testHashDistributionWithSingleShufflePartition() throws NoSuchTableException { sql( @@ -584,8 +571,7 @@ public void testHashDistributionWithSingleShufflePartition() throws NoSuchTableE sql("SELECT count(*) FROM %s", tableName)); } - // AQE coalesces post-shuffle partitions, which can merge multiple buckets into one task. With - // no synthesized local sort, fanout is needed to absorb the unclustered tail. + // AQE coalesces post-shuffle partitions → multiple buckets in one task. Fanout required. @Test public void testHashDistributionWithAQEEnabled() throws NoSuchTableException { sql( @@ -613,9 +599,7 @@ public void testHashDistributionWithAQEEnabled() throws NoSuchTableException { sql("SELECT count(*) FROM %s", tableName)); } - // With write.spark.fanout.enabled=true the FanoutDataWriter is used, which doesn't require - // clustered input. The rule still injects a repartition (no sort, since the table is unsorted); - // this test confirms the write succeeds and all rows are accounted for. + // Fanout writer accepts unclustered input directly. Rule is a no-op (unsorted, no distribution). @Test public void testFanoutWriterWithHashDistribution() throws NoSuchTableException { sql( @@ -637,9 +621,8 @@ public void testFanoutWriterWithHashDistribution() throws NoSuchTableException { sql("SELECT count(*) FROM %s", tableName)); } - // Builds a 20-row dataset spread across all 4 buckets and randomly shuffled across 4 Spark - // partitions, so each task sees rows for multiple buckets — the worst-case input for a - // ClusteredDataWriter without re-clustering. + // 20 rows across 4 buckets, randomly spread across 4 Spark partitions. Worst-case + // unclustered input for ClusteredDataWriter. private Dataset unclusteredInput() { List data = ImmutableList.of( From 2214823f581dfb187006c7ad7cb4b313a0f2542d Mon Sep 17 00:00:00 2001 From: selchen_LinkedIn Date: Fri, 8 May 2026 10:27:03 -0700 Subject: [PATCH 5/6] Update --- .../datasources/v2/ExtendedV2Writes.scala | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala index 8a72f17f1b..bdd45f843d 100644 --- a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala +++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.iceberg.spark.Spark3Util -import org.apache.iceberg.util.SortOrderUtil import org.apache.spark.sql.catalyst.plans.logical.AppendData import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.OverwriteByExpression @@ -36,20 +35,18 @@ import org.apache.spark.sql.connector.iceberg.distributions.OrderedDistribution import org.apache.spark.sql.connector.iceberg.expressions.SortOrder /** - * Backport of Spark 3.2's V2Writes idea for v3.1's AppendData/OverwriteByExpression/ - * OverwritePartitionsDynamic. Attaches a local Sort to the query feeding the write when the - * table has an explicit sort order or RANGE distribution; never attaches an Exchange. + * Backport of Spark 3.2's V2Writes for v3.1 AppendData/OverwriteByExpression/ + * OverwritePartitionsDynamic. Attaches a local Sort only when the table has an explicit sort + * order or RANGE distribution; never attaches an Exchange. Unsorted partitioned writes thus + * require write.spark.fanout.enabled=true or pre-clustered input. * * No distribution: Spark 3.1 only has strict RepartitionByExpression. Spark 3.4+'s - * RebalancePartitions (the non-strict node Spark 3.5's V2Writes emits for Iceberg) doesn't - * exist here, and forcing a strict repartition would turn skewed partition keys into stragglers. - * - * No synthesized partition-spec sort: matches Spark 3.5. When a sort is attached, partition - * cols are prepended via SortOrderUtil so ClusteredDataWriter sees per-task clustering; - * unsorted partitioned tables need fanout (or pre-clustering by the user). + * RebalancePartitions (used by Spark 3.5's V2Writes) doesn't exist here, and a strict + * repartition would turn skewed partition keys into stragglers. * * MERGE/UPDATE/DELETE are skipped — RewriteRowLevelOperationHelper.buildWritePlan already - * prepares those queries; alreadyPrepared() detects its output shape to avoid double-wrapping. + * prepares those queries (using the unwrapped Spark3Util.buildRequiredOrdering, which still + * synthesizes the partition prefix); alreadyPrepared() detects its shape to avoid double-wrap. */ object ExtendedV2Writes extends Rule[LogicalPlan] { @@ -78,27 +75,23 @@ object ExtendedV2Writes extends Rule[LogicalPlan] { private def prepareQuery(r: DataSourceV2Relation, query: LogicalPlan): LogicalPlan = { val icebergTable = Spark3Util.toIcebergTable(r.table) - // Distribution is computed only so requiredOrdering can read OrderedDistribution.ordering; - // we then pass unspecified() so no Exchange is attached. See class doc. + // Distribution is computed only to surface OrderedDistribution.ordering; we always pass + // unspecified() to prepareQuery so no Exchange is attached. val tableDistribution = Spark3Util.buildRequiredDistribution(icebergTable) val ordering = requiredOrdering(tableDistribution, icebergTable) DistributionAndOrderingUtils.prepareQuery( Distributions.unspecified(), ordering, query, conf) } - // Sort attached only for RANGE (OrderedDistribution) or an explicit table sort order. Use - // SortOrderUtil.buildSortOrder so partition cols are prepended — sorting by user fields - // alone won't cluster a partition transform (e.g. `id` doesn't cluster `bucket(N, id)`). + // Delegate to Spark3Util.buildRequiredOrdering only for OrderedDistribution or sorted tables. + // Unsorted tables get an empty ordering — fanout or pre-clustered input is required. private def requiredOrdering( distribution: Distribution, icebergTable: org.apache.iceberg.Table): Array[SortOrder] = { - distribution match { - case od: OrderedDistribution => - od.ordering - case _ if !icebergTable.sortOrder().isUnsorted => - Spark3Util.convert(SortOrderUtil.buildSortOrder(icebergTable)) - case _ => - Array.empty[SortOrder] + if (distribution.isInstanceOf[OrderedDistribution] || !icebergTable.sortOrder().isUnsorted) { + Spark3Util.buildRequiredOrdering(distribution, icebergTable) + } else { + Array.empty[SortOrder] } } } From 75be188bb79a053ac6e9aaa518aa5fa628a327ae Mon Sep 17 00:00:00 2001 From: selchen_LinkedIn Date: Fri, 8 May 2026 10:34:14 -0700 Subject: [PATCH 6/6] Update tests --- .../TestRequiredDistributionAndOrdering.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java index cdf6ed5722..4671d0dab3 100644 --- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java +++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java @@ -621,6 +621,26 @@ public void testFanoutWriterWithHashDistribution() throws NoSuchTableException { sql("SELECT count(*) FROM %s", tableName)); } + // saveAsTable("append") on an existing V2 table produces AppendData, the same plan node the + // rule matches for writeTo(...).append(). RANGE distribution keeps the rule active (sort + // attached) so this exercises the full path, not the no-op branch. + @Test + public void testSaveAsTableAppendWithRangeDistribution() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id INT, category STRING, data STRING) " + + "USING iceberg " + + "PARTITIONED BY (bucket(4, id))", + tableName); + sql("ALTER TABLE %s WRITE ORDERED BY category, id", tableName); + + unclusteredInput().write().mode("append").saveAsTable(tableName); + + assertEquals( + "Row count must match", + ImmutableList.of(row(20L)), + sql("SELECT count(*) FROM %s", tableName)); + } + // 20 rows across 4 buckets, randomly spread across 4 Spark partitions. Worst-case // unclustered input for ClusteredDataWriter. private Dataset unclusteredInput() {