Skip to content

[SPARK-36638][SQL] Generalize OptimizeSkewedJoin#33893

Closed
zhengruifeng wants to merge 12 commits into
apache:masterfrom
zhengruifeng:generalize_skew_join
Closed

[SPARK-36638][SQL] Generalize OptimizeSkewedJoin#33893
zhengruifeng wants to merge 12 commits into
apache:masterfrom
zhengruifeng:generalize_skew_join

Conversation

@zhengruifeng
Copy link
Copy Markdown
Contributor

@zhengruifeng zhengruifeng commented Sep 1, 2021

What changes were proposed in this pull request?

This PR aims to generalize OptimizeSkewedJoin to support all patterns that can be handled by current split-duplicate strategy:

1, find the splittable shuffle query stages by the semantics of internal nodes;

2, for each splittable shuffle query stage, check whether skew partitions exists, if true, split them into specs;

3, handle Combinatorial Explosion: for each skew partition, check whether the combination number is too large, if so, re-split the stages to keep a reasonable number of combinations. For example, for partition 0, stage A/B/C are split into 100/100/100 specs, respectively. Then there are 1M combinations, which is too large, and will cause performance regression.

4, attach new specs to shuffle query stages;

Why are the changes needed?

to Generalize OptimizeSkewedJoin

Does this PR introduce any user-facing change?

one additional config added

How was this patch tested?

existing testsuites, added testsuites, some cases on our productive system

@github-actions github-actions Bot added the SQL label Sep 1, 2021
@zhengruifeng
Copy link
Copy Markdown
Contributor Author

added test("General Skew Join: 3-table join")

== Physical Plan ==
AdaptiveSparkPlan (52)
+- == Final Plan ==
   CollectLimit (33)
   +- * HashAggregate (32)
      +- AQEShuffleRead (31)
         +- ShuffleQueryStage (30)
            +- Exchange (29)
               +- * HashAggregate (28)
                  +- * Project (27)
                     +- * SortMergeJoin(skew=true) LeftOuter (26)
                        :- * Project (15)
                        :  +- * SortMergeJoin(skew=true) Inner (14)
                        :     :- Window(skew=true) (7)
                        :     :  +- * Sort (6)
                        :     :     +- AQEShuffleRead (5)
                        :     :        +- ShuffleQueryStage (4)
                        :     :           +- Exchange (3)
                        :     :              +- * Project (2)
                        :     :                 +- * Range (1)
                        :     +- * Sort (13)
                        :        +- AQEShuffleRead (12)
                        :           +- ShuffleQueryStage (11)
                        :              +- Exchange (10)
                        :                 +- * Project (9)
                        :                    +- * Range (8)
                        +- * Sort (25)
                           +- * HashAggregate(skew=true) (24)
                              +- AQEShuffleRead (23)
                                 +- ShuffleQueryStage (22)
                                    +- Exchange (21)
                                       +- * HashAggregate (20)
                                          +- ShuffleQueryStage (19)
                                             +- Exchange (18)
                                                +- * Project (17)
                                                   +- * Range (16)

image

@zhengruifeng
Copy link
Copy Markdown
Contributor Author

added test("General Skew Join: 3-table join UNION 2-table join")

== Physical Plan ==
AdaptiveSparkPlan (68)
+- == Final Plan ==
   CollectLimit (43)
   +- * HashAggregate (42)
      +- AQEShuffleRead (41)
         +- ShuffleQueryStage (40)
            +- Exchange (39)
               +- * HashAggregate (38)
                  +- Union (37)
                     :- * Project (24)
                     :  +- * SortMergeJoin(skew=true) LeftOuter (23)
                     :     :- * Project (14)
                     :     :  +- * SortMergeJoin(skew=true) Inner (13)
                     :     :     :- * Sort (6)
                     :     :     :  +- AQEShuffleRead (5)
                     :     :     :     +- ShuffleQueryStage (4)
                     :     :     :        +- Exchange (3)
                     :     :     :           +- * Project (2)
                     :     :     :              +- * Range (1)
                     :     :     +- * Sort (12)
                     :     :        +- AQEShuffleRead (11)
                     :     :           +- ShuffleQueryStage (10)
                     :     :              +- Exchange (9)
                     :     :                 +- * Project (8)
                     :     :                    +- * Range (7)
                     :     +- * Sort (22)
                     :        +- * HashAggregate(skew=true) (21)
                     :           +- AQEShuffleRead (20)
                     :              +- ShuffleQueryStage (19)
                     :                 +- Exchange (18)
                     :                    +- * HashAggregate (17)
                     :                       +- * Project (16)
                     :                          +- * Range (15)
                     +- * Project (36)
                        +- * SortMergeJoin(skew=true) LeftOuter (35)
                           :- * Sort (28)
                           :  +- AQEShuffleRead (27)
                           :     +- ShuffleQueryStage (26)
                           :        +- ReusedExchange (25)
                           +- * Sort (34)
                              +- AQEShuffleRead (33)
                                 +- ShuffleQueryStage (32)
                                    +- Exchange (31)
                                       +- * Project (30)
                                          +- * Range (29)

image

@zhengruifeng
Copy link
Copy Markdown
Contributor Author

added test("General Skew Join: 5-table join")

== Physical Plan ==
AdaptiveSparkPlan (68)
+- == Final Plan ==
   CollectLimit (45)
   +- * HashAggregate (44)
      +- AQEShuffleRead (43)
         +- ShuffleQueryStage (42)
            +- Exchange (41)
               +- * HashAggregate (40)
                  +- * Project (39)
                     +- * SortMergeJoin(skew=true) Inner (38)
                        :- * Project (22)
                        :  +- * SortMergeJoin(skew=true) Cross (21)
                        :     :- * Project (14)
                        :     :  +- * SortMergeJoin(skew=true) LeftOuter (13)
                        :     :     :- * Sort (6)
                        :     :     :  +- AQEShuffleRead (5)
                        :     :     :     +- ShuffleQueryStage (4)
                        :     :     :        +- Exchange (3)
                        :     :     :           +- * Project (2)
                        :     :     :              +- * Range (1)
                        :     :     +- * Sort (12)
                        :     :        +- AQEShuffleRead (11)
                        :     :           +- ShuffleQueryStage (10)
                        :     :              +- Exchange (9)
                        :     :                 +- * Project (8)
                        :     :                    +- * Range (7)
                        :     +- * Sort (20)
                        :        +- AQEShuffleRead (19)
                        :           +- ShuffleQueryStage (18)
                        :              +- Exchange (17)
                        :                 +- * Project (16)
                        :                    +- * Range (15)
                        +- * Sort (37)
                           +- * Project (36)
                              +- * ShuffledHashJoin(skew=true) Inner BuildLeft (35)
                                 :- * HashAggregate(skew=true) (29)
                                 :  +- AQEShuffleRead (28)
                                 :     +- ShuffleQueryStage (27)
                                 :        +- Exchange (26)
                                 :           +- * HashAggregate (25)
                                 :              +- * Project (24)
                                 :                 +- * Range (23)
                                 +- AQEShuffleRead (34)
                                    +- ShuffleQueryStage (33)
                                       +- Exchange (32)
                                          +- * Project (31)
                                             +- * Range (30)

image

@zhengruifeng
Copy link
Copy Markdown
Contributor Author

friendly ping @cloud-fan

@zhengruifeng
Copy link
Copy Markdown
Contributor Author

friendly ping @JkSelf @cloud-fan @yaooqinn @ulysses-you . Could you please take a look in your spare time? Thanks!

@zhengruifeng zhengruifeng force-pushed the generalize_skew_join branch 2 times, most recently from 545bf9b to cd3c449 Compare September 13, 2021 12:38
@zhengruifeng
Copy link
Copy Markdown
Contributor Author

When developing this method, I used some tests like #34108 to check correctness. It should be helpful for reviewing.

Copy link
Copy Markdown
Contributor Author

@zhengruifeng zhengruifeng Feb 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this skew test case newly added in #34908 can be optimized by this PR without extra shuffle:

Master this PR
master_skew_case general_skew_case

@zhengruifeng
Copy link
Copy Markdown
Contributor Author

retest this please

@zhengruifeng zhengruifeng force-pushed the generalize_skew_join branch from 35d2212 to 6305685 Compare March 23, 2022 01:23
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@apache apache deleted a comment from SparkQA Mar 23, 2022
@sigmod
Copy link
Copy Markdown
Contributor

sigmod commented Mar 23, 2022

cc @maryannxue

zhengruifeng and others added 12 commits May 4, 2022 22:27
drop last str arg

format sql in ut

support both ShuffleQueryStageExec and BroadcastQueryStageExec as leaves

nit

reslove conflicts

narrow valid operator whitelist

move agg stringArgs to subclasses

add doc && resolve conflicts

del sample node

resolve conflicts

nit
nit
@zhengruifeng zhengruifeng force-pushed the generalize_skew_join branch from 6305685 to 0d250b0 Compare May 4, 2022 15:07
@github-actions
Copy link
Copy Markdown

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions Bot added the Stale label Sep 30, 2022
@github-actions github-actions Bot closed this Oct 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants