diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala index 899fd85a8bb80..5ad9605c7d78b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala @@ -400,11 +400,16 @@ trait SQLQueryTestHelper extends SQLConfHelper with Logging { protected def splitCommentsAndCodes(input: String): (Array[String], Array[String]) = input.split("\n").partition { line => val newLine = line.trim - newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER") + newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER") && + newLine != "--DEBUG" } - protected def getQueries(code: Array[String], comments: Array[String], - allTestCases: Seq[TestCase]): Seq[String] = { + /** + * Parses queries from code lines and returns each query paired with a Boolean indicating + * whether it was preceded by a --DEBUG marker. + */ + protected def getQueriesWithDebugFlag(code: Array[String], comments: Array[String], + allTestCases: Seq[TestCase]): Seq[(String, Boolean)] = { def splitWithSemicolon(seq: Seq[String]) = { seq.mkString("\n").split("(?<=[^\\\\]);") } @@ -450,10 +455,18 @@ trait SQLQueryTestHelper extends SQLConfHelper with Logging { splitWithSemicolon(allCode.toImmutableArraySeq).toSeq } - // List of SQL queries to run - tempQueries.map(_.trim).filter(_ != "") - // Fix misplacement when comment is at the end of the query. - .map(_.split("\n").filterNot(_.startsWith("--")).mkString("\n")).map(_.trim).filter(_ != "") + // Detect --DEBUG markers before stripping comment lines from each query. + tempQueries.map(_.trim).filter(_ != "").map { query => + val lines = query.split("\n") + val isDebug = lines.exists(_.trim == "--DEBUG") + val cleanedQuery = lines.filterNot(_.startsWith("--")).mkString("\n").trim + (cleanedQuery, isDebug) + }.filter(_._1 != "") + } + + protected def getQueries(code: Array[String], comments: Array[String], + allTestCases: Seq[TestCase]): Seq[String] = { + getQueriesWithDebugFlag(code, comments, allTestCases).map(_._1) } protected def getSparkSettings(comments: Array[String]): Array[(String, String)] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index a57c72f5fc155..f4be4c38ce932 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import java.io.File +import java.io.{File, PrintWriter, StringWriter} import java.net.URI import java.nio.file.Files import java.util.Locale @@ -84,6 +84,16 @@ import org.apache.spark.util.Utils * times, each time picks one config set from each dimension, until all the combinations are * tried. For example, if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file * will be run 6 times (cartesian product). + * 6. A line with --DEBUG (on its own line before a query) marks that query for debug mode. + * When any --DEBUG marker is present, the test enters debug mode: + * - Commands (CREATE TABLE, INSERT, DROP, SET, etc.) are always executed automatically. + * - Only --DEBUG-marked non-command queries are executed; all others are skipped. + * - For failed queries, the full error stacktrace is printed to the console. + * - Query results are still compared against the golden file. + * - The test always fails at the end with a reminder to remove --DEBUG markers. + * To inspect the DataFrame interactively, set a breakpoint in `runDebugQueries` at the + * line where `localSparkSession.sql(sql)` is called, then evaluate the DataFrame in the + * debugger (e.g., `df.queryExecution.analyzed`, `df.explain(true)`). * * For example: * {{{ @@ -92,6 +102,17 @@ import org.apache.spark.util.Utils * select current_date; * }}} * + * To debug a specific query, add --DEBUG before it: + * {{{ + * CREATE TABLE t (id INT, val INT) USING parquet; + * INSERT INTO t VALUES (1, 10), (2, 20); + * -- this query is skipped in debug mode + * SELECT count(*) FROM t; + * -- this is the query I'm debugging + * --DEBUG + * SELECT sum(val) OVER (ORDER BY id) FROM t; + * }}} + * * The format for golden result files look roughly like: * {{{ * -- some header information @@ -232,15 +253,18 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper protected def runSqlTestCase(testCase: TestCase, listTestCases: Seq[TestCase]): Unit = { val input = Files.readString(new File(testCase.inputFile).toPath) val (comments, code) = splitCommentsAndCodes(input) - val queries = getQueries(code, comments, listTestCases) + val queriesWithDebug = getQueriesWithDebugFlag(code, comments, listTestCases) val settings = getSparkSettings(comments) + val debugMode = queriesWithDebug.exists(_._2) && !testCase.isInstanceOf[AnalyzerTest] - if (regenerateGoldenFiles) { - runQueries(queries, testCase, settings.toImmutableArraySeq) + if (debugMode && !regenerateGoldenFiles) { + runDebugQueries(queriesWithDebug, testCase, settings.toImmutableArraySeq) + } else if (regenerateGoldenFiles) { + runQueries(queriesWithDebug.map(_._1), testCase, settings.toImmutableArraySeq) } else { val configSets = getSparkConfigDimensions(comments) runQueriesWithSparkConfigDimensions( - queries, testCase, settings, configSets) + queriesWithDebug.map(_._1), testCase, settings, configSets) } } @@ -287,12 +311,13 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper } } - protected def runQueries( - queries: Seq[String], + /** + * Creates and configures a local SparkSession for running test queries. + * Handles UDF/UDTF registration, SQL config, and TPCDS table setup. + */ + private def setupLocalSession( testCase: TestCase, - sparkConfigSet: Seq[(String, String)]): Unit = { - // Create a local SparkSession to have stronger isolation between different test cases. - // This does not isolate catalog changes. + sparkConfigSet: Seq[(String, String)]): SparkSession = { val localSparkSession = spark.newSession() testCase match { @@ -337,6 +362,127 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper } } + localSparkSession + } + + /** + * Runs queries in debug mode. Only commands and --DEBUG-marked queries are executed. + * Failed debug queries print the full error stacktrace. Results are compared against + * the golden file. The test always fails at the end to prevent accidental commits. + * + * To inspect the DataFrame interactively, set a breakpoint at the line where + * `localSparkSession.sql(sql)` is called inside this method, then evaluate the DataFrame + * in the debugger (e.g., `df.queryExecution.analyzed`, `df.explain(true)`). + */ + protected def runDebugQueries( + queriesWithDebug: Seq[(String, Boolean)], + testCase: TestCase, + sparkConfigSet: Seq[(String, String)]): Unit = { + val localSparkSession = setupLocalSession(testCase, sparkConfigSet) + val lowercaseTestCase = testCase.name.toLowerCase(Locale.ROOT) + val segmentsPerQuery = 3 + + val goldenFileExists = new File(testCase.resultFile).exists() + val segments = if (goldenFileExists) { + Files.readString(new File(testCase.resultFile).toPath).split("-- !query.*\n") + } else { + Array.empty[String] + } + + queriesWithDebug.zipWithIndex.foreach { case ((sql, isDebug), i) => + val isCommand = try { + localSparkSession.sessionState.sqlParser.parsePlan(sql).isInstanceOf[Command] + } catch { + case _: ParseException => false + } + + if (isCommand) { + localSparkSession.sql(sql).collect() + } else if (isDebug) { + // Capture exception stacktrace if the query fails. + var exceptionTrace: Option[String] = None + val (schema, result) = handleExceptions { + try { + // Set a breakpoint here and evaluate `localSparkSession.sql(sql)` to get the + // DataFrame for ad-hoc debugging (e.g., df.queryExecution.analyzed). + getNormalizedQueryExecutionResult(localSparkSession, sql) + } catch { + case e: Throwable => + val sw = new StringWriter() + e.printStackTrace(new PrintWriter(sw)) + exceptionTrace = Some(sw.toString) + throw e + } + } + val output = ExecutionOutput( + sql = sql, + schema = Some(schema), + output = normalizeTestResults(result.mkString("\n"))) + + // Build consolidated debug output. + val debugOutput = new StringBuilder() + debugOutput.append(s"\n=== DEBUG: Query #$i ===") + debugOutput.append(s"\nSQL: $sql") + exceptionTrace.foreach { trace => + debugOutput.append(s"\n$trace") + } + + // Compare against golden file and append result. + var mismatch: Option[String] = None + if (goldenFileExists && + segments.length > segmentsPerQuery * i + segmentsPerQuery) { + val expected = ExecutionOutput( + segments(segmentsPerQuery * i + 1).trim, + Some(segments(segmentsPerQuery * i + 2).trim), + normalizeTestResults(segments(segmentsPerQuery * i + 3))) + + if (expected.sql != output.sql) { + debugOutput.append("\nGolden answer: no matching entry " + + "(SQL text differs, likely a new or moved query)") + } else if (expected.schema != output.schema) { + mismatch = Some(s"Schema did not match for query #$i\n${expected.sql}") + debugOutput.append(s"\nGolden answer: schema mismatch") + debugOutput.append(s"\n Expected: ${expected.schema.getOrElse("")}") + debugOutput.append(s"\n Actual: ${output.schema.getOrElse("")}") + } else if (expected.output != output.output) { + mismatch = Some(s"Result did not match for query #$i\n${expected.sql}") + debugOutput.append(s"\nGolden answer: result mismatch") + debugOutput.append(s"\n Expected:\n${expected.output}") + debugOutput.append(s"\n Actual:\n${output.output}") + } else { + debugOutput.append("\nGolden answer: matches") + } + } else { + debugOutput.append("\nGolden answer: no golden file to compare") + } + + // scalastyle:off println + println(debugOutput.toString) + // scalastyle:on println + + mismatch.foreach(fail(_)) + } + } + + if (requireTPCDSCases.contains(lowercaseTestCase)) { + tpcDSTableNamesToSchemas.foreach { case (name: String, _: String) => + localSparkSession.sql(s"DROP TABLE IF EXISTS $name") + } + } + + fail("Test is in debug mode. " + + "Remove --DEBUG markers from the input file before committing.") + } + + protected def runQueries( + queries: Seq[String], + testCase: TestCase, + sparkConfigSet: Seq[(String, String)]): Unit = { + // Create a local SparkSession to have stronger isolation between different test cases. + // This does not isolate catalog changes. + val localSparkSession = setupLocalSession(testCase, sparkConfigSet) + val lowercaseTestCase = testCase.name.toLowerCase(Locale.ROOT) + // Run the SQL queries preparing them for comparison. val outputs: Seq[QueryTestOutput] = queries.map { sql => testCase match {