Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<!-- Allow running tests when locally `aws sso login` is required to access AWS services -->
<groupId>software.amazon.awssdk</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public record CommitTaskData(
Optional<String> referencedDataFile,
Optional<List<Long>> fileSplitOffsets,
int sortOrderId,
Optional<byte[]> encryptionKeyMetadata,
Optional<byte[]> serializedDeletionVector)
{
public CommitTaskData
Expand All @@ -46,6 +47,7 @@ public record CommitTaskData(
requireNonNull(referencedDataFile, "referencedDataFile is null");
requireNonNull(fileSplitOffsets, "fileSplitOffsets is null");
checkArgument(content == FileContent.DATA || sortOrderId == SortOrder.unsorted().orderId(), "Sorted order id can be present only for data files");
requireNonNull(encryptionKeyMetadata, "encryptionKeyMetadata is null");
requireNonNull(serializedDeletionVector, "serializedDeletionVector is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class IcebergConfig
private int metadataParallelism = 8;
private boolean bucketExecutionEnabled = true;
private boolean fileBasedConflictDetectionEnabled = true;
private Optional<EncryptionKmsType> encryptionKmsType = Optional.empty();
private List<String> encryptionKmsProperties = ImmutableList.of();

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -695,4 +697,50 @@ public IcebergConfig setFileBasedConflictDetectionEnabled(boolean fileBasedConfl
this.fileBasedConflictDetectionEnabled = fileBasedConflictDetectionEnabled;
return this;
}

public Optional<EncryptionKmsType> getEncryptionKmsType()
{
return encryptionKmsType;
}

@Config("iceberg.encryption.kms-type")
@ConfigDescription("KMS type for Iceberg table encryption")
public IcebergConfig setEncryptionKmsType(EncryptionKmsType encryptionKmsType)
{
this.encryptionKmsType = Optional.ofNullable(encryptionKmsType);
return this;
}

public List<String> getEncryptionKmsProperties()
{
return encryptionKmsProperties;
}

@Config("iceberg.encryption.kms-properties")
@ConfigDescription("Catalog-level KMS client properties in key=value format")
public IcebergConfig setEncryptionKmsProperties(List<String> encryptionKmsProperties)
{
this.encryptionKmsProperties = Optional.ofNullable(encryptionKmsProperties)
.map(ImmutableList::copyOf)
.orElseGet(ImmutableList::of);
return this;
}
Comment on lines +719 to +727
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this project, we avoid using arbitrary config properties as much as possible. With that model, it's easy to miss required properties and to permit invalid combinations of settings. It also makes it harder to correctly handle values that contain a =. Also, the current code has the risk of leaking credentials since @ConfigSecuritySensitive annotation is missing.

You can refer to IcebergRestCatalogModule for an example of how we handle multiple implementations.


public enum EncryptionKmsType
{
AWS("org.apache.iceberg.aws.AwsKeyManagementClient"),
GCP("org.apache.iceberg.gcp.GcpKeyManagementClient");

private final String kmsClientClassName;

EncryptionKmsType(String kmsClientClassName)
{
this.kmsClientClassName = kmsClientClassName;
}

public String getKmsClientClassName()
{
return kmsClientClassName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,10 @@ public interface IcebergFileWriter
{
FileMetrics getFileMetrics();

default Optional<byte[]> getEncryptionKeyMetadata()
{
return Optional.empty();
}

record FileMetrics(Metrics metrics, Optional<List<Long>> splitOffsets) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.HiveCompressionOption;
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.iceberg.fileio.EncryptedTrinoInputFile;
import io.trino.plugin.iceberg.fileio.EncryptedTrinoOutputFile;
import io.trino.plugin.iceberg.fileio.ForwardingInputFile;
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
import io.trino.spi.NodeVersion;
import io.trino.spi.TrinoException;
Expand All @@ -41,7 +44,14 @@
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.weakref.jmx.Managed;

import java.io.Closeable;
Expand Down Expand Up @@ -133,13 +143,14 @@ public IcebergFileWriter createDataFileWriter(
ConnectorSession session,
IcebergFileFormat fileFormat,
MetricsConfig metricsConfig,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
return switch (fileFormat) {
// TODO use metricsConfig https://github.com/trinodb/trino/issues/9791
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session, storageProperties);
case ORC -> createOrcWriter(metricsConfig, fileSystem, outputPath, icebergSchema, session, storageProperties, getOrcStringStatisticsLimit(session));
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, storageProperties);
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session, storageProperties, encryptionManager);
case ORC -> createOrcWriter(metricsConfig, fileSystem, outputPath, icebergSchema, session, storageProperties, getOrcStringStatisticsLimit(session), encryptionManager);
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, storageProperties, encryptionManager);
};
}

Expand All @@ -148,12 +159,13 @@ public IcebergFileWriter createPositionDeleteWriter(
Location outputPath,
ConnectorSession session,
IcebergFileFormat fileFormat,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
return switch (fileFormat) {
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, storageProperties);
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, encryptionManager);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE), encryptionManager);
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, storageProperties, encryptionManager);
};
}

Expand All @@ -163,7 +175,8 @@ private IcebergFileWriter createParquetWriter(
Location outputPath,
Schema icebergSchema,
ConnectorSession session,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
List<String> fileColumnNames = icebergSchema.columns().stream()
.map(Types.NestedField::name)
Expand All @@ -173,7 +186,8 @@ private IcebergFileWriter createParquetWriter(
.collect(toImmutableList());

try {
TrinoOutputFile outputFile = fileSystem.newOutputFile(outputPath);
EncryptedOutput encryptedOutput = createOutputFile(fileSystem, outputPath, encryptionManager);
TrinoOutputFile outputFile = encryptedOutput.trinoOutputFile();

Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);

Expand All @@ -188,7 +202,7 @@ private IcebergFileWriter createParquetWriter(
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(PARQUET, storageProperties)
.orElse(toCompressionCodec(hiveCompressionOption));

return new IcebergParquetFileWriter(
IcebergFileWriter writer = new IcebergParquetFileWriter(
metricsConfig,
outputFile,
rollbackAction,
Expand All @@ -201,6 +215,7 @@ private IcebergFileWriter createParquetWriter(
compressionCodec.getParquetCompressionCodec()
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Compression codec %s not supported for Parquet".formatted(compressionCodec))),
nodeVersion.toString());
return withEncryptionKeyMetadata(writer, encryptedOutput.keyMetadata());
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Parquet file", e);
Expand All @@ -214,10 +229,12 @@ private IcebergFileWriter createOrcWriter(
Schema icebergSchema,
ConnectorSession session,
Map<String, String> storageProperties,
DataSize stringStatisticsLimit)
DataSize stringStatisticsLimit,
Optional<EncryptionManager> encryptionManager)
{
try {
OrcDataSink orcDataSink = OutputStreamOrcDataSink.create(fileSystem.newOutputFile(outputPath));
EncryptedOutput encryptedOutput = createOutputFile(fileSystem, outputPath, encryptionManager);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg encryption is not supported yet in tables with ORC data files. In the future, the native ORC encryption will need to be leveraged.

OrcDataSink orcDataSink = OutputStreamOrcDataSink.create(encryptedOutput.trinoOutputFile());

Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);

Expand All @@ -234,7 +251,7 @@ private IcebergFileWriter createOrcWriter(
if (isOrcWriterValidate(session)) {
validationInputFactory = Optional.of(() -> {
try {
TrinoInputFile inputFile = fileSystem.newInputFile(outputPath);
TrinoInputFile inputFile = createValidationInputFile(fileSystem, outputPath, encryptedOutput.keyMetadata(), encryptionManager);
return new TrinoOrcDataSource(inputFile, new OrcReaderOptions(), readStats);
}
catch (IOException | UncheckedIOException e) {
Expand All @@ -246,7 +263,7 @@ private IcebergFileWriter createOrcWriter(
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(ORC, storageProperties)
.orElse(toCompressionCodec(hiveCompressionOption));

return new IcebergOrcFileWriter(
IcebergFileWriter writer = new IcebergOrcFileWriter(
metricsConfig,
icebergSchema,
orcDataSink,
Expand All @@ -270,6 +287,7 @@ private IcebergFileWriter createOrcWriter(
validationInputFactory,
getOrcWriterValidateMode(session),
orcWriterStats);
return withEncryptionKeyMetadata(writer, encryptedOutput.keyMetadata());
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating ORC file", e);
Expand Down Expand Up @@ -299,7 +317,8 @@ private IcebergFileWriter createAvroWriter(
TrinoFileSystem fileSystem,
Location outputPath,
Schema icebergSchema,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);

Expand All @@ -310,11 +329,113 @@ private IcebergFileWriter createAvroWriter(
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(AVRO, storageProperties)
.orElse(toCompressionCodec(hiveCompressionOption));

return new IcebergAvroFileWriter(
new ForwardingOutputFile(fileSystem, outputPath),
EncryptedOutput encryptedOutput = createOutputFile(fileSystem, outputPath, encryptionManager);

IcebergFileWriter writer = new IcebergAvroFileWriter(
encryptedOutput.icebergOutputFile(),
rollbackAction,
icebergSchema,
columnTypes,
compressionCodec);
return withEncryptionKeyMetadata(writer, encryptedOutput.keyMetadata());
}

private static TrinoInputFile createValidationInputFile(
TrinoFileSystem fileSystem,
Location outputPath,
Optional<byte[]> keyMetadata,
Optional<EncryptionManager> encryptionManager)
{
TrinoInputFile inputFile = fileSystem.newInputFile(outputPath);
if (keyMetadata.isEmpty() || encryptionManager.isEmpty()) {
return inputFile;
}
InputFile encryptedInputFile = new ForwardingInputFile(inputFile);
InputFile decryptedInputFile = encryptionManager.get().decrypt(EncryptedFiles.encryptedInput(encryptedInputFile, keyMetadata.get()));
return new EncryptedTrinoInputFile(inputFile, decryptedInputFile);
}

private EncryptedOutput createOutputFile(TrinoFileSystem fileSystem, Location outputPath, Optional<EncryptionManager> encryptionManager)
{
OutputFile icebergOutputFile = new ForwardingOutputFile(fileSystem, outputPath);
EncryptedOutputFile encryptedOutputFile = encryptionManager
.map(manager -> manager.encrypt(icebergOutputFile))
.orElseGet(() -> EncryptionUtil.plainAsEncryptedOutput(icebergOutputFile));
OutputFile encryptingOutputFile = encryptedOutputFile.encryptingOutputFile();
TrinoOutputFile trinoOutputFile = new EncryptedTrinoOutputFile(outputPath, encryptingOutputFile);
Optional<byte[]> keyMetadata = Optional.ofNullable(encryptedOutputFile.keyMetadata().buffer())
.map(ByteBuffers::toByteArray);
return new EncryptedOutput(trinoOutputFile, encryptingOutputFile, keyMetadata);
}

private static IcebergFileWriter withEncryptionKeyMetadata(IcebergFileWriter writer, Optional<byte[]> keyMetadata)
{
if (keyMetadata.isEmpty()) {
return writer;
}
return new EncryptionMetadataFileWriter(writer, keyMetadata);
}

private record EncryptedOutput(TrinoOutputFile trinoOutputFile, OutputFile icebergOutputFile, Optional<byte[]> keyMetadata) {}

private static class EncryptionMetadataFileWriter
implements IcebergFileWriter
{
private final IcebergFileWriter delegate;
private final Optional<byte[]> keyMetadata;

private EncryptionMetadataFileWriter(IcebergFileWriter delegate, Optional<byte[]> keyMetadata)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.keyMetadata = requireNonNull(keyMetadata, "keyMetadata is null");
}

@Override
public FileMetrics getFileMetrics()
{
return delegate.getFileMetrics();
}

@Override
public Optional<byte[]> getEncryptionKeyMetadata()
{
return keyMetadata;
}

@Override
public long getWrittenBytes()
{
return delegate.getWrittenBytes();
}

@Override
public long getMemoryUsage()
{
return delegate.getMemoryUsage();
}

@Override
public void appendRows(io.trino.spi.Page dataPage)
{
delegate.appendRows(dataPage);
}

@Override
public Closeable commit()
{
return delegate.commit();
}

@Override
public void rollback()
{
delegate.rollback();
}

@Override
public long getValidationCpuNanos()
{
return delegate.getValidationCpuNanos();
}
}
}
Loading