Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ and others would not be possible without your help.

Ready? [Getting Started](https://kyuubi.readthedocs.io/en/master/quick_start/) with Kyuubi.

## Security & Guard

- [Dangerous Join Watchdog](./docs/watchdog/dangerous-join.md)

## [Contributing](./CONTRIBUTING.md)

## Project & Community Status
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,18 @@ jdbc:hive2://localhost:10009/default;#spark.sql.shuffle.partitions=2;spark.execu

Please refer to the Spark official online documentation for [SET Command](https://spark.apache.org/docs/latest/sql-ref-syntax-aux-conf-mgmt-set.html)

### Dangerous Join Watchdog

You can enable dangerous join detection for Spark SQL extension with:

| Name | Default | Description |
|------------------------------------------------|---------|------------------------------------------------------------------------------------|
| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable dangerous join detection |
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This table documents kyuubi.watchdog.dangerousJoin.enabled default as true, but the Spark extension modules introduced in this PR mostly default it to false. Please align the defaults here with the code (and keep consistent across Spark versions) to avoid misleading operators.

Suggested change
| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable dangerous join detection |
| `kyuubi.watchdog.dangerousJoin.enabled` | `false` | Enable dangerous join detection |

Copilot uses AI. Check for mistakes.
| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Ratio against Spark broadcast threshold to identify oversized broadcast fallback |
| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` logs warning diagnostics, `REJECT` throws exception with error code `41101` |

Please see [Dangerous Join Watchdog](../watchdog/dangerous-join.md) for rules and examples.

## Flink Configurations

### Via flink-conf.yaml
Expand Down
3 changes: 2 additions & 1 deletion docs/deployment/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Basics
:glob:

kyuubi_on_kubernetes
settings
hive_metastore
high_availability_guide
migration-guide
Expand All @@ -42,4 +43,4 @@ Engines
engine_on_kubernetes
engine_share_level
engine_lifecycle
spark/index
spark/index
34 changes: 34 additions & 0 deletions docs/deployment/settings.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->

# Deployment Settings for Dangerous Join Watchdog

## Spark SQL Extensions

```properties
spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension
```

## Dangerous Join Configurations

| Name | Default | Description |
|------------------------------------------------|---------|-----------------------------------------------------------|
| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable dangerous join watchdog |
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs/deployment/settings.md lists kyuubi.watchdog.dangerousJoin.enabled default as true, but this PR sets it to false in several Spark extension modules. Please update either the code default or this documentation so operators see the correct default behavior.

Suggested change
| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable dangerous join watchdog |
| `kyuubi.watchdog.dangerousJoin.enabled` | `false` | Enable dangerous join watchdog |

Copilot uses AI. Check for mistakes.
| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Broadcast threshold coefficient |
| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` only logs diagnostics, `REJECT` throws error 41101 |

For detailed rules and examples, see [Dangerous Join Watchdog](../watchdog/dangerous-join.md).
57 changes: 30 additions & 27 deletions docs/extensions/engines/spark/rules.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ What's Next

quick_start/index
configuration/settings
watchdog/dangerous-join
deployment/index
Security <security/index>
monitor/index
Expand Down
105 changes: 105 additions & 0 deletions docs/watchdog/dangerous-join.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->

# Dangerous Join Watchdog

Kyuubi Dangerous Join Watchdog detects risky join planning patterns before query execution.
It helps reduce accidental Cartesian products, oversized broadcast attempts, and long-running nested loop joins.

## Background

In shared SQL gateway environments, a single risky join can consume excessive driver memory or create very slow jobs.
The Dangerous Join Watchdog adds planning-time checks for these high-risk patterns.

## Risk Rules

### Equi-Join

- Rule 1: Equi-join is marked dangerous when it degrades to a Cartesian pattern.
- Rule 2: Equi-join is marked dangerous when the estimated build side exceeds the configured broadcast ratio threshold.

### Non-Equi Join

- Rule 1: Non-equi join is marked dangerous when both sides exceed broadcast threshold and effectively become Cartesian risk.
- Rule 2: Non-equi join is marked dangerous when build side is not selectable and the plan falls back to a second BNLJ pattern.

## Configurations

| Name | Default | Meaning |
|------------------------------------------------|---------|---------------------------------------------------------------------------|
| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable or disable dangerous join detection |
| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Ratio against Spark broadcast threshold for warning/reject decision |
| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` logs diagnostics; `REJECT` throws exception and rejects submission |
Comment on lines +42 to +46
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration table lists kyuubi.watchdog.dangerousJoin.enabled default as true, but in this PR most Spark extension modules set the default to false (e.g., spark-3-3/3-4/3-5/4-0). Please reconcile the documented defaults with the actual defaults (and keep them consistent across Spark versions).

Copilot uses AI. Check for mistakes.

## Usage

1. Put Kyuubi Spark extension jar into Spark classpath.
2. Configure SQL extensions:

```properties
spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension
```
Comment on lines +50 to +55
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage example configures spark.sql.extensions with both org.apache.kyuubi.sql.KyuubiSparkSQLExtension and org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension. In this PR, KyuubiSparkSQLExtension already injects DangerousJoinInterceptor, so adding KyuubiDangerousJoinExtension would inject it twice and can lead to duplicated warnings/counter increments (and potentially double rejection paths). Please update the docs to recommend only one mechanism (either the main extension or the dedicated watchdog extension).

Copilot uses AI. Check for mistakes.

3. Configure action:

```properties
kyuubi.watchdog.dangerousJoin.action=WARN
```

or

```properties
kyuubi.watchdog.dangerousJoin.action=REJECT
```

## Sample WARN Log

When action is `WARN`, Kyuubi writes a structured JSON payload:

```text
KYUUBI_LOG_KEY={"sql":"SELECT ...","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8}
Comment on lines +71 to +74
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The emitted sql field in the JSON payload is built from plan.toString() in the interceptor (logical plan tree string), not the original SQL text. The sample log currently shows "sql":"SELECT ...", which is misleading; please either adjust the payload to include real SQL text (if available) or update the docs/sample to reflect what is actually logged.

Suggested change
When action is `WARN`, Kyuubi writes a structured JSON payload:
```text
KYUUBI_LOG_KEY={"sql":"SELECT ...","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8}
When action is `WARN`, Kyuubi writes a structured JSON payload (the `sql` field contains the logical plan string from `plan.toString()`):
```text
KYUUBI_LOG_KEY={"sql":"Project [*]\n+- LogicalRDD [id#0, value#1], false","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8}

Copilot uses AI. Check for mistakes.
```

## Sample REJECT Error

When action is `REJECT`, query submission fails with:

```text
errorCode=41101
Query rejected due to dangerous join strategy: {...details...}
```

## Disable or Tune

- Disable watchdog:

```properties
kyuubi.watchdog.dangerousJoin.enabled=false
```

- Increase tolerance:

```properties
kyuubi.watchdog.dangerousJoin.broadcastRatio=0.95
```

## FAQ

### What if `spark.sql.adaptive.enabled=true`?

Dangerous Join Watchdog runs in planner strategy phase and evaluates pre-execution plan statistics.
AQE may still optimize runtime plans, but watchdog decisions are made before query execution starts.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,30 @@ object KyuubiSQLConf {
.bytesConf(ByteUnit.BYTE)
.createOptional

val DANGEROUS_JOIN_ENABLED =
buildConf("kyuubi.watchdog.dangerousJoin.enabled")
.doc("Enable dangerous join condition detection.")
.version("1.11.0")
.booleanConf
.createWithDefault(false)

val DANGEROUS_JOIN_BROADCAST_RATIO =
buildConf("kyuubi.watchdog.dangerousJoin.broadcastRatio")
.doc("The threshold ratio to mark oversized broadcast fallback.")
.version("1.11.0")
.doubleConf
.checkValue(v => v > 0 && v <= 1, "must be in (0, 1]")
.createWithDefault(0.8)

val DANGEROUS_JOIN_ACTION =
buildConf("kyuubi.watchdog.dangerousJoin.action")
.doc("Action when dangerous join is detected, one of WARN and REJECT.")
.version("1.11.0")
.stringConf
.transform(_.toUpperCase(java.util.Locale.ROOT))
.checkValues(Set("WARN", "REJECT"))
.createWithDefault("WARN")

val DROP_IGNORE_NONEXISTENT =
buildConf("spark.sql.optimizer.dropIgnoreNonExistent")
.doc("Do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.kyuubi.sql

import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions}

import org.apache.kyuubi.sql.watchdog.{KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
import org.apache.kyuubi.sql.watchdog.{DangerousJoinInterceptor, KyuubiUnsupportedOperationsCheck, MaxScanStrategy}

// scalastyle:off line.size.limit
/**
Expand All @@ -39,6 +39,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
// watchdog extension
extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
extensions.injectPlannerStrategy(MaxScanStrategy)
extensions.injectPlannerStrategy(DangerousJoinInterceptor(_))

extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.sql.watchdog

import scala.collection.mutable.ArrayBuffer

object DangerousJoinCounter {
case class Entry(
sqlText: String,
joinType: String,
reason: String,
leftSize: BigInt,
rightSize: BigInt,
broadcastThreshold: Long,
broadcastRatio: Double) {
def toJson: String = {
val pairs = Seq(
"sql" -> escape(sqlText),
"joinType" -> escape(joinType),
"reason" -> escape(reason),
"leftSize" -> leftSize.toString,
"rightSize" -> rightSize.toString,
"broadcastThreshold" -> broadcastThreshold.toString,
"broadcastRatio" -> broadcastRatio.toString)
pairs.map { case (k, v) =>
if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" || k == "broadcastRatio") {
s""""$k":$v"""
} else {
s""""$k":"$v""""
}
}.mkString("{", ",", "}")
}
}

private val entries = ArrayBuffer.empty[Entry]

def add(entry: Entry): Unit = synchronized {
Comment on lines +49 to +52
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DangerousJoinCounter stores all entries in a global mutable buffer without bounds/eviction. In a long-running driver this can grow indefinitely and cause memory pressure. Consider bounding the buffer or removing storage in production and relying on metrics/logging instead.

Suggested change
private val entries = ArrayBuffer.empty[Entry]
def add(entry: Entry): Unit = synchronized {
// Limit the number of stored entries to avoid unbounded memory growth.
private val MaxEntries = 1000
private val entries = ArrayBuffer.empty[Entry]
def add(entry: Entry): Unit = synchronized {
if (entries.size >= MaxEntries) {
// Evict the oldest entry to keep the buffer bounded.
entries.remove(0)
}

Copilot uses AI. Check for mistakes.
entries += entry
}

def count: Int = synchronized {
entries.size
}

def latest: Option[Entry] = synchronized {
entries.lastOption
}

def snapshot: Seq[Entry] = synchronized {
entries.toSeq
}

def reset(): Unit = synchronized {
entries.clear()
}

private def escape(raw: String): String = {
raw
.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t")
}
}
Loading
Loading