diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index b48863e3c8f..728125305e2 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -33,7 +33,7 @@ import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf import org.apache.kyuubi.engine.spark.operation.PlanOnlyStatement._ -import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle} +import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OperationState, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle} import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError} import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError} import org.apache.kyuubi.operation.log.OperationLog @@ -47,6 +47,7 @@ class PlanOnlyStatement( session: Session, override val statement: String, mode: PlanOnlyMode, + queryTimeout: Long, override protected val handle: OperationHandle) extends SparkOperation(session) { @@ -76,27 +77,38 @@ class PlanOnlyStatement( override protected def runInternal(): Unit = try { - withLocalProperties { - SQLConf.withExistingConf(spark.sessionState.conf) { - val parsed = spark.sessionState.sqlParser.parsePlan(statement) - parsed match { - case cmd if planExcludes.contains(cmd.getClass.getSimpleName) => - result = spark.sql(statement) - iter = new ArrayFetchIterator(result.collect()) - - case plan => style match { - case PlainStyle => explainWithPlainStyle(plan) - case JsonStyle => explainWithJsonStyle(plan) - case UnknownStyle => unknownStyleError(style) - case other => throw notSupportedStyleError(other, "Spark SQL") - } - } - } + if (queryTimeout > 0) { + session.sessionManager.operationManager.runWithTimeoutFallback( + () => doParsePlan(), + () => cleanup(OperationState.TIMEOUT), + queryTimeout) + } else { + doParsePlan() } } catch { onError() } + private def doParsePlan(): Unit = { + withLocalProperties { + SQLConf.withExistingConf(spark.sessionState.conf) { + val parsed = spark.sessionState.sqlParser.parsePlan(statement) + parsed match { + case cmd if planExcludes.contains(cmd.getClass.getSimpleName) => + result = spark.sql(statement) + iter = new ArrayFetchIterator(result.collect()) + + case plan => style match { + case PlainStyle => explainWithPlainStyle(plan) + case JsonStyle => explainWithJsonStyle(plan) + case UnknownStyle => unknownStyleError(style) + case other => throw notSupportedStyleError(other, "Spark SQL") + } + } + } + } + } + private def explainWithPlainStyle(plan: LogicalPlan): Unit = { mode match { case ParseMode => diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala index 5533d9c45e2..0ac8d8395a0 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala @@ -109,7 +109,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n opHandle) } case mode => - new PlanOnlyStatement(session, statement, mode, opHandle) + new PlanOnlyStatement(session, statement, mode, queryTimeout, opHandle) } case OperationLanguages.SCALA => val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark)) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala index bebaf269bdc..efc58040bac 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.operation -import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} +import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeoutException, TimeUnit} import scala.collection.JavaConverters._ @@ -70,6 +70,22 @@ abstract class OperationManager(name: String) extends AbstractService(name) { super.stop() } + /** + * Submit a task to the internal timeout scheduler and run it with a timeout. + * If the task execution exceeds the specified timeout, the task will be cancelled + * and the provided onTimeout callback will be executed. + */ + def runWithTimeoutFallback(task: Runnable, onTimeout: Runnable, timeoutSeconds: Long): Unit = { + val future = timeoutScheduler.submit(task) + try { + future.get(timeoutSeconds, TimeUnit.SECONDS) + } catch { + case _: TimeoutException => + future.cancel(true) + onTimeout.run() + } + } + /** Schedule a timeout task using the internal scheduler */ def scheduleTimeout(action: Runnable, timeoutSeconds: Long): ScheduledFuture[_] = { timeoutScheduler.schedule(action, timeoutSeconds, TimeUnit.SECONDS)