Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
import org.apache.kyuubi.engine.spark.operation.PlanOnlyStatement._
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OperationState, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
import org.apache.kyuubi.operation.log.OperationLog
Expand All @@ -47,6 +47,7 @@ class PlanOnlyStatement(
session: Session,
override val statement: String,
mode: PlanOnlyMode,
queryTimeout: Long,
override protected val handle: OperationHandle)
extends SparkOperation(session) {

Expand Down Expand Up @@ -76,27 +77,38 @@ class PlanOnlyStatement(

override protected def runInternal(): Unit =
try {
withLocalProperties {
SQLConf.withExistingConf(spark.sessionState.conf) {
val parsed = spark.sessionState.sqlParser.parsePlan(statement)
parsed match {
case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
result = spark.sql(statement)
iter = new ArrayFetchIterator(result.collect())

case plan => style match {
case PlainStyle => explainWithPlainStyle(plan)
case JsonStyle => explainWithJsonStyle(plan)
case UnknownStyle => unknownStyleError(style)
case other => throw notSupportedStyleError(other, "Spark SQL")
}
}
}
if (queryTimeout > 0) {
session.sessionManager.operationManager.runWithTimeoutFallback(
() => doParsePlan(),
() => cleanup(OperationState.TIMEOUT),
queryTimeout)
} else {
doParsePlan()
}
} catch {
onError()
}

private def doParsePlan(): Unit = {
withLocalProperties {
SQLConf.withExistingConf(spark.sessionState.conf) {
val parsed = spark.sessionState.sqlParser.parsePlan(statement)
parsed match {
case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
result = spark.sql(statement)
iter = new ArrayFetchIterator(result.collect())

case plan => style match {
case PlainStyle => explainWithPlainStyle(plan)
case JsonStyle => explainWithJsonStyle(plan)
case UnknownStyle => unknownStyleError(style)
case other => throw notSupportedStyleError(other, "Spark SQL")
}
}
}
}
}

private def explainWithPlainStyle(plan: LogicalPlan): Unit = {
mode match {
case ParseMode =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
opHandle)
}
case mode =>
new PlanOnlyStatement(session, statement, mode, opHandle)
new PlanOnlyStatement(session, statement, mode, queryTimeout, opHandle)
}
case OperationLanguages.SCALA =>
val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.operation

import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeoutException, TimeUnit}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -70,6 +70,22 @@ abstract class OperationManager(name: String) extends AbstractService(name) {
super.stop()
}

/**
* Submit a task to the internal timeout scheduler and run it with a timeout.
* If the task execution exceeds the specified timeout, the task will be cancelled
* and the provided onTimeout callback will be executed.
*/
def runWithTimeoutFallback(task: Runnable, onTimeout: Runnable, timeoutSeconds: Long): Unit = {
val future = timeoutScheduler.submit(task)
try {
future.get(timeoutSeconds, TimeUnit.SECONDS)
} catch {
case _: TimeoutException =>
future.cancel(true)
onTimeout.run()
}
}

/** Schedule a timeout task using the internal scheduler */
def scheduleTimeout(action: Runnable, timeoutSeconds: Long): ScheduledFuture[_] = {
timeoutScheduler.schedule(action, timeoutSeconds, TimeUnit.SECONDS)
Expand Down
Loading