[SPARK-19471] AggregationIterator does not initialize the generated result projection before using it#16820
[SPARK-19471] AggregationIterator does not initialize the generated result projection before using it#16820yangw1234 wants to merge 3 commits into
Conversation
|
ok to test |
|
@yangw1234 could you also check if we need to do this for whole stage code generation? ...and you really need to add tests. |
|
Test build #72445 has finished for PR 16820 at commit
|
|
@hvanhovell thanks for your review. Whole stage code generation seems fine and unit test is added. |
|
Test build #72450 has finished for PR 16820 at commit
|
|
Test build #72451 has finished for PR 16820 at commit
|
| private def assertNoExceptions(c: Column): Unit = { | ||
| for (wholeStage <- Seq(true, false)) { | ||
| withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) { | ||
| spark.range(0, 5).toDF("a").agg(sum("a")).withColumn("v", c).collect() |
There was a problem hiding this comment.
This test also passes without your test. I think you need to reference a NonDeterministic expression in the aggregate.
Could also make sure that we test all aggregation paths:
- HashAggregate
- ObjectHashAggregate
- SortAggregate
|
@yangw1234 Could you address the comment by @hvanhovell ? Thanks! |
|
@gatorsmile Sorry, I totally forget this pr. I will try to address the comment this week (need a little time to re-familiarize the context). |
|
Sorry I could not find time to finish this pr recently. Close it for now. If you need this fix, please feel free to base on it and finish it. |
…ted result projection before using it ## What changes were proposed in this pull request? Recently, we have also encountered such NPE issues in our production environment as described in: https://issues.apache.org/jira/browse/SPARK-19471 This issue can be reproduced by the following examples: ` val df = spark.createDataFrame(Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4))).toDF("x", "y") //HashAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false df.groupBy("x").agg(rand(),sum("y")).show() //ObjectHashAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false df.groupBy("x").agg(rand(),collect_list("y")).show() //SortAggregate, SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key=false &&SQLConf.USE_OBJECT_HASH_AGG.key=false df.groupBy("x").agg(rand(),collect_list("y")).show()` ` This PR is based on PR-16820(apache#16820) with test cases for all aggregation paths. We want to push it forward. > When AggregationIterator generates result projection, it does not call the initialize method of the Projection class. This will cause a runtime NullPointerException when the projection involves nondeterministic expressions. ## How was this patch tested? unit test verified in production environment Author: donnyzone <wellfengzhu@gmail.com> Closes apache#18920 from DonnyZone/Branch-spark-19471.
What changes were proposed in this pull request?
When AggregationIterator generates result projection, it does not call the initialize method of the Projection class. This will cause a runtime NullPointerException when the projection involves nondeterministic expressions.
This problem was introduced by #15567.
How was this patch tested?
unit test