Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,18 @@ implementation is used:
- Enable bucket-aware execution. This allows the engine to use physical
bucketing information to optimize queries by reducing data exchanges.
- `true`
* - `iceberg.encryption.kms-type`
Comment thread
sopel39 marked this conversation as resolved.
- Key Management Service type for
[Iceberg table encryption](https://iceberg.apache.org/docs/nightly/encryption/).
Possible values are `AWS` and `GCP`. Required to read encrypted tables.
Writing to encrypted tables is not supported.
Comment thread
sopel39 marked this conversation as resolved.
-
* - `iceberg.encryption.plaintext-files-allowed-for-encrypted-tables`
- Allow reading unencrypted files in tables with encryption enabled. When set
to `false`, an error is raised if a file with encryption key metadata is not
actually encrypted. The equivalent catalog session property is
`plaintext_files_allowed_for_encrypted_tables`.
- `false`
:::

(iceberg-fte-support)=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ public class FileDecryptionProperties
private final DecryptionKeyRetriever keyRetriever;
private final Optional<byte[]> aadPrefix;
private final boolean checkFooterIntegrity;
private final boolean plaintextFilesAllowed;
Comment thread
sopel39 marked this conversation as resolved.

private FileDecryptionProperties(DecryptionKeyRetriever keyRetriever, Optional<byte[]> aadPrefix, boolean checkFooterIntegrity)
private FileDecryptionProperties(
DecryptionKeyRetriever keyRetriever,
Optional<byte[]> aadPrefix,
boolean checkFooterIntegrity,
boolean plaintextFilesAllowed)
{
this.keyRetriever = requireNonNull(keyRetriever, "keyRetriever is null");
this.aadPrefix = requireNonNull(aadPrefix, "aadPrefix is null");
this.checkFooterIntegrity = checkFooterIntegrity;
this.plaintextFilesAllowed = plaintextFilesAllowed;
}

public static Builder builder()
Expand All @@ -50,11 +56,17 @@ public boolean isCheckFooterIntegrity()
return checkFooterIntegrity;
}

public boolean isPlaintextFilesAllowed()
{
return plaintextFilesAllowed;
}

public static class Builder
{
private DecryptionKeyRetriever keyRetriever;
private Optional<byte[]> aadPrefix = Optional.empty();
private boolean checkFooterIntegrity = true;
private boolean plaintextFilesAllowed;

public Builder withKeyRetriever(DecryptionKeyRetriever keyRetriever)
{
Expand All @@ -74,9 +86,15 @@ public Builder withCheckFooterIntegrity(boolean checkFooterIntegrity)
return this;
}

public Builder withPlaintextFilesAllowed()
{
this.plaintextFilesAllowed = true;
return this;
}

public FileDecryptionProperties build()
{
return new FileDecryptionProperties(keyRetriever, aadPrefix, checkFooterIntegrity);
return new FileDecryptionProperties(keyRetriever, aadPrefix, checkFooterIntegrity, plaintextFilesAllowed);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.parquet.crypto.AesGcmEncryptor;
import io.trino.parquet.crypto.FileDecryptionContext;
import io.trino.parquet.crypto.FileDecryptionProperties;
import io.trino.parquet.crypto.ParquetCryptoException;
import io.trino.parquet.metadata.FileMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import org.apache.parquet.CorruptStatistics;
Expand Down Expand Up @@ -139,12 +140,19 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, DataSize
}

FileMetaData fileMetaData = readFileMetaData(metadataStream, footerDecryptor, aad);
if (!encryptedFooterMode && fileDecryptionProperties.isPresent() && fileMetaData.isSetEncryption_algorithm()) {
// footer is not encrypted, but some columns might be encrypted
decryptionContext = Optional.of(new FileDecryptionContext(dataSource.getId(), fileDecryptionProperties.get(), fileMetaData.getEncryption_algorithm(), Optional.ofNullable(fileMetaData.getFooter_signing_key_metadata())));
if (fileDecryptionProperties.get().isCheckFooterIntegrity()) {
// verify footer integrity
verifyFooterIntegrity(dataSource, metadataStream, decryptionContext.get(), metadataLength);
if (!encryptedFooterMode && fileDecryptionProperties.isPresent()) {
if (!fileMetaData.isSetEncryption_algorithm()) {
// Plaintext file — detect files that were not encrypted by mistake
if (!fileDecryptionProperties.get().isPlaintextFilesAllowed()) {
throw new ParquetCryptoException("Applying decryptor on plaintext file: %s", dataSource.getId());
}
}
else {
// Footer is not encrypted, but some columns might be encrypted
decryptionContext = Optional.of(new FileDecryptionContext(dataSource.getId(), fileDecryptionProperties.get(), fileMetaData.getEncryption_algorithm(), Optional.ofNullable(fileMetaData.getFooter_signing_key_metadata())));
if (fileDecryptionProperties.get().isCheckFooterIntegrity()) {
verifyFooterIntegrity(dataSource, metadataStream, decryptionContext.get(), metadataLength);
}
}
}

Expand Down Expand Up @@ -228,10 +236,10 @@ private static org.apache.parquet.column.statistics.Statistics<?> tryReadOldUtf8

return org.apache.parquet.column.statistics.Statistics
.getBuilderForReading(columnStatistics.type())
.withMin(min)
.withMax(max)
.withNumNulls(!columnStatistics.isNumNullsSet() && statistics.isSetNull_count() ? statistics.getNull_count() : columnStatistics.getNumNulls())
.build();
.withMin(min)
.withMax(max)
.withNumNulls(!columnStatistics.isNumNullsSet() && statistics.isSetNull_count() ? statistics.getNull_count() : columnStatistics.getNumNulls())
.build();
}

private static boolean isAscii(byte b)
Expand Down
26 changes: 26 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,22 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-kms</artifactId>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>listenablefuture</artifactId>
</exclusion>
<exclusion>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>http-client</artifactId>
Expand Down Expand Up @@ -807,6 +823,16 @@
</ignoredClassPatterns>
</configuration>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.UpdateStatistics;
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
Expand Down Expand Up @@ -769,11 +770,6 @@ private static void validateTableForTrino(BaseTable table, OptionalLong tableSna
if (metadata.formatVersion() < 3) {
return;
}

// Reject Iceberg table encryption
if (!metadata.encryptionKeys().isEmpty() || snapshot.keyId() != null || metadata.properties().containsKey("encryption.key-id")) {
throw new TrinoException(NOT_SUPPORTED, "Iceberg table encryption is not supported");
}
}

private IcebergTableHandle tableHandleForCurrentSnapshot(ConnectorSession session, SchemaTableName tableName, BaseTable table)
Expand Down Expand Up @@ -1477,6 +1473,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
throw new TrinoException(INVALID_TABLE_PROPERTY, format("The provided location '%s' does not match the existing table location '%s'", providedTableLocation.get(), icebergTable.location()));
}
validateNotModifyingOldSnapshot(table, icebergTable);
validateNotEncryptedForWrite(icebergTable);
tableLocation = icebergTable.location();
List<PartitionField> existingPartitionFields = getAllPartitionFields(icebergTable);
transaction = newCreateTableTransaction(catalog, tableMetadata, session, replace, tableLocation, allowedExtraProperties, existingPartitionFields);
Expand Down Expand Up @@ -1606,6 +1603,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto

validateNotModifyingOldSnapshot(table, icebergTable);
validateTableForTrino(icebergTable, getCurrentSnapshotId(icebergTable));
validateNotEncryptedForWrite(icebergTable);
Comment thread
sopel39 marked this conversation as resolved.

beginTransaction(icebergTable);

Expand Down Expand Up @@ -2093,6 +2091,7 @@ private BeginTableExecuteResult<ConnectorTableExecuteHandle, ConnectorTableHandl
BaseTable icebergTable = catalog.loadTable(session, table.getSchemaTableName());

validateNotModifyingOldSnapshot(table, icebergTable);
validateNotEncryptedForWrite(icebergTable);

verifyTableVersionForExecute(OPTIMIZE, OPTIMIZE_MAX_SUPPORTED_TABLE_VERSION, icebergTable);

Expand Down Expand Up @@ -2638,6 +2637,7 @@ public Map<String, Long> executeAddFiles(ConnectorSession session, IcebergTableE
{
IcebergAddFilesHandle addFilesHandle = (IcebergAddFilesHandle) executeHandle.procedureHandle();
Table table = catalog.loadTable(session, executeHandle.schemaTableName());
validateNotEncryptedForWrite(table);
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), table.io().properties());
long addedDataFiles = addFiles(
session,
Expand All @@ -2655,6 +2655,7 @@ public Map<String, Long> executeAddFilesFromTable(ConnectorSession session, Iceb
{
IcebergAddFilesFromTableHandle addFilesHandle = (IcebergAddFilesFromTableHandle) executeHandle.procedureHandle();
Table table = catalog.loadTable(session, executeHandle.schemaTableName());
validateNotEncryptedForWrite(table);
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), table.io().properties());
long addedDataFiles = addFilesFromTable(
session,
Expand Down Expand Up @@ -3549,6 +3550,7 @@ public ConnectorMergeTableHandle beginMerge(ConnectorSession session, ConnectorT

Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
validateNotModifyingOldSnapshot(table, icebergTable);
validateNotEncryptedForWrite(icebergTable);

beginTransaction(icebergTable);

Expand Down Expand Up @@ -3579,6 +3581,13 @@ private static void validateNotModifyingOldSnapshot(IcebergTableHandle table, Ta
}
}

private static void validateNotEncryptedForWrite(Table table)
{
if (!(table.encryption() instanceof PlaintextEncryptionManager)) {
throw new TrinoException(NOT_SUPPORTED, "Writing to encrypted Iceberg tables is not supported");
}
}

private void finishWrite(ConnectorSession session, IcebergTableHandle table, Collection<Slice> fragments)
{
Table icebergTable = transaction.table();
Expand Down Expand Up @@ -4259,6 +4268,7 @@ public ConnectorInsertTableHandle beginRefreshMaterializedView(
checkState(fromSnapshotForRefresh.isEmpty(), "From Snapshot must be empty at the start of MV refresh operation.");
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
validateNotEncryptedForWrite(icebergTable);
beginTransaction(icebergTable);

Optional<String> dependencies = Optional.ofNullable(icebergTable.currentSnapshot())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import io.trino.plugin.iceberg.cache.IcebergCacheKeyProvider;
import io.trino.plugin.iceberg.delete.DefaultDeletionVectorWriter;
import io.trino.plugin.iceberg.delete.DeletionVectorWriter;
import io.trino.plugin.iceberg.encryption.DefaultEncryptionManagerFactory;
import io.trino.plugin.iceberg.encryption.EncryptionManagerFactory;
import io.trino.plugin.iceberg.encryption.IcebergEncryptionConfig;
import io.trino.plugin.iceberg.fileio.ForwardingFileIoFactory;
import io.trino.plugin.iceberg.functions.IcebergFunctionProvider;
import io.trino.plugin.iceberg.functions.tablechanges.TableChangesFunctionProcessorProviderFactory;
Expand Down Expand Up @@ -100,6 +103,10 @@ public void configure(Binder binder)
configBinder(binder).bindConfig(ParquetReaderConfig.class);
configBinder(binder).bindConfig(ParquetWriterConfig.class);

configBinder(binder).bindConfig(IcebergEncryptionConfig.class);
newOptionalBinder(binder, EncryptionManagerFactory.class)
.setDefault().to(DefaultEncryptionManagerFactory.class).in(Scopes.SINGLETON);

binder.bind(ForwardingFileIoFactory.class).in(Scopes.SINGLETON);
binder.bind(TableStatisticsReader.class).in(Scopes.SINGLETON);
binder.bind(TableStatisticsWriter.class).in(Scopes.SINGLETON);
Expand Down
Loading
Loading