Skip to content

Commit 3a529eb

Browse files
maomaodevpan3793
authored andcommitted
[KYUUBI #7377] KSHC create database respects catalog-level warehouse
### Why are the changes needed? When creating a database using KSHC, the database location does not respect `spark.sql.catalog.<catalog>.hive.metastore.warehouse.dir`. Instead, it always falls back to `spark.sql.warehouse.dir`. ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #7378 from maomaodev/kyuubi-7377. Closes #7377 7eb12dd [lifumao] inline vars 62e1be0 [lifumao] KSHC create database respects catalog-level warehouse 5d12669 [lifumao] KSHC create database respects catalog-level warehouse Authored-by: lifumao <lifumao@tencent.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent ed5d8b6 commit 3a529eb

File tree

2 files changed

+71
-5
lines changed

2 files changed

+71
-5
lines changed

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
477477
namespace match {
478478
case Array(db) if !catalog.databaseExists(db) =>
479479
catalog.createDatabase(
480-
toCatalogDatabase(db, metadata, defaultLocation = Some(catalog.getDefaultDBPath(db))),
480+
toCatalogDatabase(db, metadata, defaultLocation = Some(getCatalogDefaultDBPath(db))),
481481
ignoreIfExists = false)
482482

483483
case Array(_) =>
@@ -488,6 +488,25 @@ class HiveTableCatalog(sparkSession: SparkSession)
488488
}
489489
}
490490

491+
/**
492+
* Returns the default database path with catalog-level warehouse configuration precedence.
493+
*
494+
* This method resolves the database path using the following priority order:
495+
* 1. Catalog-level `spark.sql.catalog.<catalog>.hive.metastore.warehouse.dir`
496+
* 2. Global-level `spark.sql.warehouse.dir` (Underlying)
497+
*
498+
* @param db database name
499+
* @return qualified URI path for the database
500+
*/
501+
private def getCatalogDefaultDBPath(db: String): URI = {
502+
Option(catalogOptions.get("hive.metastore.warehouse.dir")).filter(_.nonEmpty) match {
503+
case Some(dir) =>
504+
CatalogUtils.makeQualifiedDBObjectPath(catalog.getDefaultDBPath(db), dir, hadoopConf)
505+
case None =>
506+
catalog.getDefaultDBPath(db)
507+
}
508+
}
509+
491510
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit =
492511
withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
493512
namespace match {

extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
2929
import org.apache.spark.sql.AnalysisException
3030
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
3131
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
32-
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
32+
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, TableCatalog}
3333
import org.apache.spark.sql.connector.expressions.Transform
3434
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
3535
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
@@ -54,12 +54,13 @@ class HiveCatalogSuite extends KyuubiHiveTest {
5454

5555
var catalog: HiveTableCatalog = _
5656

57-
private def newCatalog(): HiveTableCatalog = {
57+
private def newCatalog(extraOptions: Map[String, String] = Map.empty): HiveTableCatalog = {
5858
val catalog = new HiveTableCatalog
5959
val catalogName = "hive"
6060
val properties = Maps.newHashMap[String, String]()
6161
properties.put("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:memorydb;create=true")
6262
properties.put("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
63+
extraOptions.foreach { case (k, v) => properties.put(k, v) }
6364
catalog.initialize(catalogName, new CaseInsensitiveStringMap(properties))
6465
catalog
6566
}
@@ -71,8 +72,15 @@ class HiveCatalogSuite extends KyuubiHiveTest {
7172
}
7273

7374
def makeQualifiedPathWithWarehouse(path: String): URI = {
74-
val p = new Path(catalog.conf.warehousePath, path)
75-
val fs = p.getFileSystem(catalog.hadoopConfiguration())
75+
makeQualifiedPathWithWarehouse(path, catalog.conf.warehousePath, catalog)
76+
}
77+
78+
private def makeQualifiedPathWithWarehouse(
79+
path: String,
80+
warehousePath: String,
81+
targetCatalog: HiveTableCatalog): URI = {
82+
val p = new Path(warehousePath, path)
83+
val fs = p.getFileSystem(targetCatalog.hadoopConfiguration())
7684
fs.makeQualified(p).toUri
7785
}
7886

@@ -357,6 +365,45 @@ class HiveCatalogSuite extends KyuubiHiveTest {
357365
catalog.dropNamespace(testNs, cascade = false)
358366
}
359367

368+
test("createNamespace location: use global-level warehouse dir") {
369+
val ns = Array("ns_default_path")
370+
try {
371+
catalog.createNamespace(ns, emptyProps)
372+
val location = catalog.loadNamespaceMetadata(ns)
373+
.asScala(SupportsNamespaces.PROP_LOCATION)
374+
375+
val expectedUri = makeQualifiedPathWithWarehouse(s"${ns.head}.db")
376+
assert(new URI(location) === expectedUri)
377+
} finally {
378+
catalog.dropNamespace(ns, cascade = true)
379+
}
380+
}
381+
382+
test("createNamespace location: use catalog-level warehouse dir") {
383+
withTempDir { tmpDir =>
384+
val customWarehouseDir = tmpDir.getCanonicalPath
385+
val customCatalog = newCatalog(Map("hive.metastore.warehouse.dir" -> customWarehouseDir))
386+
val ns = Array("ns_custom_path")
387+
try {
388+
customCatalog.createNamespace(ns, emptyProps)
389+
val location = customCatalog.loadNamespaceMetadata(ns)
390+
.asScala(SupportsNamespaces.PROP_LOCATION)
391+
392+
val expectedUri =
393+
makeQualifiedPathWithWarehouse(s"${ns.head}.db", customWarehouseDir, customCatalog)
394+
assert(new URI(location) === expectedUri)
395+
396+
val defaultUri = makeQualifiedPathWithWarehouse(
397+
s"${ns.head}.db",
398+
customCatalog.conf.warehousePath,
399+
customCatalog)
400+
assert(new URI(location) !== defaultUri)
401+
} finally {
402+
customCatalog.dropNamespace(ns, cascade = true)
403+
}
404+
}
405+
}
406+
360407
test("Support Parquet/Orc provider is splitable") {
361408
val parquet_table = Identifier.of(testNs, "parquet_table")
362409
val parProps: util.Map[String, String] = new util.HashMap[String, String]()

0 commit comments

Comments
 (0)