Skip to content

Commit 73882e5

Browse files
yuqipan3793
andcommitted
[KYUUBI #7318] Allow specifying the executionMode when executing the tpcds benchmark
### Why are the changes needed? The underlying system supports specifying the executionMode when running the TPCDS benchmark, but this option is not exposed to users through parameters. ### How was this patch tested? Manual test ### Was this patch authored or co-authored using generative AI tooling? No Closes #7318 from ychris78/tpcde_execution_mode. Closes #7318 f61162a [yuqi] reformat code 76b9578 [Cheng Pan] Apply suggestion from @pan3793 bb4fb17 [yuqi] reformat code 55f72bc [yuqi] refine f305694 [yuqi] Allow specifying the executionMode when executing the tpcds benchmark Lead-authored-by: yuqi <yuqi@bestpay.com.cn> Co-authored-by: Cheng Pan <pan3793@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent dfd9b49 commit 73882e5

5 files changed

Lines changed: 55 additions & 33 deletions

File tree

dev/kyuubi-tpcds/README.md

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,16 @@ $SPARK_HOME/bin/spark-submit \
4848

4949
Support options:
5050

51-
| key | default | description |
52-
|-------------|------------------------|-------------------------------------------------------------------------------|
53-
| db | none(required) | the TPC-DS database |
54-
| benchmark | tpcds-v2.4-benchmark | the name of application |
55-
| iterations | 3 | the number of iterations to run |
56-
| breakdown | false | whether to record breakdown results of an execution |
57-
| results-dir | /spark/sql/performance | dir to store benchmark results, e.g. hdfs://hdfs-nn:9870/pref |
58-
| include | none(optional) | name of the queries to run, use comma to split multiple names, e.g. q1,q2 |
59-
| exclude | none(optional) | name of the queries to exclude, use comma to split multiple names, e.g. q2,q4 |
51+
| key | default | description |
52+
|----------------|-------------------------|----------------------------------------------------------------------------------------------------------------------------|
53+
| db | none(required) | the TPC-DS database |
54+
| benchmark | tpcds-v2.4-benchmark | the name of application |
55+
| iterations | 3 | the number of iterations to run |
56+
| breakdown | false | whether to record breakdown results of an execution |
57+
| results-dir | /spark/sql/performance | dir to store benchmark results, e.g. hdfs://hdfs-nn:9870/pref |
58+
| include | none(optional) | name of the queries to run, use comma to split multiple names, e.g. q1,q2 |
59+
| exclude | none(optional) | name of the queries to exclude, use comma to split multiple names, e.g. q2,q4 |
60+
| execution-mode | collect | how a given Spark benchmark should be run, only the following four modes are supported: collect,foreach,saveToParquet,hash |
6061

6162
Example: the following command to benchmark TPC-DS sf10 with exists database `tpcds_sf10`.
6263

dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/Benchmark.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ abstract class Benchmark(
7979
variations: Seq[Variation[_]] = Seq(Variation("StandardRun", Seq("true")) { _ => {} }),
8080
tags: Map[String, String] = Map.empty,
8181
timeout: Long = 0L,
82-
resultsDir: String = resultsLocation,
82+
resultPath: String = resultsLocation,
8383
forkThread: Boolean = true): ExperimentStatus = {
8484

8585
new ExperimentStatus(
@@ -89,7 +89,7 @@ abstract class Benchmark(
8989
variations,
9090
tags,
9191
timeout,
92-
resultsDir,
92+
resultPath,
9393
sparkSession,
9494
currentConfiguration,
9595
forkThread = forkThread)
@@ -140,7 +140,7 @@ object Benchmark {
140140
variations: Seq[Variation[_]],
141141
tags: Map[String, String],
142142
timeout: Long,
143-
resultsDir: String,
143+
val resultPath: String,
144144
sparkSession: SparkSession,
145145
currentConfiguration: BenchmarkConfiguration,
146146
forkThread: Boolean = true) {
@@ -169,7 +169,6 @@ object Benchmark {
169169
}
170170

171171
val timestamp: Long = System.currentTimeMillis()
172-
val resultPath = s"$resultsDir/timestamp=$timestamp"
173172
val combinations: Seq[List[Int]] =
174173
cartesianProduct(variations.map(l => l.options.indices.toList).toList)
175174
val resultsFuture: Future[Unit] = Future {

dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/RunBenchmark.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ case class RunConfig(
3030
breakdown: Boolean = false,
3131
resultsDir: String = "/spark/sql/performance",
3232
include: Set[String] = Set.empty,
33-
exclude: Set[String] = Set.empty)
33+
exclude: Set[String] = Set.empty,
34+
executionMode: String = "collect")
3435

3536
// scalastyle:off
3637
/**
@@ -73,6 +74,9 @@ object RunBenchmark {
7374
c.copy(exclude = x.split(",").map(_.trim).filter(_.nonEmpty).toSet)
7475
}
7576
.text("name of the queries to exclude, use comma to split multiple names, e.g. q2,q4")
77+
opt[String]("execution-mode")
78+
.action((x, c) => c.copy(executionMode = x))
79+
.text("how a given Spark benchmark should be run, only the following four modes are supported: collect,foreach,saveToParquet,hash")
7680
help("help")
7781
.text("prints this usage text")
7882
}
@@ -90,7 +94,9 @@ object RunBenchmark {
9094
val sparkSession = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
9195
import sparkSession.implicits._
9296

93-
sparkSession.conf.set("spark.sql.perf.results", config.resultsDir)
97+
val timestamp: Long = System.currentTimeMillis()
98+
sparkSession.conf.set("spark.sql.perf.results", s"${config.resultsDir}/timestamp=$timestamp")
99+
sparkSession.conf.set("spark.sql.benchmark.executionMode", config.executionMode)
94100

95101
val benchmark = new TPCDS(sparkSession)
96102

dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS.scala

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,44 @@
1717

1818
package org.apache.kyuubi.tpcds.benchmark
1919

20+
import scala.io.{Codec, Source}
21+
2022
import org.apache.spark.sql.SparkSession
2123

24+
import org.apache.kyuubi.tpcds.benchmark.ExecutionMode._
25+
2226
/**
2327
* TPC-DS benchmark's dataset.
2428
*/
2529
class TPCDS(@transient sparkSession: SparkSession)
2630
extends Benchmark(sparkSession)
2731
with TPCDS_2_4_Queries
28-
with Serializable {}
32+
with Serializable {
33+
34+
override val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName =>
35+
val in = getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql")
36+
val queryContent =
37+
try {
38+
Source.fromInputStream(in)(Codec.UTF8).mkString
39+
} finally {
40+
in.close()
41+
}
42+
43+
val modeName: String = sparkSession.conf.get("spark.sql.benchmark.executionMode")
44+
val resultsLocation: String = sparkSession.conf.get("spark.sql.perf.results")
45+
46+
val executionMode: ExecutionMode = modeName match {
47+
case "collect" => CollectResults
48+
case "foreach" => ForeachResults
49+
case "hash" => HashResults
50+
case "saveToParquet" => WriteParquet(s"${resultsLocation}_query_results")
51+
case _ =>
52+
throw new IllegalArgumentException(s"Unsupported mode: $modeName")
53+
}
54+
Query(
55+
queryName + "-v2.4",
56+
queryContent,
57+
description = "TPC-DS 2.4 Query",
58+
executionMode = executionMode)
59+
}
60+
}

dev/kyuubi-tpcds/src/main/scala/org/apache/kyuubi/tpcds/benchmark/TPCDS_2_4_Queries.scala

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,11 @@
1717

1818
package org.apache.kyuubi.tpcds.benchmark
1919

20-
import scala.io.{Codec, Source}
21-
2220
/**
2321
* This implements the official TPCDS v2.4 queries with only cosmetic modifications.
2422
*/
2523
trait TPCDS_2_4_Queries extends Benchmark {
2624

27-
import ExecutionMode._
28-
2925
val queryNames = Seq(
3026
"q1",
3127
"q2",
@@ -132,18 +128,6 @@ trait TPCDS_2_4_Queries extends Benchmark {
132128
"q99",
133129
"ss_max")
134130

135-
val tpcds2_4Queries: Seq[Query] = queryNames.map { queryName =>
136-
val in = getClass.getClassLoader.getResourceAsStream(s"tpcds_2_4/$queryName.sql")
137-
val queryContent: String = Source.fromInputStream(in)(Codec.UTF8).mkString
138-
in.close()
139-
140-
Query(
141-
queryName + "-v2.4",
142-
queryContent,
143-
description = "TPC-DS 2.4 Query",
144-
executionMode = CollectResults)
145-
}
131+
val tpcds2_4Queries: Seq[Query]
146132

147-
val tpcds2_4QueriesMap: Map[String, Query] =
148-
tpcds2_4Queries.map(q => q.name.split("-").get(0) -> q).toMap
149133
}

0 commit comments

Comments
 (0)