diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index a3f40512a0a4..64fe99e56ab0 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -47,6 +47,8 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; @@ -82,6 +84,7 @@ public class JdbcCatalog extends BaseMetastoreViewCatalog "JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1"; private FileIO io; + private KeyManagementClient keyManagementClient; private String catalogName = "jdbc"; private String warehouseLocation; private Object conf; @@ -133,6 +136,11 @@ public void initialize(String name, Map properties) { this.io = CatalogUtil.loadFileIO(ioImpl, properties, conf); } + if (catalogProperties.containsKey(CatalogProperties.ENCRYPTION_KMS_TYPE) + || catalogProperties.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) { + this.keyManagementClient = EncryptionUtil.createKmsClient(properties); + } + LOG.debug("Connecting to JDBC database {}", uri); if (null != clientPoolBuilder) { this.connections = clientPoolBuilder.apply(properties); @@ -153,6 +161,7 @@ public void initialize(String name, Map properties) { closeableGroup.addCloseable(metricsReporter()); closeableGroup.addCloseable(connections); closeableGroup.addCloseable(io); + closeableGroup.addCloseable(keyManagementClient); closeableGroup.setSuppressCloseFailure(true); } @@ -266,7 +275,13 @@ private void updateSchemaIfRequired() { @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { return new JdbcTableOperations( - connections, io, catalogName, tableIdentifier, catalogProperties, schemaVersion); + connections, + io, + catalogName, + tableIdentifier, + catalogProperties, + schemaVersion, + keyManagementClient); } @Override diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java index 079faf1c5504..797e4bc4889e 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java @@ -24,18 +24,35 @@ import java.sql.SQLTimeoutException; import java.sql.SQLTransientConnectionException; import java.sql.SQLWarning; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.LocationProviders; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptedKey; +import org.apache.iceberg.encryption.EncryptingFileIO; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +66,13 @@ class JdbcTableOperations extends BaseMetastoreTableOperations { private final JdbcClientPool connections; private final Map catalogProperties; private final JdbcUtil.SchemaVersion schemaVersion; + private final KeyManagementClient keyManagementClient; + + private EncryptionManager encryptionManager; + private EncryptingFileIO encryptingFileIO; + private String tableKeyId; + private int encryptionDekLength; + private List encryptedKeys = List.of(); protected JdbcTableOperations( JdbcClientPool dbConnPool, @@ -56,13 +80,55 @@ protected JdbcTableOperations( String catalogName, TableIdentifier tableIdentifier, Map catalogProperties, - JdbcUtil.SchemaVersion schemaVersion) { + JdbcUtil.SchemaVersion schemaVersion, + KeyManagementClient keyManagementClient) { this.catalogName = catalogName; this.tableIdentifier = tableIdentifier; this.fileIO = fileIO; this.connections = dbConnPool; this.catalogProperties = catalogProperties; this.schemaVersion = schemaVersion; + this.keyManagementClient = keyManagementClient; + } + + @Override + public FileIO io() { + if (tableKeyId == null) { + return fileIO; + } + + if (encryptingFileIO == null) { + encryptingFileIO = EncryptingFileIO.combine(fileIO, encryption()); + } + + return encryptingFileIO; + } + + @Override + public EncryptionManager encryption() { + if (encryptionManager != null) { + return encryptionManager; + } + + if (tableKeyId != null) { + if (keyManagementClient == null) { + throw new RuntimeException( + "Can't create encryption manager, because key management client is not set"); + } + + Map encryptionProperties = Maps.newHashMap(); + encryptionProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, tableKeyId); + encryptionProperties.put( + TableProperties.ENCRYPTION_DEK_LENGTH, String.valueOf(encryptionDekLength)); + + encryptionManager = + EncryptionUtil.createEncryptionManager( + encryptedKeys, encryptionProperties, keyManagementClient); + } else { + return PlaintextEncryptionManager.instance(); + } + + return encryptionManager; } @Override @@ -97,12 +163,82 @@ public void doRefresh() { "Invalid table %s: metadata location is null", tableIdentifier); refreshFromMetadataLocation(newMetadataLocation); + + // TODO: Store a metadata hash in iceberg_tables and verify it here, like Hive does with HMS, + // to protect against tampering with the unencrypted metadata.json file in untrusted storage. + TableMetadata metadata = current(); + if (metadata != null) { + String tableKeyIdFromMetadata = + metadata.properties().get(TableProperties.ENCRYPTION_TABLE_KEY); + if (tableKeyIdFromMetadata != null) { + tableKeyId = tableKeyIdFromMetadata; + encryptionDekLength = + PropertyUtil.propertyAsInt( + metadata.properties(), + TableProperties.ENCRYPTION_DEK_LENGTH, + TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + + encryptedKeys = + Optional.ofNullable(metadata.encryptionKeys()) + .map(Lists::newLinkedList) + .orElseGet(Lists::newLinkedList); + + if (encryptionManager != null) { + Set keyIdsFromMetadata = + encryptedKeys.stream().map(EncryptedKey::keyId).collect(Collectors.toSet()); + + for (EncryptedKey keyFromEM : EncryptionUtil.encryptionKeys(encryptionManager).values()) { + if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) { + encryptedKeys.add(keyFromEM); + } + } + } + + // Force re-creation of encryption manager with updated keys + encryptingFileIO = null; + encryptionManager = null; + } + } } + @SuppressWarnings("checkstyle:CyclomaticComplexity") @Override public void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; - String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); + encryptionPropsFromMetadata(metadata.properties()); + + final TableMetadata tableMetadata; + EncryptionManager encrManager = encryption(); + if (encrManager instanceof StandardEncryptionManager) { + TableMetadata.Builder builder = TableMetadata.buildFrom(metadata); + for (Map.Entry entry : + EncryptionUtil.encryptionKeys(encrManager).entrySet()) { + builder.addEncryptionKey(entry.getValue()); + } + + tableMetadata = builder.build(); + } else { + tableMetadata = metadata; + } + + if (base != null) { + Set removedProps = + base.properties().keySet().stream() + .filter(key -> !metadata.properties().containsKey(key)) + .collect(Collectors.toSet()); + + if (removedProps.contains(TableProperties.ENCRYPTION_TABLE_KEY)) { + throw new IllegalArgumentException("Cannot remove key in encrypted table"); + } + + if (!Objects.equals( + base.properties().get(TableProperties.ENCRYPTION_TABLE_KEY), + metadata.properties().get(TableProperties.ENCRYPTION_TABLE_KEY))) { + throw new IllegalArgumentException("Cannot modify key in encrypted table"); + } + } + + String newMetadataLocation = writeNewMetadataIfRequired(newTable, tableMetadata); try { Map table = JdbcUtil.loadTable(schemaVersion, connections, catalogName, tableIdentifier); @@ -143,6 +279,68 @@ public void doCommit(TableMetadata base, TableMetadata metadata) { } } + @Override + public TableOperations temp(TableMetadata uncommittedMetadata) { + return new TableOperations() { + @Override + public TableMetadata current() { + return uncommittedMetadata; + } + + @Override + public TableMetadata refresh() { + throw new UnsupportedOperationException( + "Cannot call refresh on temporary table operations"); + } + + @Override + public void commit(TableMetadata base, TableMetadata metadata) { + throw new UnsupportedOperationException("Cannot call commit on temporary table operations"); + } + + @Override + public String metadataFileLocation(String fileName) { + return JdbcTableOperations.this.metadataFileLocation(uncommittedMetadata, fileName); + } + + @Override + public LocationProvider locationProvider() { + return LocationProviders.locationsFor( + uncommittedMetadata.location(), uncommittedMetadata.properties()); + } + + @Override + public FileIO io() { + JdbcTableOperations.this.encryptionPropsFromMetadata(uncommittedMetadata.properties()); + return JdbcTableOperations.this.io(); + } + + @Override + public EncryptionManager encryption() { + return JdbcTableOperations.this.encryption(); + } + + @Override + public long newSnapshotId() { + return JdbcTableOperations.this.newSnapshotId(); + } + }; + } + + private void encryptionPropsFromMetadata(Map tableProperties) { + if (tableKeyId == null) { + tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); + } + + if (tableKeyId != null && encryptionDekLength <= 0) { + encryptionDekLength = + PropertyUtil.propertyAsInt( + tableProperties, + TableProperties.ENCRYPTION_DEK_LENGTH, + TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + } + } + private void updateTable(String newMetadataLocation, String oldMetadataLocation) throws SQLException, InterruptedException { int updatedRecords = @@ -208,11 +406,6 @@ private void validateMetadataLocation(Map table, TableMetadata b } } - @Override - public FileIO io() { - return fileIO; - } - @Override protected String tableName() { return tableIdentifier.toString(); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index cda71fccda3a..7b0ac8ac9664 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -51,6 +51,8 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -170,6 +172,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private CloseableGroup closeables = null; private Set endpoints; private Supplier> mutationHeaders = Map::of; + private KeyManagementClient keyManagementClient = null; private String namespaceSeparator = null; private RESTTableCache tableCache; @@ -275,6 +278,13 @@ public void initialize(String name, Map unresolved) { mergedProps, RESTCatalogProperties.METRICS_REPORTING_ENABLED, RESTCatalogProperties.METRICS_REPORTING_ENABLED_DEFAULT); + + if (mergedProps.containsKey(CatalogProperties.ENCRYPTION_KMS_TYPE) + || mergedProps.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) { + this.keyManagementClient = EncryptionUtil.createKmsClient(mergedProps); + this.closeables.addCloseable(this.keyManagementClient); + } + this.namespaceSeparator = PropertyUtil.propertyAsString( mergedProps, @@ -579,6 +589,7 @@ private Supplier createTableSupplier( Map::of, mutationHeaders, tableFileIO(context, tableConf, credentials), + keyManagementClient, tableMetadata, endpoints); @@ -678,6 +689,7 @@ public Table registerTable( Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), + keyManagementClient, response.tableMetadata(), endpoints); @@ -947,6 +959,7 @@ public Table create() { Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), + keyManagementClient, response.tableMetadata(), endpoints); @@ -980,6 +993,7 @@ public Transaction createTransaction() { Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), + keyManagementClient, RESTTableOperations.UpdateType.CREATE, createChanges(meta), meta, @@ -1045,6 +1059,7 @@ public Transaction replaceTransaction() { Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), + keyManagementClient, RESTTableOperations.UpdateType.REPLACE, changes.build(), base, @@ -1185,6 +1200,7 @@ private FileIO tableFileIO( * @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation * requests (POST/DELETE) * @param fileIO the FileIO implementation for reading and writing table metadata and data files + * @param kmsClient the {@link KeyManagementClient} for encrypted tables * @param current the current table metadata * @param supportedEndpoints the set of supported REST endpoints * @return a new RESTTableOperations instance @@ -1195,10 +1211,18 @@ protected RESTTableOperations newTableOps( Supplier> readHeaders, Supplier> mutationHeaderSupplier, FileIO fileIO, + KeyManagementClient kmsClient, TableMetadata current, Set supportedEndpoints) { return new RESTTableOperations( - restClient, path, readHeaders, mutationHeaderSupplier, fileIO, current, supportedEndpoints); + restClient, + path, + readHeaders, + mutationHeaderSupplier, + fileIO, + kmsClient, + current, + supportedEndpoints); } /** @@ -1215,6 +1239,7 @@ protected RESTTableOperations newTableOps( * @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation * requests (POST/DELETE) * @param fileIO the FileIO implementation for reading and writing table metadata and data files + * @param kmsClient the {@link KeyManagementClient} for encrypted tables * @param updateType the {@link RESTTableOperations.UpdateType} being performed * @param createChanges the list of metadata updates to apply during table creation or replacement * @param current the current table metadata (may be null for CREATE operations) @@ -1227,6 +1252,7 @@ protected RESTTableOperations newTableOps( Supplier> readHeaders, Supplier> mutationHeaderSupplier, FileIO fileIO, + KeyManagementClient kmsClient, RESTTableOperations.UpdateType updateType, List createChanges, TableMetadata current, @@ -1237,6 +1263,7 @@ protected RESTTableOperations newTableOps( readHeaders, mutationHeaderSupplier, fileIO, + kmsClient, updateType, createChanges, current, diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index be763d30fef1..30ac08bc6235 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -21,9 +21,12 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.LocationProviders; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.SnapshotRef; @@ -32,17 +35,25 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.UpdateRequirements; +import org.apache.iceberg.encryption.EncryptedKey; +import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; class RESTTableOperations implements TableOperations { private static final String METADATA_FOLDER_NAME = "metadata"; @@ -58,17 +69,26 @@ enum UpdateType { private final Supplier> readHeaders; private final Supplier> mutationHeaders; private final FileIO io; + private final KeyManagementClient keyManagementClient; private final List createChanges; private final TableMetadata replaceBase; private final Set endpoints; private UpdateType updateType; private TableMetadata current; + private EncryptionManager encryptionManager; + private EncryptingFileIO encryptingFileIO; + private String tableKeyId; + private int encryptionDekLength; + + private List encryptedKeys = List.of(); + RESTTableOperations( RESTClient client, String path, Supplier> headers, FileIO io, + KeyManagementClient keyManagementClient, TableMetadata current, Set endpoints) { this( @@ -77,6 +97,7 @@ enum UpdateType { headers, headers, io, + keyManagementClient, UpdateType.SIMPLE, Lists.newArrayList(), current, @@ -88,11 +109,22 @@ enum UpdateType { String path, Supplier> headers, FileIO io, + KeyManagementClient keyManagementClient, UpdateType updateType, List createChanges, TableMetadata current, Set endpoints) { - this(client, path, headers, headers, io, updateType, createChanges, current, endpoints); + this( + client, + path, + headers, + headers, + io, + keyManagementClient, + updateType, + createChanges, + current, + endpoints); } RESTTableOperations( @@ -101,6 +133,7 @@ enum UpdateType { Supplier> readHeaders, Supplier> mutationHeaders, FileIO io, + KeyManagementClient keyManagementClient, TableMetadata current, Set endpoints) { this( @@ -109,6 +142,7 @@ enum UpdateType { readHeaders, mutationHeaders, io, + keyManagementClient, UpdateType.SIMPLE, Lists.newArrayList(), current, @@ -121,6 +155,7 @@ enum UpdateType { Supplier> readHeaders, Supplier> mutationHeaders, FileIO io, + KeyManagementClient keyManagementClient, UpdateType updateType, List createChanges, TableMetadata current, @@ -130,6 +165,7 @@ enum UpdateType { this.readHeaders = readHeaders; this.mutationHeaders = mutationHeaders; this.io = io; + this.keyManagementClient = keyManagementClient; this.updateType = updateType; this.createChanges = createChanges; this.replaceBase = current; @@ -139,6 +175,10 @@ enum UpdateType { this.current = current; } this.endpoints = endpoints; + + // N.B. We don't use this.current due to it being null for the CREATE update type; we still + // want encryption configured for this case. + encryptionPropsFromMetadata(current); } @Override @@ -156,6 +196,19 @@ public TableMetadata refresh() { @Override public void commit(TableMetadata base, TableMetadata metadata) { Endpoint.check(endpoints, Endpoint.V1_UPDATE_TABLE); + + EncryptionManager encryption = encryption(); + if (encryption instanceof StandardEncryptionManager) { + // Add encryption keys to the to-be-committed metadata + TableMetadata.Builder builder = TableMetadata.buildFrom(metadata); + EncryptionUtil.encryptionKeys(encryption).values().forEach(builder::addEncryptionKey); + commitInternal(base, builder.build()); + } else { + commitInternal(base, metadata); + } + } + + private void commitInternal(TableMetadata base, TableMetadata metadata) { Consumer errorHandler; List requirements; List updates; @@ -196,6 +249,19 @@ public void commit(TableMetadata base, TableMetadata metadata) { String.format("Update type %s is not supported", updateType)); } + if (base != null) { + boolean encryptionKeyRemoved = + base.properties().containsKey(TableProperties.ENCRYPTION_TABLE_KEY) + && !metadata.properties().containsKey(TableProperties.ENCRYPTION_TABLE_KEY); + Preconditions.checkArgument(!encryptionKeyRemoved, "Cannot remove key in encrypted table"); + + boolean encryptionKeyUnchanged = + Objects.equals( + base.properties().get(TableProperties.ENCRYPTION_TABLE_KEY), + metadata.properties().get(TableProperties.ENCRYPTION_TABLE_KEY)); + Preconditions.checkArgument(encryptionKeyUnchanged, "Cannot modify key in encrypted table"); + } + UpdateTableRequest request = new UpdateTableRequest(requirements, updates); // the error handler will throw necessary exceptions like CommitFailedException and @@ -245,7 +311,44 @@ private boolean reconcileOnSimpleUpdate( @Override public FileIO io() { - return io; + if (tableKeyId == null) { + return io; + } + + if (encryptingFileIO == null) { + encryptingFileIO = EncryptingFileIO.combine(io, encryption()); + } + + return encryptingFileIO; + } + + @Override + public EncryptionManager encryption() { + if (encryptionManager != null) { + return encryptionManager; + } + + if (tableKeyId != null) { + Preconditions.checkArgument( + keyManagementClient != null, + "Cannot create encryption manager without a key management client. Consider setting the '%s' catalog property", + CatalogProperties.ENCRYPTION_KMS_IMPL); + + Map encryptionProperties = + ImmutableMap.of( + TableProperties.ENCRYPTION_TABLE_KEY, + tableKeyId, + TableProperties.ENCRYPTION_DEK_LENGTH, + String.valueOf(encryptionDekLength)); + + encryptionManager = + EncryptionUtil.createEncryptionManager( + encryptedKeys, encryptionProperties, keyManagementClient); + } else { + return PlaintextEncryptionManager.instance(); + } + + return encryptionManager; } private static Long expectedSnapshotIdIfSnapshotAddOnly(List updates) { @@ -285,6 +388,45 @@ private static Long expectedSnapshotIdIfSnapshotAddOnly(List upd return addedSnapshotId; } + private void encryptionPropsFromMetadata(TableMetadata metadata) { + if (metadata == null || metadata.properties() == null) { + return; + } + + // Refresh encryption-related properties and keys on new/refreshed metadata + Map tableProperties = metadata.properties(); + tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); + + if (tableKeyId != null) { + encryptionDekLength = + PropertyUtil.propertyAsInt( + tableProperties, + TableProperties.ENCRYPTION_DEK_LENGTH, + TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + + encryptedKeys = + Optional.ofNullable(metadata.encryptionKeys()) + .map(Lists::newLinkedList) + .orElseGet(Lists::newLinkedList); + + // Include pending encryption keys from the encryption manager + if (encryptionManager != null) { + Set keyIdsFromMetadata = + encryptedKeys.stream().map(EncryptedKey::keyId).collect(Collectors.toSet()); + + for (EncryptedKey keyFromEM : EncryptionUtil.encryptionKeys(encryptionManager).values()) { + if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) { + encryptedKeys.add(keyFromEM); + } + } + } + } + + // Force re-creation of encryption manager + encryptingFileIO = null; + encryptionManager = null; + } + private TableMetadata updateCurrentMetadata(LoadTableResponse response) { // LoadTableResponse is used to deserialize the response, but config is not allowed by the REST // spec so it can be @@ -292,6 +434,7 @@ private TableMetadata updateCurrentMetadata(LoadTableResponse response) { if (current == null || !Objects.equals(current.metadataFileLocation(), response.metadataLocation())) { this.current = checkUUID(current, response.tableMetadata()); + encryptionPropsFromMetadata(current); } return current; diff --git a/core/src/test/java/org/apache/iceberg/rest/TestFreshnessAwareLoading.java b/core/src/test/java/org/apache/iceberg/rest/TestFreshnessAwareLoading.java index 80981df1fcb3..3013fbc4e126 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestFreshnessAwareLoading.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestFreshnessAwareLoading.java @@ -48,6 +48,7 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.KeyManagementClient; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.io.FileIO; @@ -743,9 +744,10 @@ class CustomTableOps extends RESTTableOperations { Supplier> readHeaders, Supplier> mutationHeaders, FileIO io, + KeyManagementClient kmsClient, TableMetadata current, Set endpoints) { - super(client, path, readHeaders, mutationHeaders, io, current, endpoints); + super(client, path, readHeaders, mutationHeaders, io, kmsClient, current, endpoints); } } @@ -763,10 +765,18 @@ protected RESTTableOperations newTableOps( Supplier> readHeaders, Supplier> mutationHeaders, FileIO fileIO, + KeyManagementClient kmsClient, TableMetadata current, Set supportedEndpoints) { return new CustomTableOps( - restClient, path, readHeaders, mutationHeaders, fileIO, current, supportedEndpoints); + restClient, + path, + readHeaders, + mutationHeaders, + fileIO, + kmsClient, + current, + supportedEndpoints); } } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 56ab8f97f84a..2d97064664bd 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -76,6 +76,7 @@ import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.KeyManagementClient; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; @@ -3102,6 +3103,7 @@ public void testCommitStateUnknownNotReconciled() { } @Test + @SuppressWarnings("MethodLength") public void testCustomTableOperationsInjection() throws IOException { AtomicBoolean customTableOpsCalled = new AtomicBoolean(); AtomicBoolean customTransactionTableOpsCalled = new AtomicBoolean(); @@ -3117,9 +3119,17 @@ class CustomRESTTableOperations extends RESTTableOperations { String path, Supplier> headers, FileIO fileIO, + KeyManagementClient keyManagementClient, TableMetadata current, Set supportedEndpoints) { - super(client, path, () -> customHeaders, fileIO, current, supportedEndpoints); + super( + client, + path, + () -> customHeaders, + fileIO, + keyManagementClient, + current, + supportedEndpoints); customTableOpsCalled.set(true); } @@ -3128,6 +3138,7 @@ class CustomRESTTableOperations extends RESTTableOperations { String path, Supplier> headers, FileIO fileIO, + KeyManagementClient keyManagementClient, RESTTableOperations.UpdateType updateType, List createChanges, TableMetadata current, @@ -3137,6 +3148,7 @@ class CustomRESTTableOperations extends RESTTableOperations { path, () -> customHeaders, fileIO, + keyManagementClient, updateType, createChanges, current, @@ -3160,11 +3172,12 @@ protected RESTTableOperations newTableOps( Supplier> readHeaders, Supplier> mutationHeaders, FileIO fileIO, + KeyManagementClient kmsClient, TableMetadata current, Set supportedEndpoints) { RESTTableOperations ops = new CustomRESTTableOperations( - restClient, path, mutationHeaders, fileIO, current, supportedEndpoints); + restClient, path, mutationHeaders, fileIO, kmsClient, current, supportedEndpoints); RESTTableOperations spy = Mockito.spy(ops); capturedOps.set(spy); return spy; @@ -3177,6 +3190,7 @@ protected RESTTableOperations newTableOps( Supplier> readHeaders, Supplier> mutationHeaders, FileIO fileIO, + KeyManagementClient kmsClient, RESTTableOperations.UpdateType updateType, List createChanges, TableMetadata current, @@ -3187,6 +3201,7 @@ protected RESTTableOperations newTableOps( path, mutationHeaders, fileIO, + kmsClient, updateType, createChanges, current, diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanningWithEncryption.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanningWithEncryption.java new file mode 100644 index 000000000000..3f63c4396240 --- /dev/null +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanningWithEncryption.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.encryption.UnitestKMS; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.RESTCatalogProperties; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.sql.TestTableEncryption; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestRemoteScanPlanningWithEncryption extends TestTableEncryption { + + private static Map encryptionProperties(Map props) { + Map newProps = Maps.newHashMap(); + newProps.putAll(props); + // TODO: This property is required for encrypted tables, but feels odd when scan planning is + // enabled as the client then has no KMS interaction. + newProps.put(CatalogProperties.ENCRYPTION_KMS_IMPL, UnitestKMS.class.getCanonicalName()); + return newProps; + } + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + protected static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + encryptionProperties( + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + // this flag is typically only set by the server, but we set it from the client + // for testing + .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .build()) + } + }; + } +} diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java index 7df9c75fb3dd..0c59a6a4fedc 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java @@ -69,7 +69,9 @@ public abstract class TestBaseWithCatalog extends TestBase { // status even belonging to the same catalog. Reference: // https://www.sqlite.org/inmemorydb.html CatalogProperties.CLIENT_POOL_SIZE, - "1")); + "1", + CatalogProperties.ENCRYPTION_KMS_IMPL, + "org.apache.iceberg.encryption.UnitestKMS")); protected static RESTCatalog restCatalog; diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java index 3dee6e1e1d54..aebc4a4ca416 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java @@ -32,6 +32,7 @@ import org.apache.iceberg.encryption.UnitestKMS; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.CatalogTestBase; @@ -57,6 +58,15 @@ protected static Object[][] parameters() { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties()) + }, + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + appendCatalogEncryptionProperties( + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + .build()) } }; } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java index a38506d621f9..a7d3ae04631e 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java @@ -22,6 +22,7 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -50,8 +51,10 @@ import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.rest.RESTCatalog; import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.types.Types; @@ -76,6 +79,15 @@ protected static Object[][] parameters() { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties()) + }, + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + appendCatalogEncryptionProperties( + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + .build()) } }; } @@ -161,7 +173,6 @@ public void testConcurrentAppendTransactions() { assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 2); } - // See CatalogTests#testConcurrentReplaceTransactions @TestTemplate public void testConcurrentReplaceTransactions() { validationCatalog.initialize(catalogName, catalogConfig); @@ -170,7 +181,7 @@ public void testConcurrentReplaceTransactions() { DataFile file = currentDataFiles(table).get(0); Schema schema = table.schema(); - // Write data for a replace transaction that will be committed later + // Begin a replace transaction that will be committed second Transaction secondReplace = validationCatalog .buildTable(tableIdent, schema) @@ -184,12 +195,15 @@ public void testConcurrentReplaceTransactions() { .buildTable(tableIdent, schema) .withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1) .replaceTransaction(); - firstReplace.newFastAppend().appendFile(file).commit(); firstReplace.commitTransaction(); + // This second replace transaction fails but then retries after refreshing latest metadata. secondReplace.commitTransaction(); Table afterSecondReplace = validationCatalog.loadTable(tableIdent); + + // This tests that encryption keys are maintained on refreshing different metadata - if + // they are not, the table will be unreadable and this will fail. assertThat(currentDataFiles(afterSecondReplace)).hasSize(1); } @@ -223,10 +237,14 @@ public void testInsertAndDelete() { @TestTemplate public void testMetadataTamperproofing() throws IOException { + assumeThat(validationCatalog) + .as("RESTCatalog does not store metadata file hashes") + .isNotInstanceOf(RESTCatalog.class); + ChecksumFileSystem fs = ((ChecksumFileSystem) FileSystem.newInstance(new Configuration())); - catalog.initialize(catalogName, catalogConfig); + validationCatalog.initialize(catalogName, catalogConfig); - Table table = catalog.loadTable(tableIdent); + Table table = validationCatalog.loadTable(tableIdent); TableMetadata currentMetadata = ((HasTableOperations) table).operations().current(); Path metadataFile = new Path(currentMetadata.metadataFileLocation()); Path previousMetadataFile = new Path(Iterables.firstOf(currentMetadata.previousFiles()).file()); @@ -237,7 +255,7 @@ public void testMetadataTamperproofing() throws IOException { fs.delete(metadataFile, false); fs.rename(previousMetadataFile, metadataFile); - assertThatThrownBy(() -> catalog.loadTable(tableIdent)) + assertThatThrownBy(() -> validationCatalog.loadTable(tableIdent)) .hasMessageContaining( String.format( "The current metadata file %s might have been modified. Hash of metadata loaded from storage differs from HMS-stored metadata hash.",