diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java index c035259e0e2c..342ce59db865 100644 --- a/api/src/main/java/org/apache/iceberg/Snapshot.java +++ b/api/src/main/java/org/apache/iceberg/Snapshot.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.util.List; import java.util.Map; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; /** @@ -72,6 +73,11 @@ public interface Snapshot extends Serializable { */ List allManifests(FileIO io); + default List allManifests(FileIO fileIO, EncryptionManager encryption) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement allManifests with encryption"); + } + /** * Return a {@link ManifestFile} for each data manifest in this snapshot. * @@ -80,6 +86,11 @@ public interface Snapshot extends Serializable { */ List dataManifests(FileIO io); + default List dataManifests(FileIO io, EncryptionManager encryption) { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement dataManifests with encryption"); + } + /** * Return a {@link ManifestFile} for each delete manifest in this snapshot. * @@ -87,6 +98,7 @@ public interface Snapshot extends Serializable { * @return a list of ManifestFile */ List deleteManifests(FileIO io); + // TODO add encryption manager /** * Return the name of the {@link DataOperations data operation} that produced this snapshot. @@ -114,6 +126,7 @@ public interface Snapshot extends Serializable { * @return all data files added to the table in this snapshot. */ Iterable addedDataFiles(FileIO io); + // TODO add encryption manager /** * Return all data files removed from the table in this snapshot. @@ -126,6 +139,7 @@ public interface Snapshot extends Serializable { * @return all data files removed from the table in this snapshot. */ Iterable removedDataFiles(FileIO io); + // TODO add encryption manager /** * Return all delete files added to the table in this snapshot. @@ -140,6 +154,7 @@ default Iterable addedDeleteFiles(FileIO io) { throw new UnsupportedOperationException( this.getClass().getName() + " doesn't implement addedDeleteFiles"); } + // TODO add encryption manager /** * Return all delete files removed from the table in this snapshot. @@ -154,6 +169,7 @@ default Iterable removedDeleteFiles(FileIO io) { throw new UnsupportedOperationException( this.getClass().getName() + " doesn't implement removedDeleteFiles"); } + // TODO add encryption manager /** * Return the location of this snapshot's manifest list, or null if it is not separate. @@ -171,4 +187,8 @@ default Iterable removedDeleteFiles(FileIO io) { default Integer schemaId() { return null; } + + default String manifestKeyMetadata() { + return null; + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index ac27191df594..6dd532dce4ac 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -20,12 +20,21 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Base64; import java.util.List; import java.util.Map; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -45,6 +54,7 @@ class BaseSnapshot implements Snapshot { private final Map summary; private final Integer schemaId; private final String[] v1ManifestLocations; + private final String manifestListKeyMetadata; // lazily initialized private transient List allManifests = null; @@ -64,6 +74,28 @@ class BaseSnapshot implements Snapshot { Map summary, Integer schemaId, String manifestList) { + this( + sequenceNumber, + snapshotId, + parentId, + timestampMillis, + operation, + summary, + schemaId, + manifestList, + null); + } + + BaseSnapshot( + long sequenceNumber, + long snapshotId, + Long parentId, + long timestampMillis, + String operation, + Map summary, + Integer schemaId, + String manifestList, + String manifestListKeyMetadata) { this.sequenceNumber = sequenceNumber; this.snapshotId = snapshotId; this.parentId = parentId; @@ -73,6 +105,7 @@ class BaseSnapshot implements Snapshot { this.schemaId = schemaId; this.manifestListLocation = manifestList; this.v1ManifestLocations = null; + this.manifestListKeyMetadata = manifestListKeyMetadata; } BaseSnapshot( @@ -93,6 +126,7 @@ class BaseSnapshot implements Snapshot { this.schemaId = schemaId; this.manifestListLocation = null; this.v1ManifestLocations = v1ManifestLocations; + this.manifestListKeyMetadata = null; } @Override @@ -130,7 +164,16 @@ public Integer schemaId() { return schemaId; } + @Override + public String manifestKeyMetadata() { + return manifestListKeyMetadata; + } + private void cacheManifests(FileIO fileIO) { + cacheManifests(fileIO, null); // TODO remove + } + + private void cacheManifests(FileIO fileIO, EncryptionManager encryption) { if (fileIO == null) { throw new IllegalArgumentException("Cannot cache changes: FileIO is null"); } @@ -145,7 +188,20 @@ private void cacheManifests(FileIO fileIO) { if (allManifests == null) { // if manifests isn't set, then the snapshotFile is set and should be read to get the list - this.allManifests = ManifestLists.read(fileIO.newInputFile(manifestListLocation)); + + InputFile manifestListFile = fileIO.newInputFile(manifestListLocation); + + if (manifestListKeyMetadata != null) { + ByteBuffer keyMetadataBytes = + ByteBuffer.wrap(Base64.getDecoder().decode(manifestListKeyMetadata)); + EncryptionKeyMetadata keyMetadata = EncryptionUtil.parseKeyMetadata(keyMetadataBytes); + + EncryptedInputFile encryptedInputFile = + EncryptedFiles.encryptedInput(manifestListFile, keyMetadata); + manifestListFile = encryption.decrypt(encryptedInputFile); + } + + this.allManifests = ManifestLists.read(manifestListFile); } if (dataManifests == null || deleteManifests == null) { @@ -168,6 +224,14 @@ public List allManifests(FileIO fileIO) { return allManifests; } + @Override + public List allManifests(FileIO fileIO, EncryptionManager encryption) { + if (allManifests == null) { + cacheManifests(fileIO, encryption); + } + return allManifests; + } + @Override public List dataManifests(FileIO fileIO) { if (dataManifests == null) { @@ -176,6 +240,14 @@ public List dataManifests(FileIO fileIO) { return dataManifests; } + @Override + public List dataManifests(FileIO fileIO, EncryptionManager encryption) { + if (dataManifests == null) { + cacheManifests(fileIO, encryption); + } + return dataManifests; + } + @Override public List deleteManifests(FileIO fileIO) { if (deleteManifests == null) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java index 5c850ec6f9fb..fc7821aeb642 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -77,6 +78,7 @@ MetadataTableType metadataTableType() { protected DataTask task(TableScan scan) { FileIO io = table().io(); + EncryptionManager encryption = table().encryption(); String location = scan.snapshot().manifestListLocation(); Map specs = Maps.newHashMap(table().specs()); @@ -85,7 +87,7 @@ protected DataTask task(TableScan scan) { location != null ? location : table().operations().current().metadataFileLocation()), schema(), scan.schema(), - scan.snapshot().allManifests(io), + scan.snapshot().allManifests(io, encryption), manifest -> { PartitionSpec spec = specs.get(manifest.partitionSpecId()); return ManifestsTable.manifestFileToRow(spec, manifest); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index bc5ef6094695..2815289b54c5 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -48,6 +48,7 @@ private SnapshotParser() {} private static final String MANIFESTS = "manifests"; private static final String MANIFEST_LIST = "manifest-list"; private static final String SCHEMA_ID = "schema-id"; + private static final String MANIFEST_LIST_KEY_METADATA = "manifest-list-key-metadata"; static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException { generator.writeStartObject(); @@ -93,6 +94,10 @@ static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOExceptio generator.writeNumberField(SCHEMA_ID, snapshot.schemaId()); } + if (snapshot.manifestKeyMetadata() != null) { + generator.writeStringField(MANIFEST_LIST_KEY_METADATA, snapshot.manifestKeyMetadata()); + } + generator.writeEndObject(); } @@ -147,6 +152,10 @@ static Snapshot fromJson(JsonNode node) { if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); + + // Manifest list can be encrypted + String manifestListKeyMetadata = JsonUtil.getString(MANIFEST_LIST_KEY_METADATA, node); + return new BaseSnapshot( sequenceNumber, snapshotId, @@ -155,7 +164,8 @@ static Snapshot fromJson(JsonNode node) { operation, summary, schemaId, - manifestList); + manifestList, + manifestListKeyMetadata); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 226388a2b028..0992966db8e3 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -218,10 +218,32 @@ public Snapshot apply() { OutputFile manifestList = manifestListPath(); + EncryptionManager encryptionManager = ops.encryption(); + EncryptedOutputFile encryptedManifestList = encryptionManager.encrypt(manifestList); + ByteBuffer manifestListEncryptionKey = encryptedManifestList.keyMetadata().encryptionKey(); + String manifestListKeyMetadata = null; + if (manifestListEncryptionKey != null) { + Preconditions.checkArgument( + encryptionManager instanceof StandardEncryptionManager, + "Encryption manager for encrypted manifest list files can currently only be an instance of " + + StandardEncryptionManager.class); + StandardEncryptionManager standardEncryptionManager = (StandardEncryptionManager) encryptionManager; + ByteBuffer wrappedEncryptionKey = standardEncryptionManager.wrapKey(manifestListEncryptionKey); + + ByteBuffer manifestListAADPrefix = encryptedManifestList.keyMetadata().aadPrefix(); + manifestListKeyMetadata = + Base64.getEncoder() + .encodeToString( + EncryptionUtil.createKeyMetadata(wrappedEncryptionKey, standardEncryptionManager.tableKeyId(), + manifestListAADPrefix) + .buffer() + .array()); + } + try (ManifestListWriter writer = ManifestLists.write( ops.current().formatVersion(), - manifestList, + encryptedManifestList.encryptingOutputFile(), snapshotId(), parentSnapshotId, sequenceNumber)) { @@ -251,7 +273,8 @@ public Snapshot apply() { operation(), summary(base), base.currentSchemaId(), - manifestList.location()); + manifestList.location(), + manifestListKeyMetadata); } protected abstract Map summary(); diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java index 0d7ec43f6ebc..83e1fdcdb688 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadata.java @@ -36,7 +36,8 @@ class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord { private static final Schema SCHEMA_V1 = new Schema( required(0, "encryption_key", Types.BinaryType.get()), - optional(1, "aad_prefix", Types.BinaryType.get())); + optional(1, "wrapping_key_id", Types.StringType.get()), + optional(2, "aad_prefix", Types.BinaryType.get())); private static final org.apache.avro.Schema AVRO_SCHEMA_V1 = AvroSchemaUtil.convert(SCHEMA_V1, KeyMetadata.class.getCanonicalName()); @@ -48,14 +49,16 @@ class KeyMetadata implements EncryptionKeyMetadata, IndexedRecord { private static final KeyMetadataDecoder KEY_METADATA_DECODER = new KeyMetadataDecoder(V1); private ByteBuffer encryptionKey; + private String wrappingKeyId; private ByteBuffer aadPrefix; private org.apache.avro.Schema avroSchema; /** Used by Avro reflection to instantiate this class * */ KeyMetadata() {} - KeyMetadata(ByteBuffer encryptionKey, ByteBuffer aadPrefix) { + KeyMetadata(ByteBuffer encryptionKey, String wrappingKeyId, ByteBuffer aadPrefix) { this.encryptionKey = encryptionKey; + this.wrappingKeyId = wrappingKeyId; this.aadPrefix = aadPrefix; this.avroSchema = AVRO_SCHEMA_V1; } @@ -68,14 +71,20 @@ static Map supportedAvroSchemaVersions() { return avroSchemaVersions; } - ByteBuffer encryptionKey() { + @Override + public ByteBuffer encryptionKey() { return encryptionKey; } - ByteBuffer aadPrefix() { + @Override + public ByteBuffer aadPrefix() { return aadPrefix; } + String wrappingKeyId() { + return wrappingKeyId; + } + static KeyMetadata parse(ByteBuffer buffer) { try { return KEY_METADATA_DECODER.decode(buffer); @@ -95,7 +104,7 @@ public ByteBuffer buffer() { @Override public EncryptionKeyMetadata copy() { - KeyMetadata metadata = new KeyMetadata(encryptionKey(), aadPrefix()); + KeyMetadata metadata = new KeyMetadata(encryptionKey(), wrappingKeyId(), aadPrefix()); return metadata; } @@ -106,6 +115,9 @@ public void put(int i, Object v) { this.encryptionKey = (ByteBuffer) v; return; case 1: + this.wrappingKeyId = (v == null) ? null : v.toString(); + return; + case 2: this.aadPrefix = (ByteBuffer) v; return; default: @@ -119,6 +131,8 @@ public Object get(int i) { case 0: return encryptionKey; case 1: + return wrappingKeyId; + case 2: return aadPrefix; default: throw new UnsupportedOperationException("Unknown field ordinal: " + i); diff --git a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java index ec78c9c1879b..3f1ac9aefd45 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java +++ b/core/src/main/java/org/apache/iceberg/encryption/StandardEncryptionManager.java @@ -65,7 +65,7 @@ public EncryptedOutputFile encrypt(OutputFile rawOutput) { ByteBuffer aadPrefix = ByteBuffer.allocate(EncryptionProperties.ENCRYPTION_AAD_LENGTH_DEFAULT); workerRNG.nextBytes(aadPrefix.array()); - KeyMetadata encryptionMetadata = new KeyMetadata(fileDek, aadPrefix); + KeyMetadata encryptionMetadata = new KeyMetadata(fileDek, null, aadPrefix); // return new BaseEncryptedOutputFile(rawOutput, encryptionMetadata, rawOutput); @@ -85,6 +85,11 @@ public InputFile decrypt(EncryptedInputFile encrypted) { KeyMetadata keyMetadata = KeyMetadata.parse(encrypted.keyMetadata().buffer()); byte[] fileDek = keyMetadata.encryptionKey().array(); + String wrappingKeyId = keyMetadata.wrappingKeyId(); + if (wrappingKeyId != null) { + fileDek = kmsClient.unwrapKey(keyMetadata.encryptionKey(), wrappingKeyId).array(); + } + byte[] aadPrefix = keyMetadata.aadPrefix().array(); // return null; @@ -102,7 +107,7 @@ public ByteBuffer wrapKey(ByteBuffer secretKey) { return kmsClient.wrapKey(secretKey, tableKeyId); } - public ByteBuffer unwrapKey(ByteBuffer wrappedSecretKey) { - return kmsClient.unwrapKey(wrappedSecretKey, tableKeyId); + public String tableKeyId() { + return tableKeyId; } }