Skip to content

Commit e79518d

Browse files
committed
Support SupportsPartitionManagement for HiveTable
1 parent 2037881 commit e79518d

3 files changed

Lines changed: 226 additions & 4 deletions

File tree

extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kyuubi.spark.connector.hive
1919

20+
import java.net.URI
2021
import java.util
2122
import java.util.Locale
2223

@@ -26,16 +27,19 @@ import scala.collection.mutable
2627
import org.apache.hadoop.fs.Path
2728
import org.apache.spark.internal.Logging
2829
import org.apache.spark.sql.SparkSession
29-
import org.apache.spark.sql.catalyst.catalog.CatalogTable
30-
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
30+
import org.apache.spark.sql.catalyst.InternalRow
31+
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
32+
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
33+
import org.apache.spark.sql.catalyst.expressions.{Cast, GenericInternalRow, Literal}
34+
import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
3135
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
3236
import org.apache.spark.sql.connector.expressions.Transform
3337
import org.apache.spark.sql.connector.read.ScanBuilder
3438
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
3539
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
3640
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
3741
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.{BucketSpecHelper, LogicalExpressions}
38-
import org.apache.spark.sql.types.StructType
42+
import org.apache.spark.sql.types.{StringType, StructType}
3943
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4044

4145
import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET}
@@ -46,7 +50,7 @@ case class HiveTable(
4650
sparkSession: SparkSession,
4751
catalogTable: CatalogTable,
4852
hiveTableCatalog: HiveTableCatalog)
49-
extends Table with SupportsRead with SupportsWrite with Logging {
53+
extends Table with SupportsRead with SupportsWrite with SupportsPartitionManagement with Logging {
5054

5155
lazy val dataSchema: StructType = catalogTable.dataSchema
5256

@@ -112,4 +116,92 @@ case class HiveTable(
112116
override def capabilities(): util.Set[TableCapability] = {
113117
util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC)
114118
}
119+
120+
override def createPartition(ident: InternalRow, properties: util.Map[String, String]): Unit = {
121+
val spec = toPartitionSpec(ident)
122+
val location = Option(properties.get(HiveTableProperties.LOCATION)).map(new URI(_))
123+
val newPart = CatalogTablePartition(
124+
spec,
125+
catalogTable.storage.copy(locationUri = location),
126+
properties.asScala.toMap)
127+
hiveTableCatalog.externalCatalog.createPartitions(
128+
catalogTable.database,
129+
catalogTable.identifier.table,
130+
Seq(newPart),
131+
ignoreIfExists = false)
132+
}
133+
134+
override def dropPartition(ident: InternalRow): Boolean = {
135+
try {
136+
hiveTableCatalog.externalCatalog.dropPartitions(
137+
catalogTable.database,
138+
catalogTable.identifier.table,
139+
Seq(toPartitionSpec(ident)),
140+
ignoreIfNotExists = false,
141+
purge = false,
142+
retainData = false)
143+
true
144+
} catch {
145+
case _: NoSuchPartitionException => false
146+
}
147+
}
148+
149+
override def replacePartitionMetadata(
150+
ident: InternalRow,
151+
properties: util.Map[String, String]): Unit = {
152+
throw new UnsupportedOperationException("Replace partition is not supported")
153+
}
154+
155+
override def loadPartitionMetadata(ident: InternalRow): util.Map[String, String] = {
156+
val spec = toPartitionSpec(ident)
157+
val partition = hiveTableCatalog.externalCatalog.getPartition(
158+
catalogTable.database,
159+
catalogTable.identifier.table,
160+
spec)
161+
val metadata = new util.HashMap[String, String](partition.parameters.asJava)
162+
partition.storage.locationUri.foreach { uri =>
163+
metadata.put(HiveTableProperties.LOCATION, uri.toString)
164+
}
165+
metadata
166+
}
167+
168+
override def listPartitionIdentifiers(
169+
names: Array[String],
170+
ident: InternalRow): Array[InternalRow] = {
171+
val partialSpec = if (names.isEmpty) {
172+
None
173+
} else {
174+
val fields = names.map(partitionSchema(_))
175+
val schema = StructType(fields)
176+
Some(toPartitionSpec(ident, schema))
177+
}
178+
hiveTableCatalog.externalCatalog.listPartitions(
179+
catalogTable.database,
180+
catalogTable.identifier.table,
181+
partialSpec).map { part =>
182+
val values = partitionSchema.map { field =>
183+
val strValue = part.spec(field.name)
184+
Cast(Literal(strValue), field.dataType).eval()
185+
}
186+
new GenericInternalRow(values.toArray)
187+
}.toArray
188+
}
189+
190+
private def toPartitionSpec(ident: InternalRow, schema: StructType): Map[String, String] = {
191+
require(
192+
schema.size == ident.numFields,
193+
s"Schema size (${schema.size}) does not match numFields (${ident.numFields})")
194+
schema.zipWithIndex.map { case (field, index) =>
195+
val value = ident.get(index, field.dataType)
196+
val filedValue = Cast(
197+
Literal(value, field.dataType),
198+
StringType,
199+
Some(sparkSession.sessionState.conf.sessionLocalTimeZone)).eval().toString
200+
field.name -> filedValue
201+
}.toMap
202+
}
203+
204+
private def toPartitionSpec(ident: InternalRow): Map[String, String] = {
205+
toPartitionSpec(ident, partitionSchema)
206+
}
115207
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.hive
19+
20+
object HiveTableProperties {
21+
val LOCATION = "location"
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.spark.connector.hive.command
19+
20+
import org.apache.spark.sql.Row
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException}
23+
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement, TableCatalog}
24+
import org.apache.spark.unsafe.types.UTF8String
25+
26+
import org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.{V1_COMMAND_VERSION, V2_COMMAND_VERSION}
27+
28+
trait PartitionManagementSuite extends DDLCommandTestUtils {
29+
override protected def command: String = "PARTITION MANAGEMENT"
30+
31+
test("create partition") {
32+
withNamespaceAndTable("ns", "tbl") { t =>
33+
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
34+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
35+
checkAnswer(
36+
sql(s"SHOW PARTITIONS $t"),
37+
Row("year=2023/month=01") :: Nil)
38+
intercept[PartitionsAlreadyExistException] {
39+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
40+
}
41+
}
42+
}
43+
44+
test("drop partition") {
45+
withNamespaceAndTable("ns", "tbl") { t =>
46+
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
47+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
48+
sql(s"ALTER TABLE $t DROP PARTITION (year='2023', month='01')")
49+
checkAnswer(
50+
sql(s"SHOW PARTITIONS $t"),
51+
Nil)
52+
intercept[NoSuchPartitionsException] {
53+
sql(s"ALTER TABLE $t DROP PARTITION (year='9999', month='99')")
54+
}
55+
}
56+
}
57+
58+
test("show partitions") {
59+
withNamespaceAndTable("ns", "tbl") { t =>
60+
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
61+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01')")
62+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='02')")
63+
checkAnswer(
64+
sql(s"SHOW PARTITIONS $t"),
65+
Row("year=2023/month=01") :: Row("year=2023/month=02") :: Nil)
66+
67+
checkAnswer(
68+
sql(s"SHOW PARTITIONS $t PARTITION (year='2023', month='01')"),
69+
Row("year=2023/month=01") :: Nil)
70+
71+
checkAnswer(
72+
sql(s"SHOW PARTITIONS $t PARTITION (year='2023')"),
73+
Row("year=2023/month=01") :: Row("year=2023/month=02") :: Nil)
74+
}
75+
}
76+
}
77+
78+
class PartitionManagementV2Suite extends PartitionManagementSuite {
79+
override protected def catalogVersion: String = "Hive V2"
80+
override protected def commandVersion: String = V2_COMMAND_VERSION
81+
82+
test("create partition with location") {
83+
withNamespaceAndTable("ns", "tbl") { t =>
84+
sql(s"CREATE TABLE $t (id string, year string, month string) PARTITIONED BY (year, month)")
85+
val loc = "file:///tmp/kyuubi/hive_catalog_part_loc"
86+
sql(s"ALTER TABLE $t ADD PARTITION (year='2023', month='01') LOCATION '$loc'")
87+
checkAnswer(
88+
sql(s"SHOW PARTITIONS $t"),
89+
Row("year=2023/month=01") :: Nil)
90+
val catalog = spark.sessionState.catalogManager
91+
.catalog(catalogName).asInstanceOf[TableCatalog]
92+
val partManagement = catalog.loadTable(Identifier.of(Array("ns"), "tbl"))
93+
.asInstanceOf[SupportsPartitionManagement]
94+
val partIdent = InternalRow.fromSeq(
95+
Seq(UTF8String.fromString("2023"), UTF8String.fromString("01")))
96+
val metadata = partManagement.loadPartitionMetadata(partIdent)
97+
assert(metadata.containsKey("location"))
98+
assert(metadata.get("location").contains("hive_catalog_part_loc"))
99+
}
100+
}
101+
}
102+
103+
class PartitionManagementV1Suite extends PartitionManagementSuite {
104+
val SESSION_CATALOG_NAME: String = "spark_catalog"
105+
override protected val catalogName: String = SESSION_CATALOG_NAME
106+
override protected def catalogVersion: String = "V1"
107+
override protected def commandVersion: String = V1_COMMAND_VERSION
108+
}

0 commit comments

Comments
 (0)