diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index c035259e0e2c..6e384f801a33 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.List; import java.util.Map; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; /** @@ -68,24 +69,45 @@ public interface Snapshot extends Serializable { * Return all {@link ManifestFile} instances for either data or delete manifests in this snapshot. * * @param io a {@link FileIO} instance used for reading files from storage + * @param encryption a {@link EncryptionManager} instance used for decrypting manifest list files * @return a list of ManifestFile */ + default List allManifests(FileIO io, EncryptionManager encryption) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement allManifests with encryption"); + } + + /** Used in V1 and tests */ List allManifests(FileIO io); /** * Return a {@link ManifestFile} for each data manifest in this snapshot. * * @param io a {@link FileIO} instance used for reading files from storage + * @param encryption a {@link EncryptionManager} instance used for decrypting manifest list files * @return a list of ManifestFile */ + default List dataManifests(FileIO io, EncryptionManager encryption) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement dataManifests with encryption"); + } + + /** Tests only */ List dataManifests(FileIO io); /** * Return a {@link ManifestFile} for each delete manifest in this snapshot. * * @param io a {@link FileIO} instance used for reading files from storage + * @param encryption a {@link EncryptionManager} instance used for decrypting manifest list files * @return a list of ManifestFile */ + default List deleteManifests(FileIO io, EncryptionManager encryption) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement deleteManifests with encryption"); + } + + /** Tests only */ List deleteManifests(FileIO io); /** @@ -111,8 +133,15 @@ public interface Snapshot extends Serializable { * columns will be null. * * @param io a {@link FileIO} instance used for reading files from storage + * @param encryption a {@link EncryptionManager} instance used for decrypting manifest files * @return all data files added to the table in this snapshot. */ + default Iterable addedDataFiles(FileIO io, EncryptionManager encryption) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement addedDataFiles with encryption"); + } + + /** Tests and benchmarks */ Iterable addedDataFiles(FileIO io); /** @@ -123,8 +152,15 @@ public interface Snapshot extends Serializable { * columns will be null. * * @param io a {@link FileIO} instance used for reading files from storage + * @param encryption a {@link EncryptionManager} instance used for decrypting manifest files * @return all data files removed from the table in this snapshot. */ + default Iterable removedDataFiles(FileIO io, EncryptionManager encryption) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement removedDataFiles with encryption"); + } + + /** Tests only */ Iterable removedDataFiles(FileIO io); /** @@ -133,9 +169,18 @@ public interface Snapshot extends Serializable { *

The files returned include the following columns: file_path, file_format, partition, * record_count, and file_size_in_bytes. Other columns will be null. * - * @param io a {@link FileIO} instance used for reading files from storage + * @param fileIO a {@link FileIO} instance used for reading files from storage + * @param encryptionManager a {@link EncryptionManager} instance used for decrypting manifest + * files * @return all delete files added to the table in this snapshot */ + default Iterable addedDeleteFiles( + FileIO fileIO, EncryptionManager encryptionManager) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement addedDeleteFiles with encryption"); + } + + /** Tests only */ default Iterable addedDeleteFiles(FileIO io) { throw new UnsupportedOperationException( this.getClass().getName() + " doesn't implement addedDeleteFiles"); @@ -147,9 +192,18 @@ default Iterable addedDeleteFiles(FileIO io) { *

The files returned include the following columns: file_path, file_format, partition, * record_count, and file_size_in_bytes. Other columns will be null. * - * @param io a {@link FileIO} instance used for reading files from storage + * @param fileIO a {@link FileIO} instance used for reading files from storage + * @param encryptionManager a {@link EncryptionManager} instance used for decrypting manifest + * files * @return all delete files removed from the table in this snapshot */ + default Iterable removedDeleteFiles( + FileIO fileIO, EncryptionManager encryptionManager) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement removedDeleteFiles with encryption"); + } + + /** Tests only */ default Iterable removedDeleteFiles(FileIO io) { throw new UnsupportedOperationException( this.getClass().getName() + " doesn't implement removedDeleteFiles"); @@ -171,4 +225,14 @@ default Iterable removedDeleteFiles(FileIO io) { default Integer schemaId() { return null; } + + /** + * Key metadata for encrypted manifest lists. + * + * @return base64-encoded key metadata for the manifest list file encryption key + */ + default String manifestKeyMetadata() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement manifestKeyMetadata"); + } } diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java index 48cc1c1d2a53..d26a39a4268d 100644 --- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java @@ -64,7 +64,8 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext @Override protected CloseableIterable manifests() { - return reachableManifests(snapshot -> snapshot.dataManifests(table().io())); + return reachableManifests( + snapshot -> snapshot.dataManifests(table().io(), table().encryption())); } } } diff --git a/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java index d1c967938d5c..cd260026c8b8 100644 --- a/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllDeleteFilesTable.java @@ -64,7 +64,8 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext @Override protected CloseableIterable manifests() { - return reachableManifests(snapshot -> snapshot.deleteManifests(table().io())); + return reachableManifests( + snapshot -> snapshot.deleteManifests(table().io(), table().encryption())); } } } diff --git a/core/src/main/java/org/apache/iceberg/AllFilesTable.java b/core/src/main/java/org/apache/iceberg/AllFilesTable.java index b296675c5c94..8686e1fef157 100644 --- a/core/src/main/java/org/apache/iceberg/AllFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllFilesTable.java @@ -64,7 +64,8 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext @Override protected CloseableIterable manifests() { - return reachableManifests(snapshot -> snapshot.allManifests(table().io())); + return reachableManifests( + snapshot -> snapshot.allManifests(table().io(), table().encryption())); } } } diff --git a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java index 263830a53b3c..426ab8bfce34 100644 --- a/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java @@ -205,7 +205,7 @@ private List findMatchingDataManifests(Snapshot snapshot) { } private List findMatchingDeleteManifests(Snapshot snapshot) { - List deleteManifests = snapshot.deleteManifests(io()); + List deleteManifests = snapshot.deleteManifests(io(), table().encryption()); scanMetrics().totalDeleteManifests().increment(deleteManifests.size()); List matchingDeleteManifests = filterManifests(deleteManifests); @@ -293,7 +293,8 @@ private CompletableFuture newDeletesFuture( } private DeleteFileIndex planDeletesLocally(List deleteManifests) { - DeleteFileIndex.Builder builder = DeleteFileIndex.builderFor(io(), deleteManifests); + DeleteFileIndex.Builder builder = + DeleteFileIndex.builderFor(io(), table().encryption(), deleteManifests); if (shouldPlanWithExecutor() && deleteManifests.size() > 1) { builder.planWith(planExecutor()); diff --git a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java index 43d8a71f8706..da4fa8f09cfb 100644 --- a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ManifestEvaluator; @@ -99,6 +100,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { private final Schema fileProjection; private final Schema dataTableSchema; private final FileIO io; + private final EncryptionManager encryption; private final ManifestFile manifest; private final Map specsById; @@ -112,6 +114,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals); this.projection = projection; this.io = table.io(); + this.encryption = table.encryption(); this.manifest = manifest; this.specsById = Maps.newHashMap(table.specs()); this.dataTableSchema = table.schema(); @@ -178,7 +181,9 @@ private StructProjection structProjection(Schema projectedSchema) { */ private CloseableIterable>> entries( Schema fileStructProjection) { - return ManifestFiles.open(manifest, io, specsById).project(fileStructProjection).entries(); + return ManifestFiles.open(manifest, io, encryption, specsById) + .project(fileStructProjection) + .entries(); } /** diff --git a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java index 62e6f8acf7a5..9f633b541281 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseFilesTable.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ManifestEvaluator; @@ -135,6 +136,7 @@ protected CloseableIterable doPlanFiles() { static class ManifestReadTask extends BaseFileScanTask implements DataTask { private final FileIO io; + private final EncryptionManager encryption; private final Map specsById; private final ManifestFile manifest; private final Schema dataTableSchema; @@ -149,6 +151,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { ResidualEvaluator residuals) { super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals); this.io = table.io(); + this.encryption = table.encryption(); this.specsById = Maps.newHashMap(table.specs()); this.manifest = manifest; this.dataTableSchema = table.schema(); @@ -172,9 +175,10 @@ public CloseableIterable rows() { private CloseableIterable> files(Schema fileProjection) { switch (manifest.content()) { case DATA: - return ManifestFiles.read(manifest, io, specsById).project(fileProjection); + return ManifestFiles.read(manifest, io, encryption, specsById).project(fileProjection); case DELETES: - return ManifestFiles.readDeleteManifest(manifest, io, specsById).project(fileProjection); + return ManifestFiles.readDeleteManifest(manifest, io, encryption, specsById) + .project(fileProjection); default: throw new IllegalArgumentException( "Unsupported manifest content type:" + manifest.content()); diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java index 24b9ae1acead..939cb5a24b2c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java @@ -74,7 +74,7 @@ private CloseableIterable appendFilesFromSnapshots(List .toSet(); ManifestGroup manifestGroup = - new ManifestGroup(table().io(), manifests) + new ManifestGroup(table().io(), table().encryption(), manifests) .caseSensitive(isCaseSensitive()) .select(scanColumns()) .filterData(filter()) diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java index 2d54a94e8d73..f5b5431107c9 100644 --- a/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java @@ -67,12 +67,13 @@ protected CloseableIterable doPlanFiles( Set newDataManifests = FluentIterable.from(changelogSnapshots) - .transformAndConcat(snapshot -> snapshot.dataManifests(table().io())) + .transformAndConcat( + snapshot -> snapshot.dataManifests(table().io(), table().encryption())) .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId())) .toSet(); ManifestGroup manifestGroup = - new ManifestGroup(table().io(), newDataManifests, ImmutableList.of()) + new ManifestGroup(table().io(), table().encryption(), newDataManifests, ImmutableList.of()) .specsById(table().specs()) .caseSensitive(isCaseSensitive()) .select(scanColumns()) @@ -105,7 +106,7 @@ private Deque orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) { if (!snapshot.operation().equals(DataOperations.REPLACE)) { - if (!snapshot.deleteManifests(table().io()).isEmpty()) { + if (!snapshot.deleteManifests(table().io(), table().encryption()).isEmpty()) { throw new UnsupportedOperationException( "Delete files are currently not supported in changelog scans"); } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index c70dda2bd6d0..2f08b4bda918 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -32,10 +32,11 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -154,21 +155,27 @@ public RewriteManifests addManifest(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { TableMetadata current = ops.current(); - InputFile toCopy = ops.io().newInputFile(manifest.path()); - OutputFile newFile = newManifestOutput(); + InputFile toCopy = + ops.encryption() + .decrypt( + EncryptedFiles.encryptedInput( + ops.io().newInputFile(manifest.path()), manifest.keyMetadata())); + EncryptedOutputFile newFile = newEncryptedManifest(); return ManifestFiles.copyRewriteManifest( current.formatVersion(), manifest.partitionSpecId(), toCopy, specsById, - newFile, + newFile.encryptingOutputFile(), + newFile.keyMetadata().buffer(), snapshotId(), summaryBuilder); } @Override public List apply(TableMetadata base, Snapshot snapshot) { - List currentManifests = base.currentSnapshot().allManifests(ops.io()); + List currentManifests = + base.currentSnapshot().allManifests(ops.io(), ops.encryption()); Set currentManifestSet = ImmutableSet.copyOf(currentManifests); validateDeletedManifests(currentManifestSet); @@ -246,7 +253,8 @@ private void performRewrite(List currentManifests) { } else { rewrittenManifests.add(manifest); try (ManifestReader reader = - ManifestFiles.read(manifest, ops.io(), ops.current().specsById()) + ManifestFiles.read( + manifest, ops.io(), ops.encryption(), ops.current().specsById()) .select(Collections.singletonList("*"))) { reader .liveEntries() diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index a3c4fc8738cd..d7760d13c1fd 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -20,12 +20,22 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Base64; import java.util.List; import java.util.Map; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -43,6 +53,7 @@ class BaseSnapshot implements Snapshot { private final Map summary; private final Integer schemaId; private final String[] v1ManifestLocations; + private final String manifestListKeyMetadata; // lazily initialized private transient List allManifests = null; @@ -53,6 +64,7 @@ class BaseSnapshot implements Snapshot { private transient List addedDeleteFiles = null; private transient List removedDeleteFiles = null; + /** Tests only */ BaseSnapshot( long sequenceNumber, long snapshotId, @@ -62,6 +74,28 @@ class BaseSnapshot implements Snapshot { Map summary, Integer schemaId, String manifestList) { + this( + sequenceNumber, + snapshotId, + parentId, + timestampMillis, + operation, + summary, + schemaId, + manifestList, + null); + } + + BaseSnapshot( + long sequenceNumber, + long snapshotId, + Long parentId, + long timestampMillis, + String operation, + Map summary, + Integer schemaId, + String manifestList, + String manifestListKeyMetadata) { this.sequenceNumber = sequenceNumber; this.snapshotId = snapshotId; this.parentId = parentId; @@ -71,6 +105,7 @@ class BaseSnapshot implements Snapshot { this.schemaId = schemaId; this.manifestListLocation = manifestList; this.v1ManifestLocations = null; + this.manifestListKeyMetadata = manifestListKeyMetadata; } BaseSnapshot( @@ -91,6 +126,7 @@ class BaseSnapshot implements Snapshot { this.schemaId = schemaId; this.manifestListLocation = null; this.v1ManifestLocations = v1ManifestLocations; + this.manifestListKeyMetadata = null; } @Override @@ -128,7 +164,12 @@ public Integer schemaId() { return schemaId; } - private void cacheManifests(FileIO fileIO) { + @Override + public String manifestKeyMetadata() { + return manifestListKeyMetadata; + } + + private void cacheManifests(FileIO fileIO, EncryptionManager encryption) { if (fileIO == null) { throw new IllegalArgumentException("Cannot cache changes: FileIO is null"); } @@ -143,7 +184,34 @@ private void cacheManifests(FileIO fileIO) { if (allManifests == null) { // if manifests isn't set, then the snapshotFile is set and should be read to get the list - this.allManifests = ManifestLists.read(fileIO.newInputFile(manifestListLocation)); + + InputFile manifestListFile = fileIO.newInputFile(manifestListLocation); + + if (manifestListKeyMetadata != null) { // encrypted manifest list file + Preconditions.checkArgument( + encryption != null, "Encryption manager not provided for encrypted manifest list file"); + Preconditions.checkArgument( + encryption instanceof StandardEncryptionManager, + "Encryption manager for encrypted manifest list files can currently only be an instance of " + + StandardEncryptionManager.class); + + ByteBuffer keyMetadataBytes = + ByteBuffer.wrap(Base64.getDecoder().decode(manifestListKeyMetadata)); + ByteBuffer unwrappedManfestListKey = null; + // Unwrap manifest list key + EncryptionKeyMetadata keyMetadata = EncryptionUtil.parseKeyMetadata(keyMetadataBytes); + unwrappedManfestListKey = + ((StandardEncryptionManager) encryption).unwrapKey(keyMetadata.encryptionKey()); + + EncryptionKeyMetadata unwrappedKeyMetadata = + EncryptionUtil.createKeyMetadata(unwrappedManfestListKey, keyMetadata.aadPrefix()); + + EncryptedInputFile encryptedInputFile = + EncryptedFiles.encryptedInput(manifestListFile, unwrappedKeyMetadata); + manifestListFile = encryption.decrypt(encryptedInputFile); + } + + this.allManifests = ManifestLists.read(manifestListFile); } if (dataManifests == null || deleteManifests == null) { @@ -160,56 +228,92 @@ private void cacheManifests(FileIO fileIO) { @Override public List allManifests(FileIO fileIO) { + return allManifests(fileIO, PlaintextEncryptionManager.instance()); + } + + @Override + public List allManifests(FileIO fileIO, EncryptionManager encryption) { if (allManifests == null) { - cacheManifests(fileIO); + cacheManifests(fileIO, encryption); } return allManifests; } @Override public List dataManifests(FileIO fileIO) { + return dataManifests(fileIO, PlaintextEncryptionManager.instance()); + } + + @Override + public List dataManifests(FileIO fileIO, EncryptionManager encryption) { if (dataManifests == null) { - cacheManifests(fileIO); + cacheManifests(fileIO, encryption); } return dataManifests; } @Override public List deleteManifests(FileIO fileIO) { + return deleteManifests(fileIO, PlaintextEncryptionManager.instance()); + } + + @Override + public List deleteManifests(FileIO fileIO, EncryptionManager encryptionManager) { if (deleteManifests == null) { - cacheManifests(fileIO); + cacheManifests(fileIO, encryptionManager); } return deleteManifests; } @Override public List addedDataFiles(FileIO fileIO) { + return addedDataFiles(fileIO, PlaintextEncryptionManager.instance()); + } + + @Override + public List addedDataFiles(FileIO fileIO, EncryptionManager encryptionManager) { if (addedDataFiles == null) { - cacheDataFileChanges(fileIO); + cacheDataFileChanges(fileIO, encryptionManager); } return addedDataFiles; } @Override public List removedDataFiles(FileIO fileIO) { + return removedDataFiles(fileIO, PlaintextEncryptionManager.instance()); + } + + @Override + public List removedDataFiles(FileIO fileIO, EncryptionManager encryptionManager) { if (removedDataFiles == null) { - cacheDataFileChanges(fileIO); + cacheDataFileChanges(fileIO, encryptionManager); } return removedDataFiles; } @Override public Iterable addedDeleteFiles(FileIO fileIO) { + return addedDeleteFiles(fileIO, PlaintextEncryptionManager.instance()); + } + + @Override + public Iterable addedDeleteFiles(FileIO fileIO, EncryptionManager encryptionManager) { if (addedDeleteFiles == null) { - cacheDeleteFileChanges(fileIO); + cacheDeleteFileChanges(fileIO, encryptionManager); } return addedDeleteFiles; } @Override public Iterable removedDeleteFiles(FileIO fileIO) { + return removedDeleteFiles(fileIO, PlaintextEncryptionManager.instance()); + } + + @Override + public Iterable removedDeleteFiles( + FileIO fileIO, EncryptionManager encryptionManager) { if (removedDeleteFiles == null) { - cacheDeleteFileChanges(fileIO); + cacheDeleteFileChanges(fileIO, encryptionManager); } return removedDeleteFiles; } @@ -219,7 +323,7 @@ public String manifestListLocation() { return manifestListLocation; } - private void cacheDeleteFileChanges(FileIO fileIO) { + private void cacheDeleteFileChanges(FileIO fileIO, EncryptionManager encryptionManager) { Preconditions.checkArgument(fileIO != null, "Cannot cache delete file changes: FileIO is null"); ImmutableList.Builder adds = ImmutableList.builder(); @@ -227,11 +331,12 @@ private void cacheDeleteFileChanges(FileIO fileIO) { Iterable changedManifests = Iterables.filter( - deleteManifests(fileIO), manifest -> Objects.equal(manifest.snapshotId(), snapshotId)); + deleteManifests(fileIO, encryptionManager), + manifest -> Objects.equal(manifest.snapshotId(), snapshotId)); for (ManifestFile manifest : changedManifests) { try (ManifestReader reader = - ManifestFiles.readDeleteManifest(manifest, fileIO, null)) { + ManifestFiles.readDeleteManifest(manifest, fileIO, encryptionManager, null)) { for (ManifestEntry entry : reader.entries()) { switch (entry.status()) { case ADDED: @@ -253,7 +358,7 @@ private void cacheDeleteFileChanges(FileIO fileIO) { this.removedDeleteFiles = deletes.build(); } - private void cacheDataFileChanges(FileIO fileIO) { + private void cacheDataFileChanges(FileIO fileIO, EncryptionManager encryptionManager) { Preconditions.checkArgument(fileIO != null, "Cannot cache data file changes: FileIO is null"); ImmutableList.Builder adds = ImmutableList.builder(); @@ -262,9 +367,10 @@ private void cacheDataFileChanges(FileIO fileIO) { // read only manifests that were created by this snapshot Iterable changedManifests = Iterables.filter( - dataManifests(fileIO), manifest -> Objects.equal(manifest.snapshotId(), snapshotId)); + dataManifests(fileIO, encryptionManager), + manifest -> Objects.equal(manifest.snapshotId(), snapshotId)); try (CloseableIterable> entries = - new ManifestGroup(fileIO, changedManifests).ignoreExisting().entries()) { + new ManifestGroup(fileIO, encryptionManager, changedManifests).ignoreExisting().entries()) { for (ManifestEntry entry : entries) { switch (entry.status()) { case ADDED: diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 2f051466bb8b..fe0ee19da7c2 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -529,7 +529,8 @@ private static Set committedFiles(TableOperations ops, Set snapsho Snapshot snap = ops.current().snapshot(snapshotId); if (snap != null) { committedFiles.add(snap.manifestListLocation()); - snap.allManifests(ops.io()).forEach(manifest -> committedFiles.add(manifest.path())); + snap.allManifests(ops.io(), ops.encryption()) + .forEach(manifest -> committedFiles.add(manifest.path())); } else { return null; } diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index f9af07c1a443..eb85d8dc113f 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -30,6 +30,8 @@ import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.common.DynConstructors; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.Configurable; @@ -81,9 +83,11 @@ private CatalogUtil() {} * has been dropped in the metastore. * * @param io a FileIO to use for deletes + * @param encryption an EncryptionManager to use for deletes * @param metadata the last valid TableMetadata instance for a dropped table. */ - public static void dropTableData(FileIO io, TableMetadata metadata) { + public static void dropTableData( + FileIO io, EncryptionManager encryption, TableMetadata metadata) { // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete // as much of the delete work as possible and avoid orphaned data or manifest files. @@ -91,7 +95,7 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { Set manifestsToDelete = Sets.newHashSet(); for (Snapshot snapshot : metadata.snapshots()) { // add all manifests to the delete set because both data and delete files should be removed - Iterables.addAll(manifestsToDelete, snapshot.allManifests(io)); + Iterables.addAll(manifestsToDelete, snapshot.allManifests(io, encryption)); // add the manifest list to the delete set, if present if (snapshot.manifestListLocation() != null) { manifestListsToDelete.add(snapshot.manifestListLocation()); @@ -107,9 +111,10 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { if (gcEnabled) { // delete data files only if we are sure this won't corrupt other tables - deleteFiles(io, manifestsToDelete); + deleteFiles(io, encryption, manifestsToDelete); } + // TODO GG encryption? deleteFiles(io, Iterables.transform(manifestsToDelete, ManifestFile::path), "manifest", true); deleteFiles(io, manifestListsToDelete, "manifest list", true); deleteFiles( @@ -130,8 +135,14 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { deleteFile(io, metadata.metadataFileLocation(), "metadata"); } + /** TODO GG Temp - for catalogs other than Hive and Hadoop; and for tests */ + public static void dropTableData(FileIO io, TableMetadata metadata) { + dropTableData(io, PlaintextEncryptionManager.instance(), metadata); + } + @SuppressWarnings("DangerousStringInternUsage") - private static void deleteFiles(FileIO io, Set allManifests) { + private static void deleteFiles( + FileIO io, EncryptionManager encryption, Set allManifests) { // keep track of deleted files in a map that can be cleaned up when memory runs low Map deletedFiles = new MapMaker().concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE).weakKeys().makeMap(); @@ -145,7 +156,7 @@ private static void deleteFiles(FileIO io, Set allManifests) { LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) .run( manifest -> { - try (ManifestReader reader = ManifestFiles.open(manifest, io)) { + try (ManifestReader reader = ManifestFiles.open(manifest, io, encryption, null)) { List pathsToDelete = Lists.newArrayList(); for (ManifestEntry entry : reader.entries()) { // intern the file path because the weak key map uses identity (==) instead of diff --git a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java index 3786b1185be6..5f21ff13821d 100644 --- a/core/src/main/java/org/apache/iceberg/CherryPickOperation.java +++ b/core/src/main/java/org/apache/iceberg/CherryPickOperation.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.CherrypickAncestorCommitException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; @@ -38,6 +39,7 @@ class CherryPickOperation extends MergingSnapshotProducer { private final FileIO io; + private final EncryptionManager encryption; private final Map specsById; private Snapshot cherrypickSnapshot = null; private boolean requireFastForward = false; @@ -46,6 +48,7 @@ class CherryPickOperation extends MergingSnapshotProducer { CherryPickOperation(String tableName, TableOperations ops) { super(tableName, ops); this.io = ops.io(); + this.encryption = ops.encryption(); this.specsById = ops.current().specsById(); } @@ -79,7 +82,7 @@ public CherryPickOperation cherrypick(long snapshotId) { set(SnapshotSummary.SOURCE_SNAPSHOT_ID_PROP, String.valueOf(snapshotId)); // Pick modifications from the snapshot - for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) { + for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io, encryption)) { add(addedFile); } @@ -111,13 +114,13 @@ public CherryPickOperation cherrypick(long snapshotId) { // copy adds from the picked snapshot this.replacedPartitions = PartitionSet.create(specsById); - for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io)) { + for (DataFile addedFile : cherrypickSnapshot.addedDataFiles(io, encryption)) { add(addedFile); replacedPartitions.add(addedFile.specId(), addedFile.partition()); } // copy deletes from the picked snapshot - for (DataFile deletedFile : cherrypickSnapshot.removedDataFiles(io)) { + for (DataFile deletedFile : cherrypickSnapshot.removedDataFiles(io, encryption)) { delete(deletedFile); } @@ -157,7 +160,8 @@ protected void validate(TableMetadata base, Snapshot snapshot) { // case if (!isFastForward(base)) { validateNonAncestor(base, cherrypickSnapshot.snapshotId()); - validateReplacedPartitions(base, cherrypickSnapshot.parentId(), replacedPartitions, io); + validateReplacedPartitions( + base, cherrypickSnapshot.parentId(), replacedPartitions, io, encryption); WapUtil.validateWapPublish(base, cherrypickSnapshot.snapshotId()); } } @@ -208,14 +212,19 @@ private static void validateNonAncestor(TableMetadata meta, long snapshotId) { } private static void validateReplacedPartitions( - TableMetadata meta, Long parentId, PartitionSet replacedPartitions, FileIO io) { + TableMetadata meta, + Long parentId, + PartitionSet replacedPartitions, + FileIO io, + EncryptionManager encryption) { if (replacedPartitions != null && meta.currentSnapshot() != null) { ValidationException.check( parentId == null || isCurrentAncestor(meta, parentId), "Cannot cherry-pick overwrite, based on non-ancestor of the current state: %s", parentId); List newFiles = - SnapshotUtil.newFiles(parentId, meta.currentSnapshot().snapshotId(), meta::snapshot, io); + SnapshotUtil.newFiles( + parentId, meta.currentSnapshot().snapshotId(), meta::snapshot, io, encryption); for (DataFile newFile : newFiles) { ValidationException.check( !replacedPartitions.contains(newFile.specId(), newFile.partition()), diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index c099934dcc77..355e99e8814c 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -58,7 +58,8 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext @Override protected CloseableIterable manifests() { - return CloseableIterable.withNoopClose(snapshot().dataManifests(table().io())); + return CloseableIterable.withNoopClose( + snapshot().dataManifests(table().io(), table().encryption())); } } } diff --git a/core/src/main/java/org/apache/iceberg/DataScan.java b/core/src/main/java/org/apache/iceberg/DataScan.java index 1c48042f52f0..8b52a8ca7e63 100644 --- a/core/src/main/java/org/apache/iceberg/DataScan.java +++ b/core/src/main/java/org/apache/iceberg/DataScan.java @@ -49,7 +49,7 @@ protected ManifestGroup newManifestGroup( boolean withColumnStats) { ManifestGroup manifestGroup = - new ManifestGroup(io(), dataManifests, deleteManifests) + new ManifestGroup(io(), table().encryption(), dataManifests, deleteManifests) .caseSensitive(isCaseSensitive()) .select(withColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS) .filterData(filter()) diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 8463112b7a51..65e924ead543 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.util.List; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -65,12 +66,13 @@ public CloseableIterable doPlanFiles() { Snapshot snapshot = snapshot(); FileIO io = table().io(); - List dataManifests = snapshot.dataManifests(io); - List deleteManifests = snapshot.deleteManifests(io); + EncryptionManager encryption = table().encryption(); + List dataManifests = snapshot.dataManifests(io, encryption); + List deleteManifests = snapshot.deleteManifests(io, encryption); scanMetrics().totalDataManifests().increment((long) dataManifests.size()); scanMetrics().totalDeleteManifests().increment((long) deleteManifests.size()); ManifestGroup manifestGroup = - new ManifestGroup(io, dataManifests, deleteManifests) + new ManifestGroup(io, table().encryption(), dataManifests, deleteManifests) .caseSensitive(isCaseSensitive()) .select(scanColumns()) .filterData(filter()) diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index 6c69a6e01370..747fadea814b 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -32,6 +32,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -60,9 +62,9 @@ /** * An index of {@link DeleteFile delete files} by sequence number. * - *

Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(long, - * DataFile)} or {@link #forEntry(ManifestEntry)} to get the delete files to apply to a given data - * file. + *

Use {@link #builderFor(FileIO, EncryptionManager, Iterable)} to construct an index, and {@link + * #forDataFile(long, DataFile)} or {@link #forEntry(ManifestEntry)} to get the delete files to + * apply to a given data file. */ class DeleteFileIndex { private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0]; @@ -311,7 +313,12 @@ private static boolean containsNull(Map nullValueCounts, Types.Ne } static Builder builderFor(FileIO io, Iterable deleteManifests) { - return new Builder(io, Sets.newHashSet(deleteManifests)); + return new Builder(io, PlaintextEncryptionManager.instance(), Sets.newHashSet(deleteManifests)); + } + + static Builder builderFor( + FileIO io, EncryptionManager encryption, Iterable deleteManifests) { + return new Builder(io, encryption, Sets.newHashSet(deleteManifests)); } static Builder builderFor(Iterable deleteFiles) { @@ -320,6 +327,7 @@ static Builder builderFor(Iterable deleteFiles) { static class Builder { private final FileIO io; + private final EncryptionManager encryption; private final Set deleteManifests; private final Iterable deleteFiles; private long minSequenceNumber = 0L; @@ -331,14 +339,16 @@ static class Builder { private ExecutorService executorService = null; private ScanMetrics scanMetrics = ScanMetrics.noop(); - Builder(FileIO io, Set deleteManifests) { + Builder(FileIO io, EncryptionManager encryption, Set deleteManifests) { this.io = io; + this.encryption = encryption; this.deleteManifests = Sets.newHashSet(deleteManifests); this.deleteFiles = null; } Builder(Iterable deleteFiles) { this.io = null; + this.encryption = null; this.deleteManifests = null; this.deleteFiles = deleteFiles; } @@ -516,7 +526,7 @@ private Iterable>> deleteManifestRea return Iterables.transform( matchingManifests, manifest -> - ManifestFiles.readDeleteManifest(manifest, io, specsById) + ManifestFiles.readDeleteManifest(manifest, io, encryption, specsById) .filterRows(dataFilter) .filterPartitions(partitionFilter) .filterPartitions(partitionSet) diff --git a/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java b/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java index 1ec2ade6c607..e77555a11ae4 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFilesTable.java @@ -58,7 +58,8 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext @Override protected CloseableIterable manifests() { - return CloseableIterable.withNoopClose(snapshot().deleteManifests(table().io())); + return CloseableIterable.withNoopClose( + snapshot().deleteManifests(table().io(), table().encryption())); } } } diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index e0919d9c7b2f..dcc6172ff466 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -22,11 +22,16 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -121,14 +126,18 @@ public FastAppend appendManifest(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { TableMetadata current = ops.current(); - InputFile toCopy = ops.io().newInputFile(manifest.path()); - OutputFile newManifestPath = newManifestOutput(); + EncryptedInputFile encryptedFile = + EncryptedFiles.encryptedInput( + ops.io().newInputFile(manifest.path()), manifest.keyMetadata()); + InputFile toCopy = ops.encryption().decrypt(encryptedFile); + EncryptedOutputFile newManifestPath = newEncryptedManifest(); return ManifestFiles.copyAppendManifest( current.formatVersion(), manifest.partitionSpecId(), toCopy, current.specsById(), - newManifestPath, + newManifestPath.encryptingOutputFile(), + newManifestPath.keyMetadata().buffer(), snapshotId(), summaryBuilder); } @@ -153,7 +162,7 @@ public List apply(TableMetadata base, Snapshot snapshot) { Iterables.addAll(manifests, appendManifestsWithMetadata); if (snapshot != null) { - manifests.addAll(snapshot.allManifests(ops.io())); + manifests.addAll(snapshot.allManifests(ops.io(), ops.encryption)); } return manifests; diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java index dae99c572c78..c28a8d617d69 100644 --- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java +++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; @@ -35,16 +36,19 @@ abstract class FileCleanupStrategy { private static final Logger LOG = LoggerFactory.getLogger(FileCleanupStrategy.class); protected final FileIO fileIO; + protected final EncryptionManager encryptionManager; protected final ExecutorService planExecutorService; private final Consumer deleteFunc; private final ExecutorService deleteExecutorService; protected FileCleanupStrategy( FileIO fileIO, + EncryptionManager encryption, ExecutorService deleteExecutorService, ExecutorService planExecutorService, Consumer deleteFunc) { this.fileIO = fileIO; + this.encryptionManager = encryption; this.deleteExecutorService = deleteExecutorService; this.planExecutorService = planExecutorService; this.deleteFunc = deleteFunc; @@ -62,7 +66,7 @@ protected FileCleanupStrategy( "deleted_data_files_count"); protected CloseableIterable readManifests(Snapshot snapshot) { - if (snapshot.manifestListLocation() != null) { + if (snapshot.manifestListLocation() != null) { // TODO GG encryption return Avro.read(fileIO.newInputFile(snapshot.manifestListLocation())) .rename("manifest_file", GenericManifestFile.class.getName()) .classLoader(GenericManifestFile.class.getClassLoader()) @@ -70,7 +74,7 @@ protected CloseableIterable readManifests(Snapshot snapshot) { .reuseContainers(true) .build(); } else { - return CloseableIterable.withNoopClose(snapshot.allManifests(fileIO)); + return CloseableIterable.withNoopClose(snapshot.allManifests(fileIO, encryptionManager)); } } diff --git a/core/src/main/java/org/apache/iceberg/FilesTable.java b/core/src/main/java/org/apache/iceberg/FilesTable.java index 4fa5f417479f..7fc818f0759d 100644 --- a/core/src/main/java/org/apache/iceberg/FilesTable.java +++ b/core/src/main/java/org/apache/iceberg/FilesTable.java @@ -58,7 +58,8 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext @Override protected CloseableIterable manifests() { - return CloseableIterable.withNoopClose(snapshot().allManifests(table().io())); + return CloseableIterable.withNoopClose( + snapshot().allManifests(table().io(), table().encryption())); } } } diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java b/core/src/main/java/org/apache/iceberg/FindFiles.java index 2cd944d922c7..5768c57df128 100644 --- a/core/src/main/java/org/apache/iceberg/FindFiles.java +++ b/core/src/main/java/org/apache/iceberg/FindFiles.java @@ -202,7 +202,8 @@ public CloseableIterable collect() { // when snapshot is not null CloseableIterable> entries = - new ManifestGroup(ops.io(), snapshot.dataManifests(ops.io())) + new ManifestGroup( + ops.io(), ops.encryption(), snapshot.dataManifests(ops.io(), ops.encryption())) .specsById(ops.current().specsById()) .filterData(rowFilter) .filterFiles(fileFilter) diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java index 408e049ea65c..1a8924bddee8 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java @@ -93,7 +93,7 @@ public CloseableIterable planFiles() { .toSet(); ManifestGroup manifestGroup = - new ManifestGroup(table().io(), manifests) + new ManifestGroup(table().io(), table().encryption(), manifests) .caseSensitive(isCaseSensitive()) .select(scanColumns()) .filterData(filter()) diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java index 60ad46e8e864..cf6884b27ce1 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; @@ -40,10 +41,11 @@ class IncrementalFileCleanup extends FileCleanupStrategy { IncrementalFileCleanup( FileIO fileIO, + EncryptionManager encryption, ExecutorService deleteExecutorService, ExecutorService planExecutorService, Consumer deleteFunc) { - super(fileIO, deleteExecutorService, planExecutorService, deleteFunc); + super(fileIO, encryption, deleteExecutorService, planExecutorService, deleteFunc); } @Override @@ -286,7 +288,7 @@ private Set findFilesToDelete( manifest -> { // the manifest has deletes, scan it to find files to delete try (ManifestReader reader = - ManifestFiles.open(manifest, fileIO, current.specsById())) { + ManifestFiles.open(manifest, fileIO, encryptionManager, current.specsById())) { for (ManifestEntry entry : reader.entries()) { // if the snapshot ID of the DELETE entry is no longer valid, the data can be // deleted @@ -312,7 +314,7 @@ private Set findFilesToDelete( manifest -> { // the manifest has deletes, scan it to find files to delete try (ManifestReader reader = - ManifestFiles.open(manifest, fileIO, current.specsById())) { + ManifestFiles.open(manifest, fileIO, encryptionManager, current.specsById())) { for (ManifestEntry entry : reader.entries()) { // delete any ADDED file from manifests that were reverted if (entry.status() == ManifestEntry.Status.ADDED) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index 6a9ca890a4c1..d3038b2bf8c2 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -65,7 +65,8 @@ protected TableScan newRefinedScan(Table table, Schema schema, TableScanContext @Override protected CloseableIterable doPlanFiles() { CloseableIterable manifests = - CloseableIterable.withNoopClose(snapshot().allManifests(table().io())); + CloseableIterable.withNoopClose( + snapshot().allManifests(table().io(), table().encryption())); return BaseEntriesTable.planFiles(table(), manifests, tableSchema(), schema(), context()); } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index c23ab667a41b..213fb2123570 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -21,10 +21,17 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.iceberg.ManifestReader.FileType; import org.apache.iceberg.avro.AvroEncoderUtil; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.ContentCache; @@ -89,27 +96,19 @@ public static synchronized void dropCache(FileIO fileIO) { * * @param manifest a ManifestFile * @param io a FileIO + * @param encryption an EncryptionManager * @return a manifest reader */ - public static CloseableIterable readPaths(ManifestFile manifest, FileIO io) { + public static CloseableIterable readPaths( + ManifestFile manifest, FileIO io, EncryptionManager encryption) { return CloseableIterable.transform( - read(manifest, io, null).select(ImmutableList.of("file_path")).liveEntries(), + read(manifest, io, encryption, null).select(ImmutableList.of("file_path")).liveEntries(), entry -> entry.file().path().toString()); } - /** - * Returns a new {@link ManifestReader} for a {@link ManifestFile}. - * - *

Note: Callers should use {@link ManifestFiles#read(ManifestFile, FileIO, Map)} to - * ensure the schema used by filters is the latest table schema. This should be used only when - * reading a manifest without filters. - * - * @param manifest a ManifestFile - * @param io a FileIO - * @return a manifest reader - */ - public static ManifestReader read(ManifestFile manifest, FileIO io) { - return read(manifest, io, null); + // No use TODO deprecate / handle in revapi + public static CloseableIterable readPaths(ManifestFile manifest, FileIO io) { + return readPaths(manifest, io, PlaintextEncryptionManager.instance()); } /** @@ -117,33 +116,46 @@ public static ManifestReader read(ManifestFile manifest, FileIO io) { * * @param manifest a {@link ManifestFile} * @param io a {@link FileIO} + * @param encryption a {@link EncryptionManager} * @param specsById a Map from spec ID to partition spec * @return a {@link ManifestReader} */ public static ManifestReader read( - ManifestFile manifest, FileIO io, Map specsById) { + ManifestFile manifest, + FileIO io, + EncryptionManager encryption, + Map specsById) { Preconditions.checkArgument( manifest.content() == ManifestContent.DATA, "Cannot read a delete manifest with a ManifestReader: %s", manifest); - InputFile file = newInputFile(io, manifest.path(), manifest.length()); + + InputFile inputFile; + if (manifest.keyMetadata() == null) { // unencrypted manifest + inputFile = newInputFile(io, manifest.path(), manifest.length()); + } else { + EncryptedInputFile encryptedFile = + EncryptedFiles.encryptedInput( + newInputFile( + io, manifest.path(), EncryptionUtil.gcmEncryptionLength(manifest.length())), + manifest.keyMetadata()); + inputFile = encryption.decrypt(encryptedFile); + } + InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>( - file, manifest.partitionSpecId(), specsById, inheritableMetadata, FileType.DATA_FILES); + inputFile, manifest.partitionSpecId(), specsById, inheritableMetadata, FileType.DATA_FILES); } - /** - * Create a new {@link ManifestWriter}. - * - *

Manifests created by this writer have all entry snapshot IDs set to null. All entries will - * inherit the snapshot ID that will be assigned to the manifest on commit. - * - * @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples - * @param outputFile the destination file location - * @return a manifest writer - */ - public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { - return write(1, spec, outputFile, null); + /** Tests only. Used only when reading a manifest without filters. */ + public static ManifestReader read(ManifestFile manifest, FileIO io) { + return read(manifest, io, null); + } + + /** TODO Flink, Spark actions and benchmarks */ + public static ManifestReader read( + ManifestFile manifest, FileIO io, Map specsById) { + return read(manifest, io, PlaintextEncryptionManager.instance(), specsById); } /** @@ -152,39 +164,86 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outp * @param formatVersion a target format version * @param spec a {@link PartitionSpec} * @param outputFile an {@link OutputFile} where the manifest will be written + * @param keyMetadata a key_metadata ByteBuffer, used for manifest encryption * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID * @return a manifest writer */ public static ManifestWriter write( - int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + int formatVersion, + PartitionSpec spec, + OutputFile outputFile, + ByteBuffer keyMetadata, + Long snapshotId) { switch (formatVersion) { case 1: return new ManifestWriter.V1Writer(spec, outputFile, snapshotId); case 2: - return new ManifestWriter.V2Writer(spec, outputFile, snapshotId); + return new ManifestWriter.V2Writer(spec, outputFile, keyMetadata, snapshotId); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); } + /** + * TODO Spark procedures; Flink; and tests + * + *

Manifests created by this writer have all entry snapshot IDs set to null. All entries will + * inherit the snapshot ID that will be assigned to the manifest on commit. + */ + public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { + return write(1, spec, outputFile, null); + } + + /** TODO Spark actions, benchmarks; Flink; and tests */ + public static ManifestWriter write( + int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + return write(formatVersion, spec, outputFile, EncryptionKeyMetadata.EMPTY.buffer(), snapshotId); + } + /** * Returns a new {@link ManifestReader} for a {@link ManifestFile}. * * @param manifest a {@link ManifestFile} * @param io a {@link FileIO} + * @param encryption a {@link EncryptionManager} * @param specsById a Map from spec ID to partition spec * @return a {@link ManifestReader} */ public static ManifestReader readDeleteManifest( - ManifestFile manifest, FileIO io, Map specsById) { + ManifestFile manifest, + FileIO io, + EncryptionManager encryption, + Map specsById) { Preconditions.checkArgument( manifest.content() == ManifestContent.DELETES, "Cannot read a data manifest with a DeleteManifestReader: %s", manifest); - InputFile file = newInputFile(io, manifest.path(), manifest.length()); + + InputFile inputFile; + if (manifest.keyMetadata() == null) { // unencrypted manifest + inputFile = newInputFile(io, manifest.path(), manifest.length()); + } else { + EncryptedInputFile encryptedFile = + EncryptedFiles.encryptedInput( + newInputFile( + io, manifest.path(), EncryptionUtil.gcmEncryptionLength(manifest.length())), + manifest.keyMetadata()); + inputFile = encryption.decrypt(encryptedFile); + } + InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>( - file, manifest.partitionSpecId(), specsById, inheritableMetadata, FileType.DELETE_FILES); + inputFile, + manifest.partitionSpecId(), + specsById, + inheritableMetadata, + FileType.DELETE_FILES); + } + + /** TODO Spark actions, Flink; and tests */ + public static ManifestReader readDeleteManifest( + ManifestFile manifest, FileIO io, Map specsById) { + return readDeleteManifest(manifest, io, PlaintextEncryptionManager.instance(), specsById); } /** @@ -193,21 +252,32 @@ public static ManifestReader readDeleteManifest( * @param formatVersion a target format version * @param spec a {@link PartitionSpec} * @param outputFile an {@link OutputFile} where the manifest will be written + * @param keyMetadata a key_metadata ByteBuffer, used for manifest encryption * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID * @return a manifest writer */ public static ManifestWriter writeDeleteManifest( - int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + int formatVersion, + PartitionSpec spec, + OutputFile outputFile, + ByteBuffer keyMetadata, + Long snapshotId) { switch (formatVersion) { case 1: throw new IllegalArgumentException("Cannot write delete files in a v1 table"); case 2: - return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId); + return new ManifestWriter.V2DeleteWriter(spec, outputFile, keyMetadata, snapshotId); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); } + // TODO Flink; and tests + public static ManifestWriter writeDeleteManifest( + int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + return writeDeleteManifest(formatVersion, spec, outputFile, null, snapshotId); + } + /** * Encode the {@link ManifestFile} to a byte array by using avro encoder. * @@ -233,17 +303,16 @@ public static ManifestFile decode(byte[] manifestData) throws IOException { return AvroEncoderUtil.decode(manifestData); } - static ManifestReader open(ManifestFile manifest, FileIO io) { - return open(manifest, io, null); - } - static ManifestReader open( - ManifestFile manifest, FileIO io, Map specsById) { + ManifestFile manifest, + FileIO io, + EncryptionManager encryption, + Map specsById) { switch (manifest.content()) { case DATA: - return ManifestFiles.read(manifest, io, specsById); + return ManifestFiles.read(manifest, io, encryption, specsById); case DELETES: - return ManifestFiles.readDeleteManifest(manifest, io, specsById); + return ManifestFiles.readDeleteManifest(manifest, io, encryption, specsById); } throw new UnsupportedOperationException( "Cannot read unknown manifest type: " + manifest.content()); @@ -255,6 +324,7 @@ static ManifestFile copyAppendManifest( InputFile toCopy, Map specsById, OutputFile outputFile, + ByteBuffer keyMetadata, long snapshotId, SnapshotSummary.Builder summaryBuilder) { // use metadata that will add the current snapshot's ID for the rewrite @@ -265,6 +335,7 @@ static ManifestFile copyAppendManifest( formatVersion, reader, outputFile, + keyMetadata, snapshotId, summaryBuilder, ManifestEntry.Status.ADDED); @@ -279,6 +350,7 @@ static ManifestFile copyRewriteManifest( InputFile toCopy, Map specsById, OutputFile outputFile, + ByteBuffer keyMetadata, long snapshotId, SnapshotSummary.Builder summaryBuilder) { // for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an @@ -290,6 +362,7 @@ static ManifestFile copyRewriteManifest( formatVersion, reader, outputFile, + keyMetadata, snapshotId, summaryBuilder, ManifestEntry.Status.EXISTING); @@ -303,10 +376,12 @@ private static ManifestFile copyManifestInternal( int formatVersion, ManifestReader reader, OutputFile outputFile, + ByteBuffer manifestKeyMetadata, long snapshotId, SnapshotSummary.Builder summaryBuilder, ManifestEntry.Status allowedEntryStatus) { - ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId); + ManifestWriter writer = + write(formatVersion, reader.spec(), outputFile, manifestKeyMetadata, snapshotId); boolean threw = true; try { for (ManifestEntry entry : reader.entries()) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index 97e480837c7b..f88519b18122 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -27,6 +27,8 @@ import java.util.concurrent.ExecutorService; import java.util.function.BiFunction; import java.util.function.Predicate; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -49,6 +51,7 @@ class ManifestGroup { private static final Types.StructType EMPTY_STRUCT = Types.StructType.of(); private final FileIO io; + private final EncryptionManager encryption; private final Set dataManifests; private final DeleteFileIndex.Builder deleteIndexBuilder; private Predicate manifestPredicate; @@ -66,18 +69,28 @@ class ManifestGroup { private ExecutorService executorService; private ScanMetrics scanMetrics; - ManifestGroup(FileIO io, Iterable manifests) { + ManifestGroup(FileIO io, EncryptionManager encryption, Iterable manifests) { this( io, + encryption, Iterables.filter(manifests, manifest -> manifest.content() == ManifestContent.DATA), Iterables.filter(manifests, manifest -> manifest.content() == ManifestContent.DELETES)); } ManifestGroup( FileIO io, Iterable dataManifests, Iterable deleteManifests) { + this(io, PlaintextEncryptionManager.instance(), dataManifests, deleteManifests); + } + + ManifestGroup( + FileIO io, + EncryptionManager encryption, + Iterable dataManifests, + Iterable deleteManifests) { this.io = io; + this.encryption = encryption; this.dataManifests = Sets.newHashSet(dataManifests); - this.deleteIndexBuilder = DeleteFileIndex.builderFor(io, deleteManifests); + this.deleteIndexBuilder = DeleteFileIndex.builderFor(io, encryption, deleteManifests); this.dataFilter = Expressions.alwaysTrue(); this.fileFilter = Expressions.alwaysTrue(); this.partitionFilter = Expressions.alwaysTrue(); @@ -317,7 +330,7 @@ private Iterable> entries( @Override public CloseableIterator iterator() { ManifestReader reader = - ManifestFiles.read(manifest, io, specsById) + ManifestFiles.read(manifest, io, encryption, specsById) .filterRows(dataFilter) .filterPartitions(partitionFilter) .caseSensitive(caseSensitive) diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 4865ccfc3b2d..f52bcdb19b9a 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -19,6 +19,7 @@ package org.apache.iceberg; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; @@ -37,6 +38,7 @@ public abstract class ManifestWriter> implements FileAp static final long UNASSIGNED_SEQ = -1L; private final OutputFile file; + private final ByteBuffer keyMetadata; private final int specId; private final FileAppender> writer; private final Long snapshotId; @@ -52,13 +54,15 @@ public abstract class ManifestWriter> implements FileAp private long deletedRows = 0L; private Long minDataSequenceNumber = null; - private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { + private ManifestWriter( + PartitionSpec spec, OutputFile file, ByteBuffer keyMetadata, Long snapshotId) { this.file = file; this.specId = spec.specId(); this.writer = newAppender(spec, file); this.snapshotId = snapshotId; this.reused = new GenericManifestEntry<>(spec.partitionType()); this.stats = new PartitionSummary(spec); + this.keyMetadata = keyMetadata; } protected abstract ManifestEntry prepare(ManifestEntry entry); @@ -204,7 +208,7 @@ public ManifestFile toManifestFile() { deletedFiles, deletedRows, stats.summaries(), - null); + keyMetadata); } @Override @@ -216,8 +220,8 @@ public void close() throws IOException { static class V2Writer extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V2Writer(PartitionSpec spec, OutputFile file, ByteBuffer keyMetadata, Long snapshotId) { + super(spec, file, keyMetadata, snapshotId); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -250,8 +254,8 @@ protected FileAppender> newAppender( static class V2DeleteWriter extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V2DeleteWriter(PartitionSpec spec, OutputFile file, ByteBuffer keyMetadata, Long snapshotId) { + super(spec, file, keyMetadata, snapshotId); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -290,7 +294,7 @@ static class V1Writer extends ManifestWriter { private final V1Metadata.IndexedManifestEntry entryWrapper; V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + super(spec, file, null, snapshotId); this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType()); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index 5c850ec6f9fb..fc7821aeb642 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -77,6 +78,7 @@ MetadataTableType metadataTableType() { protected DataTask task(TableScan scan) { FileIO io = table().io(); + EncryptionManager encryption = table().encryption(); String location = scan.snapshot().manifestListLocation(); Map specs = Maps.newHashMap(table().specs()); @@ -85,7 +87,7 @@ protected DataTask task(TableScan scan) { location != null ? location : table().operations().current().metadataFileLocation()), schema(), scan.schema(), - scan.snapshot().allManifests(io), + scan.snapshot().allManifests(io, encryption), manifest -> { PartitionSpec spec = specs.get(manifest.partitionSpecId()); return ManifestsTable.manifestFileToRow(spec, manifest); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 5d3ec6e35f0d..4cc00072eab1 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -30,6 +30,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; @@ -38,7 +41,6 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Predicate; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -275,14 +277,18 @@ protected void add(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { TableMetadata current = ops.current(); - InputFile toCopy = ops.io().newInputFile(manifest.path()); - OutputFile newManifestPath = newManifestOutput(); + EncryptedInputFile encryptedFile = + EncryptedFiles.encryptedInput( + ops.io().newInputFile(manifest.path()), manifest.keyMetadata()); + InputFile toCopy = ops.encryption().decrypt(encryptedFile); + EncryptedOutputFile newManifestPath = newEncryptedManifest(); return ManifestFiles.copyAppendManifest( current.formatVersion(), manifest.partitionSpecId(), toCopy, current.specsById(), - newManifestPath, + newManifestPath.encryptingOutputFile(), + newManifestPath.keyMetadata().buffer(), snapshotId(), appendedManifestsSummary); } @@ -379,7 +385,7 @@ private CloseableIterable> addedDataFiles( Set newSnapshots = history.second(); ManifestGroup manifestGroup = - new ManifestGroup(ops.io(), manifests, ImmutableList.of()) + new ManifestGroup(ops.io(), ops.encryption(), manifests, ImmutableList.of()) .caseSensitive(caseSensitive) .filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId())) .specsById(base.specsById()) @@ -542,7 +548,7 @@ protected DeleteFileIndex addedDeleteFiles( Snapshot parent) { // if there is no current table state, return empty delete file index if (parent == null || base.formatVersion() < 2) { - return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of()) + return DeleteFileIndex.builderFor(ops.io(), ops.encryption(), ImmutableList.of()) .specsById(base.specsById()) .build(); } @@ -650,7 +656,7 @@ private CloseableIterable> deletedDataFiles( Set newSnapshots = history.second(); ManifestGroup manifestGroup = - new ManifestGroup(ops.io(), manifests, ImmutableList.of()) + new ManifestGroup(ops.io(), ops.encryption(), manifests, ImmutableList.of()) .caseSensitive(caseSensitive) .filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId())) .filterManifestEntries(entry -> entry.status().equals(ManifestEntry.Status.DELETED)) @@ -689,7 +695,7 @@ private DeleteFileIndex buildDeleteFileIndex( Expression dataFilter, PartitionSet partitionSet) { DeleteFileIndex.Builder builder = - DeleteFileIndex.builderFor(ops.io(), deleteManifests) + DeleteFileIndex.builderFor(ops.io(), ops.encryption(), deleteManifests) .afterSequenceNumber(startingSequenceNumber) .caseSensitive(caseSensitive) .specsById(ops.current().specsById()); @@ -730,7 +736,7 @@ protected void validateDataFilesExist( Set newSnapshots = history.second(); ManifestGroup matchingDeletesGroup = - new ManifestGroup(ops.io(), manifests, ImmutableList.of()) + new ManifestGroup(ops.io(), ops.encryption(), manifests, ImmutableList.of()) .filterManifestEntries( entry -> entry.status() != ManifestEntry.Status.ADDED @@ -775,13 +781,14 @@ private Pair, Set> validationHistory( if (matchingOperations.contains(currentSnapshot.operation())) { newSnapshots.add(currentSnapshot.snapshotId()); if (content == ManifestContent.DATA) { - for (ManifestFile manifest : currentSnapshot.dataManifests(ops.io())) { + for (ManifestFile manifest : currentSnapshot.dataManifests(ops.io(), ops.encryption())) { if (manifest.snapshotId() == currentSnapshot.snapshotId()) { manifests.add(manifest); } } } else { - for (ManifestFile manifest : currentSnapshot.deleteManifests(ops.io())) { + for (ManifestFile manifest : + currentSnapshot.deleteManifests(ops.io(), ops.encryption())) { if (manifest.snapshotId() == currentSnapshot.snapshotId()) { manifests.add(manifest); } @@ -815,7 +822,7 @@ public List apply(TableMetadata base, Snapshot snapshot) { List filtered = filterManager.filterManifests( SnapshotUtil.schemaFor(base, targetBranch()), - snapshot != null ? snapshot.dataManifests(ops.io()) : null); + snapshot != null ? snapshot.dataManifests(ops.io(), ops.encryption()) : null); long minDataSequenceNumber = filtered.stream() .map(ManifestFile::minSequenceNumber) @@ -829,7 +836,7 @@ public List apply(TableMetadata base, Snapshot snapshot) { List filteredDeletes = deleteFilterManager.filterManifests( SnapshotUtil.schemaFor(base, targetBranch()), - snapshot != null ? snapshot.deleteManifests(ops.io()) : null); + snapshot != null ? snapshot.deleteManifests(ops.io(), ops.encryption()) : null); // only keep manifests that have live data files or that were written by this commit Predicate shouldKeep = diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java index d96246f15b02..1ada5182d421 100644 --- a/core/src/main/java/org/apache/iceberg/MicroBatches.java +++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; @@ -37,12 +39,22 @@ public class MicroBatches { private MicroBatches() {} + // no use TODO Deprecate / handle in revapi public static List> skippedManifestIndexesFromSnapshot( FileIO io, Snapshot snapshot, long startFileIndex, boolean scanAllFiles) { + return null; + } + + public static List> skippedManifestIndexesFromSnapshot( + FileIO io, + EncryptionManager encryption, + Snapshot snapshot, + long startFileIndex, + boolean scanAllFiles) { List manifests = scanAllFiles - ? snapshot.dataManifests(io) - : snapshot.dataManifests(io).stream() + ? snapshot.dataManifests(io, encryption) + : snapshot.dataManifests(io, encryption).stream() .filter(m -> m.snapshotId().equals(snapshot.snapshotId())) .collect(Collectors.toList()); @@ -58,9 +70,27 @@ public static CloseableIterable openManifestFile( Snapshot snapshot, ManifestFile manifestFile, boolean scanAllFiles) { + return openManifestFile( + io, + PlaintextEncryptionManager.instance(), + specsById, + caseSensitive, + snapshot, + manifestFile, + scanAllFiles); + } + + public static CloseableIterable openManifestFile( + FileIO io, + EncryptionManager encryption, + Map specsById, + boolean caseSensitive, + Snapshot snapshot, + ManifestFile manifestFile, + boolean scanAllFiles) { ManifestGroup manifestGroup = - new ManifestGroup(io, ImmutableList.of(manifestFile)) + new ManifestGroup(io, encryption, ImmutableList.of(manifestFile)) .specsById(specsById) .caseSensitive(caseSensitive); if (!scanAllFiles) { @@ -175,8 +205,13 @@ public boolean lastIndexOfSnapshot() { } } + /** Tests only */ public static MicroBatchBuilder from(Snapshot snapshot, FileIO io) { - return new MicroBatchBuilder(snapshot, io); + return from(snapshot, io, PlaintextEncryptionManager.instance()); + } + + public static MicroBatchBuilder from(Snapshot snapshot, FileIO io, EncryptionManager encryption) { + return new MicroBatchBuilder(snapshot, io, encryption); } public static class MicroBatchBuilder { @@ -184,12 +219,14 @@ public static class MicroBatchBuilder { private final Snapshot snapshot; private final FileIO io; + private final EncryptionManager encryption; private boolean caseSensitive; private Map specsById; - private MicroBatchBuilder(Snapshot snapshot, FileIO io) { + private MicroBatchBuilder(Snapshot snapshot, FileIO io, EncryptionManager encryption) { this.snapshot = snapshot; this.io = io; + this.encryption = encryption; this.caseSensitive = true; } @@ -206,7 +243,7 @@ public MicroBatchBuilder specsById(Map specs) { public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) { return generate( startFileIndex, - Iterables.size(snapshot.addedDataFiles(io)), + Iterables.size(snapshot.addedDataFiles(io, encryption)), targetSizeInBytes, scanAllFiles); } @@ -220,7 +257,8 @@ public MicroBatch generate( targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0"); return generateMicroBatch( - skippedManifestIndexesFromSnapshot(io, snapshot, startFileIndex, scanAllFiles), + skippedManifestIndexesFromSnapshot( + io, encryption, snapshot, startFileIndex, scanAllFiles), startFileIndex, endFileIndex, targetSizeInBytes, @@ -263,6 +301,7 @@ private MicroBatch generateMicroBatch( try (CloseableIterable taskIterable = openManifestFile( io, + encryption, specsById, caseSensitive, snapshot, diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java index 5ff796e95827..c0c90840b28b 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java +++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java @@ -187,7 +187,8 @@ static CloseableIterable> planEntries(StaticTableScan scan) { Table table = scan.table(); CloseableIterable filteredManifests = - filteredManifests(scan, table, scan.snapshot().allManifests(table.io())); + filteredManifests( + scan, table, scan.snapshot().allManifests(table.io(), table.encryption())); Iterable>> tasks = CloseableIterable.transform(filteredManifests, manifest -> readEntries(manifest, scan)); @@ -199,7 +200,7 @@ private static CloseableIterable> readEntries( ManifestFile manifest, StaticTableScan scan) { Table table = scan.table(); return CloseableIterable.transform( - ManifestFiles.open(manifest, table.io(), table.specs()) + ManifestFiles.open(manifest, table.io(), table.encryption(), table.specs()) .caseSensitive(scan.isCaseSensitive()) .select(scanColumns(manifest.content())) // don't select stats columns .liveEntries(), diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index 68f2e0d4522a..0f0a4d6729d4 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -234,7 +234,7 @@ protected CloseableIterable doPlanFiles() { isCaseSensitive())); // iterate through delete manifests - List manifests = snapshot().deleteManifests(table().io()); + List manifests = snapshot().deleteManifests(table().io(), table.encryption()); CloseableIterable matchingManifests = CloseableIterable.filter( @@ -289,7 +289,8 @@ public CloseableIterator iterator() { // Filter partitions CloseableIterable> deleteFileEntries = - ManifestFiles.readDeleteManifest(manifest, table().io(), transformedSpecs) + ManifestFiles.readDeleteManifest( + manifest, table().io(), table().encryption(), transformedSpecs) .caseSensitive(isCaseSensitive()) .select(scanColumns()) .filterRows(filter()) diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java index dd4239196996..111611f84825 100644 --- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java +++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.Consumer; import java.util.stream.Collectors; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; @@ -42,10 +43,11 @@ class ReachableFileCleanup extends FileCleanupStrategy { ReachableFileCleanup( FileIO fileIO, + EncryptionManager encryptionManager, ExecutorService deleteExecutorService, ExecutorService planExecutorService, Consumer deleteFunc) { - super(fileIO, deleteExecutorService, planExecutorService, deleteFunc); + super(fileIO, encryptionManager, deleteExecutorService, planExecutorService, deleteFunc); } @Override @@ -164,7 +166,8 @@ private Set findFilesToDelete( "Failed to determine live files in manifest {}. Retrying", item.path(), exc)) .run( manifest -> { - try (CloseableIterable paths = ManifestFiles.readPaths(manifest, fileIO)) { + try (CloseableIterable paths = + ManifestFiles.readPaths(manifest, fileIO, encryptionManager)) { paths.forEach(filesToDelete::add); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest); @@ -192,7 +195,8 @@ private Set findFilesToDelete( } // Remove all the live files from the candidate deletion set - try (CloseableIterable paths = ManifestFiles.readPaths(manifest, fileIO)) { + try (CloseableIterable paths = + ManifestFiles.readPaths(manifest, fileIO, encryptionManager)) { paths.forEach(filesToDelete::remove); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest); diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index fa6fcdf41442..07144aef3007 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -331,9 +331,9 @@ private void cleanExpiredSnapshots() { FileCleanupStrategy cleanupStrategy = incrementalCleanup ? new IncrementalFileCleanup( - ops.io(), deleteExecutorService, planExecutorService, deleteFunc) + ops.io(), ops.encryption(), deleteExecutorService, planExecutorService, deleteFunc) : new ReachableFileCleanup( - ops.io(), deleteExecutorService, planExecutorService, deleteFunc); + ops.io(), ops.encryption(), deleteExecutorService, planExecutorService, deleteFunc); cleanupStrategy.cleanFiles(base, current); } diff --git a/core/src/main/java/org/apache/iceberg/ScanSummary.java b/core/src/main/java/org/apache/iceberg/ScanSummary.java index f975ef1636df..f9204f04f937 100644 --- a/core/src/main/java/org/apache/iceberg/ScanSummary.java +++ b/core/src/main/java/org/apache/iceberg/ScanSummary.java @@ -158,7 +158,8 @@ public Map build() { removeTimeFilters(filters, Expressions.rewriteNot(scan.filter())); Expression rowFilter = joinFilters(filters); - Iterable manifests = table.currentSnapshot().dataManifests(ops.io()); + Iterable manifests = + table.currentSnapshot().dataManifests(ops.io(), ops.encryption()); boolean filterByTimestamp = !timeFilters.isEmpty(); Set snapshotsInTimeRange = Sets.newHashSet(); @@ -224,7 +225,7 @@ private Map computeTopPartitionMetrics( new TopN<>(limit, throwIfLimited, Comparators.charSequences()); try (CloseableIterable> entries = - new ManifestGroup(ops.io(), manifests) + new ManifestGroup(ops.io(), ops.encryption(), manifests) .specsById(ops.current().specsById()) .filterData(rowFilter) .ignoreDeleted() diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index bc5ef6094695..a7cc9ac18269 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -48,6 +48,7 @@ private SnapshotParser() {} private static final String MANIFESTS = "manifests"; private static final String MANIFEST_LIST = "manifest-list"; private static final String SCHEMA_ID = "schema-id"; + private static final String MANIFEST_LIST_KEY_METADATA = "manifest-list-key-metadata"; static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException { generator.writeStartObject(); @@ -93,6 +94,10 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio generator.writeNumberField(SCHEMA_ID, snapshot.schemaId()); } + if (snapshot.manifestKeyMetadata() != null) { + generator.writeStringField(MANIFEST_LIST_KEY_METADATA, snapshot.manifestKeyMetadata()); + } + generator.writeEndObject(); } @@ -147,6 +152,13 @@ static Snapshot fromJson(JsonNode node) { if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); + + // Manifest list can be encrypted + String manifestListKeyMetadata = null; + if (node.has(MANIFEST_LIST_KEY_METADATA)) { + manifestListKeyMetadata = JsonUtil.getString(MANIFEST_LIST_KEY_METADATA, node); + } + return new BaseSnapshot( sequenceNumber, snapshotId, @@ -155,7 +167,8 @@ static Snapshot fromJson(JsonNode node) { operation, summary, schemaId, - manifestList); + manifestList, + manifestListKeyMetadata); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 757d0b78bca7..fcc4216bf1fe 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -34,7 +34,9 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Base64; import java.util.List; import java.util.Map; import java.util.Set; @@ -43,6 +45,12 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.NativeEncryptionKeyMetadata; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.events.Listeners; import org.apache.iceberg.exceptions.CleanableFailure; @@ -235,10 +243,36 @@ public Snapshot apply() { OutputFile manifestList = manifestListPath(); + EncryptionManager encryptionManager = ops.encryption(); + EncryptedOutputFile encryptedManifestList = encryptionManager.encrypt(manifestList); + + long manifestListLength; + String manifestListKeyMetadata = null; + if (encryptedManifestList.keyMetadata() != null + && encryptedManifestList.keyMetadata() != EncryptionKeyMetadata.EMPTY) { + Preconditions.checkArgument( + encryptionManager instanceof StandardEncryptionManager, + "Encryption manager for encrypted manifest list files can currently only be an instance of " + + StandardEncryptionManager.class); + NativeEncryptionKeyMetadata keyMetadata = + (NativeEncryptionKeyMetadata) encryptedManifestList.keyMetadata(); + ByteBuffer manifestListEncryptionKey = keyMetadata.encryptionKey(); + ByteBuffer wrappedEncryptionKey = + ((StandardEncryptionManager) encryptionManager).wrapKey(manifestListEncryptionKey); + + ByteBuffer manifestListAADPrefix = keyMetadata.aadPrefix(); + manifestListKeyMetadata = + Base64.getEncoder() + .encodeToString( + EncryptionUtil.createKeyMetadata(wrappedEncryptionKey, manifestListAADPrefix) + .buffer() + .array()); + } + try (ManifestListWriter writer = ManifestLists.write( ops.current().formatVersion(), - manifestList, + encryptedManifestList.encryptingOutputFile(), snapshotId(), parentSnapshotId, sequenceNumber)) { @@ -256,6 +290,7 @@ public Snapshot apply() { writer.addAll(Arrays.asList(manifestFiles)); + manifestListLength = writer.length(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to write manifest list file"); } @@ -268,7 +303,9 @@ public Snapshot apply() { operation(), summary(base), base.currentSchemaId(), - manifestList.location()); + manifestList.location(), +// manifestListLength, + manifestListKeyMetadata); } protected abstract Map summary(); @@ -425,7 +462,7 @@ public void commit() { // id in case another commit was added between this commit and the refresh. Snapshot saved = ops.refresh().snapshot(newSnapshotId.get()); if (saved != null) { - cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io()))); + cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io(), ops.encryption()))); // also clean up unused manifest lists created by multiple attempts for (String manifestList : manifestLists) { if (!saved.manifestListLocation().equals(manifestList)) { @@ -499,21 +536,39 @@ protected OutputFile manifestListPath() { "snap-%d-%d-%s", snapshotId(), attempt.incrementAndGet(), commitUUID)))); } + // TODO Handle in revapi protected OutputFile newManifestOutput() { - return ops.io() - .newOutputFile( - ops.metadataFileLocation( - FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()))); + return null; + } + + protected EncryptedOutputFile newEncryptedManifest() { + OutputFile outputFile = + ops.io() + .newOutputFile( + ops.metadataFileLocation( + FileFormat.AVRO.addExtension( + commitUUID + "-m" + manifestCount.getAndIncrement()))); + return ops.encryption().encrypt(outputFile); } protected ManifestWriter newManifestWriter(PartitionSpec spec) { + EncryptedOutputFile encryptedFile = newEncryptedManifest(); return ManifestFiles.write( - ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); + ops.current().formatVersion(), + spec, + encryptedFile.encryptingOutputFile(), + encryptedFile.keyMetadata().buffer(), + snapshotId()); } protected ManifestWriter newDeleteManifestWriter(PartitionSpec spec) { + EncryptedOutputFile encryptedFile = newEncryptedManifest(); return ManifestFiles.writeDeleteManifest( - ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); + ops.current().formatVersion(), + spec, + encryptedFile.encryptingOutputFile(), + encryptedFile.keyMetadata().buffer(), + snapshotId()); } protected RollingManifestWriter newRollingManifestWriter(PartitionSpec spec) { @@ -526,11 +581,12 @@ protected RollingManifestWriter newRollingDeleteManifestWriter(Parti } protected ManifestReader newManifestReader(ManifestFile manifest) { - return ManifestFiles.read(manifest, ops.io(), ops.current().specsById()); + return ManifestFiles.read(manifest, ops.io(), ops.encryption(), ops.current().specsById()); } protected ManifestReader newDeleteManifestReader(ManifestFile manifest) { - return ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById()); + return ManifestFiles.readDeleteManifest( + manifest, ops.io(), ops.encryption(), ops.current().specsById()); } protected long snapshotId() { @@ -550,7 +606,7 @@ protected boolean canInheritSnapshotId() { private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) { try (ManifestReader reader = - ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { + ManifestFiles.read(manifest, ops.io(), ops.encryption(), ops.current().specsById())) { PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId())); int addedFiles = 0; long addedRows = 0L; diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index ba6bb4a31960..8f3b71d39925 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -47,7 +47,8 @@ private V2Metadata() {} ManifestFile.ADDED_ROWS_COUNT.asRequired(), ManifestFile.EXISTING_ROWS_COUNT.asRequired(), ManifestFile.DELETED_ROWS_COUNT.asRequired(), - ManifestFile.PARTITION_SUMMARIES); + ManifestFile.PARTITION_SUMMARIES, + ManifestFile.KEY_METADATA); /** * A wrapper class to write any ManifestFile implementation to Avro using the v2 write schema. diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java index e2cf98bf767f..25545c6845b3 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.encryption; +import java.nio.ByteBuffer; import java.util.Map; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.TableProperties; @@ -30,6 +31,27 @@ public class EncryptionUtil { private EncryptionUtil() {} + public static EncryptionKeyMetadata createKeyMetadata(ByteBuffer key, ByteBuffer aadPrefix) { + return new StandardKeyMetadata(key.array(), aadPrefix.array()); + } + + public static NativeEncryptionKeyMetadata parseKeyMetadata(ByteBuffer keyMetadataBytes) { + return StandardKeyMetadata.parse(keyMetadataBytes); + } + + public static long gcmEncryptionLength(long plainFileLength) { + int numberOfFullBlocks = Math.toIntExact(plainFileLength / Ciphers.PLAIN_BLOCK_SIZE); + int plainBytesInLastBlock = + Math.toIntExact(plainFileLength - numberOfFullBlocks * Ciphers.PLAIN_BLOCK_SIZE); + boolean fullBlocksOnly = (0 == plainBytesInLastBlock); + int cipherBytesInLastBlock = + fullBlocksOnly ? 0 : plainBytesInLastBlock + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + int cipherBlockSize = Ciphers.PLAIN_BLOCK_SIZE + Ciphers.NONCE_LENGTH + Ciphers.GCM_TAG_LENGTH; + return (long) Ciphers.GCM_STREAM_HEADER_LENGTH + + numberOfFullBlocks * cipherBlockSize + + cipherBytesInLastBlock; + } + public static KeyManagementClient createKmsClient(Map catalogProperties) { String kmsType = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_TYPE); String kmsImpl = catalogProperties.get(CatalogProperties.ENCRYPTION_KMS_IMPL); diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java b/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java index a7fb494cc8e1..6f834c69ed86 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java @@ -24,7 +24,7 @@ import java.util.Map; /** A minimum client interface to connect to a key management service (KMS). */ -interface KeyManagementClient extends Serializable, Closeable { +public interface KeyManagementClient extends Serializable, Closeable { /** * Wrap a secret key, using a wrapping/master key which is stored in KMS and referenced by an ID. diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java index b0ec879bda8d..78ff57d1cc6e 100644 --- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java @@ -30,6 +30,7 @@ import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -281,8 +282,18 @@ private static Iterable toIds(Iterable snapshots) { return Iterables.transform(snapshots, Snapshot::snapshotId); } + // TODO deprecate / handle in revapi public static List newFiles( Long baseSnapshotId, long latestSnapshotId, Function lookup, FileIO io) { + return null; + } + + public static List newFiles( + Long baseSnapshotId, + long latestSnapshotId, + Function lookup, + FileIO io, + EncryptionManager encryption) { List newFiles = Lists.newArrayList(); Snapshot lastSnapshot = null; for (Snapshot currentSnapshot : ancestorsOf(latestSnapshotId, lookup)) { @@ -291,7 +302,7 @@ public static List newFiles( return newFiles; } - Iterables.addAll(newFiles, currentSnapshot.addedDataFiles(io)); + Iterables.addAll(newFiles, currentSnapshot.addedDataFiles(io, encryption)); } ValidationException.check( diff --git a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java new file mode 100644 index 000000000000..77cd34a73a81 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.avro.InvalidAvroMagicException; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionTestHelpers; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Types; +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestManifestEncryption { + private static final FileIO FILE_IO = new TestTables.LocalFileIO(); + + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), + required(2, "timestamp", Types.TimestampType.withZone()), + required(3, "category", Types.StringType.get()), + required(4, "data", Types.StringType.get()), + required(5, "double", Types.DoubleType.get())); + + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA) + .identity("category") + .hour("timestamp") + .bucket("id", 16) + .build(); + + private static final long SNAPSHOT_ID = 987134631982734L; + private static final String PATH = + "s3://bucket/table/category=cheesy/timestamp_hour=10/id_bucket=3/file.avro"; + private static final FileFormat FORMAT = FileFormat.AVRO; + private static final PartitionData PARTITION = + DataFiles.data(SPEC, "category=cheesy/timestamp_hour=10/id_bucket=3"); + private static final Metrics METRICS = + new Metrics( + 1587L, + ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L, 5, 15L), // sizes + ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L, 5, 100L), // value counts + ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L, 5, 0L), // null value counts + ImmutableMap.of(5, 10L), // nan value counts + ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds + ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds + private static final List OFFSETS = ImmutableList.of(4L); + private static final Integer SORT_ORDER_ID = 2; + + private static final ByteBuffer CONTENT_KEY_METADATA = ByteBuffer.allocate(100); + + private static final DataFile DATA_FILE = + new GenericDataFile( + 0, + PATH, + FORMAT, + PARTITION, + 150972L, + METRICS, + CONTENT_KEY_METADATA, + OFFSETS, + SORT_ORDER_ID); + + private static final List EQUALITY_IDS = ImmutableList.of(1); + private static final int[] EQUALITY_ID_ARR = new int[] {1}; + + private static final DeleteFile DELETE_FILE = + new GenericDeleteFile( + 0, + FileContent.EQUALITY_DELETES, + PATH, + FORMAT, + PARTITION, + 22905L, + METRICS, + EQUALITY_ID_ARR, + SORT_ORDER_ID, + null, + CONTENT_KEY_METADATA); + + private static final EncryptionManager ENCRYPTION_MANAGER = + EncryptionTestHelpers.createEncryptionManager(); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testV2Write() throws IOException { + ManifestFile manifest = writeManifest(2); + checkEntry( + readManifest(manifest), + ManifestWriter.UNASSIGNED_SEQ, + ManifestWriter.UNASSIGNED_SEQ, + FileContent.DATA); + } + + @Test + public void testV2WriteDelete() throws IOException { + ManifestFile manifest = writeDeleteManifest(2); + checkEntry( + readDeleteManifest(manifest), + ManifestWriter.UNASSIGNED_SEQ, + ManifestWriter.UNASSIGNED_SEQ, + FileContent.EQUALITY_DELETES); + } + + void checkEntry( + ManifestEntry entry, + Long expectedDataSequenceNumber, + Long expectedFileSequenceNumber, + FileContent content) { + Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status()); + Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId()); + Assert.assertEquals( + "Data sequence number", expectedDataSequenceNumber, entry.dataSequenceNumber()); + Assert.assertEquals( + "File sequence number", expectedFileSequenceNumber, entry.fileSequenceNumber()); + checkDataFile(entry.file(), content); + } + + void checkDataFile(ContentFile dataFile, FileContent content) { + // DataFile is the superclass of DeleteFile, so this method can check both + Assert.assertEquals("Content", content, dataFile.content()); + Assert.assertEquals("Path", PATH, dataFile.path()); + Assert.assertEquals("Format", FORMAT, dataFile.format()); + Assert.assertEquals("Partition", PARTITION, dataFile.partition()); + Assert.assertEquals("Record count", METRICS.recordCount(), (Long) dataFile.recordCount()); + Assert.assertEquals("Column sizes", METRICS.columnSizes(), dataFile.columnSizes()); + Assert.assertEquals("Value counts", METRICS.valueCounts(), dataFile.valueCounts()); + Assert.assertEquals("Null value counts", METRICS.nullValueCounts(), dataFile.nullValueCounts()); + Assert.assertEquals("NaN value counts", METRICS.nanValueCounts(), dataFile.nanValueCounts()); + Assert.assertEquals("Lower bounds", METRICS.lowerBounds(), dataFile.lowerBounds()); + Assert.assertEquals("Upper bounds", METRICS.upperBounds(), dataFile.upperBounds()); + Assert.assertEquals("Sort order id", SORT_ORDER_ID, dataFile.sortOrderId()); + if (dataFile.content() == FileContent.EQUALITY_DELETES) { + Assert.assertEquals(EQUALITY_IDS, dataFile.equalityFieldIds()); + } else { + Assert.assertNull(dataFile.equalityFieldIds()); + } + } + + private ManifestFile writeManifest(int formatVersion) throws IOException { + return writeManifest(DATA_FILE, formatVersion); + } + + private ManifestFile writeManifest(DataFile file, int formatVersion) throws IOException { + OutputFile manifestFile = + Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString())); + EncryptedOutputFile encryptedManifest = ENCRYPTION_MANAGER.encrypt(manifestFile); + ManifestWriter writer = + ManifestFiles.write( + formatVersion, + SPEC, + encryptedManifest.encryptingOutputFile(), + encryptedManifest.keyMetadata().buffer(), + SNAPSHOT_ID); + try { + writer.add(file); + } finally { + writer.close(); + } + return writer.toManifestFile(); + } + + private ManifestEntry readManifest(ManifestFile manifest) throws IOException { + + // First try to read without decryption + Assertions.assertThatThrownBy(() -> ManifestFiles.read(manifest, FILE_IO, null)) + .isInstanceOf(RuntimeIOException.class) + .hasMessageContaining("Failed to open file") + .hasCauseInstanceOf(InvalidAvroMagicException.class); + + try (CloseableIterable> reader = + ManifestFiles.read(manifest, FILE_IO, ENCRYPTION_MANAGER, null).entries()) { + List> files = Lists.newArrayList(reader); + Assert.assertEquals("Should contain only one data file", 1, files.size()); + return files.get(0); + } + } + + private ManifestFile writeDeleteManifest(int formatVersion) throws IOException { + OutputFile manifestFile = + Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString())); + EncryptedOutputFile encryptedManifest = ENCRYPTION_MANAGER.encrypt(manifestFile); + ManifestWriter writer = + ManifestFiles.writeDeleteManifest( + formatVersion, + SPEC, + encryptedManifest.encryptingOutputFile(), + encryptedManifest.keyMetadata().buffer(), + SNAPSHOT_ID); + try { + writer.add(DELETE_FILE); + } finally { + writer.close(); + } + return writer.toManifestFile(); + } + + private ManifestEntry readDeleteManifest(ManifestFile manifest) throws IOException { + try (CloseableIterable> reader = + ManifestFiles.readDeleteManifest(manifest, FILE_IO, ENCRYPTION_MANAGER, null).entries()) { + List> entries = Lists.newArrayList(reader); + Assert.assertEquals("Should contain only one data file", 1, entries.size()); + return entries.get(0); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java new file mode 100644 index 000000000000..d5c7d9781c1d --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.avro.InvalidAvroMagicException; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.assertj.core.api.Assertions; +import org.junit.Assert; +import org.junit.Test; + +public class TestManifestListEncryption { + private static final String PATH = "s3://bucket/table/m1.avro"; + private static final long LENGTH = 1024L; + private static final int SPEC_ID = 1; + private static final long SEQ_NUM = 34L; + private static final long MIN_SEQ_NUM = 10L; + private static final long SNAPSHOT_ID = 987134631982734L; + private static final int ADDED_FILES = 2; + private static final long ADDED_ROWS = 5292L; + private static final int EXISTING_FILES = 343; + private static final long EXISTING_ROWS = 857273L; + private static final int DELETED_FILES = 1; + private static final long DELETED_ROWS = 22910L; + private static final List PARTITION_SUMMARIES = + ImmutableList.of(); + private static final ByteBuffer MANIFEST_KEY_METADATA = ByteBuffer.allocate(100); + + private static final ManifestFile TEST_MANIFEST = + new GenericManifestFile( + PATH, + LENGTH, + SPEC_ID, + ManifestContent.DATA, + SEQ_NUM, + MIN_SEQ_NUM, + SNAPSHOT_ID, + ADDED_FILES, + ADDED_ROWS, + EXISTING_FILES, + EXISTING_ROWS, + DELETED_FILES, + DELETED_ROWS, + PARTITION_SUMMARIES, + MANIFEST_KEY_METADATA); + + private static final EncryptionManager ENCRYPTION_MANAGER = + TestManifestEncryption.createEncryptionManager(); + + @Test + public void testV2Write() throws IOException { + ManifestFile manifest = writeAndReadManifestList(); + + // all v2 fields should be read correctly + Assert.assertEquals("Path", PATH, manifest.path()); + Assert.assertEquals("Length", LENGTH, manifest.length()); + Assert.assertEquals("Spec id", SPEC_ID, manifest.partitionSpecId()); + Assert.assertEquals("Content", ManifestContent.DATA, manifest.content()); + Assert.assertEquals("Sequence number", SEQ_NUM, manifest.sequenceNumber()); + Assert.assertEquals("Min sequence number", MIN_SEQ_NUM, manifest.minSequenceNumber()); + Assert.assertEquals("Snapshot id", SNAPSHOT_ID, (long) manifest.snapshotId()); + Assert.assertEquals("Added files count", ADDED_FILES, (int) manifest.addedFilesCount()); + Assert.assertEquals("Added rows count", ADDED_ROWS, (long) manifest.addedRowsCount()); + Assert.assertEquals( + "Existing files count", EXISTING_FILES, (int) manifest.existingFilesCount()); + Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) manifest.existingRowsCount()); + Assert.assertEquals("Deleted files count", DELETED_FILES, (int) manifest.deletedFilesCount()); + Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) manifest.deletedRowsCount()); + } + + private ManifestFile writeAndReadManifestList() throws IOException { + OutputFile rawOutput = new InMemoryOutputFile(); + EncryptedOutputFile encryptedOutput = ENCRYPTION_MANAGER.encrypt(rawOutput); + EncryptionKeyMetadata keyMetadata = encryptedOutput.keyMetadata(); + + try (FileAppender writer = + ManifestLists.write( + 2, encryptedOutput.encryptingOutputFile(), SNAPSHOT_ID, SNAPSHOT_ID - 1, SEQ_NUM)) { + writer.add(TEST_MANIFEST); + } + + InputFile rawInput = rawOutput.toInputFile(); + + // First try to read without decryption + Assertions.assertThatThrownBy(() -> ManifestLists.read(rawInput)) + .isInstanceOf(RuntimeIOException.class) + .hasMessageContaining("Failed to open file") + .hasCauseInstanceOf(InvalidAvroMagicException.class); + + EncryptedInputFile encryptedManifestListInput = + EncryptedFiles.encryptedInput(rawInput, keyMetadata); + InputFile manifestListInput = ENCRYPTION_MANAGER.decrypt(encryptedManifestListInput); + + List manifests = ManifestLists.read(manifestListInput); + Assert.assertEquals("Should contain one manifest", 1, manifests.size()); + return manifests.get(0); + } +} diff --git a/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java new file mode 100644 index 000000000000..aa49e1c40fe2 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/encryption/EncryptionTestHelpers.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class EncryptionTestHelpers { + + private EncryptionTestHelpers() {} + + public static EncryptionManager createEncryptionManager() { + Map catalogProperties = Maps.newHashMap(); + catalogProperties.put( + CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); + Map tableProperties = Maps.newHashMap(); + tableProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, UnitestKMS.MASTER_KEY_NAME1); + tableProperties.put(TableProperties.FORMAT_VERSION, "2"); + + return EncryptionUtil.createEncryptionManager( + tableProperties, EncryptionUtil.createKmsClient(catalogProperties)); + } +} diff --git a/core/src/test/java/org/apache/iceberg/encryption/UnitestKMS.java b/core/src/test/java/org/apache/iceberg/encryption/UnitestKMS.java new file mode 100644 index 000000000000..52a0e36c0011 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/encryption/UnitestKMS.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.encryption; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +public class UnitestKMS extends MemoryMockKMS { + public static final String MASTER_KEY_NAME1 = "keyA"; + public static final byte[] MASTER_KEY1 = "0123456789012345".getBytes(StandardCharsets.UTF_8); + public static final String MASTER_KEY_NAME2 = "keyB"; + public static final byte[] MASTER_KEY2 = "1123456789012345".getBytes(StandardCharsets.UTF_8); + + @Override + public void initialize(Map properties) { + masterKeys = + ImmutableMap.of( + MASTER_KEY_NAME1, MASTER_KEY1, + MASTER_KEY_NAME2, MASTER_KEY2); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 4019fedcbbfa..a99fec61da3c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -238,7 +238,7 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse } MicroBatch latestMicroBatch = - MicroBatches.from(currentSnapshot, table.io()) + MicroBatches.from(currentSnapshot, table.io(), table.encryption()) .caseSensitive(caseSensitive) .specsById(table.specs()) .generate( @@ -351,7 +351,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { // generate manifest index for the curSnapshot List> indexedManifests = MicroBatches.skippedManifestIndexesFromSnapshot( - table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles); + table.io(), table.encryption(), curSnapshot, startPosOfSnapOffset, scanAllFiles); // this is under assumption we will be able to add at-least 1 file in the new offset for (int idx = 0; idx < indexedManifests.size() && shouldContinueReading; idx++) { // be rest assured curPos >= startFileIndex @@ -359,6 +359,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { try (CloseableIterable taskIterable = MicroBatches.openManifestFile( table.io(), + table.encryption(), table.specs(), caseSensitive, curSnapshot, @@ -412,7 +413,7 @@ private long addedFilesCount(Snapshot snapshot) { // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP, // iterate through addedFiles iterator to find addedFilesCount. return addedFilesCount == -1 - ? Iterables.size(snapshot.addedDataFiles(table.io())) + ? Iterables.size(snapshot.addedDataFiles(table.io(), table.encryption())) : addedFilesCount; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java index 43ce2a303e2b..71969de8af10 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java @@ -25,6 +25,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.ClosingIterator; @@ -235,9 +236,10 @@ private static class ReadDataManifest implements FlatMapFunction call(ManifestFileBean manifest) throws Exception { FileIO io = table.value().io(); + EncryptionManager encryption = table.value().encryption(); Map specs = table.value().specs(); return new ClosingIterator<>( - ManifestFiles.read(manifest, io, specs) + ManifestFiles.read(manifest, io, encryption, specs) .select(withStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS) .filterRows(filter) .caseSensitive(isCaseSensitive) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 4019fedcbbfa..ae8aa49f1b6c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -238,7 +238,7 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse } MicroBatch latestMicroBatch = - MicroBatches.from(currentSnapshot, table.io()) + MicroBatches.from(currentSnapshot, table.io(), table.encryption()) .caseSensitive(caseSensitive) .specsById(table.specs()) .generate( @@ -359,6 +359,7 @@ public Offset latestOffset(Offset startOffset, ReadLimit limit) { try (CloseableIterable taskIterable = MicroBatches.openManifestFile( table.io(), + table.encryption(), table.specs(), caseSensitive, curSnapshot, @@ -412,7 +413,7 @@ private long addedFilesCount(Snapshot snapshot) { // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP, // iterate through addedFiles iterator to find addedFilesCount. return addedFilesCount == -1 - ? Iterables.size(snapshot.addedDataFiles(table.io())) + ? Iterables.size(snapshot.addedDataFiles(table.io(), table.encryption())) : addedFilesCount; }