-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-55756][SQL][TESTS] Add --DEBUG directive for golden file test framework #54554
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.sql | ||
|
|
||
| import java.io.File | ||
| import java.io.{File, PrintWriter, StringWriter} | ||
| import java.net.URI | ||
| import java.nio.file.Files | ||
| import java.util.Locale | ||
|
|
@@ -84,6 +84,16 @@ import org.apache.spark.util.Utils | |
| * times, each time picks one config set from each dimension, until all the combinations are | ||
| * tried. For example, if dimension 1 has 2 lines, dimension 2 has 3 lines, this testing file | ||
| * will be run 6 times (cartesian product). | ||
| * 6. A line with --DEBUG (on its own line before a query) marks that query for debug mode. | ||
| * When any --DEBUG marker is present, the test enters debug mode: | ||
| * - Commands (CREATE TABLE, INSERT, DROP, SET, etc.) are always executed automatically. | ||
| * - Only --DEBUG-marked non-command queries are executed; all others are skipped. | ||
| * - For failed queries, the full error stacktrace is printed to the console. | ||
| * - Query results are still compared against the golden file. | ||
| * - The test always fails at the end with a reminder to remove --DEBUG markers. | ||
| * To inspect the DataFrame interactively, set a breakpoint in `runDebugQueries` at the | ||
| * line where `localSparkSession.sql(sql)` is called, then evaluate the DataFrame in the | ||
| * debugger (e.g., `df.queryExecution.analyzed`, `df.explain(true)`). | ||
| * | ||
| * For example: | ||
| * {{{ | ||
|
|
@@ -92,6 +102,17 @@ import org.apache.spark.util.Utils | |
| * select current_date; | ||
| * }}} | ||
| * | ||
| * To debug a specific query, add --DEBUG before it: | ||
| * {{{ | ||
| * CREATE TABLE t (id INT, val INT) USING parquet; | ||
| * INSERT INTO t VALUES (1, 10), (2, 20); | ||
| * -- this query is skipped in debug mode | ||
| * SELECT count(*) FROM t; | ||
| * -- this is the query I'm debugging | ||
| * --DEBUG | ||
| * SELECT sum(val) OVER (ORDER BY id) FROM t; | ||
| * }}} | ||
| * | ||
| * The format for golden result files look roughly like: | ||
| * {{{ | ||
| * -- some header information | ||
|
|
@@ -232,15 +253,18 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper | |
| protected def runSqlTestCase(testCase: TestCase, listTestCases: Seq[TestCase]): Unit = { | ||
| val input = Files.readString(new File(testCase.inputFile).toPath) | ||
| val (comments, code) = splitCommentsAndCodes(input) | ||
| val queries = getQueries(code, comments, listTestCases) | ||
| val queriesWithDebug = getQueriesWithDebugFlag(code, comments, listTestCases) | ||
| val settings = getSparkSettings(comments) | ||
| val debugMode = queriesWithDebug.exists(_._2) && !testCase.isInstanceOf[AnalyzerTest] | ||
|
|
||
| if (regenerateGoldenFiles) { | ||
| runQueries(queries, testCase, settings.toImmutableArraySeq) | ||
| if (debugMode && !regenerateGoldenFiles) { | ||
| runDebugQueries(queriesWithDebug, testCase, settings.toImmutableArraySeq) | ||
| } else if (regenerateGoldenFiles) { | ||
| runQueries(queriesWithDebug.map(_._1), testCase, settings.toImmutableArraySeq) | ||
| } else { | ||
| val configSets = getSparkConfigDimensions(comments) | ||
| runQueriesWithSparkConfigDimensions( | ||
| queries, testCase, settings, configSets) | ||
| queriesWithDebug.map(_._1), testCase, settings, configSets) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -287,12 +311,13 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper | |
| } | ||
| } | ||
|
|
||
| protected def runQueries( | ||
| queries: Seq[String], | ||
| /** | ||
| * Creates and configures a local SparkSession for running test queries. | ||
| * Handles UDF/UDTF registration, SQL config, and TPCDS table setup. | ||
| */ | ||
| private def setupLocalSession( | ||
| testCase: TestCase, | ||
| sparkConfigSet: Seq[(String, String)]): Unit = { | ||
| // Create a local SparkSession to have stronger isolation between different test cases. | ||
| // This does not isolate catalog changes. | ||
| sparkConfigSet: Seq[(String, String)]): SparkSession = { | ||
| val localSparkSession = spark.newSession() | ||
|
|
||
| testCase match { | ||
|
|
@@ -337,6 +362,127 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper | |
| } | ||
| } | ||
|
|
||
| localSparkSession | ||
| } | ||
|
|
||
| /** | ||
| * Runs queries in debug mode. Only commands and --DEBUG-marked queries are executed. | ||
| * Failed debug queries print the full error stacktrace. Results are compared against | ||
| * the golden file. The test always fails at the end to prevent accidental commits. | ||
| * | ||
| * To inspect the DataFrame interactively, set a breakpoint at the line where | ||
| * `localSparkSession.sql(sql)` is called inside this method, then evaluate the DataFrame | ||
| * in the debugger (e.g., `df.queryExecution.analyzed`, `df.explain(true)`). | ||
| */ | ||
| protected def runDebugQueries( | ||
| queriesWithDebug: Seq[(String, Boolean)], | ||
| testCase: TestCase, | ||
| sparkConfigSet: Seq[(String, String)]): Unit = { | ||
| val localSparkSession = setupLocalSession(testCase, sparkConfigSet) | ||
| val lowercaseTestCase = testCase.name.toLowerCase(Locale.ROOT) | ||
| val segmentsPerQuery = 3 | ||
|
|
||
| val goldenFileExists = new File(testCase.resultFile).exists() | ||
| val segments = if (goldenFileExists) { | ||
| Files.readString(new File(testCase.resultFile).toPath).split("-- !query.*\n") | ||
| } else { | ||
| Array.empty[String] | ||
| } | ||
|
|
||
| queriesWithDebug.zipWithIndex.foreach { case ((sql, isDebug), i) => | ||
| val isCommand = try { | ||
| localSparkSession.sessionState.sqlParser.parsePlan(sql).isInstanceOf[Command] | ||
| } catch { | ||
| case _: ParseException => false | ||
| } | ||
|
|
||
| if (isCommand) { | ||
| localSparkSession.sql(sql).collect() | ||
| } else if (isDebug) { | ||
| // Capture exception stacktrace if the query fails. | ||
| var exceptionTrace: Option[String] = None | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we always do print the stack, not just in debug mode (when we hit a non-expected exception?)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same reason as above, when we run all queries, this is not useful. |
||
| val (schema, result) = handleExceptions { | ||
| try { | ||
| // Set a breakpoint here and evaluate `localSparkSession.sql(sql)` to get the | ||
| // DataFrame for ad-hoc debugging (e.g., df.queryExecution.analyzed). | ||
| getNormalizedQueryExecutionResult(localSparkSession, sql) | ||
| } catch { | ||
| case e: Throwable => | ||
| val sw = new StringWriter() | ||
| e.printStackTrace(new PrintWriter(sw)) | ||
| exceptionTrace = Some(sw.toString) | ||
| throw e | ||
| } | ||
| } | ||
| val output = ExecutionOutput( | ||
| sql = sql, | ||
| schema = Some(schema), | ||
| output = normalizeTestResults(result.mkString("\n"))) | ||
|
|
||
| // Build consolidated debug output. | ||
| val debugOutput = new StringBuilder() | ||
| debugOutput.append(s"\n=== DEBUG: Query #$i ===") | ||
| debugOutput.append(s"\nSQL: $sql") | ||
| exceptionTrace.foreach { trace => | ||
| debugOutput.append(s"\n$trace") | ||
| } | ||
|
|
||
| // Compare against golden file and append result. | ||
| var mismatch: Option[String] = None | ||
| if (goldenFileExists && | ||
| segments.length > segmentsPerQuery * i + segmentsPerQuery) { | ||
| val expected = ExecutionOutput( | ||
| segments(segmentsPerQuery * i + 1).trim, | ||
| Some(segments(segmentsPerQuery * i + 2).trim), | ||
| normalizeTestResults(segments(segmentsPerQuery * i + 3))) | ||
|
|
||
| if (expected.sql != output.sql) { | ||
| debugOutput.append("\nGolden answer: no matching entry " + | ||
| "(SQL text differs, likely a new or moved query)") | ||
| } else if (expected.schema != output.schema) { | ||
| mismatch = Some(s"Schema did not match for query #$i\n${expected.sql}") | ||
| debugOutput.append(s"\nGolden answer: schema mismatch") | ||
| debugOutput.append(s"\n Expected: ${expected.schema.getOrElse("")}") | ||
| debugOutput.append(s"\n Actual: ${output.schema.getOrElse("")}") | ||
| } else if (expected.output != output.output) { | ||
| mismatch = Some(s"Result did not match for query #$i\n${expected.sql}") | ||
| debugOutput.append(s"\nGolden answer: result mismatch") | ||
| debugOutput.append(s"\n Expected:\n${expected.output}") | ||
| debugOutput.append(s"\n Actual:\n${output.output}") | ||
| } else { | ||
| debugOutput.append("\nGolden answer: matches") | ||
| } | ||
| } else { | ||
| debugOutput.append("\nGolden answer: no golden file to compare") | ||
| } | ||
|
|
||
| // scalastyle:off println | ||
| println(debugOutput.toString) | ||
| // scalastyle:on println | ||
|
|
||
| mismatch.foreach(fail(_)) | ||
| } | ||
| } | ||
|
|
||
| if (requireTPCDSCases.contains(lowercaseTestCase)) { | ||
| tpcDSTableNamesToSchemas.foreach { case (name: String, _: String) => | ||
| localSparkSession.sql(s"DROP TABLE IF EXISTS $name") | ||
| } | ||
| } | ||
|
|
||
| fail("Test is in debug mode. " + | ||
| "Remove --DEBUG markers from the input file before committing.") | ||
| } | ||
|
|
||
| protected def runQueries( | ||
| queries: Seq[String], | ||
| testCase: TestCase, | ||
| sparkConfigSet: Seq[(String, String)]): Unit = { | ||
| // Create a local SparkSession to have stronger isolation between different test cases. | ||
| // This does not isolate catalog changes. | ||
| val localSparkSession = setupLocalSession(testCase, sparkConfigSet) | ||
| val lowercaseTestCase = testCase.name.toLowerCase(Locale.ROOT) | ||
|
|
||
| // Run the SQL queries preparing them for comparison. | ||
| val outputs: Seq[QueryTestOutput] = queries.map { sql => | ||
| testCase match { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to have this available in all case, not just --debug mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's less useful in non-debug mode as all queries run, so I didn't put it in the classdoc. People can find the DataFrame in
runQueriessimilarly.