Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
Expand Down Expand Up @@ -111,7 +112,10 @@ public FlinkCatalog(
long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
this.catalogProps =
catalogProps.entrySet().stream()
.filter(e -> !GlobalConfiguration.isSensitive(e.getKey()))
.collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.util.ArrayUtils;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -201,39 +202,171 @@ public void testCreateTableLikeInDiffIcebergCatalog() throws TableNotExistExcept

String catalog2 = catalogName + "2";
sql("CREATE CATALOG %s WITH %s", catalog2, toWithClause(config));
sql("CREATE DATABASE %s", catalog2 + ".testdb");
sql("CREATE TABLE %s LIKE tl", catalog2 + ".testdb.tl2");
try {
sql("CREATE DATABASE %s.testdb", catalog2);
sql("CREATE TABLE %s.testdb.tl2 LIKE tl", catalog2);

CatalogTable catalogTable = catalogTable(catalog2, "testdb", "tl2");
assertThat(catalogTable.getUnresolvedSchema())
.isEqualTo(
org.apache.flink.table.api.Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.build());
} finally {
sql("DROP TABLE IF EXISTS %s.testdb.tl2", catalog2);
sql("DROP DATABASE IF EXISTS %s.testdb", catalog2);
dropCatalog(catalog2, true);
}
}

CatalogTable catalogTable = catalogTable(catalog2, "testdb", "tl2");
assertThat(catalogTable.getUnresolvedSchema())
.isEqualTo(
org.apache.flink.table.api.Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.build());
@TestTemplate
public void testCreateTableLikeCopiesTableProperties() {
sql("CREATE TABLE tl(id BIGINT) WITH ('k1'='v1', 'k2'='v2')");
sql("CREATE TABLE tl2 LIKE tl");

Table source = table("tl");
Table copy = table("tl2");
assertThat(copy.properties()).isEqualTo(source.properties());
}

@TestTemplate
public void testCreateTableLikeInDiffIcebergCatalogCopiesTableProperties() {
sql("CREATE TABLE tl(id BIGINT) WITH ('k1'='v1', 'k2'='v2')");

String catalog2 = catalogName + "2";
sql("CREATE CATALOG %s WITH %s", catalog2, toWithClause(config));
try {
sql("CREATE DATABASE %s.testdb", catalog2);
sql("CREATE TABLE %s.testdb.tl2 LIKE tl", catalog2);

Table source = table("tl");
Table copy =
validationCatalog.loadTable(
TableIdentifier.of(
ArrayUtils.concat(baseNamespace.levels(), new String[] {"testdb", "tl2"})));
assertThat(copy.properties()).isEqualTo(source.properties());
} finally {
sql("DROP TABLE IF EXISTS %s.testdb.tl2", catalog2);
sql("DROP DATABASE IF EXISTS %s.testdb", catalog2);
dropCatalog(catalog2, true);
}
}

dropCatalog(catalog2, true);
@TestTemplate
public void testShowCreateTableRetractsSensitiveCatalogProperties() {
String otherCatalog = catalogName + "_other";
String sensitiveKey = "my.password";
String sensitiveValue = "super-secret-value";
String benignKey = "my.harmless.option";
String benignValue = "safe-to-show";

Map<String, String> leakyConfig = Maps.newHashMap(config);
leakyConfig.put(sensitiveKey, sensitiveValue);
leakyConfig.put(benignKey, benignValue);

sql("CREATE CATALOG %s WITH %s", otherCatalog, toWithClause(leakyConfig));
try {
sql("CREATE DATABASE %s.testdb", otherCatalog);
sql("CREATE TABLE %s.testdb.tl(id BIGINT)", otherCatalog);

String showCreate =
(String)
Iterables.getOnlyElement(sql("SHOW CREATE TABLE %s.testdb.tl", otherCatalog))
.getField(0);

assertThat(showCreate)
.as("SHOW CREATE TABLE should still expose the synthetic src-catalog options blob")
.contains(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)
.contains(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY)
.as("Benign catalog property should still be present")
.contains(benignValue)
.as("Sensitive catalog property value must not be there")
.doesNotContain(sensitiveValue);
} finally {
sql("DROP TABLE IF EXISTS %s.testdb.tl", otherCatalog);
sql("DROP DATABASE IF EXISTS %s.testdb", otherCatalog);
dropCatalog(otherCatalog, true);
}
}

@TestTemplate
public void testCreateTableLikeInFlinkCatalogFiltersSensitiveCatalogProperties()
throws TableNotExistException {
String sourceCatalog = catalogName + "_with_secrets";
String sensitiveKey = "my.password";
String sensitiveValue = "super-secret-value";
String benignKey = "my.harmless.option";
String benignValue = "safe-to-show";

Map<String, String> sourceCatalogConfig = Maps.newHashMap(config);
sourceCatalogConfig.put(sensitiveKey, sensitiveValue);
sourceCatalogConfig.put(benignKey, benignValue);

sql("CREATE CATALOG %s WITH %s", sourceCatalog, toWithClause(sourceCatalogConfig));
try {
sql("CREATE DATABASE %s.testdb", sourceCatalog);
sql("CREATE TABLE %s.testdb.tl(id BIGINT)", sourceCatalog);

// CREATE TABLE LIKE from the Iceberg-backed source catalog into Flink's default
// (non-Iceberg) in-memory catalog. The target catalog will persist the table options
// map verbatim, so anything FlinkCatalog#getTable injects will be visible there.
sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE %s.testdb.tl", sourceCatalog);

CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2");
Map<String, String> options = catalogTable.getOptions();

assertThat(options)
.as("Connector and src-catalog props are required for the LIKE-target to work")
.containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY)
.containsKey(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY);

String srcCatalogProps = options.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY);
assertThat(srcCatalogProps)
.as("Benign source-catalog property should still be propagated")
.contains(benignValue)
.as("Sensitive source-catalog property value must not leak into the target catalog")
.doesNotContain(sensitiveValue);

// Belt-and-braces: also check the rendered DDL on the target side.
String showCreate =
(String)
Iterables.getOnlyElement(
sql("SHOW CREATE TABLE `default_catalog`.`default_database`.tl2"))
.getField(0);
assertThat(showCreate)
.as("SHOW CREATE TABLE on the target catalog must not contain the secret")
.doesNotContain(sensitiveValue);
} finally {
sql("DROP TABLE IF EXISTS `default_catalog`.`default_database`.tl2");
sql("DROP TABLE IF EXISTS %s.testdb.tl", sourceCatalog);
sql("DROP DATABASE IF EXISTS %s.testdb", sourceCatalog);
dropCatalog(sourceCatalog, true);
}
}

@TestTemplate
public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT)");

sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl");

CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2");
assertThat(catalogTable.getUnresolvedSchema())
.isEqualTo(
org.apache.flink.table.api.Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.build());

String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", config);
Map<String, String> options = catalogTable.getOptions();
assertThat(options)
.containsEntry(
FlinkCreateTableOptions.CONNECTOR_PROPS_KEY,
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)
.containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps);
try {
CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2");
assertThat(catalogTable.getUnresolvedSchema())
.isEqualTo(
org.apache.flink.table.api.Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.build());

String srcCatalogProps = FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", config);
Map<String, String> options = catalogTable.getOptions();
assertThat(options)
.containsEntry(
FlinkCreateTableOptions.CONNECTOR_PROPS_KEY,
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)
.containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps);
} finally {
sql("DROP TABLE IF EXISTS `default_catalog`.`default_database`.tl2");
}
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
Expand Down Expand Up @@ -111,7 +112,10 @@ public FlinkCatalog(
long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
this.catalogProps =
catalogProps.entrySet().stream()
.filter(e -> !GlobalConfiguration.isSensitive(e.getKey()))
.collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;

Expand Down
Loading