Add dangerous join condition watchdog to prevent Cartesian and oversi…#7368
Add dangerous join condition watchdog to prevent Cartesian and oversi…#7368MonsterChenzhuo wants to merge 1 commit intoapache:masterfrom
Conversation
…zed-broadcast risks
There was a problem hiding this comment.
Pull request overview
This PR introduces a “Dangerous Join” watchdog for Kyuubi Spark SQL extensions to detect risky join planning patterns (e.g., Cartesian risk, oversized broadcast fallback) and optionally warn or reject queries, along with docs and tests across supported Spark versions.
Changes:
- Add a planner-strategy interceptor, counter, extension hook, and SQLException type for dangerous-join detection (Spark 3.3/3.4/3.5/4.0/4.1 modules).
- Register the interceptor and expose new SQL confs for enablement, broadcast ratio, and action (WARN/REJECT).
- Add documentation pages and integration/unit tests for WARN/REJECT behaviors.
Reviewed changes
Copilot reviewed 47 out of 47 changed files in this pull request and generated 27 comments.
Show a summary per file
| File | Description |
|---|---|
| extensions/spark/kyuubi-extension-spark-4-1/src/test/scala/org/apache/spark/sql/KyuubiDangerousJoinIT.scala | Adds IT coverage for WARN/REJECT behaviors (Spark 4.1). |
| extensions/spark/kyuubi-extension-spark-4-1/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala | Adds unit coverage for interceptor counting/markers (Spark 4.1). |
| extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiDangerousJoinExtension.scala | Adds standalone SparkSessionExtensions hook (Spark 4.1). |
| extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiDangerousJoinException.scala | Adds SQLException for REJECT action (Spark 4.1). |
| extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala | Adds planner strategy to detect risky joins (Spark 4.1). |
| extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala | Adds counter/entry storage + JSON payload builder (Spark 4.1). |
| extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala | Registers dangerous-join strategy in the main extension (Spark 4.1). |
| extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala | Adds dangerous-join confs (Spark 4.1). |
| extensions/spark/kyuubi-extension-spark-4-0/src/test/scala/org/apache/spark/sql/KyuubiDangerousJoinIT.scala | Adds IT coverage for WARN/REJECT behaviors (Spark 4.0). |
| extensions/spark/kyuubi-extension-spark-4-0/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala | Adds unit coverage for interceptor counting/markers (Spark 4.0). |
| extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiDangerousJoinExtension.scala | Adds standalone SparkSessionExtensions hook (Spark 4.0). |
| extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiDangerousJoinException.scala | Adds SQLException for REJECT action (Spark 4.0). |
| extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala | Adds planner strategy to detect risky joins (Spark 4.0). |
| extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala | Adds counter/entry storage + JSON payload builder (Spark 4.0). |
| extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala | Registers dangerous-join strategy in the main extension (Spark 4.0). |
| extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala | Adds dangerous-join confs (Spark 4.0). |
| extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/KyuubiDangerousJoinIT.scala | Adds IT coverage for WARN/REJECT behaviors (Spark 3.5). |
| extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala | Adds unit coverage for interceptor counting/markers (Spark 3.5). |
| extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiDangerousJoinExtension.scala | Adds standalone SparkSessionExtensions hook (Spark 3.5). |
| extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiDangerousJoinException.scala | Adds SQLException for REJECT action (Spark 3.5). |
| extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala | Adds planner strategy to detect risky joins (Spark 3.5). |
| extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala | Adds counter/entry storage + JSON payload builder (Spark 3.5). |
| extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala | Registers dangerous-join strategy in the main extension (Spark 3.5). |
| extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala | Adds dangerous-join confs (Spark 3.5). |
| extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/KyuubiDangerousJoinIT.scala | Adds IT coverage for WARN/REJECT behaviors (Spark 3.4). |
| extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala | Adds unit coverage for interceptor counting/markers (Spark 3.4). |
| extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiDangerousJoinExtension.scala | Adds standalone SparkSessionExtensions hook (Spark 3.4). |
| extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiDangerousJoinException.scala | Adds SQLException for REJECT action (Spark 3.4). |
| extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala | Adds planner strategy to detect risky joins (Spark 3.4). |
| extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala | Adds counter/entry storage + JSON payload builder (Spark 3.4). |
| extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala | Registers dangerous-join strategy in the main extension (Spark 3.4). |
| extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala | Adds dangerous-join confs (Spark 3.4). |
| extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/KyuubiDangerousJoinIT.scala | Adds IT coverage for WARN/REJECT behaviors (Spark 3.3). |
| extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala | Adds unit coverage for interceptor counting/markers (Spark 3.3). |
| extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiDangerousJoinExtension.scala | Adds standalone SparkSessionExtensions hook (Spark 3.3). |
| extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/KyuubiDangerousJoinException.scala | Adds SQLException for REJECT action (Spark 3.3). |
| extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala | Adds planner strategy to detect risky joins (Spark 3.3). |
| extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala | Adds counter/entry storage + JSON payload builder (Spark 3.3). |
| extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala | Registers dangerous-join strategy in the main extension (Spark 3.3). |
| extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala | Adds dangerous-join confs (Spark 3.3). |
| docs/watchdog/dangerous-join.md | New watchdog documentation page (rules/config/usage). |
| docs/index.rst | Adds watchdog doc to top-level TOC. |
| docs/extensions/engines/spark/rules.md | Documents new dangerous-join configs in Spark rules list. |
| docs/deployment/settings.md | Adds deployment-focused setup guide for the watchdog. |
| docs/deployment/index.rst | Adds new deployment settings page to TOC. |
| docs/configuration/settings.md | Adds dangerous-join confs to configuration reference. |
| README.md | Adds README link to the new watchdog docs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| When action is `WARN`, Kyuubi writes a structured JSON payload: | ||
|
|
||
| ```text | ||
| KYUUBI_LOG_KEY={"sql":"SELECT ...","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8} |
There was a problem hiding this comment.
The emitted sql field in the JSON payload is built from plan.toString() in the interceptor (logical plan tree string), not the original SQL text. The sample log currently shows "sql":"SELECT ...", which is misleading; please either adjust the payload to include real SQL text (if available) or update the docs/sample to reflect what is actually logged.
| When action is `WARN`, Kyuubi writes a structured JSON payload: | |
| ```text | |
| KYUUBI_LOG_KEY={"sql":"SELECT ...","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8} | |
| When action is `WARN`, Kyuubi writes a structured JSON payload (the `sql` field contains the logical plan string from `plan.toString()`): | |
| ```text | |
| KYUUBI_LOG_KEY={"sql":"Project [*]\n+- LogicalRDD [id#0, value#1], false","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8} |
| | Name | Default | Meaning | | ||
| |------------------------------------------------|---------|---------------------------------------------------------------------------| | ||
| | `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable or disable dangerous join detection | | ||
| | `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Ratio against Spark broadcast threshold for warning/reject decision | | ||
| | `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` logs diagnostics; `REJECT` throws exception and rejects submission | |
There was a problem hiding this comment.
The configuration table lists kyuubi.watchdog.dangerousJoin.enabled default as true, but in this PR most Spark extension modules set the default to false (e.g., spark-3-3/3-4/3-5/4-0). Please reconcile the documented defaults with the actual defaults (and keep them consistent across Spark versions).
|
|
||
| | Name | Default | Description | | ||
| |------------------------------------------------|---------|-----------------------------------------------------------| | ||
| | `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable dangerous join watchdog | |
There was a problem hiding this comment.
docs/deployment/settings.md lists kyuubi.watchdog.dangerousJoin.enabled default as true, but this PR sets it to false in several Spark extension modules. Please update either the code default or this documentation so operators see the correct default behavior.
| | `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable dangerous join watchdog | | |
| | `kyuubi.watchdog.dangerousJoin.enabled` | `false` | Enable dangerous join watchdog | |
| private val entries = ArrayBuffer.empty[Entry] | ||
|
|
||
| def add(entry: Entry): Unit = synchronized { | ||
| entries += entry | ||
| } |
There was a problem hiding this comment.
DangerousJoinCounter stores every detected entry in a global ArrayBuffer with no bounds or eviction. In a long-running Spark driver (Kyuubi gateway), this can grow unbounded and eventually cause memory pressure/OOM. Consider removing storage entirely for production, or keep only a bounded ring buffer / latest entry and expose counters via metrics instead.
| private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { | ||
| if (threshold <= 0) { | ||
| return None | ||
| } |
There was a problem hiding this comment.
The early return when autoBroadcastJoinThreshold <= 0 disables all detection, including Cartesian/BNLJ risks that are not dependent on broadcast thresholds. Detection should still run for non-broadcast-related rules even when broadcast is disabled.
| private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { | ||
| if (threshold <= 0) { | ||
| return None | ||
| } |
There was a problem hiding this comment.
The early return when autoBroadcastJoinThreshold <= 0 disables all dangerous-join detection, including Cartesian / nested-loop cases that are unrelated to broadcast thresholds. If the goal is to guard against Cartesian and BNLJ, detection should still run when broadcast is disabled; only the broadcast-size based rule should depend on this threshold.
| val DANGEROUS_JOIN_ENABLED = | ||
| buildConf("kyuubi.watchdog.dangerousJoin.enabled") | ||
| .doc("Enable dangerous join condition detection.") | ||
| .version("1.10.0") | ||
| .booleanConf | ||
| .createWithDefault(true) | ||
|
|
There was a problem hiding this comment.
In this Spark 4.1 extension, kyuubi.watchdog.dangerousJoin.enabled defaults to true, while other Spark-extension modules in this PR default it to false. Enabling this by default changes behavior (extra WARN logs / possible REJECT) and makes defaults inconsistent across Spark versions; please align the default across modules and update docs accordingly.
| if (leftSize > threshold && rightSize > threshold) { | ||
| Some("Cartesian") | ||
| } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) { | ||
| Some("SecondBNLJ") | ||
| } else { |
There was a problem hiding this comment.
cannotSelectBuildSide is the same predicate as the prior Cartesian check, so the SecondBNLJ branch can never be reached. Please fix the condition(s) so SecondBNLJ can be detected distinctly or remove the dead code path.
|
|
||
| private val entries = ArrayBuffer.empty[Entry] | ||
|
|
||
| def add(entry: Entry): Unit = synchronized { |
There was a problem hiding this comment.
DangerousJoinCounter stores all entries in a global mutable buffer without bounds/eviction. In a long-running driver this can grow indefinitely and cause memory pressure. Consider bounding the buffer or removing storage in production and relying on metrics/logging instead.
| private val entries = ArrayBuffer.empty[Entry] | |
| def add(entry: Entry): Unit = synchronized { | |
| // Limit the number of stored entries to avoid unbounded memory growth. | |
| private val MaxEntries = 1000 | |
| private val entries = ArrayBuffer.empty[Entry] | |
| def add(entry: Entry): Unit = synchronized { | |
| if (entries.size >= MaxEntries) { | |
| // Evict the oldest entry to keep the buffer bounded. | |
| entries.remove(0) | |
| } |
| if (leftSize > threshold && rightSize > threshold) { | ||
| Some("Cartesian") | ||
| } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) { | ||
| Some("SecondBNLJ") | ||
| } else { |
There was a problem hiding this comment.
cannotSelectBuildSide is the same predicate as the prior Cartesian check, so the SecondBNLJ branch can never be reached. Please fix the condition(s) so SecondBNLJ can be detected distinctly or remove the dead code path.
|
We don't oppose accepting AI-generated code, but when sending generated code, you take the responsibility in the same way as for the code you have manually typed. Take efforts to read and review the code before sending - otherwise it is disrespectful to maintainers. This AI-generated patch obviously was not self-reviewed before being sent out publicly, such behavior likely negatively affects your reputation as an engineer. |
Why are the changes needed?
Users often lack a clear understanding of how Spark executes different join strategies internally. As a result, they may write SQL with improper or risky join conditions, which can lead to severe performance degradation, excessive resource consumption, or even Out Of Memoryerrors on the engine.
How was this patch tested?
KyuubiDangerousJoinIT
Was this patch authored or co-authored using generative AI tooling?
Generated-by: ChatGPT (GPT-5.3)