Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.DeleteSchemaUtil;
Expand Down Expand Up @@ -91,6 +92,13 @@ public static WriteBuilder write(OutputFile file) {
return new WriteBuilder(file);
}

public static WriteBuilder write(EncryptedOutputFile file) {
Preconditions.checkState(
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
"Currenty, encryption of data files in Avro format is not supported");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not supported? The AES GCM stream could easily be used here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also run TestAvroFileSplit on Avro inside of AES GCM streams

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How nice. I expected Avro table encryption to work directly with AES GCM Streams - but not without some hiccups and fixes, since I never ran this usecase before. Turns out it just works out of box.
Now I have a functioning e2e unitest that encrypts/decrypts an Iceberg table with the Avro data format. The unitest is based on Spark SQL and catalog clients, so will go into the integration PR.
I'll add an encrypting version of the TestAvroFileSplit to this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll verify of course where the file length comes from for the Avro reader.

return new WriteBuilder(file.encryptingOutputFile());
}

public static class WriteBuilder {
private final OutputFile file;
private final Map<String, String> config = Maps.newHashMap();
Expand Down Expand Up @@ -272,6 +280,13 @@ public static DataWriteBuilder writeData(OutputFile file) {
return new DataWriteBuilder(file);
}

public static DataWriteBuilder writeData(EncryptedOutputFile file) {
Preconditions.checkState(
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
"Currenty, encryption of data files in Avro format is not supported");
return new DataWriteBuilder(file.encryptingOutputFile());
}

public static class DataWriteBuilder {
private final WriteBuilder appenderBuilder;
private final String location;
Expand Down Expand Up @@ -368,6 +383,13 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
return new DeleteWriteBuilder(file);
}

public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
Preconditions.checkState(
file.keyMetadata() == null || file.keyMetadata() == EncryptionKeyMetadata.EMPTY,
"Currenty, encryption of delete files in Avro format is not supported");
return new DeleteWriteBuilder(file.encryptingOutputFile());
}

public static class DeleteWriteBuilder {
private final WriteBuilder appenderBuilder;
private final String location;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.PropertyUtil;

Expand Down Expand Up @@ -86,10 +87,7 @@ public static EncryptionManager createEncryptionManager(
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);

if (FileFormat.fromString(fileFormat) != FileFormat.PARQUET) {
throw new UnsupportedOperationException(
"Iceberg encryption currently supports only parquet format for data files");
}
boolean nativeDataEncryption = (FileFormat.fromString(fileFormat) == FileFormat.PARQUET);

int dataKeyLength =
PropertyUtil.propertyAsInt(
Expand All @@ -102,6 +100,11 @@ public static EncryptionManager createEncryptionManager(
"Invalid data key length: %s (must be 16, 24, or 32)",
dataKeyLength);

return new StandardEncryptionManager(tableKeyId, dataKeyLength, kmsClient);
return new StandardEncryptionManager(
tableKeyId, dataKeyLength, kmsClient, nativeDataEncryption);
}

public static EncryptedOutputFile plainAsEncryptedOutput(OutputFile encryptingOutputFile) {
return new BaseEncryptedOutputFile(encryptingOutputFile, EncryptionKeyMetadata.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class StandardEncryptionManager implements EncryptionManager {
private final transient KeyManagementClient kmsClient;
private final String tableKeyId;
private final int dataKeyLength;
private final boolean nativeDataEncryption;

private transient volatile SecureRandom lazyRNG = null;

Expand All @@ -41,7 +42,10 @@ public class StandardEncryptionManager implements EncryptionManager {
* @param kmsClient Client of KMS used to wrap/unwrap keys in envelope encryption
*/
public StandardEncryptionManager(
String tableKeyId, int dataKeyLength, KeyManagementClient kmsClient) {
String tableKeyId,
int dataKeyLength,
KeyManagementClient kmsClient,
boolean nativeDataEncryption) {
Copy link
Copy Markdown
Contributor

@rdblue rdblue Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this should be passed in. The encryption manager needs to support files that use both native encryption (Parquet) and files that use AES GCM streams (Avro). There is no way to set this correctly because the behavior depends on the file type.

Preconditions.checkNotNull(tableKeyId, "Invalid encryption key ID: null");
Preconditions.checkArgument(
dataKeyLength == 16 || dataKeyLength == 24 || dataKeyLength == 32,
Expand All @@ -51,6 +55,7 @@ public StandardEncryptionManager(
this.tableKeyId = tableKeyId;
this.kmsClient = kmsClient;
this.dataKeyLength = dataKeyLength;
this.nativeDataEncryption = nativeDataEncryption;
}

@Override
Expand All @@ -67,7 +72,15 @@ public InputFile decrypt(EncryptedInputFile encrypted) {
@Override
public Iterable<InputFile> decrypt(Iterable<EncryptedInputFile> encrypted) {
// Bulk decrypt is only applied to data files. Returning source input files for parquet.
return Iterables.transform(encrypted, this::decrypt);
if (nativeDataEncryption) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no way to know the intended file format or whether it will use native encryption at this point. I think this needs to always return the result of decrypt.

return Iterables.transform(encrypted, this::getSourceFile);
} else {
return Iterables.transform(encrypted, this::decrypt);
}
}

private InputFile getSourceFile(EncryptedInputFile encryptedFile) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: Iceberg method names should not include get. Instead, use a more helpful verb like create or find, or simply leave it out.

return encryptedFile.encryptedInputFile();
}

private SecureRandom workerRNG() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;

class StandardKeyMetadata implements EncryptionKeyMetadata, IndexedRecord {
public class StandardKeyMetadata implements EncryptionKeyMetadata, IndexedRecord {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't make this class public because it implements an Avro interface. If it needs to be public, we will have to extract StandardKeyMetadata as an interface.

private static final byte V1 = 1;
private static final Schema SCHEMA_V1 =
new Schema(
Expand Down Expand Up @@ -73,11 +73,11 @@ static Map<Byte, org.apache.avro.Schema> supportedAvroSchemaVersions() {
return avroSchemaVersions;
}

ByteBuffer encryptionKey() {
public ByteBuffer encryptionKey() {
return encryptionKey;
}

ByteBuffer aadPrefix() {
public ByteBuffer aadPrefix() {
return aadPrefix;
}

Expand All @@ -95,7 +95,7 @@ static StandardKeyMetadata castOrParse(EncryptionKeyMetadata keyMetadata) {
return parse(kmBuffer);
}

static StandardKeyMetadata parse(ByteBuffer buffer) {
public static StandardKeyMetadata parse(ByteBuffer buffer) {
try {
return KEY_METADATA_DECODER.decode(buffer);
} catch (IOException e) {
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public interface FileAppenderFactory<T> {
*/
FileAppender<T> newAppender(OutputFile outputFile, FileFormat fileFormat);

/**
* Create a new {@link FileAppender}.
*
* @param outputFile an EncryptedOutputFile used to create an output stream.
* @param fileFormat File format.
* @return a newly created {@link FileAppender}
*/
default FileAppender<T> newAppender(EncryptedOutputFile outputFile, FileFormat fileFormat) {
return newAppender(outputFile.encryptingOutputFile(), fileFormat);
}

/**
* Create a new {@link DataWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;

Expand Down Expand Up @@ -93,7 +92,6 @@ protected BaseFileWriterFactory(
@Override
public DataWriter<T> newDataWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Expand All @@ -102,7 +100,7 @@ public DataWriter<T> newDataWriter(
switch (dataFileFormat) {
case AVRO:
Avro.DataWriteBuilder avroBuilder =
Avro.writeData(outputFile)
Avro.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand All @@ -118,7 +116,7 @@ public DataWriter<T> newDataWriter(

case PARQUET:
Parquet.DataWriteBuilder parquetBuilder =
Parquet.writeData(outputFile)
Parquet.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand All @@ -134,7 +132,7 @@ public DataWriter<T> newDataWriter(

case ORC:
ORC.DataWriteBuilder orcBuilder =
ORC.writeData(outputFile)
ORC.writeData(file)
.schema(dataSchema)
.setAll(properties)
.metricsConfig(metricsConfig)
Expand All @@ -160,7 +158,6 @@ public DataWriter<T> newDataWriter(
@Override
public EqualityDeleteWriter<T> newEqualityDeleteWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Expand All @@ -169,7 +166,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
switch (deleteFileFormat) {
case AVRO:
Avro.DeleteWriteBuilder avroBuilder =
Avro.writeDeletes(outputFile)
Avro.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
Expand All @@ -186,7 +183,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(

case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(outputFile)
Parquet.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
Expand All @@ -203,7 +200,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(

case ORC:
ORC.DeleteWriteBuilder orcBuilder =
ORC.writeDeletes(outputFile)
ORC.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
Expand All @@ -230,7 +227,6 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
@Override
public PositionDeleteWriter<T> newPositionDeleteWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table);
Expand All @@ -239,7 +235,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(
switch (deleteFileFormat) {
case AVRO:
Avro.DeleteWriteBuilder avroBuilder =
Avro.writeDeletes(outputFile)
Avro.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
Expand All @@ -254,7 +250,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(

case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(outputFile)
Parquet.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
Expand All @@ -269,7 +265,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(

case ORC:
ORC.DeleteWriteBuilder orcBuilder =
ORC.writeDeletes(outputFile)
ORC.writeDeletes(file)
.setAll(properties)
.metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
Expand Down
7 changes: 7 additions & 0 deletions data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.encryption.StandardKeyMetadata;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -281,6 +282,12 @@ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema dele
builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath));
}

if (deleteFile.keyMetadata() != null) {
StandardKeyMetadata keyMetadata = StandardKeyMetadata.parse(deleteFile.keyMetadata());
builder.withFileEncryptionKey(keyMetadata.encryptionKey());
builder.withAADPrefix(keyMetadata.aadPrefix());
}

return builder.build();

case ORC:
Expand Down
Loading