Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<!-- Allow running tests when locally `aws sso login` is required to access AWS services -->
<groupId>software.amazon.awssdk</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public record CommitTaskData(
Optional<String> referencedDataFile,
Optional<List<Long>> fileSplitOffsets,
int sortOrderId,
Optional<byte[]> encryptionKeyMetadata,
Optional<byte[]> serializedDeletionVector)
{
public CommitTaskData
Expand All @@ -46,6 +47,7 @@ public record CommitTaskData(
requireNonNull(referencedDataFile, "referencedDataFile is null");
requireNonNull(fileSplitOffsets, "fileSplitOffsets is null");
checkArgument(content == FileContent.DATA || sortOrderId == SortOrder.unsorted().orderId(), "Sorted order id can be present only for data files");
requireNonNull(encryptionKeyMetadata, "encryptionKeyMetadata is null");
requireNonNull(serializedDeletionVector, "serializedDeletionVector is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class IcebergConfig
private int metadataParallelism = 8;
private boolean bucketExecutionEnabled = true;
private boolean fileBasedConflictDetectionEnabled = true;
private Optional<EncryptionKmsType> encryptionKmsType = Optional.empty();
private List<String> encryptionKmsProperties = ImmutableList.of();

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -695,4 +697,50 @@ public IcebergConfig setFileBasedConflictDetectionEnabled(boolean fileBasedConfl
this.fileBasedConflictDetectionEnabled = fileBasedConflictDetectionEnabled;
return this;
}

public Optional<EncryptionKmsType> getEncryptionKmsType()
{
return encryptionKmsType;
}

@Config("iceberg.encryption.kms-type")
@ConfigDescription("KMS type for Iceberg table encryption")
public IcebergConfig setEncryptionKmsType(EncryptionKmsType encryptionKmsType)
{
this.encryptionKmsType = Optional.ofNullable(encryptionKmsType);
return this;
}

public List<String> getEncryptionKmsProperties()
{
return encryptionKmsProperties;
}

@Config("iceberg.encryption.kms-properties")
@ConfigDescription("Catalog-level KMS client properties in key=value format")
public IcebergConfig setEncryptionKmsProperties(List<String> encryptionKmsProperties)
{
this.encryptionKmsProperties = Optional.ofNullable(encryptionKmsProperties)
.map(ImmutableList::copyOf)
.orElseGet(ImmutableList::of);
return this;
}

public enum EncryptionKmsType
{
AWS("org.apache.iceberg.aws.AwsKeyManagementClient"),
GCP("org.apache.iceberg.gcp.GcpKeyManagementClient");

private final String kmsClientClassName;

EncryptionKmsType(String kmsClientClassName)
{
this.kmsClientClassName = kmsClientClassName;
}

public String getKmsClientClassName()
{
return kmsClientClassName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,10 @@ public interface IcebergFileWriter
{
FileMetrics getFileMetrics();

default Optional<byte[]> getEncryptionKeyMetadata()
{
return Optional.empty();
}

record FileMetrics(Metrics metrics, Optional<List<Long>> splitOffsets) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,26 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.HiveCompressionOption;
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.iceberg.fileio.EncryptedTrinoInputFile;
import io.trino.plugin.iceberg.fileio.EncryptedTrinoOutputFile;
import io.trino.plugin.iceberg.fileio.ForwardingInputFile;
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
import io.trino.spi.NodeVersion;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.weakref.jmx.Managed;

import java.io.Closeable;
Expand Down Expand Up @@ -133,13 +144,14 @@ public IcebergFileWriter createDataFileWriter(
ConnectorSession session,
IcebergFileFormat fileFormat,
MetricsConfig metricsConfig,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
return switch (fileFormat) {
// TODO use metricsConfig https://github.com/trinodb/trino/issues/9791
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session, storageProperties);
case ORC -> createOrcWriter(metricsConfig, fileSystem, outputPath, icebergSchema, session, storageProperties, getOrcStringStatisticsLimit(session));
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, storageProperties);
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session, storageProperties, encryptionManager);
case ORC -> createOrcWriter(metricsConfig, fileSystem, outputPath, icebergSchema, session, storageProperties, getOrcStringStatisticsLimit(session), encryptionManager);
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, storageProperties, encryptionManager);
};
}

Expand All @@ -148,12 +160,13 @@ public IcebergFileWriter createPositionDeleteWriter(
Location outputPath,
ConnectorSession session,
IcebergFileFormat fileFormat,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
return switch (fileFormat) {
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, storageProperties);
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, encryptionManager);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE), encryptionManager);
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, storageProperties, encryptionManager);
};
}

Expand All @@ -163,7 +176,8 @@ private IcebergFileWriter createParquetWriter(
Location outputPath,
Schema icebergSchema,
ConnectorSession session,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
List<String> fileColumnNames = icebergSchema.columns().stream()
.map(Types.NestedField::name)
Expand All @@ -173,7 +187,8 @@ private IcebergFileWriter createParquetWriter(
.collect(toImmutableList());

try {
TrinoOutputFile outputFile = fileSystem.newOutputFile(outputPath);
EncryptedOutput encryptedOutput = createOutputFile(fileSystem, outputPath, encryptionManager);
TrinoOutputFile outputFile = encryptedOutput.trinoOutputFile();

Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);

Expand All @@ -188,7 +203,7 @@ private IcebergFileWriter createParquetWriter(
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(PARQUET, storageProperties)
.orElse(toCompressionCodec(hiveCompressionOption));

return new IcebergParquetFileWriter(
IcebergFileWriter writer = new IcebergParquetFileWriter(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

native Parquet writer doesn't support encryption.

metricsConfig,
outputFile,
rollbackAction,
Expand All @@ -201,6 +216,7 @@ private IcebergFileWriter createParquetWriter(
compressionCodec.getParquetCompressionCodec()
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Compression codec %s not supported for Parquet".formatted(compressionCodec))),
nodeVersion.toString());
return withEncryptionKeyMetadata(writer, encryptedOutput.keyMetadata());
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating Parquet file", e);
Expand All @@ -214,10 +230,12 @@ private IcebergFileWriter createOrcWriter(
Schema icebergSchema,
ConnectorSession session,
Map<String, String> storageProperties,
DataSize stringStatisticsLimit)
DataSize stringStatisticsLimit,
Optional<EncryptionManager> encryptionManager)
{
try {
OrcDataSink orcDataSink = OutputStreamOrcDataSink.create(fileSystem.newOutputFile(outputPath));
EncryptedOutput encryptedOutput = createOutputFile(fileSystem, outputPath, encryptionManager);
OrcDataSink orcDataSink = OutputStreamOrcDataSink.create(encryptedOutput.trinoOutputFile());

Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);

Expand All @@ -234,7 +252,7 @@ private IcebergFileWriter createOrcWriter(
if (isOrcWriterValidate(session)) {
validationInputFactory = Optional.of(() -> {
try {
TrinoInputFile inputFile = fileSystem.newInputFile(outputPath);
TrinoInputFile inputFile = createValidationInputFile(fileSystem, outputPath, encryptedOutput.keyMetadata(), encryptionManager);
return new TrinoOrcDataSource(inputFile, new OrcReaderOptions(), readStats);
}
catch (IOException | UncheckedIOException e) {
Expand All @@ -246,7 +264,7 @@ private IcebergFileWriter createOrcWriter(
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(ORC, storageProperties)
.orElse(toCompressionCodec(hiveCompressionOption));

return new IcebergOrcFileWriter(
IcebergFileWriter writer = new IcebergOrcFileWriter(
metricsConfig,
icebergSchema,
orcDataSink,
Expand All @@ -270,6 +288,7 @@ private IcebergFileWriter createOrcWriter(
validationInputFactory,
getOrcWriterValidateMode(session),
orcWriterStats);
return withEncryptionKeyMetadata(writer, encryptedOutput.keyMetadata());
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_WRITER_OPEN_ERROR, "Error creating ORC file", e);
Expand Down Expand Up @@ -299,7 +318,8 @@ private IcebergFileWriter createAvroWriter(
TrinoFileSystem fileSystem,
Location outputPath,
Schema icebergSchema,
Map<String, String> storageProperties)
Map<String, String> storageProperties,
Optional<EncryptionManager> encryptionManager)
{
Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);

Expand All @@ -310,11 +330,113 @@ private IcebergFileWriter createAvroWriter(
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(AVRO, storageProperties)
.orElse(toCompressionCodec(hiveCompressionOption));

return new IcebergAvroFileWriter(
new ForwardingOutputFile(fileSystem, outputPath),
EncryptedOutput encryptedOutput = createOutputFile(fileSystem, outputPath, encryptionManager);

IcebergFileWriter writer = new IcebergAvroFileWriter(
encryptedOutput.icebergOutputFile(),
rollbackAction,
icebergSchema,
columnTypes,
compressionCodec);
return withEncryptionKeyMetadata(writer, encryptedOutput.keyMetadata());
}

private static TrinoInputFile createValidationInputFile(
TrinoFileSystem fileSystem,
Location outputPath,
Optional<byte[]> keyMetadata,
Optional<EncryptionManager> encryptionManager)
{
TrinoInputFile inputFile = fileSystem.newInputFile(outputPath);
if (keyMetadata.isEmpty() || encryptionManager.isEmpty()) {
return inputFile;
}
InputFile encryptedInputFile = new ForwardingInputFile(inputFile);
InputFile decryptedInputFile = encryptionManager.get().decrypt(EncryptedFiles.encryptedInput(encryptedInputFile, keyMetadata.get()));
return new EncryptedTrinoInputFile(inputFile, decryptedInputFile);
}

private EncryptedOutput createOutputFile(TrinoFileSystem fileSystem, Location outputPath, Optional<EncryptionManager> encryptionManager)
{
OutputFile icebergOutputFile = new ForwardingOutputFile(fileSystem, outputPath);
EncryptedOutputFile encryptedOutputFile = encryptionManager
.map(manager -> manager.encrypt(icebergOutputFile))
.orElseGet(() -> EncryptionUtil.plainAsEncryptedOutput(icebergOutputFile));
OutputFile encryptingOutputFile = encryptedOutputFile.encryptingOutputFile();
TrinoOutputFile trinoOutputFile = new EncryptedTrinoOutputFile(outputPath, encryptingOutputFile);
Optional<byte[]> keyMetadata = Optional.ofNullable(encryptedOutputFile.keyMetadata().buffer())
.map(ByteBuffers::toByteArray);
return new EncryptedOutput(trinoOutputFile, encryptingOutputFile, keyMetadata);
}

private static IcebergFileWriter withEncryptionKeyMetadata(IcebergFileWriter writer, Optional<byte[]> keyMetadata)
{
if (keyMetadata.isEmpty()) {
return writer;
}
return new EncryptionMetadataFileWriter(writer, keyMetadata);
}

private record EncryptedOutput(TrinoOutputFile trinoOutputFile, OutputFile icebergOutputFile, Optional<byte[]> keyMetadata) {}

private static class EncryptionMetadataFileWriter
implements IcebergFileWriter
{
private final IcebergFileWriter delegate;
private final Optional<byte[]> keyMetadata;

private EncryptionMetadataFileWriter(IcebergFileWriter delegate, Optional<byte[]> keyMetadata)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.keyMetadata = requireNonNull(keyMetadata, "keyMetadata is null");
}

@Override
public FileMetrics getFileMetrics()
{
return delegate.getFileMetrics();
}

@Override
public Optional<byte[]> getEncryptionKeyMetadata()
{
return keyMetadata;
}

@Override
public long getWrittenBytes()
{
return delegate.getWrittenBytes();
}

@Override
public long getMemoryUsage()
{
return delegate.getMemoryUsage();
}

@Override
public void appendRows(Page dataPage)
{
delegate.appendRows(dataPage);
}

@Override
public Closeable commit()
{
return delegate.commit();
}

@Override
public void rollback()
{
delegate.rollback();
}

@Override
public long getValidationCpuNanos()
{
return delegate.getValidationCpuNanos();
}
}
}
Loading