diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index 0459538d1cda..f46ede4e7240 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -188,6 +188,23 @@ default RewriteDataFiles zOrder(String... columns) {
"Z-ORDER Rewrite Strategy not implemented for this framework");
}
+ /**
+ * Choose K-WAY MERGE as a strategy for this rewrite operation. This strategy merges pre-sorted
+ * files in sort-key order without shuffling. All input files must already be sorted according to
+ * the table's sort order. The table must have a defined sort order.
+ *
+ *
K-way merge preserves the table's sort order in output files. It is optimal for tables that
+ * are already sorted but have accumulated multiple overlapping files per partition (e.g., after
+ * daily ingestion into a previously compacted table).
+ *
+ * @return this for method chaining
+ * @since 1.11.0
+ */
+ default RewriteDataFiles kWayMerge() {
+ throw new UnsupportedOperationException(
+ "K-WAY MERGE Rewrite Strategy not implemented for this framework");
+ }
+
/**
* A user provided filter for determining which files will be considered by the rewrite strategy.
* This will be used in addition to whatever rules the rewrite strategy generates. For example
diff --git a/core/src/main/java/org/apache/iceberg/actions/KWayMergeRewriteFilePlanner.java b/core/src/main/java/org/apache/iceberg/actions/KWayMergeRewriteFilePlanner.java
new file mode 100644
index 000000000000..7e5a9f853ba7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/KWayMergeRewriteFilePlanner.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A planner for K-way merge rewriting that selects files based on size thresholds and orders them
+ * by sort-key lower bounds before bin-packing into groups.
+ *
+ *
File selection uses the same size-based criteria as other strategies: files outside the
+ * desired size range (too small or too large) are candidates for rewriting. This ensures K-way
+ * merge compacts fragmented files within a partition without shuffling, preserving the existing
+ * sort order.
+ *
+ *
Files are sorted by their lower bounds on the first sort field before bin-packing so that
+ * adjacent files in key space are packed into the same group, minimizing cross-group overlap.
+ */
+public class KWayMergeRewriteFilePlanner
+ extends SizeBasedFileRewritePlanner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KWayMergeRewriteFilePlanner.class);
+
+ private final Expression filter;
+ private final Long snapshotId;
+ private final boolean caseSensitive;
+
+ private RewriteJobOrder rewriteJobOrder;
+
+ public KWayMergeRewriteFilePlanner(Table table) {
+ this(table, Expressions.alwaysTrue(), null, false);
+ }
+
+ public KWayMergeRewriteFilePlanner(Table table, Expression filter) {
+ this(
+ table,
+ filter,
+ table.currentSnapshot() != null ? table.currentSnapshot().snapshotId() : null,
+ false);
+ }
+
+ public KWayMergeRewriteFilePlanner(
+ Table table, Expression filter, Long snapshotId, boolean caseSensitive) {
+ super(table);
+ this.filter = filter;
+ this.snapshotId = snapshotId;
+ this.caseSensitive = caseSensitive;
+ }
+
+ @Override
+ public Set validOptions() {
+ return ImmutableSet.builder()
+ .addAll(super.validOptions())
+ .add(RewriteDataFiles.REWRITE_JOB_ORDER)
+ .build();
+ }
+
+ @Override
+ public void init(Map options) {
+ super.init(options);
+ this.rewriteJobOrder =
+ RewriteJobOrder.fromName(
+ PropertyUtil.propertyAsString(
+ options,
+ RewriteDataFiles.REWRITE_JOB_ORDER,
+ RewriteDataFiles.REWRITE_JOB_ORDER_DEFAULT));
+ }
+
+ @Override
+ protected long defaultTargetFileSize() {
+ return PropertyUtil.propertyAsLong(
+ table().properties(),
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ }
+
+ @Override
+ protected Iterable filterFiles(Iterable tasks) {
+ return Iterables.filter(tasks, this::outsideDesiredFileSizeRange);
+ }
+
+ @Override
+ protected Iterable> filterFileGroups(List> groups) {
+ return Iterables.filter(
+ groups, group -> enoughInputFiles(group) || enoughContent(group) || tooMuchContent(group));
+ }
+
+ @Override
+ protected Iterable> planFileGroups(Iterable tasks) {
+ SortOrder sortOrder = table().sortOrder();
+ Preconditions.checkArgument(
+ !sortOrder.isUnsorted(),
+ "K-way merge requires a table sort order, but table %s is unsorted. "
+ + "Use the SORT strategy to sort the files first.",
+ table().name());
+
+ int sortFieldId = sortOrder.fields().get(0).sourceId();
+ Types.NestedField sortField = table().schema().findField(sortFieldId);
+ Type.PrimitiveType sortFieldType = sortField.type().asPrimitiveType();
+ Comparator valueComparator = Comparators.forType(sortFieldType);
+
+ List taskList = Lists.newArrayList(tasks);
+
+ taskList.sort(
+ Comparator.comparing(
+ (FileScanTask task) -> {
+ Map lowerBounds = task.file().lowerBounds();
+ if (lowerBounds == null || !lowerBounds.containsKey(sortFieldId)) {
+ return null;
+ }
+ return Conversions.fromByteBuffer(sortFieldType, lowerBounds.get(sortFieldId));
+ },
+ Comparator.nullsLast(valueComparator)));
+
+ return super.planFileGroups(taskList);
+ }
+
+ @Override
+ public FileRewritePlan plan() {
+ StructLikeMap>> groupsByPartition = planFileGroups();
+ RewriteExecutionContext ctx = new RewriteExecutionContext();
+ List selectedFileGroups = Lists.newArrayList();
+
+ groupsByPartition.entrySet().stream()
+ .filter(e -> !e.getValue().isEmpty())
+ .forEach(
+ entry -> {
+ StructLike partition = entry.getKey();
+ entry
+ .getValue()
+ .forEach(
+ fileScanTasks -> {
+ long inputSize = inputSize(fileScanTasks);
+ selectedFileGroups.add(
+ newRewriteGroup(
+ ctx,
+ partition,
+ fileScanTasks,
+ inputSplitSize(inputSize),
+ expectedOutputFiles(inputSize)));
+ });
+ });
+
+ Map groupsInPartition = groupsByPartition.transformValues(List::size);
+ int totalGroupCount = groupsInPartition.values().stream().reduce(Integer::sum).orElse(0);
+ return new FileRewritePlan<>(
+ CloseableIterable.of(
+ selectedFileGroups.stream()
+ .sorted(RewriteFileGroup.comparator(rewriteJobOrder))
+ .collect(Collectors.toList())),
+ totalGroupCount,
+ groupsInPartition);
+ }
+
+ private StructLikeMap>> planFileGroups() {
+ TableScan scan =
+ table()
+ .newScan()
+ .filter(filter)
+ .caseSensitive(caseSensitive)
+ .ignoreResiduals()
+ .includeColumnStats();
+
+ if (snapshotId != null) {
+ scan = scan.useSnapshot(snapshotId);
+ }
+
+ CloseableIterable fileScanTasks = scan.planFiles();
+
+ try {
+ Types.StructType partitionType = table().spec().partitionType();
+ StructLikeMap> filesByPartition =
+ groupByPartition(partitionType, fileScanTasks);
+ return filesByPartition.transformValues(tasks -> ImmutableList.copyOf(planFileGroups(tasks)));
+ } finally {
+ try {
+ fileScanTasks.close();
+ } catch (IOException io) {
+ LOG.error("Cannot properly close file iterable while planning for rewrite", io);
+ }
+ }
+ }
+
+ private StructLikeMap> groupByPartition(
+ Types.StructType partitionType, Iterable tasks) {
+ StructLikeMap> filesByPartition = StructLikeMap.create(partitionType);
+ StructLike emptyStruct = GenericRecord.create(partitionType);
+
+ for (FileScanTask task : tasks) {
+ StructLike taskPartition =
+ task.file().specId() == table().spec().specId() ? task.file().partition() : emptyStruct;
+ filesByPartition.computeIfAbsent(taskPartition, unused -> Lists.newArrayList()).add(task);
+ }
+
+ return filesByPartition;
+ }
+
+ private RewriteFileGroup newRewriteGroup(
+ RewriteExecutionContext ctx,
+ StructLike partition,
+ List tasks,
+ long inputSplitSize,
+ int expectedOutputFiles) {
+ FileGroupInfo info =
+ ImmutableRewriteDataFiles.FileGroupInfo.builder()
+ .globalIndex(ctx.currentGlobalIndex())
+ .partitionIndex(ctx.currentPartitionIndex(partition))
+ .partition(partition)
+ .build();
+ return new RewriteFileGroup(
+ info,
+ Lists.newArrayList(tasks),
+ outputSpecId(),
+ writeMaxFileSize(),
+ inputSplitSize,
+ expectedOutputFiles);
+ }
+}
diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index b37422beacf4..6dd6e9a7fd75 100644
--- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -684,7 +684,7 @@ public void testRewriteDataFilesWithInvalidInputs() {
+ "strategy => 'temp')",
catalogName, tableIdent))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("unsupported strategy: temp. Only binpack or sort is supported");
+ .hasMessage("unsupported strategy: temp. Only binpack, sort, or k-way-merge is supported");
// Test for sort_order with binpack strategy
assertThatThrownBy(
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index 3415b6a551ae..dee6e3dad592 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -37,9 +37,11 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.FileRewritePlan;
+import org.apache.iceberg.actions.FileRewritePlanner;
import org.apache.iceberg.actions.FileRewriteRunner;
import org.apache.iceberg.actions.ImmutableRewriteDataFiles;
import org.apache.iceberg.actions.ImmutableRewriteDataFiles.Result.Builder;
+import org.apache.iceberg.actions.KWayMergeRewriteFilePlanner;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteFileGroup;
@@ -98,7 +100,8 @@ public class RewriteDataFilesSparkAction
private boolean useStartingSequenceNumber;
private boolean caseSensitive;
private String branch = SnapshotRef.MAIN_BRANCH;
- private BinPackRewriteFilePlanner planner = null;
+ private FileRewritePlanner planner =
+ null;
private FileRewriteRunner runner = null;
RewriteDataFilesSparkAction(SparkSession spark, Table table) {
@@ -146,6 +149,13 @@ public RewriteDataFilesSparkAction zOrder(String... columnNames) {
return this;
}
+ @Override
+ public RewriteDataFilesSparkAction kWayMerge() {
+ ensureRunnerNotSet();
+ this.runner = new SparkKWayMergeFileRewriteRunner(spark(), table);
+ return this;
+ }
+
private void ensureRunnerNotSet() {
Preconditions.checkArgument(
runner == null,
@@ -205,10 +215,16 @@ public RewriteDataFiles.Result execute() {
}
private void init(long startingSnapshotId) {
- this.planner =
- runner instanceof SparkShufflingFileRewriteRunner
- ? new SparkShufflingDataRewritePlanner(table, filter, startingSnapshotId, caseSensitive)
- : new BinPackRewriteFilePlanner(table, filter, startingSnapshotId, caseSensitive);
+ if (runner instanceof SparkKWayMergeFileRewriteRunner) {
+ this.planner =
+ new KWayMergeRewriteFilePlanner(table, filter, startingSnapshotId, caseSensitive);
+ } else if (runner instanceof SparkShufflingFileRewriteRunner) {
+ this.planner =
+ new SparkShufflingDataRewritePlanner(table, filter, startingSnapshotId, caseSensitive);
+ } else {
+ this.planner =
+ new BinPackRewriteFilePlanner(table, filter, startingSnapshotId, caseSensitive);
+ }
// Default to BinPack if no strategy selected
if (this.runner == null) {
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkKWayMergeFileRewriteRunner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkKWayMergeFileRewriteRunner.java
new file mode 100644
index 000000000000..a686fa370410
--- /dev/null
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkKWayMergeFileRewriteRunner.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteFileGroup;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericDeleteFilter;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.spark.TaskContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A file rewrite runner that performs K-way merge of pre-sorted data files.
+ *
+ * This runner implements a K-way merge algorithm: for each file in the group, it opens a
+ * streaming iterator and uses a priority queue (via {@link SortedMerge}) to merge all iterators in
+ * sort-key order. Output is written to new files with size-based rotation.
+ *
+ *
All input files must be pre-sorted according to the table's sort order. The runner validates
+ * this precondition before execution.
+ *
+ *
Unlike BinPack and Sort runners which use Spark SQL read/write paths, this runner operates
+ * directly on Iceberg's generic reader/writer stack. This eliminates shuffle entirely, making it
+ * optimal for tables with pre-sorted overlapping files.
+ *
+ *
Note: The parent class {@link SparkDataFileRewriteRunner#rewrite} stages tasks in {@link
+ * org.apache.iceberg.spark.ScanTaskSetManager} for use by the Iceberg Spark datasource. K-way merge
+ * bypasses that read path and instead broadcasts range assignments directly to executors.
+ */
+class SparkKWayMergeFileRewriteRunner extends SparkDataFileRewriteRunner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkKWayMergeFileRewriteRunner.class);
+
+ static final String RANGE_PARALLELISM_ENABLED = "range-parallelism-enabled";
+ static final boolean RANGE_PARALLELISM_ENABLED_DEFAULT = true;
+
+ static final String RANGES_PER_GROUP = "ranges-per-group";
+ static final int RANGES_PER_GROUP_DEFAULT = 25;
+
+ static final String MIN_FILES_FOR_RANGE_PARALLELISM = "min-files-for-range-parallelism";
+ static final int MIN_FILES_FOR_RANGE_PARALLELISM_DEFAULT = 10;
+
+ private boolean rangeParallelismEnabled;
+ private int rangesPerGroup;
+ private int minFilesForRangeParallelism;
+
+ private volatile Broadcast
cachedTableBroadcast = null;
+
+ SparkKWayMergeFileRewriteRunner(SparkSession spark, Table table) {
+ super(spark, table);
+ }
+
+ @Override
+ public String description() {
+ return "K-WAY-MERGE";
+ }
+
+ @Override
+ public Set validOptions() {
+ return ImmutableSet.builder()
+ .addAll(super.validOptions())
+ .add(RANGE_PARALLELISM_ENABLED)
+ .add(RANGES_PER_GROUP)
+ .add(MIN_FILES_FOR_RANGE_PARALLELISM)
+ .build();
+ }
+
+ @Override
+ public void init(Map options) {
+ super.init(options);
+ this.rangeParallelismEnabled =
+ PropertyUtil.propertyAsBoolean(
+ options, RANGE_PARALLELISM_ENABLED, RANGE_PARALLELISM_ENABLED_DEFAULT);
+ this.rangesPerGroup =
+ PropertyUtil.propertyAsInt(options, RANGES_PER_GROUP, RANGES_PER_GROUP_DEFAULT);
+ Preconditions.checkArgument(
+ rangesPerGroup > 0, "'%s' must be > 0, got %s", RANGES_PER_GROUP, rangesPerGroup);
+ this.minFilesForRangeParallelism =
+ PropertyUtil.propertyAsInt(
+ options, MIN_FILES_FOR_RANGE_PARALLELISM, MIN_FILES_FOR_RANGE_PARALLELISM_DEFAULT);
+ Preconditions.checkArgument(
+ minFilesForRangeParallelism > 0,
+ "'%s' must be > 0, got %s",
+ MIN_FILES_FOR_RANGE_PARALLELISM,
+ minFilesForRangeParallelism);
+ }
+
+ @Override
+ void doRewrite(String groupId, RewriteFileGroup group) {
+ List tasks = group.fileScanTasks();
+ validateSortOrder(groupId, tasks);
+
+ JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+ Broadcast tableBroadcast = tableBroadcast(jsc);
+
+ SortOrder fileSortOrder = resolveOutputSortOrder(table(), outputSortOrderId(tasks));
+ long targetFileSize = group.maxOutputFileSize();
+ int outputSpecId = group.outputSpecId();
+ int numRanges = calculateNumRanges(tasks, targetFileSize);
+
+ List> rangeAssignments = assignFilesBySize(tasks, numRanges);
+
+ // Broadcast range assignments to executors. ScanTaskSetManager is driver-local and cannot
+ // be accessed from executors. Broadcasting is safe here because range counts are bounded
+ // by ranges-per-group (default 25) and each range contains only file metadata references.
+ Broadcast>> broadcastAssignments = jsc.broadcast(rangeAssignments);
+
+ try {
+ List rangeIndices = Lists.newArrayListWithCapacity(numRanges);
+ for (int i = 0; i < numRanges; i++) {
+ rangeIndices.add(i);
+ }
+
+ Set allOutputFiles =
+ Sets.newHashSet(
+ jsc.parallelize(rangeIndices, numRanges)
+ .flatMap(
+ rangeIndex -> {
+ List files = broadcastAssignments.value().get(rangeIndex);
+ if (files.isEmpty()) {
+ return Collections.emptyList().iterator();
+ }
+
+ Table table = tableBroadcast.value();
+ return mergeAndWriteFiles(
+ files, table, fileSortOrder, targetFileSize, outputSpecId)
+ .iterator();
+ })
+ .collect());
+
+ FileRewriteCoordinator.get().stageRewrite(table(), groupId, allOutputFiles);
+
+ LOG.info(
+ "K-way merge completed for group {}: {} input files in {} ranges -> {} output files",
+ groupId,
+ tasks.size(),
+ numRanges,
+ allOutputFiles.size());
+ } finally {
+ broadcastAssignments.destroy();
+ }
+ }
+
+ private Broadcast tableBroadcast(JavaSparkContext jsc) {
+ if (cachedTableBroadcast == null) {
+ synchronized (this) {
+ if (cachedTableBroadcast == null) {
+ cachedTableBroadcast = jsc.broadcast(SerializableTable.copyOf(table()));
+ }
+ }
+ }
+ return cachedTableBroadcast;
+ }
+
+ private static List mergeAndWriteFiles(
+ List files,
+ Table table,
+ SortOrder fileSortOrder,
+ long targetFileSize,
+ int outputSpecId) {
+
+ CloseableGroup resources = new CloseableGroup();
+ resources.setSuppressCloseFailure(true);
+
+ try {
+ List> fileIterables = Lists.newArrayList();
+
+ for (FileScanTask task : files) {
+ FileScanTask readTask =
+ new BaseFileScanTask(
+ task.file(),
+ task.deletes().toArray(new DeleteFile[0]),
+ SchemaParser.toJson(table.schema()),
+ PartitionSpecParser.toJson(table.spec()),
+ ResidualEvaluator.unpartitioned(Expressions.alwaysTrue()));
+
+ GenericDeleteFilter deleteFilter =
+ new GenericDeleteFilter(table.io(), readTask, table.schema(), table.schema());
+
+ Schema readSchema = deleteFilter.requiredSchema();
+
+ EncryptedInputFile encryptedInput =
+ EncryptedFiles.encryptedInput(
+ table.io().newInputFile(task.file().location()), task.file().keyMetadata());
+ InputFile input = table.encryption().decrypt(encryptedInput);
+
+ CloseableIterable records = openFile(input, readSchema, readTask, task);
+ resources.addCloseable(records);
+ records = deleteFilter.filter(records);
+ fileIterables.add(records);
+ }
+
+ @SuppressWarnings("unchecked")
+ Comparator comparator =
+ (Comparator)
+ (Comparator>) SortOrderComparators.forSchema(table.schema(), fileSortOrder);
+
+ SortedMerge sortedMerge = new SortedMerge<>(comparator, fileIterables);
+ resources.addCloseable(sortedMerge);
+
+ return writeOutputFiles(
+ sortedMerge.iterator(), table, fileSortOrder, targetFileSize, outputSpecId);
+
+ } catch (Exception e) {
+ LOG.error("K-way merge failed on executor", e);
+ throw new RuntimeException("K-way merge failed", e);
+ } finally {
+ try {
+ resources.close();
+ } catch (IOException closeException) {
+ LOG.warn("Failed to close resources during cleanup", closeException);
+ }
+ }
+ }
+
+ private static CloseableIterable openFile(
+ InputFile input, Schema readSchema, FileScanTask readTask, FileScanTask originalTask) {
+ switch (originalTask.file().format()) {
+ case PARQUET:
+ return Parquet.read(input)
+ .project(readSchema)
+ .filter(readTask.residual())
+ .createReaderFunc(
+ fileSchema ->
+ GenericParquetReaders.buildReader(
+ readSchema, fileSchema, Collections.emptyMap()))
+ .split(originalTask.start(), originalTask.length())
+ .build();
+ case AVRO:
+ return Avro.read(input)
+ .project(readSchema)
+ .createReaderFunc(
+ avroSchema -> DataReader.create(readSchema, avroSchema, Collections.emptyMap()))
+ .split(originalTask.start(), originalTask.length())
+ .build();
+ case ORC:
+ return ORC.read(input)
+ .project(readSchema)
+ .filter(readTask.residual())
+ .createReaderFunc(
+ fileSchema ->
+ GenericOrcReader.buildReader(readSchema, fileSchema, Collections.emptyMap()))
+ .split(originalTask.start(), originalTask.length())
+ .build();
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported file format: " + originalTask.file().format());
+ }
+ }
+
+ private static List writeOutputFiles(
+ CloseableIterator records,
+ Table table,
+ SortOrder outputSortOrder,
+ long targetFileSize,
+ int outputSpecId)
+ throws IOException {
+
+ PartitionSpec spec = table.specs().get(outputSpecId);
+ Schema schema = table.schema();
+ PartitionKey outputPartitionKey = new PartitionKey(spec, schema);
+
+ TaskContext taskContext = TaskContext.get();
+ int partitionId = taskContext != null ? taskContext.partitionId() : 0;
+ long taskId = taskContext != null ? taskContext.taskAttemptId() : 0L;
+ OutputFileFactory fileFactory =
+ OutputFileFactory.builderFor(table, partitionId, taskId).defaultSpec(spec).build();
+ FileFormat outputFormat =
+ FileFormat.fromString(
+ table
+ .properties()
+ .getOrDefault(
+ TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
+
+ GenericAppenderFactory appenderFactory =
+ new GenericAppenderFactory(schema, spec, null, null, null).setAll(table.properties());
+
+ List writtenFiles = Lists.newArrayList();
+ DataWriter writer = null;
+ String currentOutputFilePath = null;
+ StructLike currentPartition = null;
+
+ try {
+ while (records.hasNext()) {
+ Record record = records.next();
+ outputPartitionKey.partition(record);
+
+ boolean partitionChanged =
+ currentPartition != null && !currentPartition.equals(outputPartitionKey);
+ if (writer == null || writer.length() >= targetFileSize || partitionChanged) {
+ if (writer != null) {
+ writer.close();
+ writtenFiles.add(writer.toDataFile());
+ writer = null;
+ currentOutputFilePath = null;
+ }
+
+ currentPartition = outputPartitionKey.copy();
+ EncryptedOutputFile encryptedOutputFile =
+ fileFactory.newOutputFile(spec, currentPartition);
+ currentOutputFilePath = encryptedOutputFile.encryptingOutputFile().location();
+
+ writer =
+ new DataWriter<>(
+ appenderFactory.newAppender(encryptedOutputFile, outputFormat),
+ outputFormat,
+ currentOutputFilePath,
+ spec,
+ currentPartition,
+ encryptedOutputFile.keyMetadata(),
+ outputSortOrder);
+ }
+
+ writer.write(record);
+ }
+
+ if (writer != null) {
+ writer.close();
+ writtenFiles.add(writer.toDataFile());
+ writer = null;
+ currentOutputFilePath = null;
+ }
+ } finally {
+ try {
+ records.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close records iterator during cleanup", e);
+ }
+
+ if (writer != null) {
+ cleanupPartialWriter(writer, currentOutputFilePath, table);
+ }
+ }
+
+ return writtenFiles;
+ }
+
+ private void validateSortOrder(String groupId, List group) {
+ Preconditions.checkArgument(!group.isEmpty(), "Cannot rewrite empty file group: %s", groupId);
+
+ Integer firstSortOrderId = group.get(0).file().sortOrderId();
+
+ if (firstSortOrderId == null || firstSortOrderId == 0) {
+ throw new ValidationException(
+ "Group %s has files with missing or unsorted sort order. "
+ + "K-way merge requires all files to be pre-sorted. "
+ + "Use the SORT strategy first.",
+ groupId);
+ }
+
+ for (FileScanTask task : group) {
+ Integer sortOrderId = task.file().sortOrderId();
+ if (!Objects.equals(sortOrderId, firstSortOrderId)) {
+ throw new ValidationException(
+ "Group %s has files with incompatible sort orders (found %s and %s). "
+ + "K-way merge requires all files to have the same sort order. "
+ + "Use the SORT strategy first.",
+ groupId, firstSortOrderId, sortOrderId);
+ }
+ }
+ }
+
+ private int outputSortOrderId(List group) {
+ Integer inputSortOrderId = group.get(0).file().sortOrderId();
+ if (inputSortOrderId != null && inputSortOrderId > 0) {
+ if (group.stream().allMatch(t -> Objects.equals(t.file().sortOrderId(), inputSortOrderId))) {
+ return inputSortOrderId;
+ }
+ }
+ return table().sortOrder().orderId();
+ }
+
+ private static SortOrder resolveOutputSortOrder(Table table, int sortOrderId) {
+ if (sortOrderId > 0 && table.sortOrders().containsKey(sortOrderId)) {
+ return table.sortOrders().get(sortOrderId);
+ }
+
+ Preconditions.checkArgument(
+ !table.sortOrder().isUnsorted(),
+ "Cannot resolve output sort order: table %s is unsorted and sort order ID %s is not in "
+ + "the table's sort order map. Use the SORT strategy first.",
+ table.name(),
+ sortOrderId);
+
+ return table.sortOrder();
+ }
+
+ private int calculateNumRanges(List files, long targetFileSize) {
+ if (!rangeParallelismEnabled || files.size() < minFilesForRangeParallelism) {
+ return 1;
+ }
+
+ long totalInputSize = files.stream().mapToLong(f -> f.file().fileSizeInBytes()).sum();
+
+ if (totalInputSize < targetFileSize * 1.5) {
+ return 1;
+ }
+
+ long expectedOutputFiles = (totalInputSize + targetFileSize - 1) / targetFileSize;
+ int targetRanges = Math.min(rangesPerGroup, (int) expectedOutputFiles);
+ return Math.max(1, targetRanges);
+ }
+
+ private static List> assignFilesBySize(
+ List files, int numRanges) {
+ if (numRanges <= 1) {
+ return ImmutableList.of(Lists.newArrayList(files));
+ }
+
+ long totalInputSize = files.stream().mapToLong(f -> f.file().fileSizeInBytes()).sum();
+ long targetSizePerRange = totalInputSize / numRanges;
+
+ List> ranges = Lists.newArrayListWithCapacity(numRanges);
+ for (int i = 0; i < numRanges; i++) {
+ ranges.add(Lists.newArrayList());
+ }
+
+ int currentRange = 0;
+ long currentRangeSize = 0;
+
+ for (FileScanTask file : files) {
+ if (currentRange < numRanges - 1
+ && currentRangeSize > 0
+ && currentRangeSize + file.file().fileSizeInBytes() > targetSizePerRange) {
+ currentRange++;
+ currentRangeSize = 0;
+ }
+
+ ranges.get(currentRange).add(file);
+ currentRangeSize += file.file().fileSizeInBytes();
+ }
+
+ return ranges;
+ }
+
+ private static void cleanupPartialWriter(
+ DataWriter writer, String outputFilePath, Table table) {
+ try {
+ writer.close();
+ } catch (IOException closeException) {
+ LOG.error("Failed to close writer during exception cleanup", closeException);
+ }
+ if (outputFilePath != null) {
+ LOG.warn("Attempting to delete partial output file: {}", outputFilePath);
+ try {
+ table.io().deleteFile(outputFilePath);
+ } catch (Exception deleteException) {
+ LOG.warn(
+ "Could not delete partial output file {} - may need manual cleanup",
+ outputFilePath,
+ deleteException);
+ }
+ }
+ }
+}
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 5ab07f4d0a67..06bce98230fa 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -29,6 +29,7 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.NamedReference;
import org.apache.iceberg.expressions.Zorder;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.ExtendedParser;
@@ -200,9 +201,17 @@ private RewriteDataFilesSparkAction checkAndApplyStrategy(
return binPackAction.sort(buildSortOrder(sortOrderFields, schema));
}
return binPackAction;
+ } else if (strategy.equalsIgnoreCase("k-way-merge")) {
+ Preconditions.checkArgument(
+ sortOrderString == null,
+ "Cannot specify sort_order with k-way-merge strategy. "
+ + "K-way merge uses the table's existing sort order.");
+ return action.kWayMerge();
} else {
throw new IllegalArgumentException(
- "unsupported strategy: " + strategy + ". Only binpack or sort is supported");
+ "unsupported strategy: "
+ + strategy
+ + ". Only binpack, sort, or k-way-merge is supported");
}
}
diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestKWayMergeFileRewriteRunner.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestKWayMergeFileRewriteRunner.java
new file mode 100644
index 000000000000..7403d13d389b
--- /dev/null
+++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestKWayMergeFileRewriteRunner.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewriteDataFiles;
+import org.apache.iceberg.actions.SizeBasedFileRewritePlanner;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.iceberg.spark.source.ThreeColumnRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestKWayMergeFileRewriteRunner extends TestBase {
+
+ @TempDir private File tableDir;
+
+ private static final HadoopTables TABLES = new HadoopTables(new Configuration());
+ private static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.optional(1, "c1", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "c2", Types.StringType.get()),
+ Types.NestedField.optional(3, "c3", Types.StringType.get()));
+
+ private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("c1").build();
+
+ private String tableLocation = null;
+
+ @BeforeAll
+ static void setupSpark() {
+ spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false");
+ }
+
+ @BeforeEach
+ void setupTableLocation() {
+ this.tableLocation = tableDir.toURI().toString();
+ }
+
+ @Test
+ void testKWayMergeUnpartitionedTable() {
+ Table table = createSortedTable(4);
+ int initialFiles = fileCount(table);
+ assertThat(initialFiles).as("Table should have multiple files").isGreaterThanOrEqualTo(4);
+ List expectedRecords = currentData();
+
+ RewriteDataFiles.Result result =
+ actions()
+ .rewriteDataFiles(table)
+ .kWayMerge()
+ .option(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "1")
+ .execute();
+
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should rewrite all data files")
+ .isEqualTo(initialFiles);
+ assertThat(result.addedDataFilesCount())
+ .as("Action should add at least 1 data file")
+ .isGreaterThanOrEqualTo(1);
+
+ table.refresh();
+ assertThat(fileCount(table))
+ .as("File count should decrease or stay same")
+ .isLessThanOrEqualTo(initialFiles);
+ List actual = currentData();
+ assertEquals("Rows must match", expectedRecords, actual);
+ }
+
+ @Test
+ void testKWayMergePartitionedTable() {
+ Table table = createSortedTablePartitioned(4, 2);
+ int initialFiles = fileCount(table);
+ assertThat(initialFiles).as("Table should have multiple files").isGreaterThanOrEqualTo(8);
+ List expectedRecords = currentData();
+
+ RewriteDataFiles.Result result =
+ actions()
+ .rewriteDataFiles(table)
+ .kWayMerge()
+ .option(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "1")
+ .execute();
+
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should rewrite data files")
+ .isEqualTo(initialFiles);
+ assertThat(result.addedDataFilesCount()).as("Action should add files").isGreaterThan(0);
+
+ List actual = currentData();
+ assertEquals("Rows must match", expectedRecords, actual);
+ }
+
+ @Test
+ void testKWayMergePreservesSort() {
+ Table table = createSortedTable(3);
+ assertThat(fileCount(table)).as("Table should have multiple files").isGreaterThanOrEqualTo(3);
+
+ actions()
+ .rewriteDataFiles(table)
+ .kWayMerge()
+ .option(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "1")
+ .execute();
+
+ table.refresh();
+
+ try (CloseableIterable tasks = table.newScan().planFiles()) {
+ for (FileScanTask task : tasks) {
+ assertThat(task.file().sortOrderId())
+ .as("Output files must have valid sort order ID")
+ .isGreaterThan(0);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ void testKWayMergeWithFilter() {
+ Table table = createSortedTablePartitioned(4, 2);
+ assertThat(fileCount(table)).as("Table should have multiple files").isGreaterThanOrEqualTo(8);
+ List expectedRecords = currentData();
+
+ RewriteDataFiles.Result result =
+ actions()
+ .rewriteDataFiles(table)
+ .kWayMerge()
+ .filter(Expressions.equal("c1", 1))
+ .option(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "1")
+ .execute();
+
+ assertThat(result.rewrittenDataFilesCount())
+ .as("Action should rewrite files in filtered partition")
+ .isGreaterThan(0);
+
+ List actual = currentData();
+ assertEquals("Rows must match", expectedRecords, actual);
+ }
+
+ @Test
+ void testKWayMergeRejectsUnsortedTable() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map options = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ writeUnsortedRecords(table, 3);
+ table.refresh();
+ assertThat(fileCount(table)).as("Table should have files").isGreaterThanOrEqualTo(3);
+
+ assertThatThrownBy(
+ () ->
+ actions()
+ .rewriteDataFiles(table)
+ .kWayMerge()
+ .option(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "1")
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("K-way merge requires a table sort order");
+ }
+
+ @Test
+ void testKWayMergeRejectsUnsortedFiles() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map options = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+ Table table = TABLES.create(SCHEMA, spec, SORT_ORDER, options, tableLocation);
+
+ writeUnsortedRecords(table, 3);
+ table.refresh();
+ assertThat(fileCount(table)).as("Table should have files").isGreaterThanOrEqualTo(3);
+
+ assertThatThrownBy(
+ () ->
+ actions()
+ .rewriteDataFiles(table)
+ .kWayMerge()
+ .option(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "1")
+ .execute())
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("K-way merge requires all files to be pre-sorted");
+ }
+
+ @Test
+ void testKWayMergeValidOptions() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map options = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+ Table table = TABLES.create(SCHEMA, spec, SORT_ORDER, options, tableLocation);
+
+ SparkKWayMergeFileRewriteRunner runner = new SparkKWayMergeFileRewriteRunner(spark, table);
+
+ assertThat(runner.validOptions())
+ .as("Runner must report all supported options")
+ .contains(
+ SparkKWayMergeFileRewriteRunner.RANGE_PARALLELISM_ENABLED,
+ SparkKWayMergeFileRewriteRunner.RANGES_PER_GROUP,
+ SparkKWayMergeFileRewriteRunner.MIN_FILES_FOR_RANGE_PARALLELISM);
+ }
+
+ @Test
+ void testKWayMergeInvalidOptionRangesPerGroup() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map options = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+ Table table = TABLES.create(SCHEMA, spec, SORT_ORDER, options, tableLocation);
+
+ SparkKWayMergeFileRewriteRunner runner = new SparkKWayMergeFileRewriteRunner(spark, table);
+
+ assertThatThrownBy(
+ () ->
+ runner.init(ImmutableMap.of(SparkKWayMergeFileRewriteRunner.RANGES_PER_GROUP, "0")))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("must be > 0");
+ }
+
+ @Test
+ void testKWayMergeDisabledRangeParallelism() {
+ Table table = createSortedTable(3);
+ assertThat(fileCount(table)).as("Table should have files").isGreaterThanOrEqualTo(3);
+ List expectedRecords = currentData();
+
+ RewriteDataFiles.Result result =
+ actions()
+ .rewriteDataFiles(table)
+ .kWayMerge()
+ .option(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "1")
+ .option(SparkKWayMergeFileRewriteRunner.RANGE_PARALLELISM_ENABLED, "false")
+ .execute();
+
+ assertThat(result.rewrittenDataFilesCount()).as("Action should rewrite files").isGreaterThan(0);
+
+ List actual = currentData();
+ assertEquals("Rows must match", expectedRecords, actual);
+ }
+
+ @Test
+ void testKWayMergeDescription() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map options = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+ Table table = TABLES.create(SCHEMA, spec, SORT_ORDER, options, tableLocation);
+
+ SparkKWayMergeFileRewriteRunner runner = new SparkKWayMergeFileRewriteRunner(spark, table);
+ assertThat(runner.description()).isEqualTo("K-WAY-MERGE");
+ }
+
+ private SparkActions actions() {
+ return SparkActions.get();
+ }
+
+ private Table createSortedTable(int numFiles) {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map options = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+ Table table = TABLES.create(SCHEMA, spec, SORT_ORDER, options, tableLocation);
+ table
+ .updateProperties()
+ .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(20 * 1024))
+ .commit();
+
+ writeSortedRecords(table, numFiles);
+ table.refresh();
+ return table;
+ }
+
+ private Table createSortedTablePartitioned(int partitions, int filesPerPartition) {
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+ Map options = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+ Table table = TABLES.create(SCHEMA, spec, SORT_ORDER, options, tableLocation);
+ table
+ .updateProperties()
+ .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, Integer.toString(20 * 1024))
+ .commit();
+
+ for (int p = 0; p < partitions; p++) {
+ for (int f = 0; f < filesPerPartition; f++) {
+ int partition = p;
+ List records =
+ IntStream.range(f * 100, (f + 1) * 100)
+ .mapToObj(i -> new ThreeColumnRecord(partition, "foo" + i, "bar" + i))
+ .collect(Collectors.toList());
+ Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).repartition(1);
+ df.select("c1", "c2", "c3")
+ .sortWithinPartitions("c1", "c2")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "true")
+ .save(tableLocation);
+ }
+ }
+
+ table.refresh();
+ return table;
+ }
+
+ private void writeSortedRecords(Table table, int numFiles) {
+ for (int f = 0; f < numFiles; f++) {
+ List records =
+ IntStream.range(f * 200, (f + 1) * 200)
+ .mapToObj(i -> new ThreeColumnRecord(i, "foo" + i, "bar" + i))
+ .collect(Collectors.toList());
+ Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).repartition(1);
+ df.select("c1", "c2", "c3")
+ .sortWithinPartitions("c1", "c2")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "true")
+ .save(tableLocation);
+ }
+ }
+
+ private void writeUnsortedRecords(Table table, int numFiles) {
+ List records =
+ IntStream.range(0, 500)
+ .mapToObj(i -> new ThreeColumnRecord(500 - i, "foo" + i, "bar" + i))
+ .collect(Collectors.toList());
+ Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).repartition(numFiles);
+ df.select("c1", "c2", "c3")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+ .save(tableLocation);
+ }
+
+ private List currentData() {
+ Dataset ds = spark.read().format("iceberg").load(tableLocation);
+ return ds.orderBy("c1", "c2", "c3").collectAsList().stream()
+ .map(
+ row ->
+ new Object[] {
+ row.isNullAt(0) ? null : row.getInt(0), row.getString(1), row.getString(2)
+ })
+ .collect(Collectors.toList());
+ }
+
+ private int fileCount(Table table) {
+ table.refresh();
+ int numFiles = 0;
+ try (CloseableIterable tasks = table.newScan().planFiles()) {
+ for (FileScanTask task : tasks) {
+ numFiles++;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return numFiles;
+ }
+}