Add support for Iceberg table encryption#28354
Conversation
Integrate Iceberg encryption handling into read and write paths and table operations across Iceberg catalogs. Add test coverage for encrypted table behavior. Signed-off-by: kamijin_fanta <kamijin@live.jp>
| throws Exception | ||
| { | ||
| return IcebergQueryRunner.builder() | ||
| .addIcebergProperty("iceberg.encryption.kms-impl", TestingKmsClient.class.getName()) |
There was a problem hiding this comment.
Can we use LocalStack instead? We should also test with real KMS such as AWS Key Management Service.
There was a problem hiding this comment.
I initially thought we couldn’t override the endpoint. However, after re-reading the code, I realized we can implement our own AwsClientFactory and point to it via the catalog property client.factory. That would let us call endpointOverride(...) in the client builder.
I’ll validate whether we can run the tests against LocalStack with this approach. Thanks!
| @Config("iceberg.encryption.kms-impl") | ||
| @ConfigDescription("KMS implementation class for Iceberg table encryption") |
There was a problem hiding this comment.
We shouldn't expose Java class name to users. Please introduce a new enum and map to class name internally.
Also, don't forget updating iceberg.md.
There was a problem hiding this comment.
According to the Iceberg documentation, both encryption.kms-type and encryption.kms-impl appear to be public configuration options. However, in Iceberg 1.10.1 and also in the latest main branch, encryption.kms-type doesn’t seem to be referenced/used in the code, and users effectively must set encryption.kms-impl all the time.
Also, I think we should keep some extensibility here because there are users who use KMS providers outside of major clouds like AWS/GCP (including myself). For consistency with other configuration surfaces (e.g., catalog configs) I’m currently leaning toward accepting the Java class name for now, and then later switching to an enum-based selection once Iceberg formalizes/supports kms-type (or an equivalent stable interface). Ideally we can evolve this to an internal enum → class mapping when Iceberg’s interface becomes clearer.
What do you think, @ebyhr? Another reasonable option is to wait to merge until Iceberg finalizes this interface, but I’m trying to balance that with keeping room for non-cloud KMS implementations.
There was a problem hiding this comment.
encryption.kms-type doesn’t seem to be referenced/used in the code
Did you confirm apache/iceberg#15272?. We don't need to wait for 1.11.0 for using KMS type anyway. We can introduce iceberg.encryption.kms-type and internally set encryption.kms-impl.
I’m currently leaning toward accepting the Java class name for now, and then later switching to an enum-based selection once Iceberg formalizes/supports kms-type (or an equivalent stable interface).
The expected order is different. We should begin with the strictest option, such as an enum-based approach, and only allow arbitrary options if that proves too restrictive. Note that we generally avoid such generic options in this project.
There was a problem hiding this comment.
Thanks. I read your Iceberg PR.
One question to confirm: are you saying it’s acceptable to implement this logic on the Trino side without waiting for the next Iceberg release? For example, we could expose only the iceberg.encryption.kms-type property in Trino, and then internally set Iceberg’s encryption.kms-impl to the appropriate implementation class based on that value.
Also, my understanding is that KMS APIs don’t have an industry-standard interface like the S3 API for object storage. In particular, users who enable encryption often build their own KMS around on-prem HSMs or similar systems. Given that context, I assume Iceberg exposes kms-impl to allow users to plug in their own implementation. That said, I understand the point that Trino generally shouldn’t expect users to configure a Java class name directly, or, put differently, to distribute arbitrary jars/classes via configuration.
In that case, is there a good way to leave room for users outside cloud environments to customize KMS behavior without forking Trino? And is supporting that kind of extensibility something you’d consider in scope?
| requireNonNull(schemaTableName, "schemaTableName is null"); | ||
| requireNonNull(tableSchemaJson, "tableSchemaJson is null"); | ||
| columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null")); | ||
| storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null")); |
There was a problem hiding this comment.
We don't use requireNonNull with ImmutableMap.copyOf.
| if (catalogKmsClient != null) { | ||
| return catalogKmsClient; | ||
| } | ||
| catalogKmsClient = EncryptionUtil.createKmsClient(properties); |
There was a problem hiding this comment.
I don't think calling EncryptionUtil.createKmsClient with table properties will work against a real KMS. As I understand it, it requires credentials provided via catalog properties.
This illustrates why we don't allow such generic implementations. It becomes very easy to ship a broken implementation.
There was a problem hiding this comment.
I’ve removed iceberg.encryption.kms-impl for now and added iceberg.encryption.kms-type and iceberg.encryption.kms-properties.
For the KMS client configuration, table properties are not used. Instead, iceberg.encryption.kms-properties is used.
That said, my understanding is that Trino generally has very few places where we allow passing through an arbitrary set of options like this. So I’m considering listing the required properties (for example, the AWS region) and defining explicit, typed fields for them instead.
| try { | ||
| return fileIo.properties(); | ||
| } | ||
| catch (UnsupportedOperationException e) { |
There was a problem hiding this comment.
We will be able to remove this catch once apache/iceberg#15289 is released.
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
|
|
||
| public class TestIcebergEncryptionManagerFactory |
There was a problem hiding this comment.
Please follow https://trino.io/docs/current/develop/tests.html
| @Inject | ||
| public IcebergEncryptionManagerFactory(IcebergConfig config) | ||
| { | ||
| requireNonNull(config, "config is null"); |
There was a problem hiding this comment.
We don't use requireNonNull for config classes. #13940
Signed-off-by: kamijin_fanta <kamijin@live.jp>
Signed-off-by: kamijin_fanta <kamijin@live.jp>
|
Thanks for the PR. I've been also working on PME in Iceberg, but haven't published the draft yet. Let me go though this PR, to see if it converges in right direction |
| private final long fileSize; | ||
| private final long fileRecordCount; | ||
| private final IcebergFileFormat fileFormat; | ||
| private final Optional<byte[]> encryptionKeyMetadata; |
There was a problem hiding this comment.
Split should contain fileKey AND aad.
encryptionKeyMetadata shouldn't be forwarded to workers are workers will not have access to Iceberg metadata (where keys are stored)
Example
@VisibleForTesting
record ParquetFileDecryptionData(byte[] fileEncryptionKey, byte[] fileAadPrefix)
{
ParquetFileDecryptionData
{
requireNonNull(fileEncryptionKey, "fileEncryptionKey is null");
requireNonNull(fileAadPrefix, "fileAadPrefix is null");
}
}
private static Optional<ParquetFileDecryptionData> parquetFileDecryptionData(
EncryptedInputFile encryptedInputFile,
EncryptionManager encryptionManager)
{
InputFile inputFile;
try {
inputFile = encryptionManager.decrypt(encryptedInputFile);
}
catch (RuntimeException e) {
return Optional.empty();
}
if (!(inputFile instanceof NativeEncryptionInputFile nativeEncryptionInputFile)) {
return Optional.empty();
}
NativeEncryptionKeyMetadata nativeKeyMetadata;
try {
nativeKeyMetadata = nativeEncryptionInputFile.keyMetadata();
}
catch (RuntimeException e) {
return Optional.empty();
}
ByteBuffer encryptionKey = nativeKeyMetadata.encryptionKey();
ByteBuffer aadPrefix = nativeKeyMetadata.aadPrefix();
if (encryptionKey == null || aadPrefix == null) {
return Optional.empty();
}
return Optional.of(new ParquetFileDecryptionData(
ByteBuffers.toByteArray(encryptionKey),
ByteBuffers.toByteArray(aadPrefix)));
}
| return fileSystem.newInputFile(Location.of(path), fileSize); | ||
| } | ||
|
|
||
| private static TrinoInputFile decryptInputFileIfNeeded( |
There was a problem hiding this comment.
We already have framework for PME decryption (#24517) that works with Trino native Parquet reader that should be used instead.
Parquet has different encryption modes with footer/column keys, see https://parquet.apache.org/docs/file-format/data-pages/encryption/. I don't think org.apache.iceberg.encryption.StandardEncryptionManager.StandardDecryptedInputFile even works with reading encrypted parquet files.
IIUC encryptionManager.get().decrypt is for decrypting metadata files.
|
PTAL at #28389. Especially split handling, read-path test and |
| "encryption_key_id = 'test-key', " + | ||
| "encryption_data_key_length = 16)")) { | ||
| String tableName = table.getName(); | ||
| assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a'), (2, 'b'), (3, 'c')", 3); |
There was a problem hiding this comment.
encrypted table is created by Trino too, right? I think it's the reason why
InputFile decryptedInputFile = encryptionManager.get().decrypt(EncryptedFiles.encryptedInput(encryptedInputFile, metadataWithLength));
works for Parquet files. However, this should really be tested with code like https://github.com/trinodb/trino/pull/28389/changes#diff-75c880cd52216050277554400af0c70c6a983d40426c347aae427123aa28542bR140 (e.g. use apache parquet writer like
try (DataWriter<Record> writer = Parquet.writeData(encryptedOutputFile)
.forTable(table)
.withSpec(table.spec())
.withPartition(null)
.withKeyMetadata(encryptedOutputFile.keyMetadata())
.createWriterFunc(GenericParquetWriter::create)
.build()) {
Add ParquetFileDecryptionData (file key + AAD prefix) to Iceberg split metadata, delete files, and table_changes splits. Resolve decryption data on coordinator in split sources and pass it to workers. Wire Parquet decryption properties into IcebergPageSourceProvider. Keep compatibility by falling back to legacy key-metadata-based decryption when key metadata is present. Signed-off-by: kamijin_fanta <kamijin@live.jp>
| @Config("iceberg.encryption.kms-properties") | ||
| @ConfigDescription("Catalog-level KMS client properties in key=value format") | ||
| public IcebergConfig setEncryptionKmsProperties(List<String> encryptionKmsProperties) | ||
| { | ||
| this.encryptionKmsProperties = Optional.ofNullable(encryptionKmsProperties) | ||
| .map(ImmutableList::copyOf) | ||
| .orElseGet(ImmutableList::of); | ||
| return this; | ||
| } |
There was a problem hiding this comment.
In this project, we avoid using arbitrary config properties as much as possible. With that model, it's easy to miss required properties and to permit invalid combinations of settings. It also makes it harder to correctly handle values that contain a =. Also, the current code has the risk of leaking credentials since @ConfigSecuritySensitive annotation is missing.
You can refer to IcebergRestCatalogModule for an example of how we handle multiple implementations.
| { | ||
| try { | ||
| OrcDataSink orcDataSink = OutputStreamOrcDataSink.create(fileSystem.newOutputFile(outputPath)); | ||
| EncryptedOutput encryptedOutput = createOutputFile(fileSystem, outputPath, encryptionManager); |
There was a problem hiding this comment.
Iceberg encryption is not supported yet in tables with ORC data files. In the future, the native ORC encryption will need to be leveraged.
| } | ||
|
|
||
| @Test | ||
| void testEncryptedOrcWriterValidation() |
There was a problem hiding this comment.
Iceberg throws an exception when writing an encrypted ORC table,
https://github.com/apache/iceberg/blob/main/orc/src/main/java/org/apache/iceberg/orc/ORC.java#L116
|
@kamijin-fanta are you going to work on this PR? |
|
#28905 takes over this PR as we talked offline. |
Integrate Iceberg encryption handling into read and write paths and table operations across Iceberg catalogs.
Description
encryption_key_idandencryption_data_key_length.iceberg.encryption.kms-impl.EncryptionManagerinto write paths for data files, position delete files, and deletion vectors.EncryptingFileIOfor encrypted tables.Additional context and related issues
encryption_data_key_lengthis validated to16,24, or32bytes and requiresencryption_key_id.encryption_key_idorencryption_data_key_lengthafter they are set is rejected.iceberg.encryption.kms-implis reused, while table-levelencryption.kms-implis resolved per table.Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: