From fe425386b8a98c4416ff44e7f765bf7f100ab2cd Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Wed, 15 Apr 2026 13:11:08 +0000 Subject: [PATCH 01/19] [SPARK-XXXXX][SQL][TESTS] Move SQLTestUtilsBase content into QueryTestBase Move all methods from `SQLTestUtilsBase` into `QueryTestBase`, making `SQLTestUtilsBase` a thin alias that extends `QueryTestBase`. This is the first step toward consolidating the test utility class hierarchy by merging `SQLTestUtils` into `QueryTest`. Co-authored-by: Isaac --- .../org/apache/spark/sql/QueryTest.scala | 290 ++++++++++++++++- .../apache/spark/sql/test/SQLTestUtils.scala | 298 +----------------- 2 files changed, 289 insertions(+), 299 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index a2f4d5010b19d..1e28cae8dfe95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -17,27 +17,45 @@ package org.apache.spark.sql -import java.util.TimeZone +import java.io.File +import java.net.URI +import java.nio.file.Files +import java.util.{Locale, TimeZone, UUID} import java.util.regex.Pattern +import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ +import scala.language.implicitConversions -import org.scalatest.Assertions -import org.scalatest.Suite +import org.apache.hadoop.fs.Path +import org.scalatest.{Assertions, BeforeAndAfterAll, Suite} +import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.ExtendedAnalysisException +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.classic.ClassicConversions._ -import org.apache.spark.sql.execution.{QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.{FilterExec, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestData import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.storage.StorageLevel import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.Utils -trait QueryTestBase extends PlanTestBase with SparkSessionProvider { self: Suite => +trait QueryTestBase + extends Eventually + with BeforeAndAfterAll + with SQLTestData + with PlanTestBase { self: Suite => /** * Runs the plan and makes sure the answer contains all of the keywords. @@ -251,6 +269,268 @@ trait QueryTestBase extends PlanTestBase with SparkSessionProvider { self: Suite s"The physical plan has missing inputs:\n${qe.executedPlan}") } + protected def sparkContext = spark.sparkContext + + // Shorthand for running a query using our SparkSession + protected lazy val sql: String => DataFrame = spark.sql _ + + /** + * A helper object for importing SQL implicits. + * + * Note that the alternative of importing `spark.implicits._` is not possible here. + * This is because we create the `SparkSession` immediately before the first test is run, + * but the implicits import is needed in the constructor. + */ + protected object testImplicits + extends classic.SQLImplicits + with classic.ClassicConversions + with classic.ColumnConversions { + override protected def session: classic.SparkSession = + self.spark.asInstanceOf[classic.SparkSession] + override protected def converter: classic.ColumnNodeToExpressionConverter = + self.spark.asInstanceOf[classic.SparkSession].converter + } + + protected override def withSQLConf[T](pairs: (String, String)*)(f: => T): T = { + SparkSession.setActiveSession(spark) + super.withSQLConf(pairs: _*)(f) + } + + /** + * Drops functions after calling `f`. A function is represented by (functionName, isTemporary). + */ + protected def withUserDefinedFunction(functions: (String, Boolean)*)(f: => Unit): Unit = { + try { + f + } catch { + case cause: Throwable => throw cause + } finally { + functions.foreach { case (functionName, isTemporary) => + val withTemporary = if (isTemporary) "TEMPORARY" else "" + spark.sql(s"DROP $withTemporary FUNCTION IF EXISTS $functionName") + assert( + !spark.sessionState.catalog.functionExists(FunctionIdentifier(functionName)), + s"Function $functionName should have been dropped. But, it still exists.") + } + } + } + + /** + * Drops temporary view `viewNames` after calling `f`. + */ + protected def withTempView(viewNames: String*)(f: => Unit): Unit = { + Utils.tryWithSafeFinally(f) { + viewNames.foreach { viewName => + try spark.catalog.dropTempView(viewName) catch { + case _: NoSuchTableException => + } + } + } + } + + /** + * Drops global temporary view `viewNames` after calling `f`. + */ + protected def withGlobalTempView(viewNames: String*)(f: => Unit): Unit = { + Utils.tryWithSafeFinally(f) { + viewNames.foreach { viewName => + try spark.catalog.dropGlobalTempView(viewName) catch { + case _: NoSuchTableException => + } + } + } + } + + /** + * Drops table `tableName` after calling `f`. + */ + protected def withTable(tableNames: String*)(f: => Unit): Unit = { + Utils.tryWithSafeFinally(f) { + tableNames.foreach { name => + spark.sql(s"DROP TABLE IF EXISTS $name") + } + } + } + + /** + * Drops view `viewName` after calling `f`. + */ + protected def withView(viewNames: String*)(f: => Unit): Unit = { + Utils.tryWithSafeFinally(f)( + viewNames.foreach { name => + spark.sql(s"DROP VIEW IF EXISTS $name") + } + ) + } + + /** + * Drops cache `cacheName` after calling `f`. + */ + protected def withCache(cacheNames: String*)(f: => Unit): Unit = { + Utils.tryWithSafeFinally(f) { + cacheNames.foreach { cacheName => + try uncacheTable(cacheName) catch { + case _: AnalysisException => + } + } + } + } + + // Blocking uncache table for tests + protected def uncacheTable(tableName: String): Unit = { + val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(tableName) + val cascade = !spark.sessionState.catalog.isTempView(tableIdent) + spark.sharedState.cacheManager.uncacheQuery( + spark.table(tableName).asInstanceOf[classic.Dataset[_]], + cascade = cascade, + blocking = true) + } + + /** + * Creates a temporary database and switches current database to it before executing `f`. This + * database is dropped after `f` returns. + * + * Note that this method doesn't switch current database before executing `f`. + */ + protected def withTempDatabase(f: String => Unit): Unit = { + val dbName = s"db_${UUID.randomUUID().toString.replace('-', '_')}" + + try { + spark.sql(s"CREATE DATABASE $dbName") + } catch { case cause: Throwable => + fail("Failed to create temporary database", cause) + } + + try f(dbName) finally { + if (spark.catalog.currentDatabase == dbName) { + spark.sql(s"USE $DEFAULT_DATABASE") + } + spark.sql(s"DROP DATABASE $dbName CASCADE") + } + } + + /** + * Drops database `dbName` after calling `f`. + */ + protected def withDatabase(dbNames: String*)(f: => Unit): Unit = { + Utils.tryWithSafeFinally(f) { + dbNames.foreach { name => + spark.sql(s"DROP DATABASE IF EXISTS $name CASCADE") + } + spark.sql(s"USE $DEFAULT_DATABASE") + } + } + + /** + * Drops namespace `namespace` after calling `f`. + * + * Note that, if you switch current catalog/namespace in `f`, you should switch it back manually. + */ + protected def withNamespace(namespaces: String*)(f: => Unit): Unit = { + Utils.tryWithSafeFinally(f) { + namespaces.foreach { name => + spark.sql(s"DROP NAMESPACE IF EXISTS $name CASCADE") + } + } + } + + /** + * Restores the current catalog/database after calling `f`. + */ + protected def withCurrentCatalogAndNamespace(f: => Unit): Unit = { + val curCatalog = sql("select current_catalog()").head().getString(0) + val curDatabase = sql("select current_database()").head().getString(0) + Utils.tryWithSafeFinally(f) { + spark.sql(s"USE $curCatalog.$curDatabase") + } + } + + /** + * Enables Locale `language` before executing `f`, then switches back to the default locale of + * JVM after `f` returns. + */ + protected def withLocale(language: String)(f: => Unit): Unit = { + val originalLocale = Locale.getDefault + try { + // Add Locale setting + Locale.setDefault(new Locale(language)) + f + } finally { + Locale.setDefault(originalLocale) + } + } + + /** + * Drops temporary variable `variableName` after calling `f`. + */ + protected def withSessionVariable(variableNames: String*)(f: => Unit): Unit = { + Utils.tryWithSafeFinally(f) { + variableNames.foreach { name => + spark.sql(s"DROP TEMPORARY VARIABLE IF EXISTS $name") + } + } + } + + /** + * Activates database `db` before executing `f`, then switches back to `default` database after + * `f` returns. + */ + protected def activateDatabase(db: String)(f: => Unit): Unit = { + spark.sessionState.catalogManager.setCurrentNamespace(Array(db)) + Utils.tryWithSafeFinally(f)( + spark.sessionState.catalogManager.setCurrentNamespace(Array("default"))) + } + + /** + * Strip Spark-side filtering in order to check if a datasource filters rows correctly. + */ + protected def stripSparkFilter(df: DataFrame): DataFrame = { + val schema = df.schema + val withoutFilters = df.queryExecution.executedPlan.transform { + case FilterExec(_, child) => child + } + + spark.asInstanceOf[classic.SparkSession] + .internalCreateDataFrame(withoutFilters.execute(), schema) + } + + /** + * Turn a logical plan into a `DataFrame`. This should be removed once we have an easier + * way to construct `DataFrame` directly out of local data without relying on implicits. + */ + protected implicit def logicalPlanToSparkQuery(plan: LogicalPlan): classic.DataFrame = { + classic.Dataset.ofRows(spark.asInstanceOf[classic.SparkSession], plan) + } + + /** + * This method is used to make the given path qualified, when a path + * does not contain a scheme, this path will not be changed after the default + * FileSystem is changed. + */ + def makeQualifiedPath(path: String): URI = { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf()) + fs.makeQualified(hadoopPath).toUri + } + + /** + * Returns full path to the given file in the resource folder + */ + protected def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + /** + * Returns the size of the local directory except the metadata file and the temporary file. + */ + def getLocalDirSize(file: File): Long = { + assert(file.isDirectory) + Files.walk(file.toPath).iterator().asScala + .filter(p => java.nio.file.Files.isRegularFile(p) && + DataSourceUtils.isDataFile(p.getFileName.toString)) + .map(_.toFile.length).sum + } + } abstract class QueryTest extends SparkFunSuite with QueryTestBase { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 4a52677510858..b7aebd9fde3a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -18,32 +18,19 @@ package org.apache.spark.sql.test import java.io.File -import java.net.URI import java.nio.file.Files -import java.util.{Locale, UUID} import scala.concurrent.duration._ -import scala.jdk.CollectionConverters._ -import scala.language.implicitConversions import scala.util.control.NonFatal -import org.apache.hadoop.fs.Path import org.scalactic.source.Position -import org.scalatest.{BeforeAndAfterAll, Suite, Tag} -import org.scalatest.concurrent.Eventually +import org.scalatest.{Suite, Tag} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{classic, AnalysisException, DataFrame, Row, SparkSession} -import org.apache.spark.sql.catalyst.FunctionIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE +import org.apache.spark.sql.{QueryTest, QueryTestBase, Row} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.PlanTestBase -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.FilterExec import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution -import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.UninterruptibleThread @@ -214,286 +201,9 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with /** * Helper trait that can be extended by all external SQL test suites. - * - * This allows subclasses to plugin a custom `SQLContext`. - * To use implicit methods, import `testImplicits._` instead of through the `SQLContext`. - * - * Subclasses should *not* create `SQLContext`s in the test suite constructor, which is - * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM. + * Now a thin alias for [[QueryTestBase]]. */ -private[sql] trait SQLTestUtilsBase - extends Eventually - with BeforeAndAfterAll - with SQLTestData - with PlanTestBase { self: Suite => - - protected def sparkContext = spark.sparkContext - - // Shorthand for running a query using our SparkSession - protected lazy val sql: String => DataFrame = spark.sql _ - - /** - * A helper object for importing SQL implicits. - * - * Note that the alternative of importing `spark.implicits._` is not possible here. - * This is because we create the `SparkSession` immediately before the first test is run, - * but the implicits import is needed in the constructor. - */ - protected object testImplicits - extends classic.SQLImplicits - with classic.ClassicConversions - with classic.ColumnConversions { - override protected def session: classic.SparkSession = - self.spark.asInstanceOf[classic.SparkSession] - override protected def converter: classic.ColumnNodeToExpressionConverter = - self.spark.asInstanceOf[classic.SparkSession].converter - } - - protected override def withSQLConf[T](pairs: (String, String)*)(f: => T): T = { - SparkSession.setActiveSession(spark) - super.withSQLConf(pairs: _*)(f) - } - - /** - * Drops functions after calling `f`. A function is represented by (functionName, isTemporary). - */ - protected def withUserDefinedFunction(functions: (String, Boolean)*)(f: => Unit): Unit = { - try { - f - } catch { - case cause: Throwable => throw cause - } finally { - // If the test failed part way, we don't want to mask the failure by failing to remove - // temp tables that never got created. - functions.foreach { case (functionName, isTemporary) => - val withTemporary = if (isTemporary) "TEMPORARY" else "" - spark.sql(s"DROP $withTemporary FUNCTION IF EXISTS $functionName") - assert( - !spark.sessionState.catalog.functionExists(FunctionIdentifier(functionName)), - s"Function $functionName should have been dropped. But, it still exists.") - } - } - } - - /** - * Drops temporary view `viewNames` after calling `f`. - */ - protected def withTempView(viewNames: String*)(f: => Unit): Unit = { - Utils.tryWithSafeFinally(f) { - viewNames.foreach { viewName => - try spark.catalog.dropTempView(viewName) catch { - // If the test failed part way, we don't want to mask the failure by failing to remove - // temp views that never got created. - case _: NoSuchTableException => - } - } - } - } - - /** - * Drops global temporary view `viewNames` after calling `f`. - */ - protected def withGlobalTempView(viewNames: String*)(f: => Unit): Unit = { - Utils.tryWithSafeFinally(f) { - viewNames.foreach { viewName => - try spark.catalog.dropGlobalTempView(viewName) catch { - // If the test failed part way, we don't want to mask the failure by failing to remove - // global temp views that never got created. - case _: NoSuchTableException => - } - } - } - } - - /** - * Drops table `tableName` after calling `f`. - */ - protected def withTable(tableNames: String*)(f: => Unit): Unit = { - Utils.tryWithSafeFinally(f) { - tableNames.foreach { name => - spark.sql(s"DROP TABLE IF EXISTS $name") - } - } - } - - /** - * Drops view `viewName` after calling `f`. - */ - protected def withView(viewNames: String*)(f: => Unit): Unit = { - Utils.tryWithSafeFinally(f)( - viewNames.foreach { name => - spark.sql(s"DROP VIEW IF EXISTS $name") - } - ) - } - - /** - * Drops cache `cacheName` after calling `f`. - */ - protected def withCache(cacheNames: String*)(f: => Unit): Unit = { - Utils.tryWithSafeFinally(f) { - cacheNames.foreach { cacheName => - try uncacheTable(cacheName) catch { - case _: AnalysisException => - } - } - } - } - - // Blocking uncache table for tests - protected def uncacheTable(tableName: String): Unit = { - val tableIdent = spark.sessionState.sqlParser.parseTableIdentifier(tableName) - val cascade = !spark.sessionState.catalog.isTempView(tableIdent) - spark.sharedState.cacheManager.uncacheQuery( - spark.table(tableName).asInstanceOf[classic.Dataset[_]], - cascade = cascade, - blocking = true) - } - - /** - * Creates a temporary database and switches current database to it before executing `f`. This - * database is dropped after `f` returns. - * - * Note that this method doesn't switch current database before executing `f`. - */ - protected def withTempDatabase(f: String => Unit): Unit = { - val dbName = s"db_${UUID.randomUUID().toString.replace('-', '_')}" - - try { - spark.sql(s"CREATE DATABASE $dbName") - } catch { case cause: Throwable => - fail("Failed to create temporary database", cause) - } - - try f(dbName) finally { - if (spark.catalog.currentDatabase == dbName) { - spark.sql(s"USE $DEFAULT_DATABASE") - } - spark.sql(s"DROP DATABASE $dbName CASCADE") - } - } - - /** - * Drops database `dbName` after calling `f`. - */ - protected def withDatabase(dbNames: String*)(f: => Unit): Unit = { - Utils.tryWithSafeFinally(f) { - dbNames.foreach { name => - spark.sql(s"DROP DATABASE IF EXISTS $name CASCADE") - } - spark.sql(s"USE $DEFAULT_DATABASE") - } - } - - /** - * Drops namespace `namespace` after calling `f`. - * - * Note that, if you switch current catalog/namespace in `f`, you should switch it back manually. - */ - protected def withNamespace(namespaces: String*)(f: => Unit): Unit = { - Utils.tryWithSafeFinally(f) { - namespaces.foreach { name => - spark.sql(s"DROP NAMESPACE IF EXISTS $name CASCADE") - } - } - } - - /** - * Restores the current catalog/database after calling `f`. - */ - protected def withCurrentCatalogAndNamespace(f: => Unit): Unit = { - val curCatalog = sql("select current_catalog()").head().getString(0) - val curDatabase = sql("select current_database()").head().getString(0) - Utils.tryWithSafeFinally(f) { - spark.sql(s"USE $curCatalog.$curDatabase") - } - } - - /** - * Enables Locale `language` before executing `f`, then switches back to the default locale of JVM - * after `f` returns. - */ - protected def withLocale(language: String)(f: => Unit): Unit = { - val originalLocale = Locale.getDefault - try { - // Add Locale setting - Locale.setDefault(new Locale(language)) - f - } finally { - Locale.setDefault(originalLocale) - } - } - - /** - * Drops temporary variable `variableName` after calling `f`. - */ - protected def withSessionVariable(variableNames: String*)(f: => Unit): Unit = { - Utils.tryWithSafeFinally(f) { - variableNames.foreach { name => - spark.sql(s"DROP TEMPORARY VARIABLE IF EXISTS $name") - } - } - } - - /** - * Activates database `db` before executing `f`, then switches back to `default` database after - * `f` returns. - */ - protected def activateDatabase(db: String)(f: => Unit): Unit = { - spark.sessionState.catalogManager.setCurrentNamespace(Array(db)) - Utils.tryWithSafeFinally(f)( - spark.sessionState.catalogManager.setCurrentNamespace(Array("default"))) - } - - /** - * Strip Spark-side filtering in order to check if a datasource filters rows correctly. - */ - protected def stripSparkFilter(df: DataFrame): DataFrame = { - val schema = df.schema - val withoutFilters = df.queryExecution.executedPlan.transform { - case FilterExec(_, child) => child - } - - spark.asInstanceOf[classic.SparkSession] - .internalCreateDataFrame(withoutFilters.execute(), schema) - } - - /** - * Turn a logical plan into a `DataFrame`. This should be removed once we have an easier - * way to construct `DataFrame` directly out of local data without relying on implicits. - */ - protected implicit def logicalPlanToSparkQuery(plan: LogicalPlan): classic.DataFrame = { - classic.Dataset.ofRows(spark.asInstanceOf[classic.SparkSession], plan) - } - - - /** - * This method is used to make the given path qualified, when a path - * does not contain a scheme, this path will not be changed after the default - * FileSystem is changed. - */ - def makeQualifiedPath(path: String): URI = { - val hadoopPath = new Path(path) - val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf()) - fs.makeQualified(hadoopPath).toUri - } - - /** - * Returns full path to the given file in the resource folder - */ - protected def testFile(fileName: String): String = { - Thread.currentThread().getContextClassLoader.getResource(fileName).toString - } - - /** - * Returns the size of the local directory except the metadata file and the temporary file. - */ - def getLocalDirSize(file: File): Long = { - assert(file.isDirectory) - Files.walk(file.toPath).iterator().asScala - .filter(p => Files.isRegularFile(p) && DataSourceUtils.isDataFile(p.getFileName.toString)) - .map(_.toFile.length).sum - } +private[sql] trait SQLTestUtilsBase extends QueryTestBase { self: Suite => } private[sql] object SQLTestUtils { From 93f154a78d535c1c50ab5ef83b8f31718c5de6af Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 16 Apr 2026 06:16:49 +0000 Subject: [PATCH 02/19] [SPARK-XXXXX][SQL][TESTS] Fix unused imports after SQLTestUtilsBase merge Co-authored-by: Isaac --- sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala | 2 -- .../scala/org/apache/spark/sql/connector/AlterTableTests.scala | 1 - .../src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala | 2 +- 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 1e28cae8dfe95..36adea7de6430 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -23,7 +23,6 @@ import java.nio.file.Files import java.util.{Locale, TimeZone, UUID} import java.util.regex.Pattern -import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.language.implicitConversions @@ -43,7 +42,6 @@ import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.execution.{FilterExec, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.DataSourceUtils -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.storage.StorageLevel diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index f48258ea6b85c..9722049e62865 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -21,7 +21,6 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row} -import org.apache.spark.sql.QueryTest.checkAnswer import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.util.quoteIdentifier diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index b7aebd9fde3a9..26a363f32dfd5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,7 +27,7 @@ import org.scalactic.source.Position import org.scalatest.{Suite, Tag} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{QueryTest, QueryTestBase, Row} +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution From f1ec729872d30b9f71cf29d7d3cc4bd8587a2dcf Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 16 Apr 2026 08:15:48 +0000 Subject: [PATCH 03/19] [SPARK-XXXXX][SQL][TESTS] Fix QueryTestBase resolution in SQLTestUtilsBase Use fully qualified name to avoid unused import warning while ensuring the type is resolved correctly from the sub-package. Co-authored-by: Isaac --- .../test/scala/org/apache/spark/sql/test/SQLTestUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 26a363f32dfd5..ee27d84f06049 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -203,7 +203,8 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with * Helper trait that can be extended by all external SQL test suites. * Now a thin alias for [[QueryTestBase]]. */ -private[sql] trait SQLTestUtilsBase extends QueryTestBase { self: Suite => +private[sql] trait SQLTestUtilsBase + extends org.apache.spark.sql.QueryTestBase { self: Suite => } private[sql] object SQLTestUtils { From 5d2fa2efc421e026596cbf807623b56122940c4f Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 16 Apr 2026 11:48:28 +0000 Subject: [PATCH 04/19] [SPARK-XXXXX][SQL][TESTS] Remove unused QueryTest import from SQLTestUtils Co-authored-by: Isaac --- .../src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index ee27d84f06049..2bc11fc85f2cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,7 +27,7 @@ import org.scalactic.source.Position import org.scalatest.{Suite, Tag} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution From f6031c7067971df3ceed220537fd68bf47c2d5a4 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 16 Apr 2026 12:11:14 +0000 Subject: [PATCH 05/19] [SPARK-XXXXX][SQL][TESTS] Remove unused spark.sql import from HivePlanTest Co-authored-by: Isaac --- .../scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala index 46079a0fb48d3..c60dc441b4f96 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala @@ -26,7 +26,6 @@ import org.apache.spark.tags.SlowHiveTest @SlowHiveTest class HivePlanTest extends QueryTest with TestHiveSingleton { - import spark.sql import spark.implicits._ test("udf constant folding") { From 50073ed0deb728a9deedefdd8f9cafad474996b0 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 00:54:33 +0000 Subject: [PATCH 06/19] [SPARK-XXXXX][SQL][TESTS] Move SQLTestUtils content into QueryTest Move all methods from `SQLTestUtils` into `QueryTest`, making `SQLTestUtils` a thin alias. Also move `compareAnswers` into `object QueryTest`. Co-authored-by: Isaac --- .../org/apache/spark/sql/QueryTest.scala | 194 +++++++++++++++++- .../apache/spark/sql/test/SQLTestUtils.scala | 185 +---------------- 2 files changed, 196 insertions(+), 183 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 36adea7de6430..9b1c1d4af1b7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -23,11 +23,14 @@ import java.nio.file.Files import java.util.{Locale, TimeZone, UUID} import java.util.regex.Pattern +import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.language.implicitConversions +import scala.util.control.NonFatal import org.apache.hadoop.fs.Path -import org.scalatest.{Assertions, BeforeAndAfterAll, Suite} +import org.scalactic.source.Position +import org.scalatest.{Assertions, BeforeAndAfterAll, Suite, Tag} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite @@ -41,11 +44,15 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.execution.{FilterExec, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.storage.StorageLevel +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.UninterruptibleThread import org.apache.spark.util.Utils @@ -531,7 +538,156 @@ trait QueryTestBase } -abstract class QueryTest extends SparkFunSuite with QueryTestBase { +abstract class QueryTest extends SparkFunSuite with QueryTestBase with PlanTest { + // Whether to materialize all test data before the first test is run + private var loadTestDataBeforeTests = false + + protected override def beforeAll(): Unit = { + super.beforeAll() + if (loadTestDataBeforeTests) { + loadTestData() + } + } + + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + */ + protected override def withTempDir(f: File => Unit): Unit = { + super.withTempDir { dir => + f(dir) + waitForTasksToFinish() + } + } + + /** + * A helper function for turning off/on codegen. + */ + protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = { + Seq("false", "true").foreach { codegenEnabled => + val isTurnOn = if (codegenEnabled == "true") "on" else "off" + test(s"$testName (whole-stage-codegen ${isTurnOn})") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled) { + f(codegenEnabled) + } + } + } + } + + /** + * Materialize the test data immediately after the `SQLContext` is set up. + * This is necessary if the data is accessed by name but not through direct reference. + */ + protected def setupTestData(): Unit = { + loadTestDataBeforeTests = true + } + + /** + * Disable stdout and stderr when running the test. To not output the logs to the console, + * ConsoleAppender's `follow` should be set to `true` so that it will honor reassignments of + * System.out or System.err. Otherwise, ConsoleAppender will still output to the console even if + * we change System.out and System.err. + */ + protected def testQuietly(name: String)(f: => Unit): Unit = { + test(name) { + quietly { + f + } + } + } + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any) + (implicit pos: Position): Unit = { + if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) { + super.test(testName, testTags: _*) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + testFun + } + } + } else { + super.test(testName, testTags: _*)(testFun) + } + } + + /** + * Run a test on a separate `UninterruptibleThread`. + */ + protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false) + (body: => Unit): Unit = { + val timeoutMillis = 10000 + @transient var ex: Throwable = null + + def runOnThread(): Unit = { + val thread = new UninterruptibleThread(s"Testing thread for test $name") { + override def run(): Unit = { + try { + body + } catch { + case NonFatal(e) => + ex = e + } + } + } + thread.setDaemon(true) + thread.start() + thread.join(timeoutMillis) + if (thread.isAlive) { + thread.interrupt() + // If this interrupt does not work, then this thread is most likely running something that + // is not interruptible. There is not much point to wait for the thread to terminate, and + // we rather let the JVM terminate the thread on exit. + fail( + s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" + + s" $timeoutMillis ms") + } else if (ex != null) { + throw ex + } + } + + if (quietly) { + testQuietly(name) { runOnThread() } + } else { + test(name) { runOnThread() } + } + } + + /** + * Copy file in jar's resource to a temp file, then pass it to `f`. + * This function is used to make `f` can use the path of temp file(e.g. file:/), instead of + * path of jar's resource which starts with 'jar:file:/' + */ + protected def withResourceTempPath(resourcePath: String)(f: File => Unit): Unit = { + val inputStream = + Thread.currentThread().getContextClassLoader.getResourceAsStream(resourcePath) + withTempDir { dir => + val tmpFile = new File(dir, "tmp") + Files.copy(inputStream, tmpFile.toPath) + f(tmpFile) + } + } + + /** + * Waits for all tasks on all executors to be finished. + */ + protected def waitForTasksToFinish(): Unit = { + eventually(timeout(10.seconds)) { + assert(spark.sparkContext.statusTracker + .getExecutorInfos.map(_.numRunningTasks()).sum == 0) + } + } + + /** + * Creates the specified number of temporary directories, which is then passed to `f` and will be + * deleted after `f` returns. + */ + protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit): Unit = { + val files = Array.fill[File](numPaths)(Utils.createTempDir().getCanonicalFile) + try f(files.toImmutableArraySeq) finally { + // wait for all tasks to finish before deleting files + waitForTasksToFinish() + files.foreach(Utils.deleteRecursively) + } + } protected def getCurrentClassCallSitePattern: String = { val cs = Thread.currentThread().getStackTrace()(2) @@ -768,6 +924,40 @@ object QueryTest extends Assertions { capturedQueryExecutions } + + def compareAnswers( + sparkAnswer: Seq[Row], + expectedAnswer: Seq[Row], + sort: Boolean): Option[String] = { + def prepareAnswer(answer: Seq[Row]): Seq[Row] = { + val converted: Seq[Row] = answer.map { s => + Row.fromSeq(s.toSeq.map { + case d: java.math.BigDecimal => BigDecimal(d) + case b: Array[Byte] => b.toSeq + case o => o + }) + } + if (sort) { + converted.sortBy(_.toString()) + } else { + converted + } + } + if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) { + val errorMessage = + s""" + | == Results == + | ${sideBySide( + s"== Expected Answer - ${expectedAnswer.size} ==" +: + prepareAnswer(expectedAnswer).map(_.toString()), + s"== Actual Answer - ${sparkAnswer.size} ==" +: + prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")} + """.stripMargin + Some(errorMessage) + } else { + None + } + } } class QueryTestSuite extends QueryTest with test.SharedSparkSession { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 2bc11fc85f2cc..aaae26e3b1420 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -17,191 +17,20 @@ package org.apache.spark.sql.test -import java.io.File -import java.nio.file.Files +import org.scalatest.Suite -import scala.concurrent.duration._ -import scala.util.control.NonFatal - -import org.scalactic.source.Position -import org.scalatest.{Suite, Tag} - -import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.ArrayImplicits._ -import org.apache.spark.util.UninterruptibleThread -import org.apache.spark.util.Utils /** * Helper trait that should be extended by all SQL test suites within the Spark - * code base. - * - * This allows subclasses to plugin a custom `SparkSession`. It comes with test data - * prepared in advance as well as all implicit conversions used extensively by dataframes. - * To use implicit methods, import `testImplicits._` instead of through the `SparkSession`. - * - * Subclasses should *not* create `SparkSession`s in the test suite constructor, which is - * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM. + * code base. Now a thin alias for [[org.apache.spark.sql.QueryTest]]. */ -private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with PlanTest { - // Whether to materialize all test data before the first test is run - private var loadTestDataBeforeTests = false - - protected override def beforeAll(): Unit = { - super.beforeAll() - if (loadTestDataBeforeTests) { - loadTestData() - } - } - - /** - * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` - * returns. - */ - protected override def withTempDir(f: File => Unit): Unit = { - super.withTempDir { dir => - f(dir) - waitForTasksToFinish() - } - } - - /** - * A helper function for turning off/on codegen. - */ - protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = { - Seq("false", "true").foreach { codegenEnabled => - val isTurnOn = if (codegenEnabled == "true") "on" else "off" - test(s"$testName (whole-stage-codegen ${isTurnOn})") { - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled) { - f(codegenEnabled) - } - } - } - } - - /** - * Materialize the test data immediately after the `SQLContext` is set up. - * This is necessary if the data is accessed by name but not through direct reference. - */ - protected def setupTestData(): Unit = { - loadTestDataBeforeTests = true - } - - /** - * Disable stdout and stderr when running the test. To not output the logs to the console, - * ConsoleAppender's `follow` should be set to `true` so that it will honor reassignments of - * System.out or System.err. Otherwise, ConsoleAppender will still output to the console even if - * we change System.out and System.err. - */ - protected def testQuietly(name: String)(f: => Unit): Unit = { - test(name) { - quietly { - f - } - } - } - - override protected def test(testName: String, testTags: Tag*)(testFun: => Any) - (implicit pos: Position): Unit = { - if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) { - super.test(testName, testTags: _*) { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - testFun - } - } - } else { - super.test(testName, testTags: _*)(testFun) - } - } - - /** - * Run a test on a separate `UninterruptibleThread`. - */ - protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false) - (body: => Unit): Unit = { - val timeoutMillis = 10000 - @transient var ex: Throwable = null - - def runOnThread(): Unit = { - val thread = new UninterruptibleThread(s"Testing thread for test $name") { - override def run(): Unit = { - try { - body - } catch { - case NonFatal(e) => - ex = e - } - } - } - thread.setDaemon(true) - thread.start() - thread.join(timeoutMillis) - if (thread.isAlive) { - thread.interrupt() - // If this interrupt does not work, then this thread is most likely running something that - // is not interruptible. There is not much point to wait for the thread to terminate, and - // we rather let the JVM terminate the thread on exit. - fail( - s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" + - s" $timeoutMillis ms") - } else if (ex != null) { - throw ex - } - } - - if (quietly) { - testQuietly(name) { runOnThread() } - } else { - test(name) { runOnThread() } - } - } - - /** - * Copy file in jar's resource to a temp file, then pass it to `f`. - * This function is used to make `f` can use the path of temp file(e.g. file:/), instead of - * path of jar's resource which starts with 'jar:file:/' - */ - protected def withResourceTempPath(resourcePath: String)(f: File => Unit): Unit = { - val inputStream = - Thread.currentThread().getContextClassLoader.getResourceAsStream(resourcePath) - withTempDir { dir => - val tmpFile = new File(dir, "tmp") - Files.copy(inputStream, tmpFile.toPath) - f(tmpFile) - } - } - - /** - * Waits for all tasks on all executors to be finished. - */ - protected def waitForTasksToFinish(): Unit = { - eventually(timeout(10.seconds)) { - assert(spark.sparkContext.statusTracker - .getExecutorInfos.map(_.numRunningTasks()).sum == 0) - } - } - - /** - * Creates the specified number of temporary directories, which is then passed to `f` and will be - * deleted after `f` returns. - */ - protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit): Unit = { - val files = Array.fill[File](numPaths)(Utils.createTempDir().getCanonicalFile) - try f(files.toImmutableArraySeq) finally { - // wait for all tasks to finish before deleting files - waitForTasksToFinish() - files.foreach(Utils.deleteRecursively) - } - } -} +private[sql] trait SQLTestUtils extends org.apache.spark.sql.QueryTest /** * Helper trait that can be extended by all external SQL test suites. - * Now a thin alias for [[QueryTestBase]]. + * Now a thin alias for [[org.apache.spark.sql.QueryTestBase]]. */ private[sql] trait SQLTestUtilsBase extends org.apache.spark.sql.QueryTestBase { self: Suite => @@ -214,12 +43,6 @@ private[sql] object SQLTestUtils { expectedAnswer: Seq[Row], sort: Boolean): Option[String] = { def prepareAnswer(answer: Seq[Row]): Seq[Row] = { - // Converts data to types that we can do equality comparison using Scala collections. - // For BigDecimal type, the Scala type has a better definition of equality test (similar to - // Java's java.math.BigDecimal.compareTo). - // For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for - // equality test. - // This function is copied from Catalyst's QueryTest val converted: Seq[Row] = answer.map { s => Row.fromSeq(s.toSeq.map { case d: java.math.BigDecimal => BigDecimal(d) From 1875e3f527cd55aaaea69fa5be2bab6e2ce43b2d Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 01:19:13 +0000 Subject: [PATCH 07/19] [SPARK-XXXXX][SQL][TESTS] Fix SQLTestUtils to extend traits only, not abstract class SQLTestUtils must remain a trait extending only traits (QueryTestBase, PlanTest) rather than the abstract class QueryTest, because many test suites mix in SQLTestUtils with SparkFunSuite or SparkPlanTest directly. Co-authored-by: Isaac --- .../scala/org/apache/spark/sql/test/SQLTestUtils.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index aaae26e3b1420..5efce79da4877 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -24,9 +24,15 @@ import org.apache.spark.sql.catalyst.util._ /** * Helper trait that should be extended by all SQL test suites within the Spark - * code base. Now a thin alias for [[org.apache.spark.sql.QueryTest]]. + * code base. Now a thin alias for [[org.apache.spark.sql.QueryTestBase]] with + * [[org.apache.spark.sql.catalyst.plans.PlanTest]]. + * + * Note: This remains a trait (not extending the abstract class QueryTest) so that it can be + * mixed into classes that extend SparkFunSuite or SparkPlanTest directly. */ -private[sql] trait SQLTestUtils extends org.apache.spark.sql.QueryTest +private[sql] trait SQLTestUtils + extends org.apache.spark.sql.QueryTestBase + with org.apache.spark.sql.catalyst.plans.PlanTest /** * Helper trait that can be extended by all external SQL test suites. From d77d28a6290f6114ba46c4714cca4383110a116a Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 01:35:55 +0000 Subject: [PATCH 08/19] [SPARK-XXXXX][SQL][TESTS] Revert SQLTestUtils merge into QueryTest SQLTestUtils cannot extend QueryTest (abstract class) because many test suites mix SQLTestUtils with SparkFunSuite or SparkPlanTest directly. Methods that call test() must stay in a class/trait that extends SparkFunSuite. Reverting to keep SQLTestUtils unchanged. This PR now only merges SQLTestUtilsBase into QueryTestBase. Co-authored-by: Isaac --- .../org/apache/spark/sql/QueryTest.scala | 193 +----------------- .../apache/spark/sql/test/SQLTestUtils.scala | 187 ++++++++++++++++- 2 files changed, 181 insertions(+), 199 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 9b1c1d4af1b7d..959424a3e613b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -23,14 +23,11 @@ import java.nio.file.Files import java.util.{Locale, TimeZone, UUID} import java.util.regex.Pattern -import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.language.implicitConversions -import scala.util.control.NonFatal import org.apache.hadoop.fs.Path -import org.scalactic.source.Position -import org.scalatest.{Assertions, BeforeAndAfterAll, Suite, Tag} +import org.scalatest.{Assertions, BeforeAndAfterAll, Suite} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite @@ -44,15 +41,11 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.execution.{FilterExec, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.DataSourceUtils -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.storage.StorageLevel -import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.util.ArrayImplicits._ -import org.apache.spark.util.UninterruptibleThread import org.apache.spark.util.Utils @@ -538,156 +531,7 @@ trait QueryTestBase } -abstract class QueryTest extends SparkFunSuite with QueryTestBase with PlanTest { - // Whether to materialize all test data before the first test is run - private var loadTestDataBeforeTests = false - - protected override def beforeAll(): Unit = { - super.beforeAll() - if (loadTestDataBeforeTests) { - loadTestData() - } - } - - /** - * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` - * returns. - */ - protected override def withTempDir(f: File => Unit): Unit = { - super.withTempDir { dir => - f(dir) - waitForTasksToFinish() - } - } - - /** - * A helper function for turning off/on codegen. - */ - protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = { - Seq("false", "true").foreach { codegenEnabled => - val isTurnOn = if (codegenEnabled == "true") "on" else "off" - test(s"$testName (whole-stage-codegen ${isTurnOn})") { - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled) { - f(codegenEnabled) - } - } - } - } - - /** - * Materialize the test data immediately after the `SQLContext` is set up. - * This is necessary if the data is accessed by name but not through direct reference. - */ - protected def setupTestData(): Unit = { - loadTestDataBeforeTests = true - } - - /** - * Disable stdout and stderr when running the test. To not output the logs to the console, - * ConsoleAppender's `follow` should be set to `true` so that it will honor reassignments of - * System.out or System.err. Otherwise, ConsoleAppender will still output to the console even if - * we change System.out and System.err. - */ - protected def testQuietly(name: String)(f: => Unit): Unit = { - test(name) { - quietly { - f - } - } - } - - override protected def test(testName: String, testTags: Tag*)(testFun: => Any) - (implicit pos: Position): Unit = { - if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) { - super.test(testName, testTags: _*) { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - testFun - } - } - } else { - super.test(testName, testTags: _*)(testFun) - } - } - - /** - * Run a test on a separate `UninterruptibleThread`. - */ - protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false) - (body: => Unit): Unit = { - val timeoutMillis = 10000 - @transient var ex: Throwable = null - - def runOnThread(): Unit = { - val thread = new UninterruptibleThread(s"Testing thread for test $name") { - override def run(): Unit = { - try { - body - } catch { - case NonFatal(e) => - ex = e - } - } - } - thread.setDaemon(true) - thread.start() - thread.join(timeoutMillis) - if (thread.isAlive) { - thread.interrupt() - // If this interrupt does not work, then this thread is most likely running something that - // is not interruptible. There is not much point to wait for the thread to terminate, and - // we rather let the JVM terminate the thread on exit. - fail( - s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" + - s" $timeoutMillis ms") - } else if (ex != null) { - throw ex - } - } - - if (quietly) { - testQuietly(name) { runOnThread() } - } else { - test(name) { runOnThread() } - } - } - - /** - * Copy file in jar's resource to a temp file, then pass it to `f`. - * This function is used to make `f` can use the path of temp file(e.g. file:/), instead of - * path of jar's resource which starts with 'jar:file:/' - */ - protected def withResourceTempPath(resourcePath: String)(f: File => Unit): Unit = { - val inputStream = - Thread.currentThread().getContextClassLoader.getResourceAsStream(resourcePath) - withTempDir { dir => - val tmpFile = new File(dir, "tmp") - Files.copy(inputStream, tmpFile.toPath) - f(tmpFile) - } - } - - /** - * Waits for all tasks on all executors to be finished. - */ - protected def waitForTasksToFinish(): Unit = { - eventually(timeout(10.seconds)) { - assert(spark.sparkContext.statusTracker - .getExecutorInfos.map(_.numRunningTasks()).sum == 0) - } - } - - /** - * Creates the specified number of temporary directories, which is then passed to `f` and will be - * deleted after `f` returns. - */ - protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit): Unit = { - val files = Array.fill[File](numPaths)(Utils.createTempDir().getCanonicalFile) - try f(files.toImmutableArraySeq) finally { - // wait for all tasks to finish before deleting files - waitForTasksToFinish() - files.foreach(Utils.deleteRecursively) - } - } +abstract class QueryTest extends SparkFunSuite with QueryTestBase { protected def getCurrentClassCallSitePattern: String = { val cs = Thread.currentThread().getStackTrace()(2) @@ -925,39 +769,6 @@ object QueryTest extends Assertions { capturedQueryExecutions } - def compareAnswers( - sparkAnswer: Seq[Row], - expectedAnswer: Seq[Row], - sort: Boolean): Option[String] = { - def prepareAnswer(answer: Seq[Row]): Seq[Row] = { - val converted: Seq[Row] = answer.map { s => - Row.fromSeq(s.toSeq.map { - case d: java.math.BigDecimal => BigDecimal(d) - case b: Array[Byte] => b.toSeq - case o => o - }) - } - if (sort) { - converted.sortBy(_.toString()) - } else { - converted - } - } - if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) { - val errorMessage = - s""" - | == Results == - | ${sideBySide( - s"== Expected Answer - ${expectedAnswer.size} ==" +: - prepareAnswer(expectedAnswer).map(_.toString()), - s"== Actual Answer - ${sparkAnswer.size} ==" +: - prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")} - """.stripMargin - Some(errorMessage) - } else { - None - } - } } class QueryTestSuite extends QueryTest with test.SharedSparkSession { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 5efce79da4877..fb083ccd32365 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -17,22 +17,187 @@ package org.apache.spark.sql.test -import org.scalatest.Suite +import java.io.File +import java.nio.file.Files +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import org.scalactic.source.Position +import org.scalatest.{Suite, Tag} + +import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.util.Utils /** * Helper trait that should be extended by all SQL test suites within the Spark - * code base. Now a thin alias for [[org.apache.spark.sql.QueryTestBase]] with - * [[org.apache.spark.sql.catalyst.plans.PlanTest]]. + * code base. + * + * This allows subclasses to plugin a custom `SparkSession`. It comes with test data + * prepared in advance as well as all implicit conversions used extensively by dataframes. + * To use implicit methods, import `testImplicits._` instead of through the `SparkSession`. * - * Note: This remains a trait (not extending the abstract class QueryTest) so that it can be - * mixed into classes that extend SparkFunSuite or SparkPlanTest directly. + * Subclasses should *not* create `SparkSession`s in the test suite constructor, which is + * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM. */ -private[sql] trait SQLTestUtils - extends org.apache.spark.sql.QueryTestBase - with org.apache.spark.sql.catalyst.plans.PlanTest +private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with PlanTest { + // Whether to materialize all test data before the first test is run + private var loadTestDataBeforeTests = false + + protected override def beforeAll(): Unit = { + super.beforeAll() + if (loadTestDataBeforeTests) { + loadTestData() + } + } + + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + */ + protected override def withTempDir(f: File => Unit): Unit = { + super.withTempDir { dir => + f(dir) + waitForTasksToFinish() + } + } + + /** + * A helper function for turning off/on codegen. + */ + protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = { + Seq("false", "true").foreach { codegenEnabled => + val isTurnOn = if (codegenEnabled == "true") "on" else "off" + test(s"$testName (whole-stage-codegen ${isTurnOn})") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled) { + f(codegenEnabled) + } + } + } + } + + /** + * Materialize the test data immediately after the `SQLContext` is set up. + * This is necessary if the data is accessed by name but not through direct reference. + */ + protected def setupTestData(): Unit = { + loadTestDataBeforeTests = true + } + + /** + * Disable stdout and stderr when running the test. To not output the logs to the console, + * ConsoleAppender's `follow` should be set to `true` so that it will honor reassignments of + * System.out or System.err. Otherwise, ConsoleAppender will still output to the console even if + * we change System.out and System.err. + */ + protected def testQuietly(name: String)(f: => Unit): Unit = { + test(name) { + quietly { + f + } + } + } + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any) + (implicit pos: Position): Unit = { + if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) { + super.test(testName, testTags: _*) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + testFun + } + } + } else { + super.test(testName, testTags: _*)(testFun) + } + } + + /** + * Run a test on a separate `UninterruptibleThread`. + */ + protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false) + (body: => Unit): Unit = { + val timeoutMillis = 10000 + @transient var ex: Throwable = null + + def runOnThread(): Unit = { + val thread = new UninterruptibleThread(s"Testing thread for test $name") { + override def run(): Unit = { + try { + body + } catch { + case NonFatal(e) => + ex = e + } + } + } + thread.setDaemon(true) + thread.start() + thread.join(timeoutMillis) + if (thread.isAlive) { + thread.interrupt() + // If this interrupt does not work, then this thread is most likely running something that + // is not interruptible. There is not much point to wait for the thread to terminate, and + // we rather let the JVM terminate the thread on exit. + fail( + s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" + + s" $timeoutMillis ms") + } else if (ex != null) { + throw ex + } + } + + if (quietly) { + testQuietly(name) { runOnThread() } + } else { + test(name) { runOnThread() } + } + } + + /** + * Copy file in jar's resource to a temp file, then pass it to `f`. + * This function is used to make `f` can use the path of temp file(e.g. file:/), instead of + * path of jar's resource which starts with 'jar:file:/' + */ + protected def withResourceTempPath(resourcePath: String)(f: File => Unit): Unit = { + val inputStream = + Thread.currentThread().getContextClassLoader.getResourceAsStream(resourcePath) + withTempDir { dir => + val tmpFile = new File(dir, "tmp") + Files.copy(inputStream, tmpFile.toPath) + f(tmpFile) + } + } + + /** + * Waits for all tasks on all executors to be finished. + */ + protected def waitForTasksToFinish(): Unit = { + eventually(timeout(10.seconds)) { + assert(spark.sparkContext.statusTracker + .getExecutorInfos.map(_.numRunningTasks()).sum == 0) + } + } + + /** + * Creates the specified number of temporary directories, which is then passed to `f` and will be + * deleted after `f` returns. + */ + protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit): Unit = { + val files = Array.fill[File](numPaths)(Utils.createTempDir().getCanonicalFile) + try f(files.toImmutableArraySeq) finally { + // wait for all tasks to finish before deleting files + waitForTasksToFinish() + files.foreach(Utils.deleteRecursively) + } + } +} /** * Helper trait that can be extended by all external SQL test suites. @@ -49,6 +214,12 @@ private[sql] object SQLTestUtils { expectedAnswer: Seq[Row], sort: Boolean): Option[String] = { def prepareAnswer(answer: Seq[Row]): Seq[Row] = { + // Converts data to types that we can do equality comparison using Scala collections. + // For BigDecimal type, the Scala type has a better definition of equality test (similar to + // Java's java.math.BigDecimal.compareTo). + // For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for + // equality test. + // This function is copied from Catalyst's QueryTest val converted: Seq[Row] = answer.map { s => Row.fromSeq(s.toSeq.map { case d: java.math.BigDecimal => BigDecimal(d) From 73b030b33b6abb331d0c374890349012a88f29a0 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 03:28:52 +0000 Subject: [PATCH 09/19] [SPARK-XXXXX][SQL][TESTS] Remove redundant self-type from SQLTestUtilsBase Co-authored-by: Isaac --- .../test/scala/org/apache/spark/sql/test/SQLTestUtils.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index fb083ccd32365..c921657f205b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -203,9 +203,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with * Helper trait that can be extended by all external SQL test suites. * Now a thin alias for [[org.apache.spark.sql.QueryTestBase]]. */ -private[sql] trait SQLTestUtilsBase - extends org.apache.spark.sql.QueryTestBase { self: Suite => -} +private[sql] trait SQLTestUtilsBase extends org.apache.spark.sql.QueryTestBase private[sql] object SQLTestUtils { From b0ef9b4809909535b1d2bd5dbe76020c58f1e0c5 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 03:42:37 +0000 Subject: [PATCH 10/19] [SPARK-XXXXX][SQL][TESTS] Use import instead of fully qualified name for QueryTestBase Co-authored-by: Isaac --- .../test/scala/org/apache/spark/sql/test/SQLTestUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index c921657f205b3..76c74d1800c44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -27,7 +27,7 @@ import org.scalactic.source.Position import org.scalatest.{Suite, Tag} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.Row +import org.apache.spark.sql.{QueryTestBase, Row} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution @@ -203,7 +203,7 @@ private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with * Helper trait that can be extended by all external SQL test suites. * Now a thin alias for [[org.apache.spark.sql.QueryTestBase]]. */ -private[sql] trait SQLTestUtilsBase extends org.apache.spark.sql.QueryTestBase +private[sql] trait SQLTestUtilsBase extends QueryTestBase private[sql] object SQLTestUtils { From 3006495ae82a4406e05123ee0375075b3eeb4333 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 03:51:13 +0000 Subject: [PATCH 11/19] [SPARK-XXXXX][SQL][TESTS] Change QueryTest to trait and merge SQLTestUtils into it Change QueryTest from abstract class to trait, enabling SQLTestUtils to extend it directly. Move all SQLTestUtils methods into QueryTest. Both SQLTestUtils and SQLTestUtilsBase are now thin aliases. Co-authored-by: Isaac --- .../org/apache/spark/sql/QueryTest.scala | 157 ++++++++++++++- .../apache/spark/sql/test/SQLTestUtils.scala | 181 +----------------- 2 files changed, 159 insertions(+), 179 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 959424a3e613b..279eb64525c4c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -23,11 +23,14 @@ import java.nio.file.Files import java.util.{Locale, TimeZone, UUID} import java.util.regex.Pattern +import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.language.implicitConversions +import scala.util.control.NonFatal import org.apache.hadoop.fs.Path -import org.scalatest.{Assertions, BeforeAndAfterAll, Suite} +import org.scalactic.source.Position +import org.scalatest.{Assertions, BeforeAndAfterAll, Suite, Tag} import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite @@ -41,11 +44,15 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.execution.{FilterExec, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.storage.StorageLevel +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.util.ArrayImplicits._ +import org.apache.spark.util.UninterruptibleThread import org.apache.spark.util.Utils @@ -531,7 +538,153 @@ trait QueryTestBase } -abstract class QueryTest extends SparkFunSuite with QueryTestBase { +trait QueryTest extends SparkFunSuite with QueryTestBase with PlanTest { + // Whether to materialize all test data before the first test is run + private var loadTestDataBeforeTests = false + + protected override def beforeAll(): Unit = { + super.beforeAll() + if (loadTestDataBeforeTests) { + loadTestData() + } + } + + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + */ + protected override def withTempDir(f: File => Unit): Unit = { + super.withTempDir { dir => + f(dir) + waitForTasksToFinish() + } + } + + /** + * A helper function for turning off/on codegen. + */ + protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = { + Seq("false", "true").foreach { codegenEnabled => + val isTurnOn = if (codegenEnabled == "true") "on" else "off" + test(s"$testName (whole-stage-codegen ${isTurnOn})") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled) { + f(codegenEnabled) + } + } + } + } + + /** + * Materialize the test data immediately after the `SQLContext` is set up. + * This is necessary if the data is accessed by name but not through direct reference. + */ + protected def setupTestData(): Unit = { + loadTestDataBeforeTests = true + } + + /** + * Disable stdout and stderr when running the test. To not output the logs to the console, + * ConsoleAppender's `follow` should be set to `true` so that it will honor reassignments of + * System.out or System.err. Otherwise, ConsoleAppender will still output to the console even if + * we change System.out and System.err. + */ + protected def testQuietly(name: String)(f: => Unit): Unit = { + test(name) { + quietly { + f + } + } + } + + override protected def test(testName: String, testTags: Tag*)(testFun: => Any) + (implicit pos: Position): Unit = { + if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) { + super.test(testName, testTags: _*) { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + testFun + } + } + } else { + super.test(testName, testTags: _*)(testFun) + } + } + + /** + * Run a test on a separate `UninterruptibleThread`. + */ + protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false) + (body: => Unit): Unit = { + val timeoutMillis = 10000 + @transient var ex: Throwable = null + + def runOnThread(): Unit = { + val thread = new UninterruptibleThread(s"Testing thread for test $name") { + override def run(): Unit = { + try { + body + } catch { + case NonFatal(e) => + ex = e + } + } + } + thread.setDaemon(true) + thread.start() + thread.join(timeoutMillis) + if (thread.isAlive) { + thread.interrupt() + fail( + s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" + + s" $timeoutMillis ms") + } else if (ex != null) { + throw ex + } + } + + if (quietly) { + testQuietly(name) { runOnThread() } + } else { + test(name) { runOnThread() } + } + } + + /** + * Copy file in jar's resource to a temp file, then pass it to `f`. + * This function is used to make `f` can use the path of temp file(e.g. file:/), instead of + * path of jar's resource which starts with 'jar:file:/' + */ + protected def withResourceTempPath(resourcePath: String)(f: File => Unit): Unit = { + val inputStream = + Thread.currentThread().getContextClassLoader.getResourceAsStream(resourcePath) + withTempDir { dir => + val tmpFile = new File(dir, "tmp") + Files.copy(inputStream, tmpFile.toPath) + f(tmpFile) + } + } + + /** + * Waits for all tasks on all executors to be finished. + */ + protected def waitForTasksToFinish(): Unit = { + eventually(timeout(10.seconds)) { + assert(spark.sparkContext.statusTracker + .getExecutorInfos.map(_.numRunningTasks()).sum == 0) + } + } + + /** + * Creates the specified number of temporary directories, which is then passed to `f` and will be + * deleted after `f` returns. + */ + protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit): Unit = { + val files = Array.fill[File](numPaths)(Utils.createTempDir().getCanonicalFile) + try f(files.toImmutableArraySeq) finally { + // wait for all tasks to finish before deleting files + waitForTasksToFinish() + files.foreach(Utils.deleteRecursively) + } + } protected def getCurrentClassCallSitePattern: String = { val cs = Thread.currentThread().getStackTrace()(2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 76c74d1800c44..c920bfb80f226 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -17,191 +17,18 @@ package org.apache.spark.sql.test -import java.io.File -import java.nio.file.Files - -import scala.concurrent.duration._ -import scala.util.control.NonFatal - -import org.scalactic.source.Position -import org.scalatest.{Suite, Tag} - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{QueryTestBase, Row} -import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.{QueryTest, QueryTestBase, Row} import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.ArrayImplicits._ -import org.apache.spark.util.UninterruptibleThread -import org.apache.spark.util.Utils /** * Helper trait that should be extended by all SQL test suites within the Spark - * code base. - * - * This allows subclasses to plugin a custom `SparkSession`. It comes with test data - * prepared in advance as well as all implicit conversions used extensively by dataframes. - * To use implicit methods, import `testImplicits._` instead of through the `SparkSession`. - * - * Subclasses should *not* create `SparkSession`s in the test suite constructor, which is - * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM. + * code base. Now a thin alias for [[QueryTest]]. */ -private[sql] trait SQLTestUtils extends SparkFunSuite with SQLTestUtilsBase with PlanTest { - // Whether to materialize all test data before the first test is run - private var loadTestDataBeforeTests = false - - protected override def beforeAll(): Unit = { - super.beforeAll() - if (loadTestDataBeforeTests) { - loadTestData() - } - } - - /** - * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` - * returns. - */ - protected override def withTempDir(f: File => Unit): Unit = { - super.withTempDir { dir => - f(dir) - waitForTasksToFinish() - } - } - - /** - * A helper function for turning off/on codegen. - */ - protected def testWithWholeStageCodegenOnAndOff(testName: String)(f: String => Unit): Unit = { - Seq("false", "true").foreach { codegenEnabled => - val isTurnOn = if (codegenEnabled == "true") "on" else "off" - test(s"$testName (whole-stage-codegen ${isTurnOn})") { - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled) { - f(codegenEnabled) - } - } - } - } - - /** - * Materialize the test data immediately after the `SQLContext` is set up. - * This is necessary if the data is accessed by name but not through direct reference. - */ - protected def setupTestData(): Unit = { - loadTestDataBeforeTests = true - } - - /** - * Disable stdout and stderr when running the test. To not output the logs to the console, - * ConsoleAppender's `follow` should be set to `true` so that it will honor reassignments of - * System.out or System.err. Otherwise, ConsoleAppender will still output to the console even if - * we change System.out and System.err. - */ - protected def testQuietly(name: String)(f: => Unit): Unit = { - test(name) { - quietly { - f - } - } - } - - override protected def test(testName: String, testTags: Tag*)(testFun: => Any) - (implicit pos: Position): Unit = { - if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) { - super.test(testName, testTags: _*) { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - testFun - } - } - } else { - super.test(testName, testTags: _*)(testFun) - } - } - - /** - * Run a test on a separate `UninterruptibleThread`. - */ - protected def testWithUninterruptibleThread(name: String, quietly: Boolean = false) - (body: => Unit): Unit = { - val timeoutMillis = 10000 - @transient var ex: Throwable = null - - def runOnThread(): Unit = { - val thread = new UninterruptibleThread(s"Testing thread for test $name") { - override def run(): Unit = { - try { - body - } catch { - case NonFatal(e) => - ex = e - } - } - } - thread.setDaemon(true) - thread.start() - thread.join(timeoutMillis) - if (thread.isAlive) { - thread.interrupt() - // If this interrupt does not work, then this thread is most likely running something that - // is not interruptible. There is not much point to wait for the thread to terminate, and - // we rather let the JVM terminate the thread on exit. - fail( - s"Test '$name' running on o.a.s.util.UninterruptibleThread timed out after" + - s" $timeoutMillis ms") - } else if (ex != null) { - throw ex - } - } - - if (quietly) { - testQuietly(name) { runOnThread() } - } else { - test(name) { runOnThread() } - } - } - - /** - * Copy file in jar's resource to a temp file, then pass it to `f`. - * This function is used to make `f` can use the path of temp file(e.g. file:/), instead of - * path of jar's resource which starts with 'jar:file:/' - */ - protected def withResourceTempPath(resourcePath: String)(f: File => Unit): Unit = { - val inputStream = - Thread.currentThread().getContextClassLoader.getResourceAsStream(resourcePath) - withTempDir { dir => - val tmpFile = new File(dir, "tmp") - Files.copy(inputStream, tmpFile.toPath) - f(tmpFile) - } - } - - /** - * Waits for all tasks on all executors to be finished. - */ - protected def waitForTasksToFinish(): Unit = { - eventually(timeout(10.seconds)) { - assert(spark.sparkContext.statusTracker - .getExecutorInfos.map(_.numRunningTasks()).sum == 0) - } - } - - /** - * Creates the specified number of temporary directories, which is then passed to `f` and will be - * deleted after `f` returns. - */ - protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit): Unit = { - val files = Array.fill[File](numPaths)(Utils.createTempDir().getCanonicalFile) - try f(files.toImmutableArraySeq) finally { - // wait for all tasks to finish before deleting files - waitForTasksToFinish() - files.foreach(Utils.deleteRecursively) - } - } -} +private[sql] trait SQLTestUtils extends QueryTest /** * Helper trait that can be extended by all external SQL test suites. - * Now a thin alias for [[org.apache.spark.sql.QueryTestBase]]. + * Now a thin alias for [[QueryTestBase]]. */ private[sql] trait SQLTestUtilsBase extends QueryTestBase From e726a7cd96e7a3fb88a87776f1bf9d022592dafb Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 04:04:27 +0000 Subject: [PATCH 12/19] [SPARK-XXXXX][SQL][TESTS] Fix import order and restore self-type on SQLTestUtilsBase Co-authored-by: Isaac --- sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala | 4 ++-- .../test/scala/org/apache/spark/sql/test/SQLTestUtils.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 279eb64525c4c..2107722305f55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -41,16 +41,16 @@ import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.execution.{FilterExec, QueryExecution, SQLExecution} -import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution +import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestData import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.storage.StorageLevel -import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.UninterruptibleThread import org.apache.spark.util.Utils diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index c920bfb80f226..cb08358751292 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -30,7 +30,7 @@ private[sql] trait SQLTestUtils extends QueryTest * Helper trait that can be extended by all external SQL test suites. * Now a thin alias for [[QueryTestBase]]. */ -private[sql] trait SQLTestUtilsBase extends QueryTestBase +private[sql] trait SQLTestUtilsBase extends QueryTestBase { self: Suite => } private[sql] object SQLTestUtils { From 84f2ae68aadc54985afbc8325542de27cdaf9d18 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 04:30:32 +0000 Subject: [PATCH 13/19] [SPARK-XXXXX][SQL][TESTS] Add missing Suite import in SQLTestUtils Co-authored-by: Isaac --- .../src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index cb08358751292..cbe7f4a2aae99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.test +import org.scalatest.Suite + import org.apache.spark.sql.{QueryTest, QueryTestBase, Row} import org.apache.spark.sql.catalyst.util._ From 29142a1b7a54af680ddc3814c28e4194eea60f7d Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 04:50:08 +0000 Subject: [PATCH 14/19] [SPARK-XXXXX][SQL][TESTS] Fix import order for PlanTest in QueryTest Co-authored-by: Isaac --- sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 2107722305f55..3f240a50c719c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -39,9 +39,9 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.classic.ClassicConversions._ import org.apache.spark.sql.execution.{FilterExec, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution From 5f90ce348d4d8f7fbb9a314f5cf6857600ee8a92 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 06:33:16 +0000 Subject: [PATCH 15/19] [SPARK-XXXXX][SQL][TESTS] Fix stack trace resolution for trait-based QueryTest When QueryTest was an abstract class, getStackTrace()(2) reliably returned the caller's frame. As a trait with default methods, the stack layout may differ. Use dynamic stack walking to find the first frame outside QueryTest.scala instead of a hardcoded index. Co-authored-by: Isaac --- .../test/scala/org/apache/spark/sql/QueryTest.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 3f240a50c719c..f0e0eb3c838c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -686,13 +686,22 @@ trait QueryTest extends SparkFunSuite with QueryTestBase with PlanTest { } } + // Find the first caller frame outside of QueryTest and Thread. + // This is needed because QueryTest is a trait, and trait default methods may + // change the stack trace layout compared to abstract class methods. + private def callerStackFrame: StackTraceElement = { + Thread.currentThread().getStackTrace() + .find(e => e.getFileName != "QueryTest.scala" && e.getClassName != "java.lang.Thread") + .get + } + protected def getCurrentClassCallSitePattern: String = { - val cs = Thread.currentThread().getStackTrace()(2) + val cs = callerStackFrame s"${cs.getClassName}\\..*\\(${cs.getFileName}:\\d+\\)" } protected def getNextLineCallSitePattern(lines: Int = 1): String = { - val cs = Thread.currentThread().getStackTrace()(2) + val cs = callerStackFrame Pattern.quote( s"${cs.getClassName}.${cs.getMethodName}(${cs.getFileName}:${cs.getLineNumber + lines})") } From ef759f0f109bc1589578cf33df139beab013b2eb Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 06:35:20 +0000 Subject: [PATCH 16/19] [SPARK-XXXXX][SQL][TESTS] Use method name lookup instead of hardcoded filename Find the current method's frame by name, then take the next frame as the caller. No hardcoded filenames or indices. Co-authored-by: Isaac --- .../scala/org/apache/spark/sql/QueryTest.scala | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index f0e0eb3c838c1..6161ba003ab73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -686,22 +686,17 @@ trait QueryTest extends SparkFunSuite with QueryTestBase with PlanTest { } } - // Find the first caller frame outside of QueryTest and Thread. - // This is needed because QueryTest is a trait, and trait default methods may - // change the stack trace layout compared to abstract class methods. - private def callerStackFrame: StackTraceElement = { - Thread.currentThread().getStackTrace() - .find(e => e.getFileName != "QueryTest.scala" && e.getClassName != "java.lang.Thread") - .get - } - protected def getCurrentClassCallSitePattern: String = { - val cs = callerStackFrame + val stack = Thread.currentThread().getStackTrace() + val idx = stack.indexWhere(_.getMethodName == "getCurrentClassCallSitePattern") + val cs = stack(idx + 1) s"${cs.getClassName}\\..*\\(${cs.getFileName}:\\d+\\)" } protected def getNextLineCallSitePattern(lines: Int = 1): String = { - val cs = callerStackFrame + val stack = Thread.currentThread().getStackTrace() + val idx = stack.indexWhere(_.getMethodName == "getNextLineCallSitePattern") + val cs = stack(idx + 1) Pattern.quote( s"${cs.getClassName}.${cs.getMethodName}(${cs.getFileName}:${cs.getLineNumber + lines})") } From df22c241ae8ceaa06ecd63451bf99ccbd4ff7330 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 17 Apr 2026 07:35:17 +0000 Subject: [PATCH 17/19] [SPARK-XXXXX][SQL][TESTS] Fix stack trace lookup for trait default methods Use lastIndexWhere instead of indexWhere to find the method's frame, skipping any mixin forwarder frames that trait compilation may add. Co-authored-by: Isaac --- sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 6161ba003ab73..94c926e643678 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -688,14 +688,14 @@ trait QueryTest extends SparkFunSuite with QueryTestBase with PlanTest { protected def getCurrentClassCallSitePattern: String = { val stack = Thread.currentThread().getStackTrace() - val idx = stack.indexWhere(_.getMethodName == "getCurrentClassCallSitePattern") + val idx = stack.lastIndexWhere(_.getMethodName == "getCurrentClassCallSitePattern") val cs = stack(idx + 1) s"${cs.getClassName}\\..*\\(${cs.getFileName}:\\d+\\)" } protected def getNextLineCallSitePattern(lines: Int = 1): String = { val stack = Thread.currentThread().getStackTrace() - val idx = stack.indexWhere(_.getMethodName == "getNextLineCallSitePattern") + val idx = stack.lastIndexWhere(_.getMethodName == "getNextLineCallSitePattern") val cs = stack(idx + 1) Pattern.quote( s"${cs.getClassName}.${cs.getMethodName}(${cs.getFileName}:${cs.getLineNumber + lines})") From ecdc5e3d822260d535c86f4bb53f6a97d2499f5a Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Mon, 20 Apr 2026 09:41:30 +0000 Subject: [PATCH 18/19] [SPARK-55910][SQL][TESTS] Address review comments - Improve scaladoc for SQLTestUtils/SQLTestUtilsBase to explain backward compat intent - Lift the SparkSession usage guidance onto QueryTest trait - Remove redundant self-type on SQLTestUtilsBase - Remove redundant PlanTest import (already covered by wildcard) Co-authored-by: Isaac --- .../test/scala/org/apache/spark/sql/QueryTest.scala | 11 ++++++++++- .../org/apache/spark/sql/test/SQLTestUtils.scala | 12 +++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 94c926e643678..eaa42c9719372 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.classic.ClassicConversions._ @@ -538,6 +537,16 @@ trait QueryTestBase } +/** + * Helper trait that should be extended by all SQL test suites within the Spark code base. + * + * This allows subclasses to plugin a custom `SparkSession`. It comes with test data + * prepared in advance as well as all implicit conversions used extensively by dataframes. + * To use implicit methods, import `testImplicits._` instead of through the `SparkSession`. + * + * Subclasses should *not* create `SparkSession`s in the test suite constructor, which is + * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM. + */ trait QueryTest extends SparkFunSuite with QueryTestBase with PlanTest { // Whether to materialize all test data before the first test is run private var loadTestDataBeforeTests = false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index cbe7f4a2aae99..6178c70b50f4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -17,22 +17,20 @@ package org.apache.spark.sql.test -import org.scalatest.Suite - import org.apache.spark.sql.{QueryTest, QueryTestBase, Row} import org.apache.spark.sql.catalyst.util._ /** - * Helper trait that should be extended by all SQL test suites within the Spark - * code base. Now a thin alias for [[QueryTest]]. + * Kept as an empty alias of [[QueryTest]] for backward compatibility with existing subclasses. + * New test suites should extend [[QueryTest]] directly. */ private[sql] trait SQLTestUtils extends QueryTest /** - * Helper trait that can be extended by all external SQL test suites. - * Now a thin alias for [[QueryTestBase]]. + * Kept as an empty alias of [[QueryTestBase]] for backward compatibility with existing subclasses. + * New test suites should extend [[QueryTestBase]] directly. */ -private[sql] trait SQLTestUtilsBase extends QueryTestBase { self: Suite => } +private[sql] trait SQLTestUtilsBase extends QueryTestBase private[sql] object SQLTestUtils { From 5d3654ffdec4dafc70ce3f2ef83e95c2207d067b Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Mon, 20 Apr 2026 10:37:50 +0000 Subject: [PATCH 19/19] [SPARK-55910][SQL][TESTS] Restore self-type on SQLTestUtilsBase Co-authored-by: Isaac --- .../test/scala/org/apache/spark/sql/test/SQLTestUtils.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 6178c70b50f4d..93d665b079e5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.test +import org.scalatest.Suite + import org.apache.spark.sql.{QueryTest, QueryTestBase, Row} import org.apache.spark.sql.catalyst.util._ @@ -30,7 +32,7 @@ private[sql] trait SQLTestUtils extends QueryTest * Kept as an empty alias of [[QueryTestBase]] for backward compatibility with existing subclasses. * New test suites should extend [[QueryTestBase]] directly. */ -private[sql] trait SQLTestUtilsBase extends QueryTestBase +private[sql] trait SQLTestUtilsBase extends QueryTestBase { self: Suite => } private[sql] object SQLTestUtils {