diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java index 30caca4b05c5..eaedc9eae6f9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java @@ -14,9 +14,11 @@ package io.trino.plugin.iceberg.catalog.rest; import com.google.common.cache.Cache; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; +import io.airlift.units.Duration; import io.trino.cache.EvictableCacheBuilder; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergFileSystemFactory; @@ -33,14 +35,18 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTCatalogProperties; import org.apache.iceberg.rest.RESTSessionCatalog; import org.apache.iceberg.rest.RESTUtil; +import java.net.URI; import java.util.Map; +import java.util.Optional; import java.util.Set; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.iceberg.CatalogProperties.AUTH_SESSION_TIMEOUT_MS; import static org.apache.iceberg.rest.auth.OAuth2Properties.CREDENTIAL; import static org.apache.iceberg.rest.auth.OAuth2Properties.TOKEN; @@ -51,12 +57,18 @@ public class TrinoIcebergRestCatalogFactory private final ForwardingFileIoFactory fileIoFactory; private final CatalogName catalogName; private final String trinoVersion; + private final URI serverUri; + private final Optional prefix; + private final Optional warehouse; private final boolean nestedNamespaceEnabled; private final Security security; private final SessionType sessionType; + private final Optional connectionTimeout; + private final Optional socketTimeout; + private final Duration sessionTimeout; + private final boolean vendedCredentialsEnabled; private final boolean viewEndpointsEnabled; private final SecurityProperties securityProperties; - private final IcebergRestCatalogPropertiesProvider catalogPropertiesProvider; private final boolean uniqueTableLocation; private final TypeManager typeManager; private final boolean caseInsensitiveNameMatching; @@ -73,7 +85,6 @@ public TrinoIcebergRestCatalogFactory( CatalogName catalogName, IcebergRestCatalogConfig restConfig, SecurityProperties securityProperties, - IcebergRestCatalogPropertiesProvider catalogPropertiesProvider, IcebergConfig icebergConfig, TypeManager typeManager, NodeVersion nodeVersion) @@ -83,12 +94,18 @@ public TrinoIcebergRestCatalogFactory( this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); requireNonNull(restConfig, "restConfig is null"); + this.serverUri = restConfig.getBaseUri(); + this.prefix = restConfig.getPrefix(); + this.warehouse = restConfig.getWarehouse(); this.nestedNamespaceEnabled = restConfig.isNestedNamespaceEnabled(); this.security = restConfig.getSecurity(); this.sessionType = restConfig.getSessionType(); + this.connectionTimeout = restConfig.getConnectionTimeout(); + this.socketTimeout = restConfig.getSocketTimeout(); + this.sessionTimeout = restConfig.getSessionTimeout(); + this.vendedCredentialsEnabled = restConfig.isVendedCredentialsEnabled(); this.viewEndpointsEnabled = restConfig.isViewEndpointsEnabled(); this.securityProperties = requireNonNull(securityProperties, "securityProperties is null"); - this.catalogPropertiesProvider = requireNonNull(catalogPropertiesProvider, "catalogPropertiesProvider is null"); requireNonNull(icebergConfig, "icebergConfig is null"); this.uniqueTableLocation = icebergConfig.isUniqueTableLocation(); this.typeManager = requireNonNull(typeManager, "typeManager is null"); @@ -109,6 +126,21 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity) // Creation of the RESTSessionCatalog is lazy due to required network calls // for authorization and config route if (icebergCatalog == null) { + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.put(CatalogProperties.URI, serverUri.toString()); + warehouse.ifPresent(location -> properties.put(CatalogProperties.WAREHOUSE_LOCATION, location)); + prefix.ifPresent(prefix -> properties.put("prefix", prefix)); + properties.put(RESTCatalogProperties.VIEW_ENDPOINTS_SUPPORTED, Boolean.toString(viewEndpointsEnabled)); + properties.put("trino-version", trinoVersion); + properties.put(AUTH_SESSION_TIMEOUT_MS, String.valueOf(sessionTimeout.toMillis())); + connectionTimeout.ifPresent(duration -> properties.put("rest.client.connection-timeout-ms", String.valueOf(duration.toMillis()))); + socketTimeout.ifPresent(duration -> properties.put("rest.client.socket-timeout-ms", String.valueOf(duration.toMillis()))); + properties.putAll(securityProperties.get()); + + if (vendedCredentialsEnabled) { + properties.put("header.X-Iceberg-Access-Delegation", "vended-credentials"); + } + RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog( config -> HTTPClient.builder(config) .uri(config.get(CatalogProperties.URI)) @@ -117,10 +149,15 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity) (context, config) -> { ConnectorIdentity currentIdentity = (context.wrappedIdentity() != null) ? ((ConnectorIdentity) context.wrappedIdentity()) - : ConnectorIdentity.ofUser("fake"); - return fileIoFactory.create(fileSystemFactory.create(currentIdentity, config), true, config); + : ConnectorIdentity.ofUser("trino"); + return fileIoFactory.create( + fileSystemFactory.create(currentIdentity, config), + true, + config, + fileSystemFactory, + currentIdentity); }); - icebergCatalogInstance.initialize(catalogName.toString(), catalogPropertiesProvider.catalogProperties()); + icebergCatalogInstance.initialize(catalogName.toString(), properties.buildOrThrow()); icebergCatalog = icebergCatalogInstance; } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index a9a448f9db66..600b4a59aa39 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -204,7 +204,9 @@ private List collectNamespaces(ConnectorSession session, Namespace paren { try { return restSessionCatalog.listNamespaces(convert(session), parentNamespace).stream() - .flatMap(childNamespace -> collectNamespaceIfExists(session, childNamespace).stream()) + .flatMap(childNamespace -> Stream.concat( + Stream.of(childNamespace.toString()), + collectNamespaces(session, childNamespace).stream())) .collect(toImmutableList()); } catch (RESTException e) { @@ -212,19 +214,6 @@ private List collectNamespaces(ConnectorSession session, Namespace paren } } - private List collectNamespaceIfExists(ConnectorSession session, Namespace namespace) - { - try { - return Stream.concat( - Stream.of(namespace.toString()), - collectNamespaces(session, namespace).stream()) - .collect(toImmutableList()); - } - catch (NoSuchNamespaceException e) { - return ImmutableList.of(); - } - } - @Override public void dropNamespace(ConnectorSession session, String namespace) { @@ -892,7 +881,7 @@ private SessionCatalog.SessionContext convert(ConnectorSession session) return switch (sessionType) { case NONE -> new SessionContext(randomUUID().toString(), null, credentials, ImmutableMap.of(), session.getIdentity()); case USER -> { - String sessionId = format("%s-%s-%s", session.getUser(), session.getQueryId(), session.getSource().orElse("default")); + String sessionId = format("%s-%s", session.getUser(), session.getSource().orElse("default")); Map properties = ImmutableMap.of( "user", session.getUser(), @@ -1067,17 +1056,7 @@ private List listNamespaces(ConnectorSession session, Namespace paren catch (RESTException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, "Failed to list namespaces", e); } - return childNamespaces.stream().flatMap(childNamespace -> listNamespaceIfExists(session, childNamespace).stream()).toList(); - } - - private List listNamespaceIfExists(ConnectorSession session, Namespace namespace) - { - try { - return Stream.concat(Stream.of(namespace), listNamespaces(session, namespace).stream()).toList(); - } - catch (NoSuchNamespaceException e) { - return ImmutableList.of(); - } + return childNamespaces.stream().flatMap(childNamespace -> Stream.concat(Stream.of(childNamespace), listNamespaces(session, childNamespace).stream())).toList(); } private static Namespace toTrinoNamespace(Namespace namespace) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java index dd92a6cc532a..3789d5ef894f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIo.java @@ -14,12 +14,15 @@ package io.trino.plugin.iceberg.fileio; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Streams; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.plugin.iceberg.IcebergFileSystemFactory; import io.trino.spi.TrinoException; +import io.trino.spi.security.ConnectorIdentity; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; @@ -27,10 +30,14 @@ import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsStorageCredentials; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -46,7 +53,7 @@ import static java.util.stream.Collectors.joining; public class ForwardingFileIo - implements SupportsBulkOperations + implements SupportsBulkOperations, SupportsStorageCredentials { private static final int DELETE_BATCH_SIZE = 1000; private static final int BATCH_DELETE_PATHS_MESSAGE_LIMIT = 5; @@ -55,6 +62,11 @@ public class ForwardingFileIo private final Map properties; private final boolean useFileSizeFromMetadata; private final ExecutorService deleteExecutor; + private final IcebergFileSystemFactory storageCredentialFileSystemFactory; + private final ConnectorIdentity storageCredentialIdentity; + + private volatile List storageCredentials = ImmutableList.of(); + private volatile Map prefixedFileSystems = ImmutableMap.of(); @VisibleForTesting public ForwardingFileIo(TrinoFileSystem fileSystem, boolean useFileSizeFromMetadata) @@ -63,22 +75,36 @@ public ForwardingFileIo(TrinoFileSystem fileSystem, boolean useFileSizeFromMetad } public ForwardingFileIo(TrinoFileSystem fileSystem, Map properties, boolean useFileSizeFromMetadata, ExecutorService deleteExecutor) + { + this(fileSystem, properties, useFileSizeFromMetadata, deleteExecutor, null, null); + } + + public ForwardingFileIo( + TrinoFileSystem fileSystem, + Map properties, + boolean useFileSizeFromMetadata, + ExecutorService deleteExecutor, + IcebergFileSystemFactory storageCredentialFileSystemFactory, + ConnectorIdentity storageCredentialIdentity) { this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); this.deleteExecutor = requireNonNull(deleteExecutor, "executorService is null"); this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties is null")); this.useFileSizeFromMetadata = useFileSizeFromMetadata; + this.storageCredentialFileSystemFactory = storageCredentialFileSystemFactory; + this.storageCredentialIdentity = storageCredentialIdentity; } @Override public InputFile newInputFile(String path) { - return new ForwardingInputFile(fileSystem.newInputFile(Location.of(path))); + return new ForwardingInputFile(fileSystemForPath(path).newInputFile(Location.of(path))); } @Override public InputFile newInputFile(String path, long length) { + TrinoFileSystem fileSystem = fileSystemForPath(path); if (!useFileSizeFromMetadata) { return new ForwardingInputFile(fileSystem.newInputFile(Location.of(path))); } @@ -89,14 +115,14 @@ public InputFile newInputFile(String path, long length) @Override public OutputFile newOutputFile(String path) { - return new ForwardingOutputFile(fileSystem, Location.of(path)); + return new ForwardingOutputFile(fileSystemForPath(path), Location.of(path)); } @Override public void deleteFile(String path) { try { - fileSystem.deleteFile(Location.of(path)); + fileSystemForPath(path).deleteFile(Location.of(path)); } catch (IOException e) { throw new UncheckedIOException("Failed to delete file: " + path, e); @@ -159,7 +185,15 @@ public InputFile newInputFile(ManifestListFile manifestList) private void deleteBatch(List filesToDelete) { try { - fileSystem.deleteFiles(filesToDelete.stream().map(Location::of).toList()); + Map> locationsByFileSystem = new IdentityHashMap<>(); + for (String path : filesToDelete) { + TrinoFileSystem fileSystem = fileSystemForPath(path); + locationsByFileSystem.computeIfAbsent(fileSystem, ignored -> new ArrayList<>()) + .add(Location.of(path)); + } + for (Map.Entry> entry : locationsByFileSystem.entrySet()) { + entry.getKey().deleteFiles(entry.getValue()); + } } catch (IOException e) { throw new UncheckedIOException( @@ -179,6 +213,51 @@ public Map properties() return properties; } + @Override + public void setCredentials(List credentials) + { + storageCredentials = ImmutableList.copyOf(requireNonNull(credentials, "credentials is null")); + rebuildPrefixedFileSystems(); + } + + @Override + public List credentials() + { + return storageCredentials; + } + + private TrinoFileSystem fileSystemForPath(String path) + { + TrinoFileSystem matchingFileSystem = fileSystem; + int matchingPrefixLength = -1; + for (Map.Entry prefixedFileSystem : prefixedFileSystems.entrySet()) { + String prefix = prefixedFileSystem.getKey(); + if (path.startsWith(prefix) && prefix.length() > matchingPrefixLength) { + matchingPrefixLength = prefix.length(); + matchingFileSystem = prefixedFileSystem.getValue(); + } + } + return matchingFileSystem; + } + + private void rebuildPrefixedFileSystems() + { + if (storageCredentials.isEmpty() || storageCredentialFileSystemFactory == null || storageCredentialIdentity == null) { + prefixedFileSystems = ImmutableMap.of(); + return; + } + + ImmutableMap.Builder rebuiltFileSystems = ImmutableMap.builder(); + for (StorageCredential storageCredential : storageCredentials) { + Map mergedProperties = ImmutableMap.builder() + .putAll(properties) + .putAll(storageCredential.config()) + .buildKeepingLast(); + rebuiltFileSystems.put(storageCredential.prefix(), storageCredentialFileSystemFactory.create(storageCredentialIdentity, mergedProperties)); + } + prefixedFileSystems = rebuiltFileSystems.buildKeepingLast(); + } + @Override public void initialize(Map properties) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIoFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIoFactory.java index 5d19b479c84a..7ba2fbd51dfd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIoFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/fileio/ForwardingFileIoFactory.java @@ -17,6 +17,8 @@ import com.google.inject.Inject; import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.iceberg.ForIcebergFileDelete; +import io.trino.plugin.iceberg.IcebergFileSystemFactory; +import io.trino.spi.security.ConnectorIdentity; import org.apache.iceberg.io.FileIO; import java.util.Map; @@ -48,4 +50,20 @@ public FileIO create(TrinoFileSystem fileSystem, boolean useFileSizeFromMetadata { return new ForwardingFileIo(fileSystem, properties, useFileSizeFromMetadata, deleteExecutor); } + + public FileIO create( + TrinoFileSystem fileSystem, + boolean useFileSizeFromMetadata, + Map properties, + IcebergFileSystemFactory storageCredentialFileSystemFactory, + ConnectorIdentity storageCredentialIdentity) + { + return new ForwardingFileIo( + fileSystem, + properties, + useFileSizeFromMetadata, + deleteExecutor, + storageCredentialFileSystemFactory, + storageCredentialIdentity); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3VendingRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3VendingRestCatalogConnectorSmokeTest.java index 0d48d3eb58bf..52d92dcfab10 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3VendingRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergS3VendingRestCatalogConnectorSmokeTest.java @@ -30,6 +30,7 @@ import io.trino.testing.minio.MinioClient; import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.RESTSessionCatalog; @@ -46,6 +47,9 @@ import java.io.IOException; import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; @@ -313,6 +317,33 @@ public void testDropTableWithNonExistentTableLocation() .hasMessageMatching("Failed to load table: (.*)"); } + @Test + public void testRestFixtureVendsCredentialsViaConfigMap() + throws Exception + { + String tableName = "config_credentials_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT 1 value", 1); + try { + String schemaName = getSession().getSchema().orElseThrow(); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://" + restCatalogBackendContainer.getRestCatalogEndpoint() + "/v1/namespaces/" + schemaName + "/tables/" + tableName)) + .GET() + .build(); + + HttpResponse response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode()).isEqualTo(200); + assertThat(response.body()) + .contains("\"config\"") + .contains("\"" + S3FileIOProperties.ACCESS_KEY_ID + "\"") + .contains("\"" + S3FileIOProperties.SECRET_ACCESS_KEY + "\"") + .contains("\"" + S3FileIOProperties.SESSION_TOKEN + "\"") + .doesNotContain("\"storage-credentials\""); + } + finally { + assertUpdate("DROP TABLE " + tableName); + } + } + @Override protected boolean isFileSorted(Location path, String sortColumnName) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergStorageCredentialsRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergStorageCredentialsRestCatalog.java new file mode 100644 index 000000000000..57eebce4f630 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergStorageCredentialsRestCatalog.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.rest; + +import com.google.common.collect.ImmutableMap; +import io.airlift.http.server.testing.TestingHttpServer; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.apache.iceberg.aws.s3.S3FileIOProperties; +import org.apache.iceberg.rest.DelegatingRestSessionCatalog; +import org.apache.iceberg.rest.credentials.ImmutableCredential; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.iceberg.catalog.rest.RestCatalogTestUtils.backendCatalog; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public class TestIcebergStorageCredentialsRestCatalog + extends AbstractTestQueryFramework +{ + private Path warehouseLocation; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + warehouseLocation = Files.createTempDirectory(null); + closeAfterClass(() -> deleteRecursively(warehouseLocation, ALLOW_INSECURE)); + + DelegatingRestSessionCatalog delegatingCatalog = DelegatingRestSessionCatalog.builder() + .delegate(backendCatalog(warehouseLocation)) + .addAllCredentials(List.of( + ImmutableCredential.builder() + .prefix("file://") + .config(ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, "test-vended-access-key", + S3FileIOProperties.SECRET_ACCESS_KEY, "test-vended-secret-key", + S3FileIOProperties.SESSION_TOKEN, "test-vended-session-token")) + .build())) + .build(); + + TestingHttpServer testServer = delegatingCatalog.testServer(); + testServer.start(); + closeAfterClass(testServer::stop); + + return IcebergQueryRunner.builder() + .setBaseDataDir(Optional.of(warehouseLocation)) + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.catalog.type", "rest") + .put("iceberg.rest-catalog.uri", testServer.getBaseUrl().toString()) + .put("iceberg.rest-catalog.vended-credentials-enabled", "true") + .buildOrThrow()) + .build(); + } + + @Test + public void testCreateAndReadTableWithStorageCredentials() + { + assertUpdate("CREATE SCHEMA test_storage_creds"); + assertUpdate("CREATE TABLE test_storage_creds.test_table (id INTEGER, name VARCHAR)"); + assertUpdate("INSERT INTO test_storage_creds.test_table VALUES (1, 'alice'), (2, 'bob')", 2); + + assertThat(query("SELECT * FROM test_storage_creds.test_table")) + .matches("VALUES (1, VARCHAR 'alice'), (2, VARCHAR 'bob')"); + + assertUpdate("DROP TABLE test_storage_creds.test_table"); + assertUpdate("DROP SCHEMA test_storage_creds"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/fileio/TestForwardingFileIo.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/fileio/TestForwardingFileIo.java index 47c33066c469..907046e9478c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/fileio/TestForwardingFileIo.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/fileio/TestForwardingFileIo.java @@ -13,17 +13,27 @@ */ package io.trino.plugin.iceberg.fileio; +import com.google.common.collect.ImmutableMap; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.local.LocalFileSystemFactory; +import io.trino.plugin.iceberg.IcebergFileSystemFactory; +import io.trino.spi.security.ConnectorIdentity; +import org.apache.iceberg.aws.s3.S3FileIOProperties; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.io.SupportsStorageCredentials; import org.junit.jupiter.api.Test; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static io.trino.testing.InterfaceTestUtils.assertAllMethodsOverridden; import static io.trino.testing.TestingConnectorSession.SESSION; import static org.assertj.core.api.Assertions.assertThat; @@ -35,6 +45,7 @@ public void testEverythingImplemented() { assertAllMethodsOverridden(FileIO.class, ForwardingFileIo.class); assertAllMethodsOverridden(SupportsBulkOperations.class, ForwardingFileIo.class); + assertAllMethodsOverridden(SupportsStorageCredentials.class, ForwardingFileIo.class); } @Test @@ -61,4 +72,153 @@ public void testUseFileSizeFromMetadata() } deleteRecursively(tempDir, ALLOW_INSECURE); } + + @Test + public void testStorageCredentialsRemainSeparateFromProperties() + throws Exception + { + Path tempDir = Files.createTempDirectory("test_forwarding_fileio_storage_creds"); + List> seenProperties = new ArrayList<>(); + LocalFileSystemFactory localFactory = new LocalFileSystemFactory(tempDir); + IcebergFileSystemFactory capturingFactory = (identity, fileIoProperties) -> { + seenProperties.add(ImmutableMap.copyOf(fileIoProperties)); + return localFactory.create(SESSION); + }; + + try (ForwardingFileIo fileIo = new ForwardingFileIo( + localFactory.create(SESSION), + ImmutableMap.of("base.key", "base-value"), + true, + newDirectExecutorService(), + capturingFactory, + ConnectorIdentity.ofUser("test-user"))) { + fileIo.setCredentials(List.of( + StorageCredential.create("s3://bucket-a/", ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, "access-a", + S3FileIOProperties.SECRET_ACCESS_KEY, "secret-a", + S3FileIOProperties.SESSION_TOKEN, "token-a")), + StorageCredential.create("s3://bucket-a/warehouse/", ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, "access-b", + S3FileIOProperties.SECRET_ACCESS_KEY, "secret-b", + S3FileIOProperties.SESSION_TOKEN, "token-b")))); + + assertThat(fileIo.properties()).containsExactlyEntriesOf(ImmutableMap.of("base.key", "base-value")); + assertThat(fileIo.credentials()).hasSize(2); + assertThat(seenProperties).hasSize(2); + assertThat(seenProperties).contains(ImmutableMap.of( + "base.key", "base-value", + S3FileIOProperties.ACCESS_KEY_ID, "access-a", + S3FileIOProperties.SECRET_ACCESS_KEY, "secret-a", + S3FileIOProperties.SESSION_TOKEN, "token-a")); + assertThat(seenProperties).contains(ImmutableMap.of( + "base.key", "base-value", + S3FileIOProperties.ACCESS_KEY_ID, "access-b", + S3FileIOProperties.SECRET_ACCESS_KEY, "secret-b", + S3FileIOProperties.SESSION_TOKEN, "token-b")); + } + finally { + deleteRecursively(tempDir, ALLOW_INSECURE); + } + } + + @Test + public void testSetCredentialsAfterInitializeRebuildsPrefixFileSystems() + throws Exception + { + Path tempDir = Files.createTempDirectory("test_forwarding_fileio_refresh_creds"); + List> seenProperties = new ArrayList<>(); + LocalFileSystemFactory localFactory = new LocalFileSystemFactory(tempDir); + IcebergFileSystemFactory capturingFactory = (identity, fileIoProperties) -> { + seenProperties.add(ImmutableMap.copyOf(fileIoProperties)); + return localFactory.create(SESSION); + }; + + try (ForwardingFileIo fileIo = new ForwardingFileIo( + localFactory.create(SESSION), + ImmutableMap.of("base.key", "base-value"), + true, + newDirectExecutorService(), + capturingFactory, + ConnectorIdentity.ofUser("test-user"))) { + fileIo.setCredentials(List.of( + StorageCredential.create("s3://bucket-a/", ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, "access-a", + S3FileIOProperties.SECRET_ACCESS_KEY, "secret-a", + S3FileIOProperties.SESSION_TOKEN, "token-a")))); + assertThat(seenProperties).hasSize(1); + assertThat(seenProperties.get(0)).containsEntry(S3FileIOProperties.ACCESS_KEY_ID, "access-a"); + + fileIo.setCredentials(List.of( + StorageCredential.create("s3://bucket-b/", ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, "access-b", + S3FileIOProperties.SECRET_ACCESS_KEY, "secret-b", + S3FileIOProperties.SESSION_TOKEN, "token-b")))); + + assertThat(seenProperties).hasSize(2); + assertThat(seenProperties.get(1)).containsEntry(S3FileIOProperties.ACCESS_KEY_ID, "access-b"); + } + finally { + deleteRecursively(tempDir, ALLOW_INSECURE); + } + } + + @Test + public void testLongestPrefixRoutingAndFallback() + throws Exception + { + Path tempDir = Files.createTempDirectory("test_forwarding_fileio_prefix_routing"); + Path baseRoot = tempDir.resolve("base"); + Path prefixARoot = tempDir.resolve("prefix-a"); + Path prefixBRoot = tempDir.resolve("prefix-b"); + Files.createDirectories(baseRoot); + Files.createDirectories(prefixARoot); + Files.createDirectories(prefixBRoot); + + Files.createDirectories(baseRoot.resolve("other")); + Files.writeString(baseRoot.resolve("other/fallback.txt"), "ccc"); + Files.createDirectories(prefixARoot.resolve("bucket")); + Files.writeString(prefixARoot.resolve("bucket/data.txt"), "a"); + Files.createDirectories(prefixBRoot.resolve("bucket/warehouse")); + Files.writeString(prefixBRoot.resolve("bucket/warehouse/data.txt"), "bb"); + + LocalFileSystemFactory baseFactory = new LocalFileSystemFactory(baseRoot); + LocalFileSystemFactory prefixAFactory = new LocalFileSystemFactory(prefixARoot); + LocalFileSystemFactory prefixBFactory = new LocalFileSystemFactory(prefixBRoot); + IcebergFileSystemFactory routingFactory = (identity, fileIoProperties) -> { + String accessKey = fileIoProperties.get(S3FileIOProperties.ACCESS_KEY_ID); + if ("access-a".equals(accessKey)) { + return prefixAFactory.create(SESSION); + } + if ("access-b".equals(accessKey)) { + return prefixBFactory.create(SESSION); + } + throw new IllegalArgumentException("Unexpected access key: " + accessKey); + }; + + try (ForwardingFileIo fileIo = new ForwardingFileIo( + baseFactory.create(SESSION), + ImmutableMap.of("base.key", "base-value"), + true, + newDirectExecutorService(), + routingFactory, + ConnectorIdentity.ofUser("test-user"))) { + fileIo.setCredentials(List.of( + StorageCredential.create("file:///bucket/", ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, "access-a", + S3FileIOProperties.SECRET_ACCESS_KEY, "secret-a", + S3FileIOProperties.SESSION_TOKEN, "token-a")), + StorageCredential.create("file:///bucket/warehouse/", ImmutableMap.of( + S3FileIOProperties.ACCESS_KEY_ID, "access-b", + S3FileIOProperties.SECRET_ACCESS_KEY, "secret-b", + S3FileIOProperties.SESSION_TOKEN, "token-b")))); + + // Longest matching prefix should select the more specific credential filesystem. + assertThat(fileIo.newInputFile("file:///bucket/warehouse/data.txt").getLength()).isEqualTo(2); + assertThat(fileIo.newInputFile("file:///bucket/data.txt").getLength()).isEqualTo(1); + assertThat(fileIo.newInputFile("file:///other/fallback.txt").getLength()).isEqualTo(3); + } + finally { + deleteRecursively(tempDir, ALLOW_INSECURE); + } + } } diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java index 16a2037943bf..f94f42059806 100644 --- a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java @@ -19,9 +19,15 @@ import io.airlift.http.server.testing.TestingHttpServer; import io.airlift.node.NodeInfo; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.rest.credentials.Credential; +import org.apache.iceberg.rest.responses.LoadTableResponse; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; import static java.util.Objects.requireNonNull; @@ -81,6 +87,7 @@ public static Builder builder() public static class Builder { private Catalog delegate; + private final List credentials = new ArrayList<>(); public Builder delegate(Catalog delegate) { @@ -88,11 +95,54 @@ public Builder delegate(Catalog delegate) return this; } + public Builder addAllCredentials(List credentials) + { + this.credentials.addAll(credentials); + return this; + } + public DelegatingRestSessionCatalog build() { requireNonNull(delegate, "Delegate must be set"); - return new DelegatingRestSessionCatalog(new RESTCatalogAdapter(delegate), delegate); + RESTCatalogAdapter adapter = credentials.isEmpty() + ? new RESTCatalogAdapter(delegate) + : new CredentialInjectingAdapter(delegate, credentials); + return new DelegatingRestSessionCatalog(adapter, delegate); + } + } + + private static class CredentialInjectingAdapter + extends RESTCatalogAdapter + { + private final List credentials; + + CredentialInjectingAdapter(Catalog catalog, List credentials) + { + super(catalog); + this.credentials = List.copyOf(credentials); + } + + @Override + public T handleRequest( + Route route, + Map vars, + HTTPRequest request, + Class responseType, + Consumer> responseHeaders) + { + T response = super.handleRequest(route, vars, request, responseType, responseHeaders); + if (response instanceof LoadTableResponse loadTableResponse) { + LoadTableResponse.Builder builder = LoadTableResponse.builder() + .withTableMetadata(loadTableResponse.tableMetadata()) + .addAllConfig(loadTableResponse.config()) + .addAllCredentials(loadTableResponse.credentials()) + .addAllCredentials(credentials); + @SuppressWarnings("unchecked") + T result = (T) builder.build(); + return result; + } + return response; } } } diff --git a/pom.xml b/pom.xml index 8a2f751a911d..bece09c77683 100644 --- a/pom.xml +++ b/pom.xml @@ -2365,6 +2365,13 @@ + + + apache-snapshots + https://repository.apache.org/content/repositories/snapshots + + +