Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.engine.spark.operation

import java.lang.{Boolean => JBoolean}
import java.util.concurrent.{TimeoutException, TimeUnit}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
Expand All @@ -33,11 +34,12 @@ import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
import org.apache.kyuubi.engine.spark.operation.PlanOnlyStatement._
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OperationState, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils
import org.apache.kyuubi.util.reflect.DynMethods

/**
Expand All @@ -47,6 +49,7 @@ class PlanOnlyStatement(
session: Session,
override val statement: String,
mode: PlanOnlyMode,
queryTimeout: Long,
override protected val handle: OperationHandle)
extends SparkOperation(session) {

Expand Down Expand Up @@ -76,27 +79,48 @@ class PlanOnlyStatement(

override protected def runInternal(): Unit =
try {
withLocalProperties {
SQLConf.withExistingConf(spark.sessionState.conf) {
val parsed = spark.sessionState.sqlParser.parsePlan(statement)
parsed match {
case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
result = spark.sql(statement)
iter = new ArrayFetchIterator(result.collect())

case plan => style match {
case PlainStyle => explainWithPlainStyle(plan)
case JsonStyle => explainWithJsonStyle(plan)
case UnknownStyle => unknownStyleError(style)
case other => throw notSupportedStyleError(other, "Spark SQL")
}
}
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread", false)
Copy link
Copy Markdown
Member

@pan3793 pan3793 Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this creates a new thread for each operation, it's too expensive. let's follow the idea of #7121 to create a global ThreadScheduledExecutor shared by all operations.

Q: why plan-only stmt was not covered by that? I suppose it should be, though I didn't take a deeper look yet

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tes, this is expensive. I will add a new method

def runWithTimeoutFallback(task: Runnable,
                             onTimeout: Runnable,
                             timeoutSeconds: Long): Unit = {
    val future = timeoutScheduler.submit(task)
    try {
      future.get(timeoutSeconds, TimeUnit.SECONDS)
    } catch {
      case _: TimeoutException =>
        future.cancel(true)
        onTimeout.run()
    }
  }

at OperationManager to deal with this.

And I had also assumed that plan-only statements could respond to timeouts, but I discovered that they cannot.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our production environment, we discovered that in the current 1.9.x version, when setting a timeout in plan-only mode, although the session in the Kyuubi server has timed out and closed, the driver in the Spark engine continues to parse and execute the plan. This causes memory to remain unavailable.

val future = timeoutExecutor.submit(new Runnable {
override def run(): Unit = doParsePlan()
})
try {
future.get(queryTimeout, TimeUnit.SECONDS)
} catch {
case _: TimeoutException =>
future.cancel(true)
cleanup(OperationState.TIMEOUT)
} finally {
timeoutExecutor.shutdownNow()
}
} else {
doParsePlan()
}
} catch {
onError()
}

private def doParsePlan(): Unit = {
withLocalProperties {
SQLConf.withExistingConf(spark.sessionState.conf) {
val parsed = spark.sessionState.sqlParser.parsePlan(statement)
parsed match {
case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
result = spark.sql(statement)
iter = new ArrayFetchIterator(result.collect())

case plan => style match {
case PlainStyle => explainWithPlainStyle(plan)
case JsonStyle => explainWithJsonStyle(plan)
case UnknownStyle => unknownStyleError(style)
case other => throw notSupportedStyleError(other, "Spark SQL")
}
}
}
}
}

private def explainWithPlainStyle(plan: LogicalPlan): Unit = {
mode match {
case ParseMode =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
opHandle)
}
case mode =>
new PlanOnlyStatement(session, statement, mode, opHandle)
new PlanOnlyStatement(session, statement, mode, queryTimeout, opHandle)
}
case OperationLanguages.SCALA =>
val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.engine.spark

import java.sql.SQLTimeoutException

import org.apache.kyuubi.WithKyuubiServer
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
Expand Down Expand Up @@ -149,5 +151,38 @@ class SparkSqlEngineSuite extends WithKyuubiServer with HiveJDBCTestHelper {
}
}

test("KYUUBI-7323: Support timeout at PlanOnlyMode") {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kyuubi uses GitHub Issues instead of JIRA, use KYUUBI #XXXX instead of KYUUBI-XXXX for issue/PR reference.

Also, we don't require creating an issue before sending a PR, GitHub Issues and PRs share the sequence number, and the link can jump to each other, so XXXX can either be an issue number or a PR number. What really matters here, clear description of the context of your change - especially WHY, and TEST. WHAT is also important in most cases, but for cases where the code itself is self-explanatory, WHAT is optional.

Copy link
Copy Markdown
Member Author

@ruanwenjun ruanwenjun Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, please forgive me for mistakenly copying the template from another test method above.

And I removed this test case here, since I find that the added test method actually fails to verify whether the engine has truly cancel operation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has added the solution at issue.

However, I don't fully understand why interrupting the thread can stop the analysis. There might be thread state detection logic during plan analysis, but I haven't found the specific code.
During testing, I discovered that canceling the jobgroup via cleanup didn't halt the plan analy.

val tableName = "t_timeout"
val sessionConfMap = Map("kyuubi.operation.plan.only.mode=analyze" -> "analyze")
withSessionConf(sessionConfMap)(Map.empty)(Map.empty) {

withJdbcStatement(tableName) { statement =>
statement.setQueryTimeout(2)
val layers = 30

val cteBuilder = new StringBuilder()
cteBuilder.append("WITH p AS (SELECT dt, tid, count(1) as pv, DENSE_RANK() " +
"over(order by dt) as dt_rank FROM t GROUP BY 1, 2), ")
cteBuilder.append("test_d1 AS (SELECT dt, tid FROM (SELECT dt, tid, row_number() " +
"over(order by pv desc) as r FROM p WHERE dt_rank=1) WHERE r<=100)")

for (i <- 2 to layers) {
cteBuilder.append(s", test_d$i AS ( " +
s"SELECT dt, tid FROM (SELECT p.dt, p.tid, row_number() over(order by pv desc) as r " +
s"FROM p LEFT JOIN test_d${i - 1} prev ON p.tid = prev.tid " +
s"WHERE p.dt_rank=$i AND prev.tid IS NULL) WHERE r<=100 " +
s"UNION ALL SELECT dt, tid FROM test_d${i - 1})")
}

val complexSql = cteBuilder.toString() + s" SELECT * FROM test_d$layers"

val e = intercept[SQLTimeoutException] {
statement.executeQuery(complexSql)
}
assert(e.getMessage.equals("Query timed out after 2 seconds"))
}
}
}

override protected def jdbcUrl: String = getJdbcUrl
}
Loading