Skip to content

Commit fde836a

Browse files
committed
Remove unused test case
1 parent 5d2bf9e commit fde836a

File tree

3 files changed

+21
-52
lines changed

3 files changed

+21
-52
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.kyuubi.engine.spark.operation
1919

2020
import java.lang.{Boolean => JBoolean}
21-
import java.util.concurrent.{TimeoutException, TimeUnit}
2221

2322
import com.fasterxml.jackson.databind.ObjectMapper
2423
import com.fasterxml.jackson.module.scala.DefaultScalaModule
@@ -39,7 +38,6 @@ import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownM
3938
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
4039
import org.apache.kyuubi.operation.log.OperationLog
4140
import org.apache.kyuubi.session.Session
42-
import org.apache.kyuubi.util.ThreadUtils
4341
import org.apache.kyuubi.util.reflect.DynMethods
4442

4543
/**
@@ -80,20 +78,10 @@ class PlanOnlyStatement(
8078
override protected def runInternal(): Unit =
8179
try {
8280
if (queryTimeout > 0) {
83-
val timeoutExecutor =
84-
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
85-
val future = timeoutExecutor.submit(new Runnable {
86-
override def run(): Unit = doParsePlan()
87-
})
88-
try {
89-
future.get(queryTimeout, TimeUnit.SECONDS)
90-
} catch {
91-
case _: TimeoutException =>
92-
future.cancel(true)
93-
cleanup(OperationState.TIMEOUT)
94-
} finally {
95-
timeoutExecutor.shutdownNow()
96-
}
81+
session.sessionManager.operationManager.runWithTimeoutFallback(
82+
() => doParsePlan(),
83+
() => cleanup(OperationState.TIMEOUT),
84+
queryTimeout)
9785
} else {
9886
doParsePlan()
9987
}

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.kyuubi.operation
1919

20-
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}
20+
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeoutException, TimeUnit}
2121

2222
import scala.collection.JavaConverters._
2323

@@ -70,6 +70,22 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
7070
super.stop()
7171
}
7272

73+
/**
74+
* Submit a task to the internal timeout scheduler and run it with a timeout.
75+
* If the task execution exceeds the specified timeout, the task will be cancelled
76+
* and the provided onTimeout callback will be executed.
77+
*/
78+
def runWithTimeoutFallback(task: Runnable, onTimeout: Runnable, timeoutSeconds: Long): Unit = {
79+
val future = timeoutScheduler.submit(task)
80+
try {
81+
future.get(timeoutSeconds, TimeUnit.SECONDS)
82+
} catch {
83+
case _: TimeoutException =>
84+
future.cancel(true)
85+
onTimeout.run()
86+
}
87+
}
88+
7389
/** Schedule a timeout task using the internal scheduler */
7490
def scheduleTimeout(action: Runnable, timeoutSeconds: Long): ScheduledFuture[_] = {
7591
timeoutScheduler.schedule(action, timeoutSeconds, TimeUnit.SECONDS)

kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkSqlEngineSuite.scala

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.kyuubi.engine.spark
1919

20-
import java.sql.SQLTimeoutException
21-
2220
import org.apache.kyuubi.WithKyuubiServer
2321
import org.apache.kyuubi.config.KyuubiConf
2422
import org.apache.kyuubi.config.KyuubiConf._
@@ -151,38 +149,5 @@ class SparkSqlEngineSuite extends WithKyuubiServer with HiveJDBCTestHelper {
151149
}
152150
}
153151

154-
test("KYUUBI-7323: Support timeout at PlanOnlyMode") {
155-
val tableName = "t_timeout"
156-
val sessionConfMap = Map("kyuubi.operation.plan.only.mode=analyze" -> "analyze")
157-
withSessionConf(sessionConfMap)(Map.empty)(Map.empty) {
158-
159-
withJdbcStatement(tableName) { statement =>
160-
statement.setQueryTimeout(2)
161-
val layers = 30
162-
163-
val cteBuilder = new StringBuilder()
164-
cteBuilder.append("WITH p AS (SELECT dt, tid, count(1) as pv, DENSE_RANK() " +
165-
"over(order by dt) as dt_rank FROM t GROUP BY 1, 2), ")
166-
cteBuilder.append("test_d1 AS (SELECT dt, tid FROM (SELECT dt, tid, row_number() " +
167-
"over(order by pv desc) as r FROM p WHERE dt_rank=1) WHERE r<=100)")
168-
169-
for (i <- 2 to layers) {
170-
cteBuilder.append(s", test_d$i AS ( " +
171-
s"SELECT dt, tid FROM (SELECT p.dt, p.tid, row_number() over(order by pv desc) as r " +
172-
s"FROM p LEFT JOIN test_d${i - 1} prev ON p.tid = prev.tid " +
173-
s"WHERE p.dt_rank=$i AND prev.tid IS NULL) WHERE r<=100 " +
174-
s"UNION ALL SELECT dt, tid FROM test_d${i - 1})")
175-
}
176-
177-
val complexSql = cteBuilder.toString() + s" SELECT * FROM test_d$layers"
178-
179-
val e = intercept[SQLTimeoutException] {
180-
statement.executeQuery(complexSql)
181-
}
182-
assert(e.getMessage.equals("Query timed out after 2 seconds"))
183-
}
184-
}
185-
}
186-
187152
override protected def jdbcUrl: String = getJdbcUrl
188153
}

0 commit comments

Comments
 (0)