Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hadoop-ozone/dist/src/main/license/bin/LICENSE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ Apache License 2.0
org.apache.hadoop:hadoop-shaded-protobuf_3_25
org.apache.httpcomponents:httpcore
org.apache.iceberg:iceberg-api
org.apache.iceberg:iceberg-bundled-guava
org.apache.iceberg:iceberg-common
org.apache.iceberg:iceberg-core
org.apache.kerby:kerb-admin
Expand Down
1 change: 1 addition & 0 deletions hadoop-ozone/dist/src/main/license/jar-report.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ share/ozone/lib/hppc.jar
share/ozone/lib/httpclient.jar
share/ozone/lib/httpcore.jar
share/ozone/lib/iceberg-api.jar
share/ozone/lib/iceberg-bundled-guava.jar
share/ozone/lib/iceberg-common.jar
share/ozone/lib/iceberg-core.jar
share/ozone/lib/istack-commons-runtime.jar
Expand Down
22 changes: 12 additions & 10 deletions hadoop-ozone/iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-bundled-guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
Expand All @@ -60,10 +54,6 @@
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5-h2</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-bundled-guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
Expand All @@ -74,6 +64,18 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.iceberg;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.iceberg.BaseTable;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.RewriteTablePathUtil;
import org.apache.iceberg.RewriteTablePathUtil.RewriteResult;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadata.MetadataLogEntry;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.actions.ImmutableRewriteTablePath;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.util.Pair;

/**
* An implementation of {@link RewriteTablePath} for Apache Ozone backed Iceberg tables.
*
* <p>This action rewrites table's metadata and position delete file paths by replacing a source
* prefix with a target prefix. It processes table versions, snapshots, manifests and position delete files.</p>
*
* <p>The rewrite can be scoped between optional start and end metadata versions,
* and all rewritten files are staged in a temporary directory.</p>
*/
public class RewriteTablePathOzoneAction implements RewriteTablePath {

private static final String RESULT_LOCATION = "file-list";
Expand Down Expand Up @@ -121,8 +145,9 @@ private void validateInputs() {
"Source prefix cannot be the same as target prefix (%s)", sourcePrefix));
}

validateAndSetEndVersion();
validateAndSetStartVersion();
TableMetadata tableMetadata = ((HasTableOperations) table).operations().current();
validateAndSetEndVersion(tableMetadata);
validateAndSetStartVersion(tableMetadata);

if (stagingDir == null) {
stagingDir =
Expand All @@ -135,9 +160,7 @@ private void validateInputs() {
}
}

private void validateAndSetEndVersion() {
TableMetadata tableMetadata = ((HasTableOperations) table).operations().current();

private void validateAndSetEndVersion(TableMetadata tableMetadata) {
if (endVersionName == null) {
Objects.requireNonNull(
tableMetadata.metadataFileLocation(), "Metadata file location should not be null");
Expand All @@ -147,9 +170,7 @@ private void validateAndSetEndVersion() {
}
}

private void validateAndSetStartVersion() {
TableMetadata tableMetadata = ((HasTableOperations) table).operations().current();

private void validateAndSetStartVersion(TableMetadata tableMetadata) {
if (startVersionName != null) {
this.startVersionName = validateVersion(tableMetadata, startVersionName);
}
Expand All @@ -159,11 +180,12 @@ private String validateVersion(TableMetadata tableMetadata, String versionFileNa
String versionFile = null;
if (versionInFilePath(tableMetadata.metadataFileLocation(), versionFileName)) {
versionFile = tableMetadata.metadataFileLocation();
}

for (MetadataLogEntry log : tableMetadata.previousFiles()) {
if (versionInFilePath(log.file(), versionFileName)) {
versionFile = log.file();
} else {
for (MetadataLogEntry log : tableMetadata.previousFiles()) {
if (versionInFilePath(log.file(), versionFileName)) {
versionFile = log.file();
break;
}
}
}

Expand All @@ -184,8 +206,106 @@ private boolean versionInFilePath(String path, String version) {
}

private String rebuildMetadata() {
//TODO need to implement rewrite of metadata files , manifest list , manifest files and position delete files.
return null;
//TODO need to implement rewrite of manifest list , manifest files and position delete files.
TableMetadata startMetadata = startVersionName != null
? ((HasTableOperations) newStaticTable(startVersionName, table.io()))
.operations()
.current()
: null;
TableMetadata endMetadata =
((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current();

List<PartitionStatisticsFile> partitionStats = endMetadata.partitionStatisticsFiles();
if (partitionStats != null && !partitionStats.isEmpty()) {
throw new IllegalArgumentException("Partition statistics files are not supported yet.");
}

RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata);

Set<Pair<String, String>> copyPlan = new HashSet<>();
copyPlan.addAll(rewriteVersionResult.copyPlan());

return saveFileList(copyPlan);
}

private String saveFileList(Set<Pair<String, String>> filesToMove) {
String fileListPath = stagingDir + RESULT_LOCATION;
OutputFile fileList = table.io().newOutputFile(fileListPath);
writeAsCsv(filesToMove, fileList);
return fileListPath;
}

private void writeAsCsv(Set<Pair<String, String>> rows, OutputFile outputFile) {
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(outputFile.createOrOverwrite(), StandardCharsets.UTF_8))) {
for (Pair<String, String> pair : rows) {
writer.write(String.join(",", pair.first(), pair.second()));
writer.newLine();
}
} catch (IOException e) {
throw new RuntimeIOException(e);
}
}

private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) {
RewriteResult<Snapshot> result = new RewriteResult<>();
result.toRewrite().addAll(endMetadata.snapshots());
result.copyPlan().addAll(rewriteVersionFile(endMetadata, endVersionName));

List<MetadataLogEntry> versions = endMetadata.previousFiles();
for (int i = versions.size() - 1; i >= 0; i--) {
String versionFilePath = versions.get(i).file();
if (versionFilePath.equals(startVersionName)) {
break;
}

if (!fileExist(versionFilePath)) {
throw new IllegalArgumentException(String.format("Version file %s doesn't exist", versionFilePath));
}

TableMetadata tableMetadata = new StaticTableOperations(versionFilePath, table.io()).current();

result.toRewrite().addAll(tableMetadata.snapshots());
result.copyPlan().addAll(rewriteVersionFile(tableMetadata, versionFilePath));
}

return result;
}

private Set<Pair<String, String>> rewriteVersionFile(TableMetadata metadata, String versionFilePath) {
Set<Pair<String, String>> result = new HashSet<>();
String stagingPath = RewriteTablePathUtil.stagingPath(versionFilePath, sourcePrefix, stagingDir);

System.out.println("Processing version file " + versionFilePath);
TableMetadata newTableMetadata = RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix);
TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath));

result.add(Pair.of(stagingPath, RewriteTablePathUtil.newPath(versionFilePath, sourcePrefix, targetPrefix)));
result.addAll(statsFileCopyPlan(metadata.statisticsFiles(), newTableMetadata.statisticsFiles()));

return result;
}

private Set<Pair<String, String>> statsFileCopyPlan(List<StatisticsFile> beforeStats,
List<StatisticsFile> afterStats) {
Set<Pair<String, String>> result = new HashSet<>();
if (beforeStats.isEmpty()) {
return result;
}

if (beforeStats.size() != afterStats.size()) {
throw new IllegalArgumentException("Before and after path rewrite, statistic files count should be same");
}

for (int i = 0; i < beforeStats.size(); i++) {
StatisticsFile before = beforeStats.get(i);
StatisticsFile after = afterStats.get(i);
if (before.fileSizeInBytes() != after.fileSizeInBytes()) {
throw new IllegalArgumentException("Before and after path rewrite, statistic files size should be same");
}
result.add(Pair.of(before.path(), after.path()));
}
return result;
}

private boolean fileExist(String path) {
Expand Down Expand Up @@ -216,4 +336,9 @@ private static void checkNonNullNonEmpty(String value, String name) {
throw new IllegalArgumentException(name + " is empty");
}
}

private Table newStaticTable(String metadataFileLocation, FileIO io) {
StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io);
return new BaseTable(ops, metadataFileLocation);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Apache Ozone integration with Apache Iceberg.
*/
package org.apache.hadoop.ozone.iceberg;
Loading