Skip to content

Commit 5d2bf9e

Browse files
committed
[KYUUBI #7323] Support timeout at PlanOnlyMode
1 parent 69e8e95 commit 5d2bf9e

3 files changed

Lines changed: 76 additions & 17 deletions

File tree

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.kyuubi.engine.spark.operation
1919

2020
import java.lang.{Boolean => JBoolean}
21+
import java.util.concurrent.{TimeoutException, TimeUnit}
2122

2223
import com.fasterxml.jackson.databind.ObjectMapper
2324
import com.fasterxml.jackson.module.scala.DefaultScalaModule
@@ -33,11 +34,12 @@ import org.apache.kyuubi.KyuubiSQLException
3334
import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
3435
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
3536
import org.apache.kyuubi.engine.spark.operation.PlanOnlyStatement._
36-
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
37+
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OperationState, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
3738
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
3839
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
3940
import org.apache.kyuubi.operation.log.OperationLog
4041
import org.apache.kyuubi.session.Session
42+
import org.apache.kyuubi.util.ThreadUtils
4143
import org.apache.kyuubi.util.reflect.DynMethods
4244

4345
/**
@@ -47,6 +49,7 @@ class PlanOnlyStatement(
4749
session: Session,
4850
override val statement: String,
4951
mode: PlanOnlyMode,
52+
queryTimeout: Long,
5053
override protected val handle: OperationHandle)
5154
extends SparkOperation(session) {
5255

@@ -76,27 +79,48 @@ class PlanOnlyStatement(
7679

7780
override protected def runInternal(): Unit =
7881
try {
79-
withLocalProperties {
80-
SQLConf.withExistingConf(spark.sessionState.conf) {
81-
val parsed = spark.sessionState.sqlParser.parsePlan(statement)
82-
parsed match {
83-
case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
84-
result = spark.sql(statement)
85-
iter = new ArrayFetchIterator(result.collect())
86-
87-
case plan => style match {
88-
case PlainStyle => explainWithPlainStyle(plan)
89-
case JsonStyle => explainWithJsonStyle(plan)
90-
case UnknownStyle => unknownStyleError(style)
91-
case other => throw notSupportedStyleError(other, "Spark SQL")
92-
}
93-
}
82+
if (queryTimeout > 0) {
83+
val timeoutExecutor =
84+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
85+
val future = timeoutExecutor.submit(new Runnable {
86+
override def run(): Unit = doParsePlan()
87+
})
88+
try {
89+
future.get(queryTimeout, TimeUnit.SECONDS)
90+
} catch {
91+
case _: TimeoutException =>
92+
future.cancel(true)
93+
cleanup(OperationState.TIMEOUT)
94+
} finally {
95+
timeoutExecutor.shutdownNow()
9496
}
97+
} else {
98+
doParsePlan()
9599
}
96100
} catch {
97101
onError()
98102
}
99103

104+
private def doParsePlan(): Unit = {
105+
withLocalProperties {
106+
SQLConf.withExistingConf(spark.sessionState.conf) {
107+
val parsed = spark.sessionState.sqlParser.parsePlan(statement)
108+
parsed match {
109+
case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
110+
result = spark.sql(statement)
111+
iter = new ArrayFetchIterator(result.collect())
112+
113+
case plan => style match {
114+
case PlainStyle => explainWithPlainStyle(plan)
115+
case JsonStyle => explainWithJsonStyle(plan)
116+
case UnknownStyle => unknownStyleError(style)
117+
case other => throw notSupportedStyleError(other, "Spark SQL")
118+
}
119+
}
120+
}
121+
}
122+
}
123+
100124
private def explainWithPlainStyle(plan: LogicalPlan): Unit = {
101125
mode match {
102126
case ParseMode =>

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
109109
opHandle)
110110
}
111111
case mode =>
112-
new PlanOnlyStatement(session, statement, mode, opHandle)
112+
new PlanOnlyStatement(session, statement, mode, queryTimeout, opHandle)
113113
}
114114
case OperationLanguages.SCALA =>
115115
val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))

kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.kyuubi.engine.spark
1919

20+
import java.sql.SQLTimeoutException
21+
2022
import org.apache.kyuubi.WithKyuubiServer
2123
import org.apache.kyuubi.config.KyuubiConf
2224
import org.apache.kyuubi.config.KyuubiConf._
@@ -149,5 +151,38 @@ class SparkSqlEngineSuite extends WithKyuubiServer with HiveJDBCTestHelper {
149151
}
150152
}
151153

154+
test("KYUUBI-7323: Support timeout at PlanOnlyMode") {
155+
val tableName = "t_timeout"
156+
val sessionConfMap = Map("kyuubi.operation.plan.only.mode=analyze" -> "analyze")
157+
withSessionConf(sessionConfMap)(Map.empty)(Map.empty) {
158+
159+
withJdbcStatement(tableName) { statement =>
160+
statement.setQueryTimeout(2)
161+
val layers = 30
162+
163+
val cteBuilder = new StringBuilder()
164+
cteBuilder.append("WITH p AS (SELECT dt, tid, count(1) as pv, DENSE_RANK() " +
165+
"over(order by dt) as dt_rank FROM t GROUP BY 1, 2), ")
166+
cteBuilder.append("test_d1 AS (SELECT dt, tid FROM (SELECT dt, tid, row_number() " +
167+
"over(order by pv desc) as r FROM p WHERE dt_rank=1) WHERE r<=100)")
168+
169+
for (i <- 2 to layers) {
170+
cteBuilder.append(s", test_d$i AS ( " +
171+
s"SELECT dt, tid FROM (SELECT p.dt, p.tid, row_number() over(order by pv desc) as r " +
172+
s"FROM p LEFT JOIN test_d${i - 1} prev ON p.tid = prev.tid " +
173+
s"WHERE p.dt_rank=$i AND prev.tid IS NULL) WHERE r<=100 " +
174+
s"UNION ALL SELECT dt, tid FROM test_d${i - 1})")
175+
}
176+
177+
val complexSql = cteBuilder.toString() + s" SELECT * FROM test_d$layers"
178+
179+
val e = intercept[SQLTimeoutException] {
180+
statement.executeQuery(complexSql)
181+
}
182+
assert(e.getMessage.equals("Query timed out after 2 seconds"))
183+
}
184+
}
185+
}
186+
152187
override protected def jdbcUrl: String = getJdbcUrl
153188
}

0 commit comments

Comments
 (0)