Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;

/**
Expand Down Expand Up @@ -72,6 +73,11 @@ public interface Snapshot extends Serializable {
*/
List<ManifestFile> allManifests(FileIO io);

default List<ManifestFile> allManifests(FileIO fileIO, EncryptionManager encryption) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement allManifests with encryption");
}

/**
* Return a {@link ManifestFile} for each data manifest in this snapshot.
*
Expand All @@ -80,13 +86,19 @@ public interface Snapshot extends Serializable {
*/
List<ManifestFile> dataManifests(FileIO io);

default List<ManifestFile> dataManifests(FileIO io, EncryptionManager encryption) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement dataManifests with encryption");
}

/**
* Return a {@link ManifestFile} for each delete manifest in this snapshot.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @return a list of ManifestFile
*/
List<ManifestFile> deleteManifests(FileIO io);
// TODO add encryption manager

/**
* Return the name of the {@link DataOperations data operation} that produced this snapshot.
Expand Down Expand Up @@ -114,6 +126,7 @@ public interface Snapshot extends Serializable {
* @return all data files added to the table in this snapshot.
*/
Iterable<DataFile> addedDataFiles(FileIO io);
// TODO add encryption manager

/**
* Return all data files removed from the table in this snapshot.
Expand All @@ -126,6 +139,7 @@ public interface Snapshot extends Serializable {
* @return all data files removed from the table in this snapshot.
*/
Iterable<DataFile> removedDataFiles(FileIO io);
// TODO add encryption manager

/**
* Return all delete files added to the table in this snapshot.
Expand All @@ -140,6 +154,7 @@ default Iterable<DeleteFile> addedDeleteFiles(FileIO io) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement addedDeleteFiles");
}
// TODO add encryption manager

/**
* Return all delete files removed from the table in this snapshot.
Expand All @@ -154,6 +169,7 @@ default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement removedDeleteFiles");
}
// TODO add encryption manager

/**
* Return the location of this snapshot's manifest list, or null if it is not separate.
Expand All @@ -171,4 +187,8 @@ default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
default Integer schemaId() {
return null;
}

default String manifestKeyMetadata() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,8 @@ default Iterable<InputFile> decrypt(Iterable<EncryptedInputFile> encrypted) {
default Iterable<EncryptedOutputFile> encrypt(Iterable<OutputFile> rawOutput) {
return Iterables.transform(rawOutput, this::encrypt);
}

default EncryptedOutputFile encrypt(OutputFile rawOutput, boolean wrapEncryptionKey) {
return encrypt(rawOutput);
}
}
73 changes: 72 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -45,6 +53,7 @@ class BaseSnapshot implements Snapshot {
private final Map<String, String> summary;
private final Integer schemaId;
private final String[] v1ManifestLocations;
private final String manifestListKeyMetadata;

// lazily initialized
private transient List<ManifestFile> allManifests = null;
Expand All @@ -64,6 +73,28 @@ class BaseSnapshot implements Snapshot {
Map<String, String> summary,
Integer schemaId,
String manifestList) {
this(
sequenceNumber,
snapshotId,
parentId,
timestampMillis,
operation,
summary,
schemaId,
manifestList,
null);
}

BaseSnapshot(
long sequenceNumber,
long snapshotId,
Long parentId,
long timestampMillis,
String operation,
Map<String, String> summary,
Integer schemaId,
String manifestList,
String manifestListKeyMetadata) {
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
Expand All @@ -73,6 +104,7 @@ class BaseSnapshot implements Snapshot {
this.schemaId = schemaId;
this.manifestListLocation = manifestList;
this.v1ManifestLocations = null;
this.manifestListKeyMetadata = manifestListKeyMetadata;
}

BaseSnapshot(
Expand All @@ -93,6 +125,7 @@ class BaseSnapshot implements Snapshot {
this.schemaId = schemaId;
this.manifestListLocation = null;
this.v1ManifestLocations = v1ManifestLocations;
this.manifestListKeyMetadata = null;
}

@Override
Expand Down Expand Up @@ -130,7 +163,16 @@ public Integer schemaId() {
return schemaId;
}

@Override
public String manifestKeyMetadata() {
return manifestListKeyMetadata;
}

private void cacheManifests(FileIO fileIO) {
cacheManifests(fileIO, null); // TODO remove
}

private void cacheManifests(FileIO fileIO, EncryptionManager encryption) {
if (fileIO == null) {
throw new IllegalArgumentException("Cannot cache changes: FileIO is null");
}
Expand All @@ -145,7 +187,20 @@ private void cacheManifests(FileIO fileIO) {

if (allManifests == null) {
// if manifests isn't set, then the snapshotFile is set and should be read to get the list
this.allManifests = ManifestLists.read(fileIO.newInputFile(manifestListLocation));

InputFile manifestListFile = fileIO.newInputFile(manifestListLocation);

if (manifestListKeyMetadata != null) {
ByteBuffer keyMetadataBytes =
ByteBuffer.wrap(Base64.getDecoder().decode(manifestListKeyMetadata));
EncryptionKeyMetadata keyMetadata = EncryptionUtil.parseKeyMetadata(keyMetadataBytes);

EncryptedInputFile encryptedInputFile =
EncryptedFiles.encryptedInput(manifestListFile, keyMetadata);
manifestListFile = encryption.decrypt(encryptedInputFile);
}

this.allManifests = ManifestLists.read(manifestListFile);
}

if (dataManifests == null || deleteManifests == null) {
Expand All @@ -168,6 +223,14 @@ public List<ManifestFile> allManifests(FileIO fileIO) {
return allManifests;
}

@Override
public List<ManifestFile> allManifests(FileIO fileIO, EncryptionManager encryption) {
if (allManifests == null) {
cacheManifests(fileIO, encryption);
}
return allManifests;
}

@Override
public List<ManifestFile> dataManifests(FileIO fileIO) {
if (dataManifests == null) {
Expand All @@ -176,6 +239,14 @@ public List<ManifestFile> dataManifests(FileIO fileIO) {
return dataManifests;
}

@Override
public List<ManifestFile> dataManifests(FileIO fileIO, EncryptionManager encryption) {
if (dataManifests == null) {
cacheManifests(fileIO, encryption);
}
return dataManifests;
}

@Override
public List<ManifestFile> deleteManifests(FileIO fileIO) {
if (deleteManifests == null) {
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/ManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -77,6 +78,7 @@ MetadataTableType metadataTableType() {

protected DataTask task(TableScan scan) {
FileIO io = table().io();
EncryptionManager encryption = table().encryption();
String location = scan.snapshot().manifestListLocation();
Map<Integer, PartitionSpec> specs = Maps.newHashMap(table().specs());

Expand All @@ -85,7 +87,7 @@ protected DataTask task(TableScan scan) {
location != null ? location : table().operations().current().metadataFileLocation()),
schema(),
scan.schema(),
scan.snapshot().allManifests(io),
scan.snapshot().allManifests(io, encryption),
manifest -> {
PartitionSpec spec = specs.get(manifest.partitionSpecId());
return ManifestsTable.manifestFileToRow(spec, manifest);
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ private SnapshotParser() {}
private static final String MANIFESTS = "manifests";
private static final String MANIFEST_LIST = "manifest-list";
private static final String SCHEMA_ID = "schema-id";
private static final String MANIFEST_LIST_KEY_METADATA = "manifest-list-key-metadata";

static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException {
generator.writeStartObject();
Expand Down Expand Up @@ -93,6 +94,10 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio
generator.writeNumberField(SCHEMA_ID, snapshot.schemaId());
}

if (snapshot.manifestKeyMetadata() != null) {
generator.writeStringField(MANIFEST_LIST_KEY_METADATA, snapshot.manifestKeyMetadata());
}

generator.writeEndObject();
}

Expand Down Expand Up @@ -147,6 +152,10 @@ static Snapshot fromJson(JsonNode node) {
if (node.has(MANIFEST_LIST)) {
// the manifest list is stored in a manifest list file
String manifestList = JsonUtil.getString(MANIFEST_LIST, node);

// Manifest list can be encrypted
String manifestListKeyMetadata = JsonUtil.getString(MANIFEST_LIST_KEY_METADATA, node);

return new BaseSnapshot(
sequenceNumber,
snapshotId,
Expand All @@ -155,7 +164,8 @@ static Snapshot fromJson(JsonNode node) {
operation,
summary,
schemaId,
manifestList);
manifestList,
manifestListKeyMetadata);

} else {
// fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,23 @@ public Snapshot apply() {
List<ManifestFile> manifests = apply(base, parentSnapshot);

OutputFile manifestList = manifestListPath();
EncryptionManager encryptionManager = ops.encryption();
EncryptedOutputFile encryptedManifestList = encryptionManager.encrypt(manifestList, true);

String encodedManifestListKeyMetadata = null;
if (encryptedManifestList.keyMetadata().encryptionKey() != null) {
encodedManifestListKeyMetadata =
Base64.getEncoder()
.encodeToString(
encryptedManifestList.keyMetadata()
.buffer()
.array());
}

try (ManifestListWriter writer =
ManifestLists.write(
ops.current().formatVersion(),
manifestList,
encryptedManifestList.encryptingOutputFile(),
snapshotId(),
parentSnapshotId,
sequenceNumber)) {
Expand Down Expand Up @@ -251,7 +263,8 @@ public Snapshot apply() {
operation(),
summary(base),
base.currentSchemaId(),
manifestList.location());
manifestList.location(),
manifestListKeyMetadata);
}

protected abstract Map<String, String> summary();
Expand Down
Loading