Skip to content
68 changes: 66 additions & 2 deletions api/src/main/java/org/apache/iceberg/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;

/**
Expand Down Expand Up @@ -68,24 +69,45 @@ public interface Snapshot extends Serializable {
* Return all {@link ManifestFile} instances for either data or delete manifests in this snapshot.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @param encryption a {@link EncryptionManager} instance used for decrypting manifest list files
* @return a list of ManifestFile
*/
default List<ManifestFile> allManifests(FileIO io, EncryptionManager encryption) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement allManifests with encryption");
}

/** Used in V1 and tests */
List<ManifestFile> allManifests(FileIO io);

/**
* Return a {@link ManifestFile} for each data manifest in this snapshot.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @param encryption a {@link EncryptionManager} instance used for decrypting manifest list files
* @return a list of ManifestFile
*/
default List<ManifestFile> dataManifests(FileIO io, EncryptionManager encryption) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement dataManifests with encryption");
}

/** Tests only */
List<ManifestFile> dataManifests(FileIO io);

/**
* Return a {@link ManifestFile} for each delete manifest in this snapshot.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @param encryption a {@link EncryptionManager} instance used for decrypting manifest list files
* @return a list of ManifestFile
*/
default List<ManifestFile> deleteManifests(FileIO io, EncryptionManager encryption) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement deleteManifests with encryption");
}

/** Tests only */
List<ManifestFile> deleteManifests(FileIO io);

/**
Expand All @@ -111,8 +133,15 @@ public interface Snapshot extends Serializable {
* columns will be null.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @param encryption a {@link EncryptionManager} instance used for decrypting manifest files
* @return all data files added to the table in this snapshot.
*/
default Iterable<DataFile> addedDataFiles(FileIO io, EncryptionManager encryption) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement addedDataFiles with encryption");
}

/** Tests and benchmarks */
Iterable<DataFile> addedDataFiles(FileIO io);

/**
Expand All @@ -123,8 +152,15 @@ public interface Snapshot extends Serializable {
* columns will be null.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @param encryption a {@link EncryptionManager} instance used for decrypting manifest files
* @return all data files removed from the table in this snapshot.
*/
default Iterable<DataFile> removedDataFiles(FileIO io, EncryptionManager encryption) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement removedDataFiles with encryption");
}

/** Tests only */
Iterable<DataFile> removedDataFiles(FileIO io);

/**
Expand All @@ -133,9 +169,18 @@ public interface Snapshot extends Serializable {
* <p>The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @param fileIO a {@link FileIO} instance used for reading files from storage
* @param encryptionManager a {@link EncryptionManager} instance used for decrypting manifest
* files
* @return all delete files added to the table in this snapshot
*/
default Iterable<DeleteFile> addedDeleteFiles(
FileIO fileIO, EncryptionManager encryptionManager) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement addedDeleteFiles with encryption");
}

/** Tests only */
default Iterable<DeleteFile> addedDeleteFiles(FileIO io) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement addedDeleteFiles");
Expand All @@ -147,9 +192,18 @@ default Iterable<DeleteFile> addedDeleteFiles(FileIO io) {
* <p>The files returned include the following columns: file_path, file_format, partition,
* record_count, and file_size_in_bytes. Other columns will be null.
*
* @param io a {@link FileIO} instance used for reading files from storage
* @param fileIO a {@link FileIO} instance used for reading files from storage
* @param encryptionManager a {@link EncryptionManager} instance used for decrypting manifest
* files
* @return all delete files removed from the table in this snapshot
*/
default Iterable<DeleteFile> removedDeleteFiles(
FileIO fileIO, EncryptionManager encryptionManager) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement removedDeleteFiles with encryption");
}

/** Tests only */
default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement removedDeleteFiles");
Expand All @@ -171,4 +225,14 @@ default Iterable<DeleteFile> removedDeleteFiles(FileIO io) {
default Integer schemaId() {
return null;
}

/**
* Key metadata for encrypted manifest lists.
*
* @return base64-encoded key metadata for the manifest list file encryption key
*/
default String manifestKeyMetadata() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement manifestKeyMetadata");
}
}
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext

@Override
protected CloseableIterable<ManifestFile> manifests() {
return reachableManifests(snapshot -> snapshot.dataManifests(table().io()));
return reachableManifests(
snapshot -> snapshot.dataManifests(table().io(), table().encryption()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext

@Override
protected CloseableIterable<ManifestFile> manifests() {
return reachableManifests(snapshot -> snapshot.deleteManifests(table().io()));
return reachableManifests(
snapshot -> snapshot.deleteManifests(table().io(), table().encryption()));
}
}
}
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/AllFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext

@Override
protected CloseableIterable<ManifestFile> manifests() {
return reachableManifests(snapshot -> snapshot.allManifests(table().io()));
return reachableManifests(
snapshot -> snapshot.allManifests(table().io(), table().encryption()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private List<ManifestFile> findMatchingDataManifests(Snapshot snapshot) {
}

private List<ManifestFile> findMatchingDeleteManifests(Snapshot snapshot) {
List<ManifestFile> deleteManifests = snapshot.deleteManifests(io());
List<ManifestFile> deleteManifests = snapshot.deleteManifests(io(), table().encryption());
scanMetrics().totalDeleteManifests().increment(deleteManifests.size());

List<ManifestFile> matchingDeleteManifests = filterManifests(deleteManifests);
Expand Down Expand Up @@ -293,7 +293,8 @@ private CompletableFuture<DeleteFileIndex> newDeletesFuture(
}

private DeleteFileIndex planDeletesLocally(List<ManifestFile> deleteManifests) {
DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(io(), deleteManifests);
DeleteFileIndex.Builder builder =
DeleteFileIndex.builderFor(io(), table().encryption(), deleteManifests);

if (shouldPlanWithExecutor() && deleteManifests.size() > 1) {
builder.planWith(planExecutor());
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
Expand Down Expand Up @@ -99,6 +100,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final Schema fileProjection;
private final Schema dataTableSchema;
private final FileIO io;
private final EncryptionManager encryption;
private final ManifestFile manifest;
private final Map<Integer, PartitionSpec> specsById;

Expand All @@ -112,6 +114,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {
super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
this.projection = projection;
this.io = table.io();
this.encryption = table.encryption();
this.manifest = manifest;
this.specsById = Maps.newHashMap(table.specs());
this.dataTableSchema = table.schema();
Expand Down Expand Up @@ -178,7 +181,9 @@ private StructProjection structProjection(Schema projectedSchema) {
*/
private CloseableIterable<? extends ManifestEntry<? extends ContentFile<?>>> entries(
Schema fileStructProjection) {
return ManifestFiles.open(manifest, io, specsById).project(fileStructProjection).entries();
return ManifestFiles.open(manifest, io, encryption, specsById)
.project(fileStructProjection)
.entries();
}

/**
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
Expand Down Expand Up @@ -135,6 +136,7 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
static class ManifestReadTask extends BaseFileScanTask implements DataTask {

private final FileIO io;
private final EncryptionManager encryption;
private final Map<Integer, PartitionSpec> specsById;
private final ManifestFile manifest;
private final Schema dataTableSchema;
Expand All @@ -149,6 +151,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask {
ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
this.io = table.io();
this.encryption = table.encryption();
this.specsById = Maps.newHashMap(table.specs());
this.manifest = manifest;
this.dataTableSchema = table.schema();
Expand All @@ -172,9 +175,10 @@ public CloseableIterable<StructLike> rows() {
private CloseableIterable<? extends ContentFile<?>> files(Schema fileProjection) {
switch (manifest.content()) {
case DATA:
return ManifestFiles.read(manifest, io, specsById).project(fileProjection);
return ManifestFiles.read(manifest, io, encryption, specsById).project(fileProjection);
case DELETES:
return ManifestFiles.readDeleteManifest(manifest, io, specsById).project(fileProjection);
return ManifestFiles.readDeleteManifest(manifest, io, encryption, specsById)
.project(fileProjection);
default:
throw new IllegalArgumentException(
"Unsupported manifest content type:" + manifest.content());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private CloseableIterable<FileScanTask> appendFilesFromSnapshots(List<Snapshot>
.toSet();

ManifestGroup manifestGroup =
new ManifestGroup(table().io(), manifests)
new ManifestGroup(table().io(), table().encryption(), manifests)
.caseSensitive(isCaseSensitive())
.select(scanColumns())
.filterData(filter())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(

Set<ManifestFile> newDataManifests =
FluentIterable.from(changelogSnapshots)
.transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
.transformAndConcat(
snapshot -> snapshot.dataManifests(table().io(), table().encryption()))
.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
.toSet();

ManifestGroup manifestGroup =
new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
new ManifestGroup(table().io(), table().encryption(), newDataManifests, ImmutableList.of())
.specsById(table().specs())
.caseSensitive(isCaseSensitive())
.select(scanColumns())
Expand Down Expand Up @@ -105,7 +106,7 @@ private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl

for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
if (!snapshot.operation().equals(DataOperations.REPLACE)) {
if (!snapshot.deleteManifests(table().io()).isEmpty()) {
if (!snapshot.deleteManifests(table().io(), table().encryption()).isEmpty()) {
throw new UnsupportedOperationException(
"Delete files are currently not supported in changelog scans");
}
Expand Down
20 changes: 14 additions & 6 deletions core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -154,21 +155,27 @@ public RewriteManifests addManifest(ManifestFile manifest) {

private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest.path());
OutputFile newFile = newManifestOutput();
InputFile toCopy =
ops.encryption()
.decrypt(
EncryptedFiles.encryptedInput(
ops.io().newInputFile(manifest.path()), manifest.keyMetadata()));
EncryptedOutputFile newFile = newEncryptedManifest();
return ManifestFiles.copyRewriteManifest(
current.formatVersion(),
manifest.partitionSpecId(),
toCopy,
specsById,
newFile,
newFile.encryptingOutputFile(),
newFile.keyMetadata().buffer(),
snapshotId(),
summaryBuilder);
}

@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> currentManifests = base.currentSnapshot().allManifests(ops.io());
List<ManifestFile> currentManifests =
base.currentSnapshot().allManifests(ops.io(), ops.encryption());
Set<ManifestFile> currentManifestSet = ImmutableSet.copyOf(currentManifests);

validateDeletedManifests(currentManifestSet);
Expand Down Expand Up @@ -246,7 +253,8 @@ private void performRewrite(List<ManifestFile> currentManifests) {
} else {
rewrittenManifests.add(manifest);
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, ops.io(), ops.current().specsById())
ManifestFiles.read(
manifest, ops.io(), ops.encryption(), ops.current().specsById())
.select(Collections.singletonList("*"))) {
reader
.liveEntries()
Expand Down
Loading