Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,37 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}

import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods}
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs

object HiveConnectorUtils extends Logging {

private val castCtor: DynConstructors.Ctor[Expression] =
DynConstructors.builder(classOf[Expression])
.impl(
classOf[Cast],
classOf[Expression],
classOf[DataType],
classOf[Option[_]])
.build[Expression]()

// SPARK-40054, ensuring cross-version compatibility.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add fixed version alongside the JIRA ticket id.

def castExpression(
child: Expression,
dataType: DataType,
timeZoneId: Option[String] = None): Expression = {
castCtor.newInstance(child, dataType, timeZoneId)
}

def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat =
Try { // SPARK-43186: 3.5.0
DynConstructors.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kyuubi.spark.connector.hive

import java.net.URI
import java.util
import java.util.Locale

Expand All @@ -26,16 +27,19 @@ import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal}
import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, LogicalExpressions}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET}
Expand All @@ -46,7 +50,7 @@ case class HiveTable(
sparkSession: SparkSession,
catalogTable: CatalogTable,
hiveTableCatalog: HiveTableCatalog)
extends Table with SupportsRead with SupportsWrite with Logging {
extends Table with SupportsRead with SupportsWrite with SupportsPartitionManagement with Logging {

lazy val dataSchema: StructType = catalogTable.dataSchema

Expand Down Expand Up @@ -112,4 +116,92 @@ case class HiveTable(
override def capabilities(): util.Set[TableCapability] = {
util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC)
}

override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = {
val spec = toPartitionSpec(ident)
val location = Option(properties.get(HiveTableProperties.LOCATION)).map(new URI(_))
val newPart = CatalogTablePartition(
spec,
catalogTable.storage.copy(locationUri = location),
properties.asScala.toMap)
hiveTableCatalog.externalCatalog.createPartitions(
catalogTable.database,
catalogTable.identifier.table,
Seq(newPart),
ignoreIfExists = false)
}

override def dropPartition(ident: InternalRow): Boolean = {
try {
hiveTableCatalog.externalCatalog.dropPartitions(
catalogTable.database,
catalogTable.identifier.table,
Seq(toPartitionSpec(ident)),
ignoreIfNotExists = false,
purge = false,
retainData = false)
true
} catch {
case _: NoSuchPartitionException => false
}
}

override def replacePartitionMetadata(
ident: InternalRow,
properties: util.Map[String, String]): Unit = {
throw new UnsupportedOperationException("Replace partition is not supported")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new UnsupportedOperationException("Replace partition is not supported")
throw new UnsupportedOperationException("Replace partition metadata is not supported")

}

override def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = {
val spec = toPartitionSpec(ident)
val partition = hiveTableCatalog.externalCatalog.getPartition(
catalogTable.database,
catalogTable.identifier.table,
spec)
val metadata = new util.HashMap[String, String](partition.parameters.asJava)
partition.storage.locationUri.foreach { uri =>
metadata.put(HiveTableProperties.LOCATION, uri.toString)
}
metadata
}

override def listPartitionIdentifiers(
names: Array[String],
ident: InternalRow): Array[InternalRow] = {
val partialSpec = if (names.isEmpty) {
None
} else {
val fields = names.map(partitionSchema(_))
val schema = StructType(fields)
Some(toPartitionSpec(ident, schema))
}
hiveTableCatalog.externalCatalog.listPartitions(
catalogTable.database,
catalogTable.identifier.table,
partialSpec).map { part =>
val values = partitionSchema.map { field =>
val strValue = part.spec(field.name)
HiveConnectorUtils.castExpression(Literal(strValue), field.dataType).eval()
}
new GenericInternalRow(values.toArray)
}.toArray
}

private def toPartitionSpec(ident: InternalRow, schema: StructType): Map[String, String] = {
require(
schema.size == ident.numFields,
s"Schema size (${schema.size}) does not match numFields (${ident.numFields})")
schema.zipWithIndex.map { case (field, index) =>
val value = ident.get(index, field.dataType)
val fieldValue = HiveConnectorUtils.castExpression(
Literal(value, field.dataType),
StringType,
Some(sparkSession.sessionState.conf.sessionLocalTimeZone)).eval().toString
field.name -> fieldValue
}.toMap
}

private def toPartitionSpec(ident: InternalRow): Map[String, String] = {
toPartitionSpec(ident, partitionSchema)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.spark.connector.hive

object HiveTableProperties {
val LOCATION = "location"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.spark.connector.hive.command

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement, TableCatalog}
import org.apache.spark.unsafe.types.UTF8String

import org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.{V1_COMMAND_VERSION, V2_COMMAND_VERSION}

trait PartitionManagementSuite extends DDLCommandTestUtils {
override protected def command: String = "PARTITION MANAGEMENT"

test("create partition") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
checkAnswer(
sql(s"SHOW PARTITIONS $t"),
Row("year=2023/month=01") :: Nil)
intercept[PartitionsAlreadyExistException] {
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
}
}
}

test("drop partition") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
sql(s"ALTER TABLE $t DROP PARTITION (year='2023', month='01')")
checkAnswer(
sql(s"SHOW PARTITIONS $t"),
Nil)
intercept[NoSuchPartitionsException] {
sql(s"ALTER TABLE $t DROP PARTITION (year='9999', month='99')")
}
}
}

test("show partitions") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='02')")
checkAnswer(
sql(s"SHOW PARTITIONS $t"),
Row("year=2023/month=01") :: Row("year=2023/month=02") :: Nil)

checkAnswer(
sql(s"SHOW PARTITIONS $t PARTITION (year='2023', month='01')"),
Row("year=2023/month=01") :: Nil)

checkAnswer(
sql(s"SHOW PARTITIONS $t PARTITION (year='2023')"),
Row("year=2023/month=01") :: Row("year=2023/month=02") :: Nil)
}
}
}

class PartitionManagementV2Suite extends PartitionManagementSuite {
override protected def catalogVersion: String = "Hive V2"
override protected def commandVersion: String = V2_COMMAND_VERSION

test("create partition with location") {
withNamespaceAndTable("ns", "tbl") { t =>
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
val loc = "file:///tmp/kyuubi/hive_catalog_part_loc"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a temp dir instead of relying on global /tmp

sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01') LOCATION '$loc'")
checkAnswer(
sql(s"SHOW PARTITIONS $t"),
Row("year=2023/month=01") :: Nil)
val catalog = spark.sessionState.catalogManager
.catalog(catalogName).asInstanceOf[TableCatalog]
val partManagement = catalog.loadTable(Identifier.of(Array("ns"), "tbl"))
.asInstanceOf[SupportsPartitionManagement]
val partIdent = InternalRow.fromSeq(
Seq(UTF8String.fromString("2023"), UTF8String.fromString("01")))
val metadata = partManagement.loadPartitionMetadata(partIdent)
assert(metadata.containsKey("location"))
assert(metadata.get("location").contains("hive_catalog_part_loc"))
}
}
}

class PartitionManagementV1Suite extends PartitionManagementSuite {
val SESSION_CATALOG_NAME: String = "spark_catalog"
override protected val catalogName: String = SESSION_CATALOG_NAME
override protected def catalogVersion: String = "V1"
override protected def commandVersion: String = V1_COMMAND_VERSION
}
Loading