Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,23 @@ default RewriteDataFiles zOrder(String... columns) {
"Z-ORDER Rewrite Strategy not implemented for this framework");
}

/**
* Choose K-WAY MERGE as a strategy for this rewrite operation. This strategy merges pre-sorted
* files in sort-key order without shuffling. All input files must already be sorted according to
* the table's sort order. The table must have a defined sort order.
*
* <p>K-way merge preserves the table's sort order in output files. It is optimal for tables that
* are already sorted but have accumulated multiple overlapping files per partition (e.g., after
* daily ingestion into a previously compacted table).
*
* @return this for method chaining
* @since 1.11.0
*/
default RewriteDataFiles kWayMerge() {
throw new UnsupportedOperationException(
"K-WAY MERGE Rewrite Strategy not implemented for this framework");
}

/**
* A user provided filter for determining which files will be considered by the rewrite strategy.
* This will be used in addition to whatever rules the rewrite strategy generates. For example
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A planner for K-way merge rewriting that selects files based on size thresholds and orders them
* by sort-key lower bounds before bin-packing into groups.
*
* <p>File selection uses the same size-based criteria as other strategies: files outside the
* desired size range (too small or too large) are candidates for rewriting. This ensures K-way
* merge compacts fragmented files within a partition without shuffling, preserving the existing
* sort order.
*
* <p>Files are sorted by their lower bounds on the first sort field before bin-packing so that
* adjacent files in key space are packed into the same group, minimizing cross-group overlap.
*/
public class KWayMergeRewriteFilePlanner
extends SizeBasedFileRewritePlanner<FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> {

private static final Logger LOG = LoggerFactory.getLogger(KWayMergeRewriteFilePlanner.class);

private final Expression filter;
private final Long snapshotId;
private final boolean caseSensitive;

private RewriteJobOrder rewriteJobOrder;

public KWayMergeRewriteFilePlanner(Table table) {
this(table, Expressions.alwaysTrue(), null, false);
}

public KWayMergeRewriteFilePlanner(Table table, Expression filter) {
this(
table,
filter,
table.currentSnapshot() != null ? table.currentSnapshot().snapshotId() : null,
false);
}

public KWayMergeRewriteFilePlanner(
Table table, Expression filter, Long snapshotId, boolean caseSensitive) {
super(table);
this.filter = filter;
this.snapshotId = snapshotId;
this.caseSensitive = caseSensitive;
}

@Override
public Set<String> validOptions() {
return ImmutableSet.<String>builder()
.addAll(super.validOptions())
.add(RewriteDataFiles.REWRITE_JOB_ORDER)
.build();
}

@Override
public void init(Map<String, String> options) {
super.init(options);
this.rewriteJobOrder =
RewriteJobOrder.fromName(
PropertyUtil.propertyAsString(
options,
RewriteDataFiles.REWRITE_JOB_ORDER,
RewriteDataFiles.REWRITE_JOB_ORDER_DEFAULT));
}

@Override
protected long defaultTargetFileSize() {
return PropertyUtil.propertyAsLong(
table().properties(),
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
}

@Override
protected Iterable<FileScanTask> filterFiles(Iterable<FileScanTask> tasks) {
return Iterables.filter(tasks, this::outsideDesiredFileSizeRange);
}

@Override
protected Iterable<List<FileScanTask>> filterFileGroups(List<List<FileScanTask>> groups) {
return Iterables.filter(
groups, group -> enoughInputFiles(group) || enoughContent(group) || tooMuchContent(group));
}

@Override
protected Iterable<List<FileScanTask>> planFileGroups(Iterable<FileScanTask> tasks) {
SortOrder sortOrder = table().sortOrder();
Preconditions.checkArgument(
!sortOrder.isUnsorted(),
"K-way merge requires a table sort order, but table %s is unsorted. "
+ "Use the SORT strategy to sort the files first.",
table().name());

int sortFieldId = sortOrder.fields().get(0).sourceId();
Types.NestedField sortField = table().schema().findField(sortFieldId);
Type.PrimitiveType sortFieldType = sortField.type().asPrimitiveType();
Comparator<Object> valueComparator = Comparators.forType(sortFieldType);

List<FileScanTask> taskList = Lists.newArrayList(tasks);

taskList.sort(
Comparator.comparing(
(FileScanTask task) -> {
Map<Integer, ByteBuffer> lowerBounds = task.file().lowerBounds();
if (lowerBounds == null || !lowerBounds.containsKey(sortFieldId)) {
return null;
}
return Conversions.fromByteBuffer(sortFieldType, lowerBounds.get(sortFieldId));
},
Comparator.nullsLast(valueComparator)));

return super.planFileGroups(taskList);
}

@Override
public FileRewritePlan<FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> plan() {
StructLikeMap<List<List<FileScanTask>>> groupsByPartition = planFileGroups();
RewriteExecutionContext ctx = new RewriteExecutionContext();
List<RewriteFileGroup> selectedFileGroups = Lists.newArrayList();

groupsByPartition.entrySet().stream()
.filter(e -> !e.getValue().isEmpty())
.forEach(
entry -> {
StructLike partition = entry.getKey();
entry
.getValue()
.forEach(
fileScanTasks -> {
long inputSize = inputSize(fileScanTasks);
selectedFileGroups.add(
newRewriteGroup(
ctx,
partition,
fileScanTasks,
inputSplitSize(inputSize),
expectedOutputFiles(inputSize)));
});
});

Map<StructLike, Integer> groupsInPartition = groupsByPartition.transformValues(List::size);
int totalGroupCount = groupsInPartition.values().stream().reduce(Integer::sum).orElse(0);
return new FileRewritePlan<>(
CloseableIterable.of(
selectedFileGroups.stream()
.sorted(RewriteFileGroup.comparator(rewriteJobOrder))
.collect(Collectors.toList())),
totalGroupCount,
groupsInPartition);
}

private StructLikeMap<List<List<FileScanTask>>> planFileGroups() {
TableScan scan =
table()
.newScan()
.filter(filter)
.caseSensitive(caseSensitive)
.ignoreResiduals()
.includeColumnStats();

if (snapshotId != null) {
scan = scan.useSnapshot(snapshotId);
}

CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles();

try {
Types.StructType partitionType = table().spec().partitionType();
StructLikeMap<List<FileScanTask>> filesByPartition =
groupByPartition(partitionType, fileScanTasks);
return filesByPartition.transformValues(tasks -> ImmutableList.copyOf(planFileGroups(tasks)));
} finally {
try {
fileScanTasks.close();
} catch (IOException io) {
LOG.error("Cannot properly close file iterable while planning for rewrite", io);
}
}
}

private StructLikeMap<List<FileScanTask>> groupByPartition(
Types.StructType partitionType, Iterable<FileScanTask> tasks) {
StructLikeMap<List<FileScanTask>> filesByPartition = StructLikeMap.create(partitionType);
StructLike emptyStruct = GenericRecord.create(partitionType);

for (FileScanTask task : tasks) {
StructLike taskPartition =
task.file().specId() == table().spec().specId() ? task.file().partition() : emptyStruct;
filesByPartition.computeIfAbsent(taskPartition, unused -> Lists.newArrayList()).add(task);
}

return filesByPartition;
}

private RewriteFileGroup newRewriteGroup(
RewriteExecutionContext ctx,
StructLike partition,
List<FileScanTask> tasks,
long inputSplitSize,
int expectedOutputFiles) {
FileGroupInfo info =
ImmutableRewriteDataFiles.FileGroupInfo.builder()
.globalIndex(ctx.currentGlobalIndex())
.partitionIndex(ctx.currentPartitionIndex(partition))
.partition(partition)
.build();
return new RewriteFileGroup(
info,
Lists.newArrayList(tasks),
outputSpecId(),
writeMaxFileSize(),
inputSplitSize,
expectedOutputFiles);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ public void testRewriteDataFilesWithInvalidInputs() {
+ "strategy => 'temp')",
catalogName, tableIdent))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("unsupported strategy: temp. Only binpack or sort is supported");
.hasMessage("unsupported strategy: temp. Only binpack, sort, or k-way-merge is supported");

// Test for sort_order with binpack strategy
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
import org.apache.iceberg.actions.FileRewritePlan;
import org.apache.iceberg.actions.FileRewritePlanner;
import org.apache.iceberg.actions.FileRewriteRunner;
import org.apache.iceberg.actions.ImmutableRewriteDataFiles;
import org.apache.iceberg.actions.ImmutableRewriteDataFiles.Result.Builder;
import org.apache.iceberg.actions.KWayMergeRewriteFilePlanner;
import org.apache.iceberg.actions.RewriteDataFiles;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteFileGroup;
Expand Down Expand Up @@ -98,7 +100,8 @@ public class RewriteDataFilesSparkAction
private boolean useStartingSequenceNumber;
private boolean caseSensitive;
private String branch = SnapshotRef.MAIN_BRANCH;
private BinPackRewriteFilePlanner planner = null;
private FileRewritePlanner<FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> planner =
null;
private FileRewriteRunner<FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> runner = null;

RewriteDataFilesSparkAction(SparkSession spark, Table table) {
Expand Down Expand Up @@ -146,6 +149,13 @@ public RewriteDataFilesSparkAction zOrder(String... columnNames) {
return this;
}

@Override
public RewriteDataFilesSparkAction kWayMerge() {
ensureRunnerNotSet();
this.runner = new SparkKWayMergeFileRewriteRunner(spark(), table);
return this;
}

private void ensureRunnerNotSet() {
Preconditions.checkArgument(
runner == null,
Expand Down Expand Up @@ -205,10 +215,16 @@ public RewriteDataFiles.Result execute() {
}

private void init(long startingSnapshotId) {
this.planner =
runner instanceof SparkShufflingFileRewriteRunner
? new SparkShufflingDataRewritePlanner(table, filter, startingSnapshotId, caseSensitive)
: new BinPackRewriteFilePlanner(table, filter, startingSnapshotId, caseSensitive);
if (runner instanceof SparkKWayMergeFileRewriteRunner) {
this.planner =
new KWayMergeRewriteFilePlanner(table, filter, startingSnapshotId, caseSensitive);
} else if (runner instanceof SparkShufflingFileRewriteRunner) {
this.planner =
new SparkShufflingDataRewritePlanner(table, filter, startingSnapshotId, caseSensitive);
} else {
this.planner =
new BinPackRewriteFilePlanner(table, filter, startingSnapshotId, caseSensitive);
}

// Default to BinPack if no strategy selected
if (this.runner == null) {
Expand Down
Loading
Loading