Skip to content

Commit 59dff17

Browse files
author
zhuoyuchen
committed
Add dangerous join condition watchdog to prevent Cartesian and oversized-broadcast risks
1 parent 7969f85 commit 59dff17

47 files changed

Lines changed: 2253 additions & 33 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ and others would not be possible without your help.
109109

110110
Ready? [Getting Started](https://kyuubi.readthedocs.io/en/master/quick_start/) with Kyuubi.
111111

112+
## Security & Guard
113+
114+
- [Dangerous Join Watchdog](./docs/watchdog/dangerous-join.md)
115+
112116
## [Contributing](./CONTRIBUTING.md)
113117

114118
## Project & Community Status

docs/configuration/settings.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,18 @@ jdbc:hive2://localhost:10009/default;#spark.sql.shuffle.partitions=2;spark.execu
580580

581581
Please refer to the Spark official online documentation for [SET Command](https://spark.apache.org/docs/latest/sql-ref-syntax-aux-conf-mgmt-set.html)
582582

583+
### Dangerous Join Watchdog
584+
585+
You can enable dangerous join detection for Spark SQL extension with:
586+
587+
| Name | Default | Description |
588+
|------------------------------------------------|---------|------------------------------------------------------------------------------------|
589+
| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable dangerous join detection |
590+
| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Ratio against Spark broadcast threshold to identify oversized broadcast fallback |
591+
| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` logs warning diagnostics, `REJECT` throws exception with error code `41101` |
592+
593+
Please see [Dangerous Join Watchdog](../watchdog/dangerous-join.md) for rules and examples.
594+
583595
## Flink Configurations
584596

585597
### Via flink-conf.yaml

docs/deployment/index.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Basics
2727
:glob:
2828

2929
kyuubi_on_kubernetes
30+
settings
3031
hive_metastore
3132
high_availability_guide
3233
migration-guide
@@ -42,4 +43,4 @@ Engines
4243
engine_on_kubernetes
4344
engine_share_level
4445
engine_lifecycle
45-
spark/index
46+
spark/index

docs/deployment/settings.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<!--
2+
- Licensed to the Apache Software Foundation (ASF) under one or more
3+
- contributor license agreements. See the NOTICE file distributed with
4+
- this work for additional information regarding copyright ownership.
5+
- The ASF licenses this file to You under the Apache License, Version 2.0
6+
- (the "License"); you may not use this file except in compliance with
7+
- the License. You may obtain a copy of the License at
8+
-
9+
- http://www.apache.org/licenses/LICENSE-2.0
10+
-
11+
- Unless required by applicable law or agreed to in writing, software
12+
- distributed under the License is distributed on an "AS IS" BASIS,
13+
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
- See the License for the specific language governing permissions and
15+
- limitations under the License.
16+
-->
17+
18+
# Deployment Settings for Dangerous Join Watchdog
19+
20+
## Spark SQL Extensions
21+
22+
```properties
23+
spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension
24+
```
25+
26+
## Dangerous Join Configurations
27+
28+
| Name | Default | Description |
29+
|------------------------------------------------|---------|-----------------------------------------------------------|
30+
| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable dangerous join watchdog |
31+
| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Broadcast threshold coefficient |
32+
| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` only logs diagnostics, `REJECT` throws error 41101 |
33+
34+
For detailed rules and examples, see [Dangerous Join Watchdog](../watchdog/dangerous-join.md).

docs/extensions/engines/spark/rules.md

Lines changed: 30 additions & 27 deletions
Large diffs are not rendered by default.

docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ What's Next
181181

182182
quick_start/index
183183
configuration/settings
184+
watchdog/dangerous-join
184185
deployment/index
185186
Security <security/index>
186187
monitor/index

docs/watchdog/dangerous-join.md

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
<!--
2+
- Licensed to the Apache Software Foundation (ASF) under one or more
3+
- contributor license agreements. See the NOTICE file distributed with
4+
- this work for additional information regarding copyright ownership.
5+
- The ASF licenses this file to You under the Apache License, Version 2.0
6+
- (the "License"); you may not use this file except in compliance with
7+
- the License. You may obtain a copy of the License at
8+
-
9+
- http://www.apache.org/licenses/LICENSE-2.0
10+
-
11+
- Unless required by applicable law or agreed to in writing, software
12+
- distributed under the License is distributed on an "AS IS" BASIS,
13+
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
- See the License for the specific language governing permissions and
15+
- limitations under the License.
16+
-->
17+
18+
# Dangerous Join Watchdog
19+
20+
Kyuubi Dangerous Join Watchdog detects risky join planning patterns before query execution.
21+
It helps reduce accidental Cartesian products, oversized broadcast attempts, and long-running nested loop joins.
22+
23+
## Background
24+
25+
In shared SQL gateway environments, a single risky join can consume excessive driver memory or create very slow jobs.
26+
The Dangerous Join Watchdog adds planning-time checks for these high-risk patterns.
27+
28+
## Risk Rules
29+
30+
### Equi-Join
31+
32+
- Rule 1: Equi-join is marked dangerous when it degrades to a Cartesian pattern.
33+
- Rule 2: Equi-join is marked dangerous when the estimated build side exceeds the configured broadcast ratio threshold.
34+
35+
### Non-Equi Join
36+
37+
- Rule 1: Non-equi join is marked dangerous when both sides exceed broadcast threshold and effectively become Cartesian risk.
38+
- Rule 2: Non-equi join is marked dangerous when build side is not selectable and the plan falls back to a second BNLJ pattern.
39+
40+
## Configurations
41+
42+
| Name | Default | Meaning |
43+
|------------------------------------------------|---------|---------------------------------------------------------------------------|
44+
| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable or disable dangerous join detection |
45+
| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Ratio against Spark broadcast threshold for warning/reject decision |
46+
| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` logs diagnostics; `REJECT` throws exception and rejects submission |
47+
48+
## Usage
49+
50+
1. Put Kyuubi Spark extension jar into Spark classpath.
51+
2. Configure SQL extensions:
52+
53+
```properties
54+
spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension
55+
```
56+
57+
3. Configure action:
58+
59+
```properties
60+
kyuubi.watchdog.dangerousJoin.action=WARN
61+
```
62+
63+
or
64+
65+
```properties
66+
kyuubi.watchdog.dangerousJoin.action=REJECT
67+
```
68+
69+
## Sample WARN Log
70+
71+
When action is `WARN`, Kyuubi writes a structured JSON payload:
72+
73+
```text
74+
KYUUBI_LOG_KEY={"sql":"SELECT ...","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8}
75+
```
76+
77+
## Sample REJECT Error
78+
79+
When action is `REJECT`, query submission fails with:
80+
81+
```text
82+
errorCode=41101
83+
Query rejected due to dangerous join strategy: {...details...}
84+
```
85+
86+
## Disable or Tune
87+
88+
- Disable watchdog:
89+
90+
```properties
91+
kyuubi.watchdog.dangerousJoin.enabled=false
92+
```
93+
94+
- Increase tolerance:
95+
96+
```properties
97+
kyuubi.watchdog.dangerousJoin.broadcastRatio=0.95
98+
```
99+
100+
## FAQ
101+
102+
### What if `spark.sql.adaptive.enabled=true`?
103+
104+
Dangerous Join Watchdog runs in planner strategy phase and evaluates pre-execution plan statistics.
105+
AQE may still optimize runtime plans, but watchdog decisions are made before query execution starts.

extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,30 @@ object KyuubiSQLConf {
123123
.bytesConf(ByteUnit.BYTE)
124124
.createOptional
125125

126+
val DANGEROUS_JOIN_ENABLED =
127+
buildConf("kyuubi.watchdog.dangerousJoin.enabled")
128+
.doc("Enable dangerous join condition detection.")
129+
.version("1.11.0")
130+
.booleanConf
131+
.createWithDefault(false)
132+
133+
val DANGEROUS_JOIN_BROADCAST_RATIO =
134+
buildConf("kyuubi.watchdog.dangerousJoin.broadcastRatio")
135+
.doc("The threshold ratio to mark oversized broadcast fallback.")
136+
.version("1.11.0")
137+
.doubleConf
138+
.checkValue(v => v > 0 && v <= 1, "must be in (0, 1]")
139+
.createWithDefault(0.8)
140+
141+
val DANGEROUS_JOIN_ACTION =
142+
buildConf("kyuubi.watchdog.dangerousJoin.action")
143+
.doc("Action when dangerous join is detected, one of WARN and REJECT.")
144+
.version("1.11.0")
145+
.stringConf
146+
.transform(_.toUpperCase(java.util.Locale.ROOT))
147+
.checkValues(Set("WARN", "REJECT"))
148+
.createWithDefault("WARN")
149+
126150
val DROP_IGNORE_NONEXISTENT =
127151
buildConf("spark.sql.optimizer.dropIgnoreNonExistent")
128152
.doc("Do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " +

extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/KyuubiSparkSQLExtension.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.kyuubi.sql
1919

2020
import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions}
2121

22-
import org.apache.kyuubi.sql.watchdog.{KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
22+
import org.apache.kyuubi.sql.watchdog.{DangerousJoinInterceptor, KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
2323

2424
// scalastyle:off line.size.limit
2525
/**
@@ -39,6 +39,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
3939
// watchdog extension
4040
extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
4141
extensions.injectPlannerStrategy(MaxScanStrategy)
42+
extensions.injectPlannerStrategy(DangerousJoinInterceptor(_))
4243

4344
extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
4445
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.sql.watchdog
19+
20+
import scala.collection.mutable.ArrayBuffer
21+
22+
object DangerousJoinCounter {
23+
case class Entry(
24+
sqlText: String,
25+
joinType: String,
26+
reason: String,
27+
leftSize: BigInt,
28+
rightSize: BigInt,
29+
broadcastThreshold: Long,
30+
broadcastRatio: Double) {
31+
def toJson: String = {
32+
val pairs = Seq(
33+
"sql" -> escape(sqlText),
34+
"joinType" -> escape(joinType),
35+
"reason" -> escape(reason),
36+
"leftSize" -> leftSize.toString,
37+
"rightSize" -> rightSize.toString,
38+
"broadcastThreshold" -> broadcastThreshold.toString,
39+
"broadcastRatio" -> broadcastRatio.toString)
40+
pairs.map { case (k, v) =>
41+
if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" || k == "broadcastRatio") {
42+
s""""$k":$v"""
43+
} else {
44+
s""""$k":"$v""""
45+
}
46+
}.mkString("{", ",", "}")
47+
}
48+
}
49+
50+
private val entries = ArrayBuffer.empty[Entry]
51+
52+
def add(entry: Entry): Unit = synchronized {
53+
entries += entry
54+
}
55+
56+
def count: Int = synchronized {
57+
entries.size
58+
}
59+
60+
def latest: Option[Entry] = synchronized {
61+
entries.lastOption
62+
}
63+
64+
def snapshot: Seq[Entry] = synchronized {
65+
entries.toSeq
66+
}
67+
68+
def reset(): Unit = synchronized {
69+
entries.clear()
70+
}
71+
72+
private def escape(raw: String): String = {
73+
raw
74+
.replace("\\", "\\\\")
75+
.replace("\"", "\\\"")
76+
.replace("\n", "\\n")
77+
.replace("\r", "\\r")
78+
.replace("\t", "\\t")
79+
}
80+
}

0 commit comments

Comments
 (0)