Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public record CommitTaskData(
Optional<String> referencedDataFile,
Optional<List<Long>> fileSplitOffsets,
int sortOrderId,
Optional<byte[]> encryptionKeyMetadata,
Optional<byte[]> serializedDeletionVector)
{
public CommitTaskData
Expand All @@ -46,6 +47,7 @@ public record CommitTaskData(
requireNonNull(referencedDataFile, "referencedDataFile is null");
requireNonNull(fileSplitOffsets, "fileSplitOffsets is null");
checkArgument(content == FileContent.DATA || sortOrderId == SortOrder.unsorted().orderId(), "Sorted order id can be present only for data files");
requireNonNull(encryptionKeyMetadata, "encryptionKeyMetadata is null");
requireNonNull(serializedDeletionVector, "serializedDeletionVector is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class IcebergConfig
private int metadataParallelism = 8;
private boolean bucketExecutionEnabled = true;
private boolean fileBasedConflictDetectionEnabled = true;
private Optional<String> encryptionKmsImpl = Optional.empty();

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -695,4 +696,17 @@ public IcebergConfig setFileBasedConflictDetectionEnabled(boolean fileBasedConfl
this.fileBasedConflictDetectionEnabled = fileBasedConflictDetectionEnabled;
return this;
}

public Optional<String> getEncryptionKmsImpl()
{
return encryptionKmsImpl;
}

@Config("iceberg.encryption.kms-impl")
@ConfigDescription("KMS implementation class for Iceberg table encryption")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't expose Java class name to users. Please introduce a new enum and map to class name internally.

Also, don't forget updating iceberg.md.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the Iceberg documentation, both encryption.kms-type and encryption.kms-impl appear to be public configuration options. However, in Iceberg 1.10.1 and also in the latest main branch, encryption.kms-type doesn’t seem to be referenced/used in the code, and users effectively must set encryption.kms-impl all the time.

Also, I think we should keep some extensibility here because there are users who use KMS providers outside of major clouds like AWS/GCP (including myself). For consistency with other configuration surfaces (e.g., catalog configs) I’m currently leaning toward accepting the Java class name for now, and then later switching to an enum-based selection once Iceberg formalizes/supports kms-type (or an equivalent stable interface). Ideally we can evolve this to an internal enum → class mapping when Iceberg’s interface becomes clearer.

What do you think, @ebyhr? Another reasonable option is to wait to merge until Iceberg finalizes this interface, but I’m trying to balance that with keeping room for non-cloud KMS implementations.

Ref:
https://github.com/apache/iceberg/blob/a6c4e6aef06103db38be064534b7b0be09b3ebca/core/src/main/java/org/apache/iceberg/encryption/EncryptionUtil.java#L38-L80

Copy link
Copy Markdown
Member

@ebyhr ebyhr Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

encryption.kms-type doesn’t seem to be referenced/used in the code

Did you confirm apache/iceberg#15272?. We don't need to wait for 1.11.0 for using KMS type anyway. We can introduce iceberg.encryption.kms-type and internally set encryption.kms-impl.

I’m currently leaning toward accepting the Java class name for now, and then later switching to an enum-based selection once Iceberg formalizes/supports kms-type (or an equivalent stable interface).

The expected order is different. We should begin with the strictest option, such as an enum-based approach, and only allow arbitrary options if that proves too restrictive. Note that we generally avoid such generic options in this project.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I read your Iceberg PR.

One question to confirm: are you saying it’s acceptable to implement this logic on the Trino side without waiting for the next Iceberg release? For example, we could expose only the iceberg.encryption.kms-type property in Trino, and then internally set Iceberg’s encryption.kms-impl to the appropriate implementation class based on that value.

Also, my understanding is that KMS APIs don’t have an industry-standard interface like the S3 API for object storage. In particular, users who enable encryption often build their own KMS around on-prem HSMs or similar systems. Given that context, I assume Iceberg exposes kms-impl to allow users to plug in their own implementation. That said, I understand the point that Trino generally shouldn’t expect users to configure a Java class name directly, or, put differently, to distribute arbitrary jars/classes via configuration.

In that case, is there a good way to leave room for users outside cloud environments to customize KMS behavior without forking Trino? And is supporting that kind of extensibility something you’d consider in scope?

public IcebergConfig setEncryptionKmsImpl(String encryptionKmsImpl)
{
this.encryptionKmsImpl = Optional.ofNullable(encryptionKmsImpl);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,10 @@ public interface IcebergFileWriter
{
FileMetrics getFileMetrics();

default Optional<byte[]> getEncryptionKeyMetadata()
{
return Optional.empty();
}

record FileMetrics(Metrics metrics, Optional<List<Long>> splitOffsets) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.HiveCompressionOption;
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.iceberg.fileio.EncryptedTrinoInputFile;
import io.trino.plugin.iceberg.fileio.EncryptedTrinoOutputFile;
import io.trino.plugin.iceberg.fileio.ForwardingInputFile;
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
import io.trino.spi.NodeVersion;
import io.trino.spi.TrinoException;
Expand All @@ -41,7 +44,14 @@
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.weakref.jmx.Managed;

import java.io.Closeable;
Expand Down Expand Up @@ -133,13 +143,14 @@ public IcebergFileWriter createDataFileWriter(
ConnectorSession session,
IcebergFileFormat fileFormat,
MetricsConfig metricsConfig,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
return switch (fileFormat) {
// TODO use metricsConfig https://github.com/trinodb/trino/issues/9791
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session, storageProperties);
case ORC -> createOrcWriter(metricsConfig, fileSystem, outputPath, icebergSchema, session, storageProperties, getOrcStringStatisticsLimit(session));
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, storageProperties);
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session, storageProperties, encryptionManager);
case ORC -> createOrcWriter(metricsConfig, fileSystem, outputPath, icebergSchema, session, storageProperties, getOrcStringStatisticsLimit(session), encryptionManager);
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, storageProperties, encryptionManager);
};
}

Expand All @@ -148,12 +159,13 @@ public IcebergFileWriter createPositionDeleteWriter(
Location outputPath,
ConnectorSession session,
IcebergFileFormat fileFormat,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
return switch (fileFormat) {
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, storageProperties);
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, encryptionManager);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE), encryptionManager);
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, storageProperties, encryptionManager);
};
}

Expand All @@ -163,7 +175,8 @@ private IcebergFileWriter createParquetWriter(
Location outputPath,
Schema icebergSchema,
ConnectorSession session,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
List<String> fileColumnNames = icebergSchema.columns().stream()
.map(Types.NestedField::name)
Expand All @@ -173,7 +186,8 @@ private IcebergFileWriter createParquetWriter(
.collect(toImmutableList());

try {
TrinoOutputFile outputFile = fileSystem.newOutputFile(outputPath);
EncryptedOutput encryptedOutput = createOutputFile(fileSystem, outputPath, encryptionManager);
TrinoOutputFile outputFile = encryptedOutput.trinoOutputFile();

Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);

Expand All @@ -188,7 +202,7 @@ private IcebergFileWriter createParquetWriter(
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(PARQUET, storageProperties)
.orElse(toCompressionCodec(hiveCompressionOption));

return new IcebergParquetFileWriter(
IcebergFileWriter writer = new IcebergParquetFileWriter(
metricsConfig,
outputFile,
rollbackAction,
Expand All @@ -201,6 +215,7 @@ private IcebergFileWriter createParquetWriter(
compressionCodec.getParquetCompressionCodec()
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Compression codec %s not supported for Parquet".formatted(compressionCodec))),
nodeVersion.toString());
return withEncryptionKeyMetadata(writer, encryptedOutput.keyMetadata());
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Parquet file", e);
Expand All @@ -214,10 +229,12 @@ private IcebergFileWriter createOrcWriter(
Schema icebergSchema,
ConnectorSession session,
Map<String, String> storageProperties,
DataSize stringStatisticsLimit)
DataSize stringStatisticsLimit,
Optional<EncryptionManager> encryptionManager)
{
try {
OrcDataSink orcDataSink = OutputStreamOrcDataSink.create(fileSystem.newOutputFile(outputPath));
EncryptedOutput encryptedOutput = createOutputFile(fileSystem, outputPath, encryptionManager);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg encryption is not supported yet in tables with ORC data files. In the future, the native ORC encryption will need to be leveraged.

OrcDataSink orcDataSink = OutputStreamOrcDataSink.create(encryptedOutput.trinoOutputFile());

Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);

Expand All @@ -234,7 +251,7 @@ private IcebergFileWriter createOrcWriter(
if (isOrcWriterValidate(session)) {
validationInputFactory = Optional.of(() -> {
try {
TrinoInputFile inputFile = fileSystem.newInputFile(outputPath);
TrinoInputFile inputFile = createValidationInputFile(fileSystem, outputPath, encryptedOutput.keyMetadata(), encryptionManager);
return new TrinoOrcDataSource(inputFile, new OrcReaderOptions(), readStats);
}
catch (IOException | UncheckedIOException e) {
Expand All @@ -246,7 +263,7 @@ private IcebergFileWriter createOrcWriter(
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(ORC, storageProperties)
.orElse(toCompressionCodec(hiveCompressionOption));

return new IcebergOrcFileWriter(
IcebergFileWriter writer = new IcebergOrcFileWriter(
metricsConfig,
icebergSchema,
orcDataSink,
Expand All @@ -270,6 +287,7 @@ private IcebergFileWriter createOrcWriter(
validationInputFactory,
getOrcWriterValidateMode(session),
orcWriterStats);
return withEncryptionKeyMetadata(writer, encryptedOutput.keyMetadata());
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating ORC file", e);
Expand Down Expand Up @@ -299,7 +317,8 @@ private IcebergFileWriter createAvroWriter(
TrinoFileSystem fileSystem,
Location outputPath,
Schema icebergSchema,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);

Expand All @@ -310,11 +329,113 @@ private IcebergFileWriter createAvroWriter(
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(AVRO, storageProperties)
.orElse(toCompressionCodec(hiveCompressionOption));

return new IcebergAvroFileWriter(
new ForwardingOutputFile(fileSystem, outputPath),
EncryptedOutput encryptedOutput = createOutputFile(fileSystem, outputPath, encryptionManager);

IcebergFileWriter writer = new IcebergAvroFileWriter(
encryptedOutput.icebergOutputFile(),
rollbackAction,
icebergSchema,
columnTypes,
compressionCodec);
return withEncryptionKeyMetadata(writer, encryptedOutput.keyMetadata());
}

private static TrinoInputFile createValidationInputFile(
TrinoFileSystem fileSystem,
Location outputPath,
Optional<byte[]> keyMetadata,
Optional<EncryptionManager> encryptionManager)
{
TrinoInputFile inputFile = fileSystem.newInputFile(outputPath);
if (keyMetadata.isEmpty() || encryptionManager.isEmpty()) {
return inputFile;
}
InputFile encryptedInputFile = new ForwardingInputFile(inputFile);
InputFile decryptedInputFile = encryptionManager.get().decrypt(EncryptedFiles.encryptedInput(encryptedInputFile, keyMetadata.get()));
return new EncryptedTrinoInputFile(inputFile, decryptedInputFile);
}

private EncryptedOutput createOutputFile(TrinoFileSystem fileSystem, Location outputPath, Optional<EncryptionManager> encryptionManager)
{
OutputFile icebergOutputFile = new ForwardingOutputFile(fileSystem, outputPath);
EncryptedOutputFile encryptedOutputFile = encryptionManager
.map(manager -> manager.encrypt(icebergOutputFile))
.orElseGet(() -> EncryptionUtil.plainAsEncryptedOutput(icebergOutputFile));
OutputFile encryptingOutputFile = encryptedOutputFile.encryptingOutputFile();
TrinoOutputFile trinoOutputFile = new EncryptedTrinoOutputFile(outputPath, encryptingOutputFile);
Optional<byte[]> keyMetadata = Optional.ofNullable(encryptedOutputFile.keyMetadata().buffer())
.map(ByteBuffers::toByteArray);
return new EncryptedOutput(trinoOutputFile, encryptingOutputFile, keyMetadata);
}

private static IcebergFileWriter withEncryptionKeyMetadata(IcebergFileWriter writer, Optional<byte[]> keyMetadata)
{
if (keyMetadata.isEmpty()) {
return writer;
}
return new EncryptionMetadataFileWriter(writer, keyMetadata);
}

private record EncryptedOutput(TrinoOutputFile trinoOutputFile, OutputFile icebergOutputFile, Optional<byte[]> keyMetadata) {}

private static class EncryptionMetadataFileWriter
implements IcebergFileWriter
{
private final IcebergFileWriter delegate;
private final Optional<byte[]> keyMetadata;

private EncryptionMetadataFileWriter(IcebergFileWriter delegate, Optional<byte[]> keyMetadata)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.keyMetadata = requireNonNull(keyMetadata, "keyMetadata is null");
}

@Override
public FileMetrics getFileMetrics()
{
return delegate.getFileMetrics();
}

@Override
public Optional<byte[]> getEncryptionKeyMetadata()
{
return keyMetadata;
}

@Override
public long getWrittenBytes()
{
return delegate.getWrittenBytes();
}

@Override
public long getMemoryUsage()
{
return delegate.getMemoryUsage();
}

@Override
public void appendRows(io.trino.spi.Page dataPage)
{
delegate.appendRows(dataPage);
}

@Override
public Closeable commit()
{
return delegate.commit();
}

@Override
public void rollback()
{
delegate.rollback();
}

@Override
public long getValidationCpuNanos()
{
return delegate.getValidationCpuNanos();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.types.Type;

Expand Down Expand Up @@ -65,6 +66,7 @@ public class IcebergMergeSink
private final ConnectorSession session;
private final IcebergFileFormat fileFormat;
private final Map<String, String> storageProperties;
private final Optional<EncryptionManager> encryptionManager;
private final Schema schema;
private final Map<Integer, PartitionSpec> partitionsSpecs;
private final ConnectorPageSink insertPageSink;
Expand All @@ -81,6 +83,7 @@ public IcebergMergeSink(
ConnectorSession session,
IcebergFileFormat fileFormat,
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager,
Schema schema,
Map<Integer, PartitionSpec> partitionsSpecs,
ConnectorPageSink insertPageSink,
Expand All @@ -94,6 +97,7 @@ public IcebergMergeSink(
this.session = requireNonNull(session, "session is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
this.encryptionManager = requireNonNull(encryptionManager, "encryptionManager is null");
this.schema = requireNonNull(schema, "schema is null");
this.partitionsSpecs = ImmutableMap.copyOf(requireNonNull(partitionsSpecs, "partitionsSpecs is null"));
this.insertPageSink = requireNonNull(insertPageSink, "insertPageSink is null");
Expand Down Expand Up @@ -171,6 +175,7 @@ else if (formatVersion == 3) {
Optional.of(dataFilePath.toStringUtf8()),
Optional.empty(), // unused for v3
SortOrder.unsorted().orderId(),
Optional.empty(),
Optional.of(deletionVector.serialize().getBytes()));
fragments.add(wrappedBuffer(jsonCodec.toJsonBytes(task)));
}));
Expand Down Expand Up @@ -199,7 +204,8 @@ private PositionDeleteWriter createPositionDeleteWriter(String dataFilePath, Par
fileSystem,
session,
fileFormat,
storageProperties);
storageProperties,
encryptionManager);
}

private Slice writePositionDeletes(PositionDeleteWriter writer, DeletionVector rowsToDelete)
Expand Down
Loading