Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/src/main/sphinx/connector/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,43 @@ limitations and differences:
- `GRANT privilege ON SCHEMA schema` is not supported. Schema ownership can be
changed with `ALTER SCHEMA schema SET AUTHORIZATION user`

(hive-parquet-encryption)=
## Parquet encryption

The Hive connector supports reading Parquet files encrypted with Parquet
Modular Encryption (PME). Decryption keys can be provided via environment
variables. Writing encrypted Parquet files is not supported.

:::{list-table} Parquet encryption properties
:widths: 35, 50, 15
:header-rows: 1

* - Property name
- Description
- Default
* - `pme.environment-key-retriever.enabled`
- Enable the key retriever that reads decryption keys from
environment variables.
- `false`
* - `pme.aad-prefix`
- AAD prefix used when decoding Parquet files. Must match the prefix used
when the files were written, if applicable.
-
* - `pme.check-footer-integrity`
- Validate signature for plaintext footer files.
- `true`
:::

When `pme.environment-key-retriever.enabled` is set, provide keys with
environment variables:

- `pme.environment-key-retriever.footer-keys`
- `pme.environment-key-retriever.column-keys`

Each variable accepts either a single base64-encoded key, or a comma-separated
list of `id:key` pairs (base64-encoded keys) where `id` must match the key
metadata embedded in the Parquet file.

(hive-sql-support)=
## SQL support

Expand Down
12 changes: 12 additions & 0 deletions lib/trino-parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
Expand Down Expand Up @@ -193,6 +199,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.BasicSliceInput;
import io.airlift.slice.Slice;
import io.trino.parquet.crypto.AesCipherUtils;
import io.trino.parquet.crypto.ColumnDecryptionContext;
import io.trino.parquet.crypto.FileDecryptionContext;
import io.trino.parquet.crypto.ModuleType;
import io.trino.parquet.metadata.BlockMetadata;
import io.trino.parquet.metadata.ColumnChunkMetadata;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
Expand Down Expand Up @@ -48,21 +53,27 @@ public class BloomFilterStore

private final ParquetDataSource dataSource;
private final Map<ColumnPath, Long> bloomFilterOffsets;
private final Map<ColumnPath, ColumnChunkMetadata> columnChunks;
private final Optional<FileDecryptionContext> decryptionContext;

public BloomFilterStore(ParquetDataSource dataSource, BlockMetadata block, Set<ColumnPath> columnsFiltered)
public BloomFilterStore(ParquetDataSource dataSource, BlockMetadata block, Set<ColumnPath> columnsFiltered, Optional<FileDecryptionContext> decryptionContext)
{
this.dataSource = requireNonNull(dataSource, "dataSource is null");
requireNonNull(block, "block is null");
requireNonNull(columnsFiltered, "columnsFiltered is null");
this.decryptionContext = requireNonNull(decryptionContext, "decryptionContext is null");

ImmutableMap.Builder<ColumnPath, Long> bloomFilterOffsetBuilder = ImmutableMap.builder();
ImmutableMap.Builder<ColumnPath, ColumnChunkMetadata> chunkBuilder = ImmutableMap.builder();
for (ColumnChunkMetadata column : block.columns()) {
ColumnPath path = column.getPath();
if (hasBloomFilter(column) && columnsFiltered.contains(path)) {
bloomFilterOffsetBuilder.put(path, column.getBloomFilterOffset());
chunkBuilder.put(path, column);
}
}
this.bloomFilterOffsets = bloomFilterOffsetBuilder.buildOrThrow();
this.columnChunks = chunkBuilder.buildOrThrow();
}

public Optional<BloomFilter> getBloomFilter(ColumnPath columnPath)
Expand All @@ -74,9 +85,24 @@ public Optional<BloomFilter> getBloomFilter(ColumnPath columnPath)
if (columnBloomFilterOffset == null) {
return Optional.empty();
}
BasicSliceInput headerSliceInput = dataSource.readFully(columnBloomFilterOffset, MAX_HEADER_LENGTH).getInput();
bloomFilterHeader = Util.readBloomFilterHeader(headerSliceInput);
bloomFilterDataOffset = columnBloomFilterOffset + headerSliceInput.position();
// If the column is encrypted, decrypt the header using the metadata decryptor
Optional<ColumnDecryptionContext> columnContext = decryptionContext.flatMap(context -> context.getColumnDecryptionContext(columnPath));
if (columnContext.isPresent()) {
// Read encrypted header module: SIZE(4) + NONCE + CIPHERTEXT + TAG
int encryptedSize = BytesUtils.readIntLittleEndian(dataSource.readFully(columnBloomFilterOffset, AesCipherUtils.SIZE_LENGTH).getBytes(), 0);
Slice module = dataSource.readFully(columnBloomFilterOffset, AesCipherUtils.SIZE_LENGTH + encryptedSize);
BasicSliceInput in = module.getInput();
ColumnChunkMetadata chunk = requireNonNull(columnChunks.get(columnPath), "missing chunk metadata");
byte[] aad = AesCipherUtils.createModuleAAD(columnContext.get().fileAad(), ModuleType.BloomFilterHeader, chunk.getRowGroupOrdinal(), chunk.getColumnOrdinal(), -1);
bloomFilterHeader = Util.readBloomFilterHeader(in, columnContext.get().metadataDecryptor(), aad);
// after read, position() == 4 + encrypted data length
bloomFilterDataOffset = columnBloomFilterOffset + in.position();
}
else {
BasicSliceInput headerSliceInput = dataSource.readFully(columnBloomFilterOffset, MAX_HEADER_LENGTH).getInput();
bloomFilterHeader = Util.readBloomFilterHeader(headerSliceInput);
bloomFilterDataOffset = columnBloomFilterOffset + headerSliceInput.position();
}
}
catch (IOException exception) {
throw new UncheckedIOException("Failed to read Bloom filter header", exception);
Expand All @@ -87,9 +113,23 @@ public Optional<BloomFilter> getBloomFilter(ColumnPath columnPath)
}

try {
Slice bloomFilterData = dataSource.readFully(bloomFilterDataOffset, bloomFilterHeader.getNumBytes());
verify(bloomFilterData.length() > 0, "Read empty bloom filter %s", bloomFilterHeader);
return Optional.of(new BlockSplitBloomFilter(bloomFilterData.getBytes()));
Optional<ColumnDecryptionContext> columnContext = decryptionContext.flatMap(context -> context.getColumnDecryptionContext(columnPath));
if (columnContext.isPresent()) {
// Read the whole bitset module: SIZE + NONCE + CIPHERTEXT + TAG
int encryptedSize = BytesUtils.readIntLittleEndian(dataSource.readFully(bloomFilterDataOffset, AesCipherUtils.SIZE_LENGTH).getBytes(), 0);
Slice module = dataSource.readFully(bloomFilterDataOffset, AesCipherUtils.SIZE_LENGTH + encryptedSize);
ColumnChunkMetadata chunk = requireNonNull(columnChunks.get(columnPath), "missing chunk metadata");
byte[] aad = AesCipherUtils.createModuleAAD(columnContext.get().fileAad(), ModuleType.BloomFilterBitset, chunk.getRowGroupOrdinal(), chunk.getColumnOrdinal(), -1);
byte[] plain = columnContext.get().metadataDecryptor().decrypt(module.getBytes(), aad);
verify(plain.length == bloomFilterHeader.getNumBytes(), "Decrypted bloom filter length mismatch: expected %s, got %s", bloomFilterHeader.getNumBytes(), plain.length);
return Optional.of(new BlockSplitBloomFilter(plain));
}
else {
// Plaintext bitset
Slice bloomFilterData = dataSource.readFully(bloomFilterDataOffset, bloomFilterHeader.getNumBytes());
verify(bloomFilterData.length() > 0, "Read empty bloom filter %s", bloomFilterHeader);
return Optional.of(new BlockSplitBloomFilter(bloomFilterData.getBytes()));
}
}
catch (IOException exception) {
throw new UncheckedIOException("Failed to read Bloom filter data", exception);
Expand All @@ -100,7 +140,8 @@ public static Optional<BloomFilterStore> getBloomFilterStore(
ParquetDataSource dataSource,
BlockMetadata blockMetadata,
TupleDomain<ColumnDescriptor> parquetTupleDomain,
ParquetReaderOptions options)
ParquetReaderOptions options,
Optional<FileDecryptionContext> decryptionContext)
{
if (!options.useBloomFilter() || parquetTupleDomain.isAll() || parquetTupleDomain.isNone()) {
return Optional.empty();
Expand All @@ -117,7 +158,7 @@ public static Optional<BloomFilterStore> getBloomFilterStore(
.map(column -> ColumnPath.get(column.getPath()))
.collect(toImmutableSet());

return Optional.of(new BloomFilterStore(dataSource, blockMetadata, columnsFilteredPaths));
return Optional.of(new BloomFilterStore(dataSource, blockMetadata, columnsFilteredPaths, decryptionContext));
}

public static boolean hasBloomFilter(ColumnChunkMetadata columnMetaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ public abstract sealed class DataPage
{
protected final int valueCount;
private final OptionalLong firstRowIndex;
private final int pageIndex;

public DataPage(int uncompressedSize, int valueCount, OptionalLong firstRowIndex)
public DataPage(int uncompressedSize, int valueCount, OptionalLong firstRowIndex, int pageIndex)
{
super(uncompressedSize);
this.valueCount = valueCount;
this.firstRowIndex = firstRowIndex;
this.pageIndex = pageIndex;
}

/**
Expand All @@ -41,4 +43,9 @@ public int getValueCount()
{
return valueCount;
}

public int getPageIndex()
{
return pageIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ public DataPageV1(
OptionalLong firstRowIndex,
ParquetEncoding repetitionLevelEncoding,
ParquetEncoding definitionLevelEncoding,
ParquetEncoding valuesEncoding)
ParquetEncoding valuesEncoding,
int pageIndex)
{
super(uncompressedSize, valueCount, firstRowIndex);
super(uncompressedSize, valueCount, firstRowIndex, pageIndex);
this.slice = requireNonNull(slice, "slice is null");
this.repetitionLevelEncoding = repetitionLevelEncoding;
this.definitionLevelEncoding = definitionLevelEncoding;
this.valuesEncoding = valuesEncoding;
}

@Override
public Slice getSlice()
{
return slice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ public DataPageV2(
int uncompressedSize,
OptionalLong firstRowIndex,
Statistics<?> statistics,
boolean isCompressed)
boolean isCompressed,
int pageIndex)
{
super(uncompressedSize, valueCount, firstRowIndex);
super(uncompressedSize, valueCount, firstRowIndex, pageIndex);
this.rowCount = rowCount;
this.nullCount = nullCount;
this.repetitionLevels = requireNonNull(repetitionLevels, "repetitionLevels slice is null");
Expand Down Expand Up @@ -82,6 +83,7 @@ public ParquetEncoding getDataEncoding()
return dataEncoding;
}

@Override
public Slice getSlice()
{
return slice;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public DictionaryPage(Slice slice, int uncompressedSize, int dictionarySize, Par
encoding);
}

@Override
public Slice getSlice()
{
return slice;
Expand Down
4 changes: 4 additions & 0 deletions lib/trino-parquet/src/main/java/io/trino/parquet/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.parquet;

import io.airlift.slice.Slice;

public abstract class Page
{
protected final int uncompressedSize;
Expand All @@ -26,4 +28,6 @@ public int getUncompressedSize()
{
return uncompressedSize;
}

public abstract Slice getSlice();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.parquet;

import com.google.errorprone.annotations.FormatMethod;
import io.trino.parquet.crypto.ParquetCryptoException;

public final class ParquetValidationUtils
{
Expand All @@ -27,4 +28,12 @@ public static void validateParquet(boolean condition, ParquetDataSourceId dataSo
throw new ParquetCorruptionException(dataSourceId, formatString, args);
}
}

@FormatMethod
public static void validateParquetCrypto(boolean condition, ParquetDataSourceId dataSourceId, String formatString, Object... args)
{
if (!condition) {
throw new ParquetCryptoException(dataSourceId, formatString, args);
}
}
}
Loading