io.trino
trino-parser
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java
index 46c4affef94a..30c454f9517b 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java
@@ -14,14 +14,15 @@
package io.trino.plugin.hive;
import com.google.inject.Binder;
-import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.hive.avro.AvroFileWriterFactory;
import io.trino.plugin.hive.avro.AvroPageSourceFactory;
+import io.trino.plugin.hive.crypto.ParquetEncryptionModule;
import io.trino.plugin.hive.esri.EsriPageSourceFactory;
import io.trino.plugin.hive.fs.CachingDirectoryLister;
import io.trino.plugin.hive.fs.DirectoryLister;
@@ -61,10 +62,10 @@
import static org.weakref.jmx.guice.ExportBinder.newExporter;
public class HiveModule
- implements Module
+ extends AbstractConfigurationAwareModule
{
@Override
- public void configure(Binder binder)
+ public void setup(Binder binder)
{
configBinder(binder).bindConfig(HiveConfig.class);
configBinder(binder).bindConfig(HiveMetastoreConfig.class);
@@ -136,6 +137,7 @@ public void configure(Binder binder)
fileWriterFactoryBinder.addBinding().to(ParquetFileWriterFactory.class).in(Scopes.SINGLETON);
binder.install(new HiveExecutorModule());
+ install(new ParquetEncryptionModule());
}
@Provides
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/crypto/EnvironmentDecryptionKeyRetriever.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/crypto/EnvironmentDecryptionKeyRetriever.java
new file mode 100644
index 000000000000..ce5e0e0d50a3
--- /dev/null
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/crypto/EnvironmentDecryptionKeyRetriever.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hive.crypto;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import io.trino.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+
+/**
+ * Reads keys from two environment variables.
+ *
+ * pme.environment-key-retriever.footer-keys = <single‑key> | id1:key1,id2:key2 …
+ * pme.environment-key-retriever.column-keys = <single‑key> | id1:key1,id2:key2 …
+ *
+ *
+ * - If the value contains ‘:’ we treat it as a map (comma‑separated {@code id:key}).
+ * The {@code id} must match the {@code keyMetadata} supplied by Parquet.
+ * - Otherwise it is a single default key, independent of {@code keyMetadata}.
+ * - Keys are expected to be Base‑64; if decoding fails we fall back to the raw UTF‑8 bytes.
+ *
+ */
+public final class EnvironmentDecryptionKeyRetriever
+ implements DecryptionKeyRetriever
+{
+ private static final String FOOTER_VARIABLE_NAME = "pme.environment-key-retriever.footer-keys";
+ private static final String COLUMN_VARIABLE_NAME = "pme.environment-key-retriever.column-keys";
+
+ private final KeySource footerKeys;
+ private final KeySource columnKeys;
+
+ @Inject
+ public EnvironmentDecryptionKeyRetriever()
+ {
+ this(parseEnvironmentVariable(FOOTER_VARIABLE_NAME), parseEnvironmentVariable(COLUMN_VARIABLE_NAME));
+ }
+
+ @VisibleForTesting
+ EnvironmentDecryptionKeyRetriever(String footerValue, String columnValue)
+ {
+ this(parseValue(footerValue, FOOTER_VARIABLE_NAME), parseValue(columnValue, COLUMN_VARIABLE_NAME));
+ }
+
+ private EnvironmentDecryptionKeyRetriever(KeySource footerKeys, KeySource columnKeys)
+ {
+ this.footerKeys = footerKeys;
+ this.columnKeys = columnKeys;
+ }
+
+ @Override
+ public Optional getColumnKey(ColumnPath columnPath, Optional keyMetadata)
+ {
+ return columnKeys.resolve(keyMetadata);
+ }
+
+ @Override
+ public Optional getFooterKey(Optional keyMetadata)
+ {
+ return footerKeys.resolve(keyMetadata);
+ }
+
+ private static KeySource parseEnvironmentVariable(String variable)
+ {
+ return parseValue(System.getenv(variable), variable);
+ }
+
+ private static KeySource parseValue(String value, String variable)
+ {
+ if (value == null || value.isBlank()) {
+ return KeySource.empty();
+ }
+ if (value.contains(":")) {
+ // map mode
+ ImmutableMap.Builder map = ImmutableMap.builder();
+ for (String entry : value.split("\\s*,\\s*")) {
+ checkArgument(!entry.isBlank(), "Empty entry in %s", variable);
+ if (entry.isBlank()) {
+ continue;
+ }
+ String[] parts = entry.split(":", 2);
+ checkArgument(parts.length == 2, "Malformed entry in %s: %s", variable, entry);
+ map.put(ByteBuffer.wrap(parts[0].getBytes(StandardCharsets.UTF_8)), decodeKey(parts[1]));
+ }
+ return new KeySource(Optional.empty(), map.buildOrThrow());
+ }
+ // single key mode
+ return new KeySource(Optional.of(decodeKey(value)), ImmutableMap.of());
+ }
+
+ private static byte[] decodeKey(String token)
+ {
+ return Base64.getDecoder().decode(token);
+ }
+
+ /**
+ * container for either a single default key or a map keyed by key‑metadata id
+ */
+ private record KeySource(Optional singleKey, Map keyedKeys)
+ {
+ static KeySource empty()
+ {
+ return new KeySource(Optional.empty(), Collections.emptyMap());
+ }
+
+ Optional resolve(Optional keyMetadata)
+ {
+ if (singleKey.isPresent()) {
+ // no map → keyMetadata irrelevant
+ return singleKey;
+ }
+ return keyMetadata.map(bytes -> keyedKeys.get(ByteBuffer.wrap(bytes)));
+ }
+ }
+}
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/crypto/ParquetEncryptionConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/crypto/ParquetEncryptionConfig.java
new file mode 100644
index 000000000000..5126d8daf317
--- /dev/null
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/crypto/ParquetEncryptionConfig.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hive.crypto;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+
+import java.util.Optional;
+
+public class ParquetEncryptionConfig
+{
+ private boolean environmentKeyRetrieverEnabled;
+ private Optional aadPrefix = Optional.empty();
+ private boolean checkFooterIntegrity = true;
+
+ @Config("pme.environment-key-retriever.enabled")
+ @ConfigDescription("Enable the key retriever that retrieves keys from the environment variable")
+ public ParquetEncryptionConfig setEnvironmentKeyRetrieverEnabled(boolean enabled)
+ {
+ this.environmentKeyRetrieverEnabled = enabled;
+ return this;
+ }
+
+ public boolean isEnvironmentKeyRetrieverEnabled()
+ {
+ return environmentKeyRetrieverEnabled;
+ }
+
+ @Config("pme.aad-prefix")
+ @ConfigDescription("AAD prefix used to decode Parquet files")
+ public ParquetEncryptionConfig setAadPrefix(String prefix)
+ {
+ this.aadPrefix = Optional.ofNullable(prefix);
+ return this;
+ }
+
+ public Optional getAadPrefix()
+ {
+ return aadPrefix;
+ }
+
+ @Config("pme.check-footer-integrity")
+ @ConfigDescription("Validate signature for plaintext footer files")
+ public ParquetEncryptionConfig setCheckFooterIntegrity(boolean checkFooterIntegrity)
+ {
+ this.checkFooterIntegrity = checkFooterIntegrity;
+ return this;
+ }
+
+ public boolean isCheckFooterIntegrity()
+ {
+ return checkFooterIntegrity;
+ }
+}
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/crypto/ParquetEncryptionModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/crypto/ParquetEncryptionModule.java
new file mode 100644
index 000000000000..c16bad2c38c5
--- /dev/null
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/crypto/ParquetEncryptionModule.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hive.crypto;
+
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.Multibinder;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
+import io.trino.parquet.crypto.DecryptionKeyRetriever;
+import io.trino.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public class ParquetEncryptionModule
+ extends AbstractConfigurationAwareModule
+{
+ @Override
+ protected void setup(Binder binder)
+ {
+ ParquetEncryptionConfig config = buildConfigObject(ParquetEncryptionConfig.class);
+ Multibinder retrieverBinder =
+ Multibinder.newSetBinder(binder, DecryptionKeyRetriever.class);
+ if (config.isEnvironmentKeyRetrieverEnabled()) {
+ retrieverBinder.addBinding().to(EnvironmentDecryptionKeyRetriever.class).in(Scopes.SINGLETON);
+ }
+ }
+
+ @Provides
+ @Singleton
+ public Optional fileDecryptionProperties(
+ ParquetEncryptionConfig config,
+ Set retrievers)
+ {
+ if (retrievers.isEmpty()) {
+ return Optional.empty();
+ }
+
+ DecryptionKeyRetriever aggregate = new CompositeDecryptionKeyRetriever(List.copyOf(retrievers));
+
+ FileDecryptionProperties.Builder builder = FileDecryptionProperties.builder()
+ .withKeyRetriever(aggregate)
+ .withCheckFooterIntegrity(config.isCheckFooterIntegrity());
+
+ config.getAadPrefix()
+ .map(string -> string.getBytes(StandardCharsets.UTF_8))
+ .ifPresent(builder::withAadPrefix);
+
+ return Optional.of(builder.build());
+ }
+
+ private static class CompositeDecryptionKeyRetriever
+ implements DecryptionKeyRetriever
+ {
+ private final List delegates;
+
+ CompositeDecryptionKeyRetriever(List delegates)
+ {
+ this.delegates = List.copyOf(delegates);
+ }
+
+ @Override
+ public Optional getColumnKey(ColumnPath path, Optional meta)
+ {
+ return delegates.stream()
+ .map(delegate -> delegate.getColumnKey(path, meta))
+ .filter(Optional::isPresent)
+ .findFirst()
+ .orElse(Optional.empty());
+ }
+
+ @Override
+ public Optional getFooterKey(Optional meta)
+ {
+ return delegates.stream()
+ .map(delegate -> delegate.getFooterKey(meta))
+ .filter(Optional::isPresent)
+ .findFirst()
+ .orElse(Optional.empty());
+ }
+ }
+}
diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
index a8df03061c89..33c144f2e12b 100644
--- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
+++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java
@@ -33,6 +33,7 @@
import io.trino.parquet.ParquetDataSourceId;
import io.trino.parquet.ParquetReaderOptions;
import io.trino.parquet.ParquetWriteValidation;
+import io.trino.parquet.crypto.FileDecryptionProperties;
import io.trino.parquet.metadata.FileMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.predicate.TupleDomainParquetPredicate;
@@ -134,6 +135,7 @@ public class ParquetPageSourceFactory
private final TrinoFileSystemFactory fileSystemFactory;
private final FileFormatDataSourceStats stats;
+ private final Optional fileDecryptionProperties;
private final ParquetReaderOptions options;
private final DateTimeZone timeZone;
private final int domainCompactionThreshold;
@@ -142,11 +144,13 @@ public class ParquetPageSourceFactory
public ParquetPageSourceFactory(
TrinoFileSystemFactory fileSystemFactory,
FileFormatDataSourceStats stats,
+ Optional fileDecryptionProperties,
ParquetReaderConfig config,
HiveConfig hiveConfig)
{
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.stats = requireNonNull(stats, "stats is null");
+ this.fileDecryptionProperties = requireNonNull(fileDecryptionProperties, "fileDecryptionProperties is null");
options = config.toParquetReaderOptions();
timeZone = hiveConfig.getParquetDateTimeZone();
domainCompactionThreshold = hiveConfig.getDomainCompactionThreshold();
@@ -201,6 +205,7 @@ public Optional createPageSource(
.withVectorizedDecodingEnabled(isParquetVectorizedDecodingEnabled(session))
.build(),
Optional.empty(),
+ fileDecryptionProperties,
domainCompactionThreshold,
OptionalLong.of(estimatedFileSize)));
}
@@ -219,6 +224,7 @@ public static ConnectorPageSource createPageSource(
FileFormatDataSourceStats stats,
ParquetReaderOptions options,
Optional parquetWriteValidation,
+ Optional fileDecryptionProperties,
int domainCompactionThreshold,
OptionalLong estimatedFileSize)
{
@@ -230,7 +236,7 @@ public static ConnectorPageSource createPageSource(
AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext();
dataSource = createDataSource(inputFile, estimatedFileSize, options, memoryContext, stats);
- ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.of(options.getMaxFooterReadSize()), parquetWriteValidation);
+ ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.of(options.getMaxFooterReadSize()), parquetWriteValidation, fileDecryptionProperties);
FileMetadata fileMetaData = parquetMetadata.getFileMetaData();
fileSchema = fileMetaData.getSchema();
@@ -285,7 +291,8 @@ public static ConnectorPageSource createPageSource(
// We avoid using disjuncts of parquetPredicate for page pruning in ParquetReader as currently column indexes
// are not present in the Parquet files which are read with disjunct predicates.
parquetPredicates.size() == 1 ? Optional.of(parquetPredicates.getFirst()) : Optional.empty(),
- parquetWriteValidation);
+ parquetWriteValidation,
+ parquetMetadata.getDecryptionContext());
return createParquetPageSource(columns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider);
}
catch (Exception e) {
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java
index 2eb36d67a8e5..f319a9932f27 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java
@@ -23,6 +23,7 @@
import io.trino.metastore.Database;
import io.trino.metastore.HiveMetastore;
import io.trino.metastore.HiveMetastoreFactory;
+import io.trino.parquet.crypto.DecryptionKeyRetriever;
import io.trino.plugin.tpcds.TpcdsPlugin;
import io.trino.plugin.tpch.ColumnNaming;
import io.trino.plugin.tpch.DecimalTypeMapping;
@@ -105,6 +106,7 @@ public static class Builder>
private boolean createTpchSchemas = true;
private ColumnNaming tpchColumnNaming = SIMPLIFIED;
private DecimalTypeMapping tpchDecimalTypeMapping = DOUBLE;
+ private Optional decryptionKeyRetriever = Optional.empty();
protected Builder()
{
@@ -196,6 +198,13 @@ public SELF setTpchDecimalTypeMapping(DecimalTypeMapping tpchDecimalTypeMapping)
return self();
}
+ @CanIgnoreReturnValue
+ public SELF setDecryptionKeyRetriever(DecryptionKeyRetriever decryptionKeyRetriever)
+ {
+ this.decryptionKeyRetriever = Optional.of(requireNonNull(decryptionKeyRetriever, "decryptionKeyRetriever is null"));
+ return self();
+ }
+
@Override
public DistributedQueryRunner build()
throws Exception
@@ -227,7 +236,7 @@ public DistributedQueryRunner build()
hiveProperties.put("fs.hadoop.enabled", "true");
}
- queryRunner.installPlugin(new TestingHivePlugin(dataDir, metastore));
+ queryRunner.installPlugin(new TestingHivePlugin(dataDir, metastore, decryptionKeyRetriever));
Map hiveProperties = new HashMap<>();
if (!skipTimezoneSetup) {
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java
index 85a5a62edde9..0b551e13a6a0 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveTestUtils.java
@@ -178,7 +178,7 @@ public static Set getDefaultHivePageSourceFactories(Trino
.add(new AvroPageSourceFactory(fileSystemFactory))
.add(new RcFilePageSourceFactory(fileSystemFactory, hiveConfig))
.add(new OrcPageSourceFactory(new OrcReaderConfig(), fileSystemFactory, stats, hiveConfig))
- .add(new ParquetPageSourceFactory(fileSystemFactory, stats, new ParquetReaderConfig(), hiveConfig))
+ .add(new ParquetPageSourceFactory(fileSystemFactory, stats, Optional.empty(), new ParquetReaderConfig(), hiveConfig))
.add(new ProtobufSequenceFilePageSourceFactory(fileSystemFactory, hiveConfig))
.build();
}
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java
index 255a2c728a6d..8f79bf1acfef 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java
@@ -20,6 +20,7 @@
import io.airlift.units.Duration;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.TimeZone;
@@ -128,6 +129,7 @@ public void testDefaults()
@Test
public void testExplicitPropertyMappings()
+ throws IOException
{
Map properties = ImmutableMap.builder()
.put("hive.single-statement-writes", "true")
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java
index cc7e10059471..0960d6b29fcd 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveFileFormats.java
@@ -554,7 +554,7 @@ public void testParquetPageSource(int rowCount, long fileSizePadding)
.withSession(PARQUET_SESSION)
.withRowsCount(rowCount)
.withFileSizePadding(fileSizePadding)
- .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, new ParquetReaderConfig(), new HiveConfig()));
+ .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, Optional.empty(), new ParquetReaderConfig(), new HiveConfig()));
}
@Test(dataProvider = "validRowAndFileSizePadding")
@@ -568,7 +568,7 @@ public void testParquetPageSourceGzip(int rowCount, long fileSizePadding)
.withCompressionCodec(HiveCompressionCodec.GZIP)
.withFileSizePadding(fileSizePadding)
.withRowsCount(rowCount)
- .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, new ParquetReaderConfig(), new HiveConfig()));
+ .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, Optional.empty(), new ParquetReaderConfig(), new HiveConfig()));
}
@Test(dataProvider = "rowCount")
@@ -583,7 +583,7 @@ public void testParquetWriter(int rowCount)
.withColumns(testColumns)
.withRowsCount(rowCount)
.withFileWriterFactory(fileSystemFactory -> new ParquetFileWriterFactory(fileSystemFactory, new NodeVersion("test-version"), TESTING_TYPE_MANAGER, new HiveConfig(), STATS))
- .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, new ParquetReaderConfig(), new HiveConfig()));
+ .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, Optional.empty(), new ParquetReaderConfig(), new HiveConfig()));
}
@Test(dataProvider = "rowCount")
@@ -601,7 +601,7 @@ public void testParquetPageSourceSchemaEvolution(int rowCount)
.withReadColumns(readColumns)
.withSession(PARQUET_SESSION)
.withRowsCount(rowCount)
- .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, new ParquetReaderConfig(), new HiveConfig()));
+ .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, Optional.empty(), new ParquetReaderConfig(), new HiveConfig()));
// test the name-based access
readColumns = writeColumns.reversed();
@@ -609,7 +609,7 @@ public void testParquetPageSourceSchemaEvolution(int rowCount)
.withWriteColumns(writeColumns)
.withReadColumns(readColumns)
.withSession(PARQUET_SESSION_USE_NAME)
- .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, new ParquetReaderConfig(), new HiveConfig()));
+ .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, Optional.empty(), new ParquetReaderConfig(), new HiveConfig()));
}
@Test(dataProvider = "rowCount")
@@ -627,7 +627,7 @@ public void testParquetCaseSensitivity(int rowCount)
.withSession(getHiveSession(createParquetHiveConfig(true), new ParquetWriterConfig().setValidationPercentage(0)))
.withRowsCount(rowCount)
.withFileWriterFactory(fileSystemFactory -> new ParquetFileWriterFactory(fileSystemFactory, new NodeVersion("test-version"), TESTING_TYPE_MANAGER, new HiveConfig(), STATS))
- .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, new ParquetReaderConfig(), new HiveConfig()));
+ .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, Optional.empty(), new ParquetReaderConfig(), new HiveConfig()));
}
private static List getTestColumnsSupportedByParquet()
@@ -670,7 +670,7 @@ public void testTruncateVarcharColumn()
.withWriteColumns(ImmutableList.of(writeColumn))
.withReadColumns(ImmutableList.of(readColumn))
.withSession(PARQUET_SESSION)
- .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, new ParquetReaderConfig(), new HiveConfig()));
+ .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, Optional.empty(), new ParquetReaderConfig(), new HiveConfig()));
assertThatFileFormat(AVRO)
.withWriteColumns(ImmutableList.of(writeColumn))
@@ -736,14 +736,14 @@ public void testParquetProjectedColumns(int rowCount)
.withReadColumns(readColumns)
.withRowsCount(rowCount)
.withSession(PARQUET_SESSION)
- .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, new ParquetReaderConfig(), new HiveConfig()));
+ .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, Optional.empty(), new ParquetReaderConfig(), new HiveConfig()));
assertThatFileFormat(PARQUET)
.withWriteColumns(writeColumns)
.withReadColumns(readColumns)
.withRowsCount(rowCount)
.withSession(PARQUET_SESSION_USE_NAME)
- .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, new ParquetReaderConfig(), new HiveConfig()));
+ .isReadableByPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, Optional.empty(), new ParquetReaderConfig(), new HiveConfig()));
}
@Test(dataProvider = "rowCount")
@@ -944,7 +944,7 @@ public void testFailForLongVarcharPartitionColumn()
assertThatFileFormat(PARQUET)
.withColumns(columns)
.withSession(PARQUET_SESSION)
- .isFailingForPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, new ParquetReaderConfig(), new HiveConfig()), expectedErrorCode, expectedMessage);
+ .isFailingForPageSource(fileSystemFactory -> new ParquetPageSourceFactory(fileSystemFactory, STATS, Optional.empty(), new ParquetReaderConfig(), new HiveConfig()), expectedErrorCode, expectedMessage);
}
private static void testPageSourceFactory(
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java
index c0e03b111e10..e26a56ed33e7 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHiveConnectorFactory.java
@@ -15,9 +15,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Module;
+import com.google.inject.multibindings.Multibinder;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.local.LocalFileSystemFactory;
import io.trino.metastore.HiveMetastore;
+import io.trino.parquet.crypto.DecryptionKeyRetriever;
import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
@@ -40,11 +42,11 @@ public class TestingHiveConnectorFactory
public TestingHiveConnectorFactory(Path localFileSystemRootPath)
{
- this(localFileSystemRootPath, Optional.empty());
+ this(localFileSystemRootPath, Optional.empty(), Optional.empty());
}
@Deprecated
- public TestingHiveConnectorFactory(Path localFileSystemRootPath, Optional metastore)
+ public TestingHiveConnectorFactory(Path localFileSystemRootPath, Optional metastore, Optional decryptionKeyRetriever)
{
this.metastore = requireNonNull(metastore, "metastore is null");
@@ -53,6 +55,12 @@ public TestingHiveConnectorFactory(Path localFileSystemRootPath, Optional config.setCatalogDirectory("local:///"));
+
+ decryptionKeyRetriever.ifPresent(retriever -> {
+ Multibinder retrieverBinder =
+ Multibinder.newSetBinder(binder, DecryptionKeyRetriever.class);
+ retrieverBinder.addBinding().toInstance(retriever);
+ });
};
}
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java
index 41f492a99405..b472f9355893 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestingHivePlugin.java
@@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableList;
import io.trino.metastore.HiveMetastore;
+import io.trino.parquet.crypto.DecryptionKeyRetriever;
import io.trino.spi.Plugin;
import io.trino.spi.connector.ConnectorFactory;
@@ -28,28 +29,30 @@ public class TestingHivePlugin
{
private final Path localFileSystemRootPath;
private final Optional metastore;
+ private final Optional decryptionKeyRetriever;
public TestingHivePlugin(Path localFileSystemRootPath)
{
- this(localFileSystemRootPath, Optional.empty());
+ this(localFileSystemRootPath, Optional.empty(), Optional.empty());
}
@Deprecated
public TestingHivePlugin(Path localFileSystemRootPath, HiveMetastore metastore)
{
- this(localFileSystemRootPath, Optional.of(metastore));
+ this(localFileSystemRootPath, Optional.of(metastore), Optional.empty());
}
@Deprecated
- public TestingHivePlugin(Path localFileSystemRootPath, Optional metastore)
+ public TestingHivePlugin(Path localFileSystemRootPath, Optional metastore, Optional decryptionKeyRetriever)
{
this.localFileSystemRootPath = requireNonNull(localFileSystemRootPath, "localFileSystemRootPath is null");
this.metastore = requireNonNull(metastore, "metastore is null");
+ this.decryptionKeyRetriever = requireNonNull(decryptionKeyRetriever, "decryptionKeyRetriever is null");
}
@Override
public Iterable getConnectorFactories()
{
- return ImmutableList.of(new TestingHiveConnectorFactory(localFileSystemRootPath, metastore));
+ return ImmutableList.of(new TestingHiveConnectorFactory(localFileSystemRootPath, metastore, decryptionKeyRetriever));
}
}
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/crypto/TestEnvironmentDecryptionKeyRetriever.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/crypto/TestEnvironmentDecryptionKeyRetriever.java
new file mode 100644
index 000000000000..66537a8e478c
--- /dev/null
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/crypto/TestEnvironmentDecryptionKeyRetriever.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hive.crypto;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.testng.annotations.Test;
+
+import java.util.Base64;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public final class TestEnvironmentDecryptionKeyRetriever
+{
+ private static final ColumnPath AGE = ColumnPath.fromDotString("age");
+
+ @Test
+ public void defaultEmpty()
+ {
+ EnvironmentDecryptionKeyRetriever retriever = new EnvironmentDecryptionKeyRetriever(null, null);
+
+ assertThat(retriever.getFooterKey(Optional.empty())).isEmpty();
+ assertThat(retriever.getColumnKey(AGE, Optional.empty())).isEmpty();
+ }
+
+ @Test
+ public void singleKeyMode()
+ {
+ EnvironmentDecryptionKeyRetriever retriever = new EnvironmentDecryptionKeyRetriever(b64("foot"), b64("colKey"));
+
+ assertThat(retriever.getFooterKey(Optional.of("ignored".getBytes(UTF_8)))).contains("foot".getBytes(UTF_8));
+ assertThat(retriever.getColumnKey(AGE, Optional.empty())).contains("colKey".getBytes(UTF_8));
+ }
+
+ @Test
+ public void mapModeUsesMetadata()
+ {
+ // footer: id1→k1 , id2→k2
+ String footerValue = String.join(",",
+ "id1:" + b64("k1"), "id2:" + b64("k2"));
+ // column: meta→ageKey
+ String columnValue = "meta:" + b64("ageKey");
+
+ EnvironmentDecryptionKeyRetriever retriever = new EnvironmentDecryptionKeyRetriever(footerValue, columnValue);
+
+ assertThat(retriever.getFooterKey(Optional.of("id2".getBytes(UTF_8)))).contains("k2".getBytes(UTF_8));
+ assertThat(retriever.getColumnKey(AGE, Optional.of("meta".getBytes(UTF_8)))).contains("ageKey".getBytes(UTF_8));
+
+ // unknown metadata → empty
+ assertThat(retriever.getFooterKey(Optional.of("zzz".getBytes(UTF_8)))).isEmpty();
+ assertThat(retriever.getColumnKey(AGE, Optional.empty())).isEmpty();
+ }
+
+ private static String b64(String string)
+ {
+ return Base64.getEncoder().encodeToString(string.getBytes(UTF_8));
+ }
+}
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/crypto/TestHiveParquetEncryption.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/crypto/TestHiveParquetEncryption.java
new file mode 100644
index 000000000000..a90e652285eb
--- /dev/null
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/crypto/TestHiveParquetEncryption.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hive.crypto;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.filesystem.Location;
+import io.trino.parquet.ParquetDataSource;
+import io.trino.parquet.ParquetReaderOptions;
+import io.trino.parquet.crypto.DecryptionKeyRetriever;
+import io.trino.parquet.crypto.FileDecryptionProperties;
+import io.trino.parquet.metadata.ColumnChunkMetadata;
+import io.trino.parquet.metadata.ParquetMetadata;
+import io.trino.parquet.reader.FileParquetDataSource;
+import io.trino.parquet.reader.MetadataReader;
+import io.trino.plugin.hive.HiveQueryRunner;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.DistributedQueryRunner;
+import io.trino.testing.MaterializedResult;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import java.nio.file.Files;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
+
+/**
+ * End‑to‑end PME flow: Parquet writer → Hive connector → Trino query.
+ */
+// ExampleParquetWriter is not thread-safe
+@TestInstance(PER_CLASS)
+@Execution(ExecutionMode.SAME_THREAD)
+public class TestHiveParquetEncryption
+ extends AbstractTestQueryFramework
+{
+ private static final byte[] FOOTER_KEY = "footKeyIs16Byte?".getBytes(UTF_8);
+
+ // Keys per column (different on purpose)
+ private static final byte[] COLUMN_KEY_AGE = "colKeyIs16ByteA?".getBytes(UTF_8);
+ private static final byte[] COLUMN_KEY_ID = "colKeyIs16ByteB?".getBytes(UTF_8);
+
+ private static final ColumnPath AGE_PATH = ColumnPath.fromDotString("age");
+ private static final ColumnPath ID_PATH = ColumnPath.fromDotString("id");
+
+ /**
+ * kept so we can reference the warehouse path later
+ */
+ private java.nio.file.Path warehouseDir;
+
+ @Override
+ protected DistributedQueryRunner createQueryRunner()
+ throws Exception
+ {
+ warehouseDir = Files.createTempDirectory("pme_hive");
+
+ Map properties = ImmutableMap.builder()
+ .put("hive.metastore", "file")
+ .put("hive.metastore.catalog.dir", warehouseDir.toUri().toString())
+ .put("pme.environment-key-retriever.enabled", "false")
+ .buildOrThrow();
+
+ // Bind retriever that knows ONLY the footer key + age column key
+ return HiveQueryRunner.builder()
+ .setHiveProperties(properties)
+ .setDecryptionKeyRetriever(new TestingParquetEncryptionModule(FOOTER_KEY, Optional.of(COLUMN_KEY_AGE), Optional.empty()))
+ .build();
+ }
+
+ @Test
+ public void testEncryptedParquetRead()
+ throws Exception
+ {
+ // 1) write encrypted file inside warehouse
+ java.nio.file.Path dataDir = Files.createDirectories(warehouseDir.resolve("pme_data"));
+ java.nio.file.Path parquetFile = dataDir.resolve("data.parquet");
+ writeEncryptedFile(parquetFile);
+
+ // 2) create external table
+ String location = Location.of(String.valueOf(dataDir.toUri())).toString();
+ assertUpdate("""
+ CREATE TABLE enc_age(age INT)
+ WITH (external_location = '%s', format = 'PARQUET')
+ """.formatted(location));
+
+ // 3) verify results
+ MaterializedResult result = computeActual("SELECT COUNT(*), MIN(age), MAX(age) FROM enc_age");
+ assertThat(result.getMaterializedRows().get(0).getField(0)).isEqualTo(100L); // count
+ assertThat(result.getMaterializedRows().get(0).getField(1)).isEqualTo(0); // min
+ assertThat(result.getMaterializedRows().get(0).getField(2)).isEqualTo(99); // max
+ }
+
+ @Test
+ public void testTwoColumnFileOnlyAgeKeyProvided()
+ throws Exception
+ {
+ // 1) write a two-column file (both columns encrypted with different keys)
+ java.nio.file.Path dataDir = Files.createDirectories(warehouseDir.resolve("pme_two_cols"));
+ java.nio.file.Path parquetFile = dataDir.resolve("data.parquet");
+ writeTwoColumnEncryptedFile(parquetFile);
+
+ // 2) create external table with both columns
+ String location = Location.of(String.valueOf(dataDir.toUri())).toString();
+ assertUpdate("""
+ CREATE TABLE enc_two(id INT, age INT)
+ WITH (external_location = '%s', format = 'PARQUET')
+ """.formatted(location));
+
+ // 3) Selecting ONLY the accessible column (age) must succeed
+ MaterializedResult ok = computeActual("SELECT COUNT(*), MIN(age), MAX(age) FROM enc_two");
+ assertThat(ok.getMaterializedRows().get(0).getField(0)).isEqualTo(100L);
+ assertThat(ok.getMaterializedRows().get(0).getField(1)).isEqualTo(0);
+ assertThat(ok.getMaterializedRows().get(0).getField(2)).isEqualTo(99);
+
+ // 4) Selecting the inaccessible column (id) must fail (no column key)
+ // We match on a broad error text to avoid coupling to a specific message.
+ assertQueryFails("SELECT MIN(id) FROM enc_two", "(?s).*User does not have access to column.*");
+ }
+
+ @Test
+ public void testEncryptedDictionaryPruningTwoColumns()
+ throws Exception
+ {
+ // 1) write an encrypted, dictionary-encoded two-column file
+ // Values are 0..10 except the “missing” ones; RG min/max is [0,10] so min/max alone can’t prune.
+ java.nio.file.Path dataDir = Files.createDirectories(warehouseDir.resolve("pme_dict2_enc"));
+ java.nio.file.Path parquetFile = dataDir.resolve("data.parquet");
+ int missingAge = 7;
+ int missingId = 3;
+ writeTwoColumnEncryptedDictionaryFile(parquetFile, missingAge, missingId);
+
+ // 2) create external table
+ String location = Location.of(String.valueOf(dataDir.toUri())).toString();
+ assertUpdate("""
+ CREATE TABLE enc_dict2(id INT, age INT)
+ WITH (external_location = '%s', format = 'PARQUET')
+ """.formatted(location));
+
+ // 3) Predicate on the accessible column (age) – dictionary (encrypted) gets read & prunes to zero
+ assertThat(computeActual("SELECT count(*) FROM enc_dict2 WHERE age = " + missingAge).getOnlyValue())
+ .isEqualTo(0L);
+
+ // sanity: present value on age returns rows
+ assertThat(((Number) computeActual("SELECT count(*) FROM enc_dict2 WHERE age = 5").getOnlyValue()).longValue())
+ .isGreaterThan(0L);
+
+ // 4) Predicate on the inaccessible column (id) – should fail (no column key)
+ assertQueryFails("SELECT count(*) FROM enc_dict2 WHERE id = " + missingId, "(?s).*access.*column.*id.*");
+
+ assertQuerySucceeds("DROP TABLE enc_dict2");
+ }
+
+ // ───────────────────────── writer ─────────────────────────
+ private static void writeEncryptedFile(java.nio.file.Path path)
+ throws Exception
+ {
+ var schema = MessageTypeParser.parseMessageType(
+ "message doc { required int32 age; }");
+
+ // This test purposely reuses one demo key.
+ // With Parquet Modular Encryption (AES-GCM/CTR), reusing a key for lots of data
+ // weakens security.
+ ColumnEncryptionProperties columnProperties = ColumnEncryptionProperties.builder(AGE_PATH)
+ .withKey(COLUMN_KEY_AGE)
+ .build();
+
+ FileEncryptionProperties encodingProperties = FileEncryptionProperties.builder(FOOTER_KEY)
+ .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+ .withEncryptedColumns(Map.of(AGE_PATH, columnProperties))
+ .build();
+
+ try (ParquetWriter writer = ExampleParquetWriter.builder(new org.apache.hadoop.fs.Path(path.toString()))
+ .withType(schema)
+ .withConf(new Configuration())
+ .withEncryption(encodingProperties)
+ .withWriteMode(OVERWRITE)
+ .build()) {
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ for (int i = 0; i < 100; i++) {
+ writer.write(factory.newGroup().append("age", i));
+ }
+ }
+ }
+
+ private static void writeTwoColumnEncryptedFile(java.nio.file.Path path)
+ throws Exception
+ {
+ var schema = MessageTypeParser.parseMessageType("message doc { required int32 age; required int32 id; }");
+
+ ColumnEncryptionProperties idProperties = ColumnEncryptionProperties.builder(ID_PATH).withKey(COLUMN_KEY_ID).build();
+ ColumnEncryptionProperties ageProperties = ColumnEncryptionProperties.builder(AGE_PATH).withKey(COLUMN_KEY_AGE).build();
+
+ FileEncryptionProperties encodingProperties = FileEncryptionProperties.builder(FOOTER_KEY)
+ .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+ .withEncryptedColumns(Map.of(AGE_PATH, ageProperties, ID_PATH, idProperties))
+ .build();
+
+ try (ParquetWriter writer = ExampleParquetWriter.builder(new org.apache.hadoop.fs.Path(path.toString()))
+ .withType(schema)
+ .withConf(new Configuration())
+ .withEncryption(encodingProperties)
+ .withWriteMode(OVERWRITE)
+ .build()) {
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ for (int i = 0; i < 100; i++) {
+ writer.write(factory.newGroup().append("id", 100 - i).append("age", i));
+ }
+ }
+ }
+
+ /**
+ * Two-column writer:
+ * - both columns encrypted (different keys),
+ * - tiny page size to encourage dictionary encoding,
+ * - each column skips one value in 0..10 so dictionary-based pruning can eliminate the row-group.
+ * Reader in this test only has the AGE key (ID key is absent).
+ */
+ private static void writeTwoColumnEncryptedDictionaryFile(java.nio.file.Path path, int missingAge, int missingId)
+ throws Exception
+ {
+ var schema = MessageTypeParser.parseMessageType("message doc { required int32 age; required int32 id; }");
+
+ ColumnEncryptionProperties idProperties = ColumnEncryptionProperties.builder(ID_PATH).withKey(COLUMN_KEY_ID).build();
+ ColumnEncryptionProperties ageProperties = ColumnEncryptionProperties.builder(AGE_PATH).withKey(COLUMN_KEY_AGE).build();
+
+ FileEncryptionProperties encodingProperties = FileEncryptionProperties.builder(FOOTER_KEY)
+ .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+ .withEncryptedColumns(Map.of(AGE_PATH, ageProperties, ID_PATH, idProperties))
+ .build();
+
+ try (ParquetWriter writer = ExampleParquetWriter.builder(new org.apache.hadoop.fs.Path(path.toString()))
+ .withType(schema)
+ .withConf(new Configuration())
+ .withEncryption(encodingProperties)
+ .withWriteMode(OVERWRITE)
+ .withPageSize(1024) // small pages -> dictionary likely
+ .build()) {
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ for (int i = 0; i < 5000; i++) {
+ int id = i % 11;
+ if (id == missingId) {
+ id = (id + 1) % 11; // skip one value for id
+ }
+ int age = i % 11;
+ if (age == missingAge) {
+ age = (age + 1) % 11; // skip one value for age
+ }
+ writer.write(factory.newGroup().append("id", id).append("age", age));
+ }
+ }
+
+ // Verify both columns actually have dictionary pages
+ try (ParquetDataSource source = new FileParquetDataSource(path.toFile(), ParquetReaderOptions.defaultOptions())) {
+ FileDecryptionProperties dec = FileDecryptionProperties.builder()
+ .withKeyRetriever(new TestHiveParquetEncryption.TestingParquetEncryptionModule(
+ FOOTER_KEY, Optional.of(COLUMN_KEY_AGE), Optional.of(COLUMN_KEY_ID)))
+ .build();
+ ParquetMetadata metadata = MetadataReader.readFooter(source, Optional.empty(), Optional.empty(), Optional.of(dec));
+
+ ColumnChunkMetadata ageChunk = metadata.getBlocks().getFirst().columns().stream()
+ .filter(column -> column.getPath().equals(AGE_PATH))
+ .findFirst().orElseThrow();
+ ColumnChunkMetadata idChunk = metadata.getBlocks().getFirst().columns().stream()
+ .filter(column -> column.getPath().equals(ID_PATH))
+ .findFirst().orElseThrow();
+
+ assertThat(ageChunk.getDictionaryPageOffset()).isGreaterThan(0);
+ assertThat(idChunk.getDictionaryPageOffset()).isGreaterThan(0);
+ assertThat(ageChunk.getEncodings()).anyMatch(org.apache.parquet.column.Encoding::usesDictionary);
+ assertThat(idChunk.getEncodings()).anyMatch(org.apache.parquet.column.Encoding::usesDictionary);
+ }
+ }
+
+ public static class TestingParquetEncryptionModule
+ implements DecryptionKeyRetriever
+ {
+ private final byte[] footerKey;
+ private final Optional ageKey;
+ private final Optional idKey;
+
+ public TestingParquetEncryptionModule(byte[] footerKey, Optional ageKey, Optional idKey)
+ {
+ this.footerKey = requireNonNull(footerKey, "footerKey is null");
+ this.ageKey = requireNonNull(ageKey, "ageKey is null");
+ this.idKey = requireNonNull(idKey, "idKey is null");
+ }
+
+ @Override
+ public Optional getColumnKey(ColumnPath columnPath, Optional keyMetadata)
+ {
+ String path = columnPath.toDotString();
+ if ("age".equals(path)) {
+ return ageKey;
+ }
+ if ("id".equals(path)) {
+ return idKey;
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional getFooterKey(Optional keyMetadata)
+ {
+ return Optional.of(footerKey);
+ }
+ }
+}
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/crypto/TestParquetEncryptionConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/crypto/TestParquetEncryptionConfig.java
new file mode 100644
index 000000000000..641863e261d7
--- /dev/null
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/crypto/TestParquetEncryptionConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hive.crypto;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
+import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
+import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
+
+public class TestParquetEncryptionConfig
+{
+ @Test
+ public void testDefaults()
+ {
+ assertRecordedDefaults(recordDefaults(ParquetEncryptionConfig.class)
+ .setEnvironmentKeyRetrieverEnabled(false)
+ .setAadPrefix(null)
+ .setCheckFooterIntegrity(true));
+ }
+
+ @Test
+ public void testExplicitPropertyMappings()
+ {
+ Map properties = ImmutableMap.builder()
+ .put("pme.environment-key-retriever.enabled", "true")
+ .put("pme.aad-prefix", "tenant‑42")
+ .put("pme.check-footer-integrity", "false")
+ .buildOrThrow();
+
+ ParquetEncryptionConfig expected = new ParquetEncryptionConfig()
+ .setEnvironmentKeyRetrieverEnabled(true)
+ .setAadPrefix("tenant‑42")
+ .setCheckFooterIntegrity(false);
+
+ assertFullMapping(properties, expected);
+ }
+}
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetUtil.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetUtil.java
index d93a46589c29..e4ad205c98ac 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetUtil.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/ParquetUtil.java
@@ -80,6 +80,7 @@ private static ConnectorPageSource createPageSource(ConnectorSession session, Fi
HivePageSourceFactory hivePageSourceFactory = new ParquetPageSourceFactory(
fileSystemFactory,
new FileFormatDataSourceStats(),
+ Optional.empty(),
new ParquetReaderConfig(),
hiveConfig);
diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java
index 0f5795aa03c5..fe049c0acfd0 100644
--- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java
+++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java
@@ -308,10 +308,10 @@ private static BloomFilterStore generateBloomFilterStore(ParquetTester.TempFile
TrinoInputFile inputFile = new LocalInputFile(tempFile.getFile());
TrinoParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, ParquetReaderOptions.defaultOptions(), new FileFormatDataSourceStats());
- ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource);
+ ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
ColumnChunkMetadata columnChunkMetaData = getOnlyElement(getOnlyElement(parquetMetadata.getBlocks()).columns());
- return new BloomFilterStore(dataSource, getOnlyElement(parquetMetadata.getBlocks()), Set.of(columnChunkMetaData.getPath()));
+ return new BloomFilterStore(dataSource, getOnlyElement(parquetMetadata.getBlocks()), Set.of(columnChunkMetaData.getPath()), Optional.empty());
}
private static class BloomFilterTypeTestCase
diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java
index 532b0568fdd2..8851add79312 100644
--- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java
+++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java
@@ -230,7 +230,7 @@ private static ConnectorPageSource createPageSource(
try {
AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext();
dataSource = createDataSource(inputFile, OptionalLong.of(hudiSplit.fileSize()), options, memoryContext, dataSourceStats);
- ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, options.getMaxFooterReadSize());
+ ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, options.getMaxFooterReadSize(), Optional.empty());
FileMetadata fileMetaData = parquetMetadata.getFileMetaData();
MessageType fileSchema = fileMetaData.getSchema();
@@ -271,7 +271,8 @@ private static ConnectorPageSource createPageSource(
options,
exception -> handleException(dataSourceId, exception),
Optional.of(parquetPredicate),
- Optional.empty());
+ Optional.empty(),
+ parquetMetadata.getDecryptionContext());
return createParquetPageSource(columns, fileSchema, messageColumn, useColumnNames, parquetReaderProvider);
}
catch (IOException | RuntimeException e) {
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
index aad2add5ec40..c256d6c2c4e8 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java
@@ -916,7 +916,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource(
ParquetDataSource dataSource = null;
try {
dataSource = createDataSource(inputFile, OptionalLong.of(fileSize), options, memoryContext, fileFormatDataSourceStats);
- ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, options.getMaxFooterReadSize());
+ ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, options.getMaxFooterReadSize(), Optional.empty());
FileMetadata fileMetaData = parquetMetadata.getFileMetaData();
MessageType fileSchema = fileMetaData.getSchema();
if (nameMapping.isPresent() && !ParquetSchemaUtil.hasIds(fileSchema)) {
@@ -1027,6 +1027,7 @@ else if (!parquetIdToFieldName.containsKey(column.getBaseColumn().getId())) {
options,
exception -> handleException(dataSourceId, exception),
Optional.empty(),
+ Optional.empty(),
Optional.empty());
ConnectorPageSource pageSource = new ParquetPageSource(parquetReader);
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java
index 1c7ba6ac43e3..b947b6cb179c 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java
@@ -13,9 +13,11 @@
*/
package io.trino.plugin.iceberg;
+import com.google.common.collect.ImmutableList;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.parquet.ParquetDataSourceId;
+import io.trino.parquet.metadata.BlockMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.plugin.hive.parquet.ParquetFileWriter;
@@ -29,6 +31,8 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -36,7 +40,6 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static io.trino.plugin.iceberg.util.ParquetUtil.footerMetrics;
-import static io.trino.plugin.iceberg.util.ParquetUtil.getSplitOffsets;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
@@ -83,7 +86,7 @@ public FileMetrics getFileMetrics()
{
ParquetMetadata parquetMetadata;
try {
- parquetMetadata = new ParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()));
+ parquetMetadata = new ParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()), Optional.empty());
return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata)));
}
catch (IOException | UncheckedIOException e) {
@@ -126,4 +129,16 @@ public long getValidationCpuNanos()
{
return parquetFileWriter.getValidationCpuNanos();
}
+
+ private static List getSplitOffsets(ParquetMetadata metadata)
+ throws IOException
+ {
+ List blocks = metadata.getBlocks();
+ List splitOffsets = new ArrayList<>(blocks.size());
+ for (BlockMetadata blockMetaData : blocks) {
+ splitOffsets.add(blockMetaData.columns().getFirst().getStartingPos());
+ }
+ Collections.sort(splitOffsets);
+ return ImmutableList.copyOf(splitOffsets);
+ }
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java
index 630a5ca4f867..06e9d93b4bba 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java
@@ -154,7 +154,7 @@ private static Metrics parquetMetrics(TrinoInputFile file, MetricsConfig metrics
{
ParquetReaderOptions options = ParquetReaderOptions.defaultOptions();
try (ParquetDataSource dataSource = new TrinoParquetDataSource(file, ParquetReaderOptions.defaultOptions(), new FileFormatDataSourceStats())) {
- ParquetMetadata metadata = MetadataReader.readFooter(dataSource, options.getMaxFooterReadSize());
+ ParquetMetadata metadata = MetadataReader.readFooter(dataSource, options.getMaxFooterReadSize(), Optional.empty());
return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig, nameMapping);
}
catch (IOException e) {
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java
index 98f50940b419..d25f3aaf6365 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java
@@ -14,8 +14,6 @@
package io.trino.plugin.iceberg.util;
-import com.google.common.collect.ImmutableList;
-import io.trino.parquet.ParquetCorruptionException;
import io.trino.parquet.metadata.BlockMetadata;
import io.trino.parquet.metadata.ColumnChunkMetadata;
import io.trino.parquet.metadata.ParquetMetadata;
@@ -41,13 +39,12 @@
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
+import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -70,7 +67,7 @@ public final class ParquetUtil
private ParquetUtil() {}
public static Metrics footerMetrics(ParquetMetadata metadata, Stream> fieldMetrics, MetricsConfig metricsConfig)
- throws ParquetCorruptionException
+ throws IOException
{
return footerMetrics(metadata, fieldMetrics, metricsConfig, null);
}
@@ -80,7 +77,7 @@ public static Metrics footerMetrics(
Stream> fieldMetrics,
MetricsConfig metricsConfig,
NameMapping nameMapping)
- throws ParquetCorruptionException
+ throws IOException
{
requireNonNull(fieldMetrics, "fieldMetrics should not be null");
@@ -158,18 +155,6 @@ public static Metrics footerMetrics(
toBufferMap(fileSchema, upperBounds));
}
- public static List getSplitOffsets(ParquetMetadata metadata)
- throws ParquetCorruptionException
- {
- List blocks = metadata.getBlocks();
- List splitOffsets = new ArrayList<>(blocks.size());
- for (BlockMetadata blockMetaData : blocks) {
- splitOffsets.add(blockMetaData.getStartingPos());
- }
- Collections.sort(splitOffsets);
- return ImmutableList.copyOf(splitOffsets);
- }
-
private static void updateFromFieldMetrics(
Map> idToFieldMetricsMap,
MetricsConfig metricsConfig,
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java
index 826a3d01eeb0..63a0f3205633 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java
@@ -267,7 +267,7 @@ public static Map getMetadataFileAndUpdatedMillis(TrinoFileSystem
public static ParquetMetadata getParquetFileMetadata(TrinoInputFile inputFile)
{
try (TrinoParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, ParquetReaderOptions.defaultOptions(), new FileFormatDataSourceStats())) {
- return MetadataReader.readFooter(dataSource);
+ return MetadataReader.readFooter(dataSource, Optional.empty());
}
catch (IOException e) {
throw new UncheckedIOException(e);
diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java
index 2892a81d0116..13e97b466d58 100644
--- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java
+++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java
@@ -40,6 +40,7 @@
import io.trino.plugin.hive.TransactionalMetadataFactory;
import io.trino.plugin.hive.avro.AvroFileWriterFactory;
import io.trino.plugin.hive.avro.AvroPageSourceFactory;
+import io.trino.plugin.hive.crypto.ParquetEncryptionModule;
import io.trino.plugin.hive.fs.CachingDirectoryLister;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory;
@@ -136,5 +137,6 @@ protected void setup(Binder binder)
fileWriterFactoryBinder.addBinding().to(ParquetFileWriterFactory.class).in(Scopes.SINGLETON);
binder.install(new HiveExecutorModule());
+ install(new ParquetEncryptionModule());
}
}
diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftPageSourceProvider.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftPageSourceProvider.java
index bab0f514bb4f..510777c3dbda 100644
--- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftPageSourceProvider.java
+++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftPageSourceProvider.java
@@ -104,7 +104,7 @@ private ParquetReader parquetReader(TrinoInputFile inputFile, List
{
ParquetReaderOptions options = ParquetReaderOptions.defaultOptions();
TrinoParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, options, fileFormatDataSourceStats);
- ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, options.getMaxFooterReadSize());
+ ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, options.getMaxFooterReadSize(), Optional.empty());
MessageType fileSchema = parquetMetadata.getFileMetaData().getSchema();
MessageColumnIO messageColumn = getColumnIO(fileSchema, fileSchema);
Map, ColumnDescriptor> descriptorsByPath = getDescriptors(fileSchema, fileSchema);
@@ -127,6 +127,7 @@ private ParquetReader parquetReader(TrinoInputFile inputFile, List
options,
RedshiftParquetPageSource::handleException,
Optional.empty(),
+ Optional.empty(),
Optional.empty());
}