Skip to content

[SPARK-37328][SQL] Fix bug that OptimizeSkewedJoin may not work after it was moved from queryStageOptimizerRules to queryStagePreparationRules.#34602

Closed
Liulietong wants to merge 2 commits into
apache:masterfrom
Liulietong:optimizeskewjoin-bugfix
Closed

[SPARK-37328][SQL] Fix bug that OptimizeSkewedJoin may not work after it was moved from queryStageOptimizerRules to queryStagePreparationRules.#34602
Liulietong wants to merge 2 commits into
apache:masterfrom
Liulietong:optimizeskewjoin-bugfix

Conversation

@Liulietong
Copy link
Copy Markdown

What changes were proposed in this pull request?

Fix the issue that OptimizeSkewedJoin may not work.
Since OptimizeSkewedJoin was moved from queryStageOptimizerRules to queryStagePreparationRules, the position OptimizeSkewedJoin was applied has been moved from newQueryStage() to reOptimize(). The plan OptimizeSkewedJoin applied on changed from plan of new stage which is about to submit to whole spark plan.
In the cases where skewedJoin is not last stage, OptimizeSkewedJoin may not work because the number of collected shuffleStages is more than 2.

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New test.

… it was moved from queryStageOptimizerRules to queryStagePreparationRules.
@github-actions github-actions Bot added the SQL label Nov 15, 2021
@AmplabJenkins
Copy link
Copy Markdown

Can one of the admins verify this patch?

@advancedxy
Copy link
Copy Markdown
Contributor

ping @cloud-fan and @ulysses-you, could you take a look at this problem?

…ld because ensureRequirements with requiredDistribution may bring extra shuffle if we just return child of exchange.
@cloud-fan
Copy link
Copy Markdown
Contributor

cloud-fan commented Nov 16, 2021

OptimizeSkewedJoin is supposed to only handle materialized shuffle stages, or did I miss something?

@Liulietong
Copy link
Copy Markdown
Author

OptimizeSkewedJoin is supposed to only handle materialized shuffle stages, or did I miss something?

I haven't changed that. The problem is since OptimizeSkewedJoin was moved from queryStageOptimizerRules to queryStagePreparationRules, OptimizeSkewedJoin was applied on the whole plan.
For example

+- ShuffledHashJoin [value2#227L], [value3#233L], Inner
   :  +- Exchange hashpartitioning(value2#227L, 10), ENSURE_REQUIREMENTS, [id=#260]
   :     +- Project [key1#220L, value2#227L]
   :        +- ShuffledHashJoin [key1#220L], [key2#226L], Inner
   :           :  +- ShuffleQueryStage 0
   :              +- ShuffleQueryStage 1
      +- ShuffleQueryStage 2

We should apply OptimizeSkewedJoin on Project [key1#220L, value2#227L] rather the whole plan ShuffledHashJoin [value2#227L], [value3#233L], Inner
So we need to find top plan of new stage is about to submit, which will be Project [key1#220L, value2#227L] only if ShuffleQueryStage0 and ShuffleQueryStage1 are materialized.

@ulysses-you
Copy link
Copy Markdown
Contributor

can we remove the check if (shuffleStages.length == 2) ?

@Liulietong
Copy link
Copy Markdown
Author

can we remove the check if (shuffleStages.length == 2) ?

And it will not only work in cases where just 2 tables join, many complex combination need to be considered, such as multiple table joins in same stage.

@advancedxy
Copy link
Copy Markdown
Contributor

OptimizeSkewedJoin is supposed to only handle materialized shuffle stages, or did I miss something?

Hi @cloud-fan, OptimizeSkewedJoin is supposed to handle materialized shuffle stages only. The problem is that for multiple joins like the following

ShuffledHashJoin1
   --  Exchange
           -- Project
           -- ShuffledHashJoin0 (has skew)
                -- ShuffleQueryStage0
                -- ShuffleQueryStage1
   -- ShuffleQueryStage2

The ShuffledHashJoin0 has skew, and should be optimized by skew join rule. However, after we move OptimizeSkewedJoin to queryStagePreparationRules, OptimizeSkewedJoin takes the whole plan as input and detects that it has 3 shuffle query stages, which OptimizeSkewJoin doesn't support.
This is a regression of SPARK-33832

@ulysses-you
Copy link
Copy Markdown
Contributor

ulysses-you commented Nov 18, 2021

And it will not only work in cases where just 2 tables join, many complex combination need to be considered, such as multiple table joins in same stage.

Why it can not work in such case ? if multiple table joins in same stage, the plan should be :

SHJ2
  SHJ1
    ShuffleStage
    ShuffleStage
ShuffleStage

So we can still optimize the SHJ1 by transformUp this plan if we allow introduce extra shuffle.

It seems to me that the check if (shuffleStages.length == 2) is unnecessary. aslo cc @JkSelf

@advancedxy
Copy link
Copy Markdown
Contributor

It seems to me that the check if (shuffleStages.length == 2) is uncessary. aslo cc @JkSelf

@ulysses-you per my understanding, it's mostly about to reduce complexity.

Also cc @zhengruifeng https://github.com/apache/spark/pull/33893/files since your PR is about to generalize the skew join rule

@Liulietong
Copy link
Copy Markdown
Author

And it will not only work in cases where just 2 tables join, many complex combination need to be considered, such as multiple table joins in same stage.

Why it can not work in such case ? if multiple table joins in same stage, the plan should be :

SHJ2
  SHJ1
    ShuffleStage
    ShuffleStage
ShuffleStage

So we can still optimize the SHJ1 by transformUp this plan if we allow introduce extra shuffle.

It seems to me that the check if (shuffleStages.length == 2) is uncessary. aslo cc @JkSelf

Yes, it will work in cases where multiple table joins in same stage. But I don't think it's the best way to optimize MultipleSkewedJoin since extra shuffle will be introduced. In worst cases, N SHJ will introduce (N-1) shuffles.

@zhengruifeng
Copy link
Copy Markdown
Contributor

zhengruifeng commented Dec 15, 2021

@advancedxy Sorry for the late reply and thanks for ping me.

I did a quick test with #33893

Unfortunately, #33893 failed to handle the case, since the whole plan including Exchange nodes (which were unexpected when I was implementing #33893) were passed.

test code:

spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", false)
spark.conf.set("spark.sql.shuffle.partitions", 10)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "100")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "100")

spark.range(0, 1000, 1, 10).selectExpr("id % 3 as key1", "id % 3 as value1").createOrReplaceTempView("skewData1")
spark.range(0, 1000, 1, 10).selectExpr("id % 1 as key2", "id as value2").createOrReplaceTempView("skewData2")
spark.range(0, 1000, 1, 10).selectExpr("id % 1 as key3", "id as value3").createOrReplaceTempView("skewData3")


spark.sql("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 JOIN skewData3 ON value2 = value3").write.mode("overwrite").parquet("/tmp/tmp1.parquet")

related log:

21/12/15 11:15:33 DEBUG SparkSqlParser: Parsing command: SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 JOIN skewData3 ON value2 = value3
21/12/15 11:15:34 DEBUG OptimizeSkewedJoin: Optimizing Project #75: ShuffledJoins: [SortMergeJoin, SortMergeJoin]
21/12/15 11:15:34 DEBUG OptimizeSkewedJoin: Optimizing Project #75: Do NOT support operators [Exchange, Exchange, Range, Exchange, Range, Exchange, Range]
21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #161: ShuffledJoins: [SortMergeJoin, SortMergeJoin]
21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #161: Do NOT support operators [Exchange]
21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #200: ShuffledJoins: [SortMergeJoin, SortMergeJoin]
21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #200: Do NOT support operators [Exchange]
21/12/15 11:15:35 DEBUG OptimizeSkewedJoin: Optimizing Project #247: ShuffledJoins: [SortMergeJoin]
21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: ShuffledJoins: [SortMergeJoin]
21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: ShuffleQueryStages: [3, 2]
21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: Splittable ShuffleQueryStages: [3, 2]
21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: Optimizing ShuffleQueryStage #3 in skew join, size info: median size: 21184, max size: 26854, min size: 18341, avg size: 21544
21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: Optimizing ShuffleQueryStage #2 in skew join, size info: median size: 1227, max size: 1308, min size: 1142, avg size: 1230
21/12/15 11:15:36 DEBUG OptimizeSkewedJoin: Optimizing Project #261: Totally 0 skew partitions found

update:

I try to change if (shuffleStages.length == 2) to if (shuffleStages.length >= 2) in master, than the two added cases can be successfully optimized.

@zhengruifeng
Copy link
Copy Markdown
Contributor

This should be a regression, but a simple change if (shuffleStages.length == 2) -> if (shuffleStages.length >= 2) should be enough to fix it. (I test AdaptiveQueryExecSuite and the added two cases in this PR, it is ok)

As to #33893, I also update it to support this case. It supports multi joins with union/agg/win nodes in single stage, and had been used on our production system for 3 months, you may have a try.

@cloud-fan
Copy link
Copy Markdown
Contributor

I think we can simply remove the check. Thanks for providing the test! I created a PR to remove the check and your test passed: #34974

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants