Skip to content

[SPARK-40193][SQL][FOLLOWUP] Restrict cached-side If wrapping to original cached range#55500

Closed
cloud-fan wants to merge 2 commits into
apache:masterfrom
cloud-fan:cloud-fan/SPARK-40193-filter-propagation-fix
Closed

[SPARK-40193][SQL][FOLLOWUP] Restrict cached-side If wrapping to original cached range#55500
cloud-fan wants to merge 2 commits into
apache:masterfrom
cloud-fan:cloud-fan/SPARK-40193-filter-propagation-fix

Conversation

@cloud-fan
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Follow-up to #55298 (SPARK-40193). Two related cleanups to PlanMerger's filter propagation:

  1. Correctness fix in mergeNamedExpressions. Wrapping of unmatched cached expressions with the cached plan's filter now iterates only over the original cached range [0, cachedPlanExpressions.length), not over all of mergedExpressions. The previous loop also touched new-plan entries that were appended earlier in the same call and already wrapped with the new plan's filter.

  2. Tighten the (np: Filter, cp) / (np, cp: Filter) cases in tryMergePlans. Drop the structurally unreachable branches that appended cpFilter.toSeq / npFilter.map(_._1).toSeq to the new Project and the corresponding symmetricFilterPropagationEnabled escape in the guard. In both cases the recursion keeps the non-Filter side unchanged, so no deeper case can expose a Filter on that side — the child result always has cpFilter = None / npFilter = None. Matching None explicitly makes the invariant explicit and removes dead code that would have produced a Project with duplicate attributes if ever reached.

Why are the changes needed?

For (1): with symmetric filter propagation enabled (spark.sql.optimizer.mergeSubplans.symmetricFilterPropagation.enabled = true) and non-attribute Project expressions on both sides of the merge, the cached-side loop double-wrapped new-plan-appended expressions with If(cpFilter, If(npFilter, expr, null), null) and replaced the slot in mergedExpressions with a new Alias (fresh exprId). The newNPMapping built earlier in the same call still pointed at the single-wrap alias's attribute, so the parent Aggregate was rewritten to reference an attribute that was no longer in the merged Project's output. The resulting plan failed analysis with MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION.

Minimal reproducer (fails on master before this PR):

withSQLConf(SQLConf.MERGE_SUBPLANS_SYMMETRIC_FILTER_PROPAGATION_ENABLED.key -> "true") {
  val subquery1 = ScalarSubquery(
    testRelation.where($"a" > 1).select(($"a" * 2).as("x")).groupBy()(sum($"x").as("sum_x")))
  val subquery2 = ScalarSubquery(
    testRelation.where($"a" < 1).select(($"a" + 1).as("y")).groupBy()(max($"y").as("max_y")))
  val df = testRelation.select(subquery1, subquery2).analyze
  MergeSubplans(df)  // analyzer error: Resolved attribute(s) "y" missing from "x", "y", ...
}

For (2): the branches in question are unreachable by case analysis and the appended cpFilter.toSeq / npFilter.map(_._1).toSeq would duplicate an attribute already present in mergedChild.output. Removing them makes the reachable contract explicit.

Does this PR introduce any user-facing change?

No. The bug was only observable as an analyzer failure, and only when spark.sql.optimizer.mergeSubplans.symmetricFilterPropagation.enabled (which defaults to false) was enabled together with subqueries whose merge path exercises non-attribute Project expressions on both sides. Behavior otherwise matches the released master.

How was this patch tested?

  • New unit test MergeSubplansSuite: "SPARK-40193: Merge non-grouping subqueries with different filter conditions and non-attribute Project expressions on both sides" — fails on master without the fix (analysis error), passes with the fix.
  • Full MergeSubplansSuite (42 tests) and PlanMergeSuite (12 tests) continue to pass.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.5

This pull request and its description were written by Isaac.

…inal cached range

Fix `PlanMerger.mergeNamedExpressions` to wrap only the original cached
expressions with the cached plan's filter. The loop previously iterated over
all of `mergedExpressions`, including new-plan entries that were appended
earlier in the same call and already wrapped with the new plan's filter;
re-wrapping them with the cached plan's filter produced double-wrapped
`If(cpFilter, If(npFilter, expr, null), null)` expressions, stale
`newNPMapping` targets, and analysis failures (missing attribute).

Also tighten the `(np: Filter, cp)` and `(np, cp: Filter)` cases in
`tryMergePlans` to match only the structurally reachable child results
(`cpFilter`/`npFilter` always `None` because the recursion keeps the
non-Filter side unchanged), and drop the associated dead-code appends.

Co-authored-by: Isaac
Replace `.collect` with `.map` + explicit `assert` in the `(np: Filter, cp)`
and `(np, cp: Filter)` cases so a future refactor that surfaces a filter on
the unchanged side fails loudly instead of silently dropping the merge.
Inline the single-use `cachedPlanLength` local.

Co-authored-by: Isaac
@peter-toth
Copy link
Copy Markdown
Contributor

peter-toth commented Apr 23, 2026

@cloud-fan , I opened a code cleanup and simplification PR yesterday: #55482.

It also handles 1. of this PR, but probably the new test of this PR should be added to there.
2. analysis doesn't take into account that the supported set of nodes might change in the future, so when we handle (np: Filter, cp) and (np, cp: Filter) and symmetricFilterPropagationEnabled is enabled the we should prepare for non-empty cpFilter / npFilter. The current code is more future proof IMO.

@cloud-fan
Copy link
Copy Markdown
Contributor Author

@peter-toth ah didn't see that PR, feel free to take the test here!

@cloud-fan cloud-fan closed this Apr 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants