diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 4bb235b811d0..5182155ec8a1 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; @@ -111,7 +112,10 @@ public FlinkCatalog( long cacheExpirationIntervalMs) { super(catalogName, defaultDatabase); this.catalogLoader = catalogLoader; - this.catalogProps = catalogProps; + this.catalogProps = + catalogProps.entrySet().stream() + .filter(e -> !GlobalConfiguration.isSensitive(e.getKey())) + .collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); this.baseNamespace = baseNamespace; this.cacheEnabled = cacheEnabled; diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index f7848a5d22ef..5bfdf877ff9d 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -37,6 +37,7 @@ import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.util.ArrayUtils; import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; @@ -202,17 +203,146 @@ public void testCreateTableLikeInDiffIcebergCatalog() throws TableNotExistExcept String catalog2 = catalogName + "2"; sql("CREATE CATALOG %s WITH %s", catalog2, toWithClause(config)); - sql("CREATE DATABASE %s", catalog2 + ".testdb"); - sql("CREATE TABLE %s LIKE tl", catalog2 + ".testdb.tl2"); + try { + sql("CREATE DATABASE %s.testdb", catalog2); + sql("CREATE TABLE %s.testdb.tl2 LIKE tl", catalog2); + + CatalogTable catalogTable = catalogTable(catalog2, "testdb", "tl2"); + assertThat(catalogTable.getUnresolvedSchema()) + .isEqualTo( + org.apache.flink.table.api.Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .build()); + } finally { + sql("DROP TABLE IF EXISTS %s.testdb.tl2", catalog2); + sql("DROP DATABASE IF EXISTS %s.testdb", catalog2); + dropCatalog(catalog2, true); + } + } - CatalogTable catalogTable = catalogTable(catalog2, "testdb", "tl2"); - assertThat(catalogTable.getUnresolvedSchema()) - .isEqualTo( - org.apache.flink.table.api.Schema.newBuilder() - .column("id", DataTypes.BIGINT()) - .build()); + @TestTemplate + public void testCreateTableLikeCopiesTableProperties() { + sql("CREATE TABLE tl(id BIGINT) WITH ('k1'='v1', 'k2'='v2')"); + sql("CREATE TABLE tl2 LIKE tl"); + + Table source = table("tl"); + Table copy = table("tl2"); + assertThat(copy.properties()).isEqualTo(source.properties()); + } + + @TestTemplate + public void testCreateTableLikeInDiffIcebergCatalogCopiesTableProperties() { + sql("CREATE TABLE tl(id BIGINT) WITH ('k1'='v1', 'k2'='v2')"); + + String catalog2 = catalogName + "2"; + sql("CREATE CATALOG %s WITH %s", catalog2, toWithClause(config)); + try { + sql("CREATE DATABASE %s.testdb", catalog2); + sql("CREATE TABLE %s.testdb.tl2 LIKE tl", catalog2); + + Table source = table("tl"); + Table copy = + validationCatalog.loadTable( + TableIdentifier.of( + ArrayUtils.concat(baseNamespace.levels(), new String[] {"testdb", "tl2"}))); + assertThat(copy.properties()).isEqualTo(source.properties()); + } finally { + sql("DROP TABLE IF EXISTS %s.testdb.tl2", catalog2); + sql("DROP DATABASE IF EXISTS %s.testdb", catalog2); + dropCatalog(catalog2, true); + } + } - dropCatalog(catalog2, true); + @TestTemplate + public void testShowCreateTableRetractsSensitiveCatalogProperties() { + String otherCatalog = catalogName + "_other"; + String sensitiveKey = "my.password"; + String sensitiveValue = "super-secret-value"; + String benignKey = "my.harmless.option"; + String benignValue = "safe-to-show"; + + Map leakyConfig = Maps.newHashMap(config); + leakyConfig.put(sensitiveKey, sensitiveValue); + leakyConfig.put(benignKey, benignValue); + + sql("CREATE CATALOG %s WITH %s", otherCatalog, toWithClause(leakyConfig)); + try { + sql("CREATE DATABASE %s.testdb", otherCatalog); + sql("CREATE TABLE %s.testdb.tl(id BIGINT)", otherCatalog); + + String showCreate = + (String) + Iterables.getOnlyElement(sql("SHOW CREATE TABLE %s.testdb.tl", otherCatalog)) + .getField(0); + + assertThat(showCreate) + .as("SHOW CREATE TABLE should still expose the synthetic src-catalog options blob") + .contains(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY) + .contains(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY) + .as("Benign catalog property should still be present") + .contains(benignValue) + .as("Sensitive catalog property value must not be there") + .doesNotContain(sensitiveValue); + } finally { + sql("DROP TABLE IF EXISTS %s.testdb.tl", otherCatalog); + sql("DROP DATABASE IF EXISTS %s.testdb", otherCatalog); + dropCatalog(otherCatalog, true); + } + } + + @TestTemplate + public void testCreateTableLikeInFlinkCatalogFiltersSensitiveCatalogProperties() + throws TableNotExistException { + String sourceCatalog = catalogName + "_with_secrets"; + String sensitiveKey = "my.password"; + String sensitiveValue = "super-secret-value"; + String benignKey = "my.harmless.option"; + String benignValue = "safe-to-show"; + + Map sourceCatalogConfig = Maps.newHashMap(config); + sourceCatalogConfig.put(sensitiveKey, sensitiveValue); + sourceCatalogConfig.put(benignKey, benignValue); + + sql("CREATE CATALOG %s WITH %s", sourceCatalog, toWithClause(sourceCatalogConfig)); + try { + sql("CREATE DATABASE %s.testdb", sourceCatalog); + sql("CREATE TABLE %s.testdb.tl(id BIGINT)", sourceCatalog); + + // CREATE TABLE LIKE from the Iceberg-backed source catalog into Flink's default + // (non-Iceberg) in-memory catalog. The target catalog will persist the table options + // map verbatim, so anything FlinkCatalog#getTable injects will be visible there. + sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE %s.testdb.tl", sourceCatalog); + + CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2"); + Map options = catalogTable.getOptions(); + + assertThat(options) + .as("Connector and src-catalog props are required for the LIKE-target to work") + .containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY) + .containsKey(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY); + + String srcCatalogProps = options.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY); + assertThat(srcCatalogProps) + .as("Benign source-catalog property should still be propagated") + .contains(benignValue) + .as("Sensitive source-catalog property value must not leak into the target catalog") + .doesNotContain(sensitiveValue); + + // Belt-and-braces: also check the rendered DDL on the target side. + String showCreate = + (String) + Iterables.getOnlyElement( + sql("SHOW CREATE TABLE `default_catalog`.`default_database`.tl2")) + .getField(0); + assertThat(showCreate) + .as("SHOW CREATE TABLE on the target catalog must not contain the secret") + .doesNotContain(sensitiveValue); + } finally { + sql("DROP TABLE IF EXISTS `default_catalog`.`default_database`.tl2"); + sql("DROP TABLE IF EXISTS %s.testdb.tl", sourceCatalog); + sql("DROP DATABASE IF EXISTS %s.testdb", sourceCatalog); + dropCatalog(sourceCatalog, true); + } } @TestTemplate @@ -220,27 +350,30 @@ public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException { sql("CREATE TABLE tl(id BIGINT)"); sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl"); - - CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2"); - assertThat(catalogTable.getUnresolvedSchema()) - .isEqualTo( - org.apache.flink.table.api.Schema.newBuilder() - .column("id", DataTypes.BIGINT()) - .build()); - - // `type` option is filtered out by Flink - // https://github.com/apache/flink/blob/edc3d68736de73665440f4313ddcfd9142d8d42b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java#L378 - Map filteredOptions = Maps.newHashMap(config); - filteredOptions.remove(CommonCatalogOptions.CATALOG_TYPE.key()); - - String srcCatalogProps = - FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", filteredOptions); - Map options = catalogTable.getOptions(); - assertThat(options) - .containsEntry( - FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, - FlinkDynamicTableFactory.FACTORY_IDENTIFIER) - .containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps); + try { + CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2"); + assertThat(catalogTable.getUnresolvedSchema()) + .isEqualTo( + org.apache.flink.table.api.Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .build()); + + // `type` option is filtered out by Flink + // https://github.com/apache/flink/blob/edc3d68736de73665440f4313ddcfd9142d8d42b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java#L378 + Map filteredOptions = Maps.newHashMap(config); + filteredOptions.remove(CommonCatalogOptions.CATALOG_TYPE.key()); + + String srcCatalogProps = + FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", filteredOptions); + Map options = catalogTable.getOptions(); + assertThat(options) + .containsEntry( + FlinkCreateTableOptions.CONNECTOR_PROPS_KEY, + FlinkDynamicTableFactory.FACTORY_IDENTIFIER) + .containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps); + } finally { + sql("DROP TABLE IF EXISTS `default_catalog`.`default_database`.tl2"); + } } @TestTemplate