From a07a5949ed9206ab6f5cc001db0920a7b33602ad Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 28 Feb 2026 13:32:10 +0800 Subject: [PATCH] [SPARK-XXXXX][SQL][TESTS] Add --DEBUG directive for golden file test framework Add a `--DEBUG` marker that can be placed before any query in a SQL test input file to enable selective debugging. When any `--DEBUG` marker is present, the test enters debug mode: - Commands (CREATE TABLE, INSERT, SET, etc.) are always executed for setup. - Only `--DEBUG`-marked non-command queries are executed; others are skipped. - Failed queries print the full error stacktrace to the console. - Results are still compared against the golden file. - The test always fails at the end as a safety net to prevent accidental commits with `--DEBUG` markers. This significantly improves the debugging experience for golden file tests by allowing developers to focus on specific failing queries without running the entire test file, while getting full error details in the console output. --- .../apache/spark/sql/SQLQueryTestHelper.scala | 27 ++- .../apache/spark/sql/SQLQueryTestSuite.scala | 166 ++++++++++++++++-- 2 files changed, 176 insertions(+), 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala index 899fd85a8bb80..5ad9605c7d78b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala @@ -400,11 +400,16 @@ trait SQLQueryTestHelper extends SQLConfHelper with Logging { protected def splitCommentsAndCodes(input: String): (Array[String], Array[String]) = input.split("\n").partition { line => val newLine = line.trim - newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER") + newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER") && + newLine != "--DEBUG" } - protected def getQueries(code: Array[String], comments: Array[String], - allTestCases: Seq[TestCase]): Seq[String] = { + /** + * Parses queries from code lines and returns each query paired with a Boolean indicating + * whether it was preceded by a --DEBUG marker. + */ + protected def getQueriesWithDebugFlag(code: Array[String], comments: Array[String], + allTestCases: Seq[TestCase]): Seq[(String, Boolean)] = { def splitWithSemicolon(seq: Seq[String]) = { seq.mkString("\n").split("(?<=[^\\\\]);") } @@ -450,10 +455,18 @@ trait SQLQueryTestHelper extends SQLConfHelper with Logging { splitWithSemicolon(allCode.toImmutableArraySeq).toSeq } - // List of SQL queries to run - tempQueries.map(_.trim).filter(_ != "") - // Fix misplacement when comment is at the end of the query. - .map(_.split("\n").filterNot(_.startsWith("--")).mkString("\n")).map(_.trim).filter(_ != "") + // Detect --DEBUG markers before stripping comment lines from each query. + tempQueries.map(_.trim).filter(_ != "").map { query => + val lines = query.split("\n") + val isDebug = lines.exists(_.trim == "--DEBUG") + val cleanedQuery = lines.filterNot(_.startsWith("--")).mkString("\n").trim + (cleanedQuery, isDebug) + }.filter(_._1 != "") + } + + protected def getQueries(code: Array[String], comments: Array[String], + allTestCases: Seq[TestCase]): Seq[String] = { + getQueriesWithDebugFlag(code, comments, allTestCases).map(_._1) } protected def getSparkSettings(comments: Array[String]): Array[(String, String)] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index a57c72f5fc155..f4be4c38ce932 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.io.File +import java.io.{File, PrintWriter, StringWriter} import java.net.URI import java.nio.file.Files import java.util.Locale @@ -84,6 +84,16 @@ import org.apache.spark.util.Utils * times, each time picks one config set from each dimension, until all the combinations are * tried. For example, if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file * will be run 6 times (cartesian product). + * 6. A line with --DEBUG (on its own line before a query) marks that query for debug mode. + * When any --DEBUG marker is present, the test enters debug mode: + * - Commands (CREATE TABLE, INSERT, DROP, SET, etc.) are always executed automatically. + * - Only --DEBUG-marked non-command queries are executed; all others are skipped. + * - For failed queries, the full error stacktrace is printed to the console. + * - Query results are still compared against the golden file. + * - The test always fails at the end with a reminder to remove --DEBUG markers. + * To inspect the DataFrame interactively, set a breakpoint in `runDebugQueries` at the + * line where `localSparkSession.sql(sql)` is called, then evaluate the DataFrame in the + * debugger (e.g., `df.queryExecution.analyzed`, `df.explain(true)`). * * For example: * {{{ @@ -92,6 +102,17 @@ import org.apache.spark.util.Utils * select current_date; * }}} * + * To debug a specific query, add --DEBUG before it: + * {{{ + * CREATE TABLE t (id INT, val INT) USING parquet; + * INSERT INTO t VALUES (1, 10), (2, 20); + * -- this query is skipped in debug mode + * SELECT count(*) FROM t; + * -- this is the query I'm debugging + * --DEBUG + * SELECT sum(val) OVER (ORDER BY id) FROM t; + * }}} + * * The format for golden result files look roughly like: * {{{ * -- some header information @@ -232,15 +253,18 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper protected def runSqlTestCase(testCase: TestCase, listTestCases: Seq[TestCase]): Unit = { val input = Files.readString(new File(testCase.inputFile).toPath) val (comments, code) = splitCommentsAndCodes(input) - val queries = getQueries(code, comments, listTestCases) + val queriesWithDebug = getQueriesWithDebugFlag(code, comments, listTestCases) val settings = getSparkSettings(comments) + val debugMode = queriesWithDebug.exists(_._2) && !testCase.isInstanceOf[AnalyzerTest] - if (regenerateGoldenFiles) { - runQueries(queries, testCase, settings.toImmutableArraySeq) + if (debugMode && !regenerateGoldenFiles) { + runDebugQueries(queriesWithDebug, testCase, settings.toImmutableArraySeq) + } else if (regenerateGoldenFiles) { + runQueries(queriesWithDebug.map(_._1), testCase, settings.toImmutableArraySeq) } else { val configSets = getSparkConfigDimensions(comments) runQueriesWithSparkConfigDimensions( - queries, testCase, settings, configSets) + queriesWithDebug.map(_._1), testCase, settings, configSets) } } @@ -287,12 +311,13 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper } } - protected def runQueries( - queries: Seq[String], + /** + * Creates and configures a local SparkSession for running test queries. + * Handles UDF/UDTF registration, SQL config, and TPCDS table setup. + */ + private def setupLocalSession( testCase: TestCase, - sparkConfigSet: Seq[(String, String)]): Unit = { - // Create a local SparkSession to have stronger isolation between different test cases. - // This does not isolate catalog changes. + sparkConfigSet: Seq[(String, String)]): SparkSession = { val localSparkSession = spark.newSession() testCase match { @@ -337,6 +362,127 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper } } + localSparkSession + } + + /** + * Runs queries in debug mode. Only commands and --DEBUG-marked queries are executed. + * Failed debug queries print the full error stacktrace. Results are compared against + * the golden file. The test always fails at the end to prevent accidental commits. + * + * To inspect the DataFrame interactively, set a breakpoint at the line where + * `localSparkSession.sql(sql)` is called inside this method, then evaluate the DataFrame + * in the debugger (e.g., `df.queryExecution.analyzed`, `df.explain(true)`). + */ + protected def runDebugQueries( + queriesWithDebug: Seq[(String, Boolean)], + testCase: TestCase, + sparkConfigSet: Seq[(String, String)]): Unit = { + val localSparkSession = setupLocalSession(testCase, sparkConfigSet) + val lowercaseTestCase = testCase.name.toLowerCase(Locale.ROOT) + val segmentsPerQuery = 3 + + val goldenFileExists = new File(testCase.resultFile).exists() + val segments = if (goldenFileExists) { + Files.readString(new File(testCase.resultFile).toPath).split("-- !query.*\n") + } else { + Array.empty[String] + } + + queriesWithDebug.zipWithIndex.foreach { case ((sql, isDebug), i) => + val isCommand = try { + localSparkSession.sessionState.sqlParser.parsePlan(sql).isInstanceOf[Command] + } catch { + case _: ParseException => false + } + + if (isCommand) { + localSparkSession.sql(sql).collect() + } else if (isDebug) { + // Capture exception stacktrace if the query fails. + var exceptionTrace: Option[String] = None + val (schema, result) = handleExceptions { + try { + // Set a breakpoint here and evaluate `localSparkSession.sql(sql)` to get the + // DataFrame for ad-hoc debugging (e.g., df.queryExecution.analyzed). + getNormalizedQueryExecutionResult(localSparkSession, sql) + } catch { + case e: Throwable => + val sw = new StringWriter() + e.printStackTrace(new PrintWriter(sw)) + exceptionTrace = Some(sw.toString) + throw e + } + } + val output = ExecutionOutput( + sql = sql, + schema = Some(schema), + output = normalizeTestResults(result.mkString("\n"))) + + // Build consolidated debug output. + val debugOutput = new StringBuilder() + debugOutput.append(s"\n=== DEBUG: Query #$i ===") + debugOutput.append(s"\nSQL: $sql") + exceptionTrace.foreach { trace => + debugOutput.append(s"\n$trace") + } + + // Compare against golden file and append result. + var mismatch: Option[String] = None + if (goldenFileExists && + segments.length > segmentsPerQuery * i + segmentsPerQuery) { + val expected = ExecutionOutput( + segments(segmentsPerQuery * i + 1).trim, + Some(segments(segmentsPerQuery * i + 2).trim), + normalizeTestResults(segments(segmentsPerQuery * i + 3))) + + if (expected.sql != output.sql) { + debugOutput.append("\nGolden answer: no matching entry " + + "(SQL text differs, likely a new or moved query)") + } else if (expected.schema != output.schema) { + mismatch = Some(s"Schema did not match for query #$i\n${expected.sql}") + debugOutput.append(s"\nGolden answer: schema mismatch") + debugOutput.append(s"\n Expected: ${expected.schema.getOrElse("")}") + debugOutput.append(s"\n Actual: ${output.schema.getOrElse("")}") + } else if (expected.output != output.output) { + mismatch = Some(s"Result did not match for query #$i\n${expected.sql}") + debugOutput.append(s"\nGolden answer: result mismatch") + debugOutput.append(s"\n Expected:\n${expected.output}") + debugOutput.append(s"\n Actual:\n${output.output}") + } else { + debugOutput.append("\nGolden answer: matches") + } + } else { + debugOutput.append("\nGolden answer: no golden file to compare") + } + + // scalastyle:off println + println(debugOutput.toString) + // scalastyle:on println + + mismatch.foreach(fail(_)) + } + } + + if (requireTPCDSCases.contains(lowercaseTestCase)) { + tpcDSTableNamesToSchemas.foreach { case (name: String, _: String) => + localSparkSession.sql(s"DROP TABLE IF EXISTS $name") + } + } + + fail("Test is in debug mode. " + + "Remove --DEBUG markers from the input file before committing.") + } + + protected def runQueries( + queries: Seq[String], + testCase: TestCase, + sparkConfigSet: Seq[(String, String)]): Unit = { + // Create a local SparkSession to have stronger isolation between different test cases. + // This does not isolate catalog changes. + val localSparkSession = setupLocalSession(testCase, sparkConfigSet) + val lowercaseTestCase = testCase.name.toLowerCase(Locale.ROOT) + // Run the SQL queries preparing them for comparison. val outputs: Seq[QueryTestOutput] = queries.map { sql => testCase match {