diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java index 94e53cc1ab69..7d448ff42872 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java @@ -41,6 +41,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -61,6 +62,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AccessDeniedException; import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; import software.amazon.awssdk.services.glue.model.CreateTableRequest; import software.amazon.awssdk.services.glue.model.Database; @@ -477,6 +479,11 @@ public void createNamespace(Namespace namespace, Map metadata) { } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) { throw new AlreadyExistsException( "Cannot create namespace %s because it already exists in Glue", namespace); + } catch (AccessDeniedException e) { + throw new ForbiddenException( + e, + "Cannot create namespace %s because Glue cannot access the requested resources", + namespace); } } diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java b/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java index 82f7e84d563b..5f35b033ab3d 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/TestGlueCatalog.java @@ -44,6 +44,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AccessDeniedException; import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; import software.amazon.awssdk.services.glue.model.CreateTableRequest; @@ -449,6 +450,19 @@ public void testCreateNamespaceBadName() { } } + @Test + public void testCreateNamespaceAccessDenied() { + Mockito.doThrow( + AccessDeniedException.builder() + .message("User is not authorized to perform: glue:CreateDatabase") + .build()) + .when(glue) + .createDatabase(Mockito.any(CreateDatabaseRequest.class)); + assertThatThrownBy(() -> glueCatalog.createNamespace(Namespace.of("db"))) + .isInstanceOf(org.apache.iceberg.exceptions.ForbiddenException.class) + .hasMessageContaining("cannot access the requested resources"); + } + @Test public void testListAllNamespaces() { Mockito.doReturn( diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index afb68f170136..df58b60ae90d 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -31,7 +31,6 @@ import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.types.Type; @@ -134,14 +133,16 @@ static void createNamespaceIfNotExist(Catalog catalog, Namespace identifierNames return; } + SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; String[] levels = identifierNamespace.levels(); for (int index = 0; index < levels.length; index++) { Namespace namespace = Namespace.of(Arrays.copyOfRange(levels, 0, index + 1)); - try { - ((SupportsNamespaces) catalog).createNamespace(namespace); - } catch (AlreadyExistsException | ForbiddenException ex) { - // Ignoring the error as forcefully creating the namespace even if it exists - // to avoid double namespaceExists() check. + if (!nsCatalog.namespaceExists(namespace)) { + try { + nsCatalog.createNamespace(namespace); + } catch (AlreadyExistsException ex) { + LOG.warn("Namespace {} was created concurrently", namespace, ex); + } } } } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java index 2b18d4b24a1f..6617199ebd64 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestIcebergWriterFactory.java @@ -19,8 +19,11 @@ package org.apache.iceberg.connect.data; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -36,12 +39,15 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.connect.TableSinkConfig; +import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.NotAuthorizedException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.StringType; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; @@ -96,4 +102,42 @@ public void testAutoCreateTable(boolean partitioned) { assertThat(capturedArguments.get(1)).isEqualTo(Namespace.of("foo1", "foo2")); assertThat(capturedArguments.get(2)).isEqualTo(Namespace.of("foo1", "foo2", "foo3")); } + + @Test + public void testCreateNamespacePropagatesForbiddenException() { + Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class)); + when(((SupportsNamespaces) catalog).namespaceExists(any())).thenReturn(false); + doThrow(new ForbiddenException("access denied")) + .when((SupportsNamespaces) catalog) + .createNamespace(any()); + + assertThatThrownBy( + () -> IcebergWriterFactory.createNamespaceIfNotExist(catalog, Namespace.of("db"))) + .isInstanceOf(ForbiddenException.class) + .hasMessage("access denied"); + } + + @Test + public void testCreateNamespacePropagatesNotAuthorizedException() { + Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class)); + when(((SupportsNamespaces) catalog).namespaceExists(any())).thenReturn(false); + doThrow(new NotAuthorizedException("not authorized")) + .when((SupportsNamespaces) catalog) + .createNamespace(any()); + + assertThatThrownBy( + () -> IcebergWriterFactory.createNamespaceIfNotExist(catalog, Namespace.of("db"))) + .isInstanceOf(NotAuthorizedException.class) + .hasMessage("not authorized"); + } + + @Test + public void testCreateNamespaceSkipsCreateWhenExists() { + Catalog catalog = mock(Catalog.class, withSettings().extraInterfaces(SupportsNamespaces.class)); + when(((SupportsNamespaces) catalog).namespaceExists(any())).thenReturn(true); + + IcebergWriterFactory.createNamespaceIfNotExist(catalog, Namespace.of("db")); + + verify((SupportsNamespaces) catalog, never()).createNamespace(any()); + } }