Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
Expand Down Expand Up @@ -82,6 +84,7 @@ public class JdbcCatalog extends BaseMetastoreViewCatalog
"JDBC catalog is initialized without view support. To auto-migrate the database's schema and enable view support, set jdbc.schema-version=V1";

private FileIO io;
private KeyManagementClient keyManagementClient;
private String catalogName = "jdbc";
private String warehouseLocation;
private Object conf;
Expand Down Expand Up @@ -133,6 +136,11 @@ public void initialize(String name, Map<String, String> properties) {
this.io = CatalogUtil.loadFileIO(ioImpl, properties, conf);
}

if (catalogProperties.containsKey(CatalogProperties.ENCRYPTION_KMS_TYPE)
|| catalogProperties.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) {
this.keyManagementClient = EncryptionUtil.createKmsClient(properties);
}

LOG.debug("Connecting to JDBC database {}", uri);
if (null != clientPoolBuilder) {
this.connections = clientPoolBuilder.apply(properties);
Expand All @@ -153,6 +161,7 @@ public void initialize(String name, Map<String, String> properties) {
closeableGroup.addCloseable(metricsReporter());
closeableGroup.addCloseable(connections);
closeableGroup.addCloseable(io);
closeableGroup.addCloseable(keyManagementClient);
closeableGroup.setSuppressCloseFailure(true);
}

Expand Down Expand Up @@ -266,7 +275,13 @@ private void updateSchemaIfRequired() {
@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new JdbcTableOperations(
connections, io, catalogName, tableIdentifier, catalogProperties, schemaVersion);
connections,
io,
catalogName,
tableIdentifier,
catalogProperties,
schemaVersion,
keyManagementClient);
}

@Override
Expand Down
207 changes: 200 additions & 7 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcTableOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,35 @@
import java.sql.SQLTimeoutException;
import java.sql.SQLTransientConnectionException;
import java.sql.SQLWarning;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.LocationProviders;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.encryption.EncryptedKey;
import org.apache.iceberg.encryption.EncryptingFileIO;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.encryption.StandardEncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,20 +66,69 @@ class JdbcTableOperations extends BaseMetastoreTableOperations {
private final JdbcClientPool connections;
private final Map<String, String> catalogProperties;
private final JdbcUtil.SchemaVersion schemaVersion;
private final KeyManagementClient keyManagementClient;

private EncryptionManager encryptionManager;
private EncryptingFileIO encryptingFileIO;
private String tableKeyId;
private int encryptionDekLength;
private List<EncryptedKey> encryptedKeys = List.of();

protected JdbcTableOperations(
JdbcClientPool dbConnPool,
FileIO fileIO,
String catalogName,
TableIdentifier tableIdentifier,
Map<String, String> catalogProperties,
JdbcUtil.SchemaVersion schemaVersion) {
JdbcUtil.SchemaVersion schemaVersion,
KeyManagementClient keyManagementClient) {
this.catalogName = catalogName;
this.tableIdentifier = tableIdentifier;
this.fileIO = fileIO;
this.connections = dbConnPool;
this.catalogProperties = catalogProperties;
this.schemaVersion = schemaVersion;
this.keyManagementClient = keyManagementClient;
}

@Override
public FileIO io() {
if (tableKeyId == null) {
return fileIO;
}

if (encryptingFileIO == null) {
encryptingFileIO = EncryptingFileIO.combine(fileIO, encryption());
}

return encryptingFileIO;
}

@Override
public EncryptionManager encryption() {
if (encryptionManager != null) {
return encryptionManager;
}

if (tableKeyId != null) {
if (keyManagementClient == null) {
throw new RuntimeException(
"Can't create encryption manager, because key management client is not set");
}

Map<String, String> encryptionProperties = Maps.newHashMap();
encryptionProperties.put(TableProperties.ENCRYPTION_TABLE_KEY, tableKeyId);
encryptionProperties.put(
TableProperties.ENCRYPTION_DEK_LENGTH, String.valueOf(encryptionDekLength));

encryptionManager =
EncryptionUtil.createEncryptionManager(
encryptedKeys, encryptionProperties, keyManagementClient);
} else {
return PlaintextEncryptionManager.instance();
}

return encryptionManager;
}

@Override
Expand Down Expand Up @@ -97,12 +163,82 @@ public void doRefresh() {
"Invalid table %s: metadata location is null",
tableIdentifier);
refreshFromMetadataLocation(newMetadataLocation);

// TODO: Store a metadata hash in iceberg_tables and verify it here, like Hive does with HMS,
// to protect against tampering with the unencrypted metadata.json file in untrusted storage.
TableMetadata metadata = current();
if (metadata != null) {
String tableKeyIdFromMetadata =
metadata.properties().get(TableProperties.ENCRYPTION_TABLE_KEY);
if (tableKeyIdFromMetadata != null) {
tableKeyId = tableKeyIdFromMetadata;
encryptionDekLength =
PropertyUtil.propertyAsInt(
metadata.properties(),
TableProperties.ENCRYPTION_DEK_LENGTH,
TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT);

encryptedKeys =
Optional.ofNullable(metadata.encryptionKeys())
.map(Lists::newLinkedList)
.orElseGet(Lists::newLinkedList);

if (encryptionManager != null) {
Set<String> keyIdsFromMetadata =
encryptedKeys.stream().map(EncryptedKey::keyId).collect(Collectors.toSet());

for (EncryptedKey keyFromEM : EncryptionUtil.encryptionKeys(encryptionManager).values()) {
if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) {
encryptedKeys.add(keyFromEM);
}
}
}

// Force re-creation of encryption manager with updated keys
encryptingFileIO = null;
encryptionManager = null;
}
}
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
@Override
public void doCommit(TableMetadata base, TableMetadata metadata) {
boolean newTable = base == null;
String newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata);
encryptionPropsFromMetadata(metadata.properties());

final TableMetadata tableMetadata;
EncryptionManager encrManager = encryption();
if (encrManager instanceof StandardEncryptionManager) {
TableMetadata.Builder builder = TableMetadata.buildFrom(metadata);
for (Map.Entry<String, EncryptedKey> entry :
EncryptionUtil.encryptionKeys(encrManager).entrySet()) {
builder.addEncryptionKey(entry.getValue());
}

tableMetadata = builder.build();
} else {
tableMetadata = metadata;
}

if (base != null) {
Set<String> removedProps =
base.properties().keySet().stream()
.filter(key -> !metadata.properties().containsKey(key))
.collect(Collectors.toSet());

if (removedProps.contains(TableProperties.ENCRYPTION_TABLE_KEY)) {
throw new IllegalArgumentException("Cannot remove key in encrypted table");
}

if (!Objects.equals(
base.properties().get(TableProperties.ENCRYPTION_TABLE_KEY),
metadata.properties().get(TableProperties.ENCRYPTION_TABLE_KEY))) {
throw new IllegalArgumentException("Cannot modify key in encrypted table");
}
}

String newMetadataLocation = writeNewMetadataIfRequired(newTable, tableMetadata);
try {
Map<String, String> table =
JdbcUtil.loadTable(schemaVersion, connections, catalogName, tableIdentifier);
Expand Down Expand Up @@ -143,6 +279,68 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
}
}

@Override
public TableOperations temp(TableMetadata uncommittedMetadata) {
return new TableOperations() {
@Override
public TableMetadata current() {
return uncommittedMetadata;
}

@Override
public TableMetadata refresh() {
throw new UnsupportedOperationException(
"Cannot call refresh on temporary table operations");
}

@Override
public void commit(TableMetadata base, TableMetadata metadata) {
throw new UnsupportedOperationException("Cannot call commit on temporary table operations");
}

@Override
public String metadataFileLocation(String fileName) {
return JdbcTableOperations.this.metadataFileLocation(uncommittedMetadata, fileName);
}

@Override
public LocationProvider locationProvider() {
return LocationProviders.locationsFor(
uncommittedMetadata.location(), uncommittedMetadata.properties());
}

@Override
public FileIO io() {
JdbcTableOperations.this.encryptionPropsFromMetadata(uncommittedMetadata.properties());
return JdbcTableOperations.this.io();
}

@Override
public EncryptionManager encryption() {
return JdbcTableOperations.this.encryption();
}

@Override
public long newSnapshotId() {
return JdbcTableOperations.this.newSnapshotId();
}
};
}

private void encryptionPropsFromMetadata(Map<String, String> tableProperties) {
if (tableKeyId == null) {
tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY);
}

if (tableKeyId != null && encryptionDekLength <= 0) {
encryptionDekLength =
PropertyUtil.propertyAsInt(
tableProperties,
TableProperties.ENCRYPTION_DEK_LENGTH,
TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT);
}
}

private void updateTable(String newMetadataLocation, String oldMetadataLocation)
throws SQLException, InterruptedException {
int updatedRecords =
Expand Down Expand Up @@ -208,11 +406,6 @@ private void validateMetadataLocation(Map<String, String> table, TableMetadata b
}
}

@Override
public FileIO io() {
return fileIO;
}

@Override
protected String tableName() {
return tableIdentifier.toString();
Expand Down
Loading