[SPARK-37019][SQL] Add codegen support to array higher-order functions#34558
[SPARK-37019][SQL] Add codegen support to array higher-order functions#34558Kimahriman wants to merge 5 commits into
Conversation
|
Can one of the admins verify this patch? |
f46fb71 to
217960e
Compare
217960e to
ce082f3
Compare
ce082f3 to
d4a2f63
Compare
|
@viirya @HyukjinKwon @cloud-fan any thoughts or know who might have thoughts? |
d4a2f63 to
aaa4be4
Compare
|
@Kimahriman just out of curiosity, how much did the performance improve? |
|
It's hard to say because when I tested this out on my production jobs (actually still actively using it), I had several other changes too. I'm not sure if there are any benchmarks involving HOFs? Though it's highly dependent on what the lambda function is, and honestly that's one of the main benefits, the lambda functions themselves can be codegen'd instead of eval'd. I also have a larger goal to support subexpression elimination inside lambda functions, because that's where I've found our biggest problem is. #34727 is also part of that goal. |
aaa4be4 to
3da6342
Compare
3da6342 to
c3236c0
Compare
c3236c0 to
2e9f4d3
Compare
9cec788 to
8b898b0
Compare
8b898b0 to
194e457
Compare
194e457 to
1a52017
Compare
1a52017 to
b71b633
Compare
b71b633 to
a565a82
Compare
a565a82 to
92d9a9f
Compare
a565a82 to
92d9a9f
Compare
92d9a9f to
03c2dc6
Compare
03c2dc6 to
572b666
Compare
572b666 to
4a7dba9
Compare
4a7dba9 to
5b13cd2
Compare
jaceklaskowski
left a comment
There was a problem hiding this comment.
There seems to be a lot of repetition. Wish it could be avoided somehow but can't help though (beside nit-picking).
Thanks for the review! I tried to get as much common code in the parent classes as I could, can take another pass to see if anything jumps out for deduping |
dcfb17c to
a565a82
Compare
The subexpression elimination option is huge! Very exciting |
|
I added a simple benchmark, local results: |
705d950 to
f2e6135
Compare
f2e6135 to
2a651b2
Compare
|
2a651b2 to
a1e2c24
Compare
a1e2c24 to
35cd592
Compare
68a0a29 to
8cfefef
Compare
8cfefef to
de114f6
Compare
de114f6 to
2b13d01
Compare
|
@dongjoon-hyun This was submitted earlier than the one at #54864. We can switch to discussing it here instead. |
2b13d01 to
2cab349
Compare
| mergeCode.isNull, merge.nullable) | ||
|
|
||
| val finishAssignment = assignVar(accForFinishCode, finishAtomic, accForMergeCode.value, | ||
| accForMergeCode.isNull, merge.nullable) |
There was a problem hiding this comment.
finishAssignment is using merge.nullable to decide whether to propagate the accumulator null bit into the finish lambda. That loses a nullable zero on empty arrays when merge itself is non-nullable. For example, with aggregate(array(), CAST(NULL AS INT), (acc, x) -> coalesce(acc, 0) + x, acc -> acc IS NULL), interpreted eval passes NULL into finish and returns true, but the generated path leaves accForFinishCode.isNull at its default false and can return false. This should follow the accumulator nullability here, not merge.nullable.
[ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ]
There was a problem hiding this comment.
Thanks, good catch. Fixed in 9f330488952 by making finishAssignment follow the accumulator lambda variable nullability instead of merge.nullable, so a nullable zero is still propagated into finish when the array is empty.
I also added the regression case from the comment and verified it with:
build/sbt 'catalyst/testOnly org.apache.spark.sql.catalyst.expressions.HigherOrderFunctionsSuite -- -z ArrayAggregate'Result: passed (2 tests, 0 failures).
| | $varAssignments | ||
| | ${functionCode.code} | ||
| | $resultAssignment | ||
| | if ((boolean)${functionCode.value}) { |
There was a problem hiding this comment.
[P1] Preserve null predicate semantics in generated ArrayFilter
filter treats a null predicate result as false, but this generated loop reads ${functionCode.value} without checking ${functionCode.isNull}. I reproduced this on the current head with a vectorized Parquet array column containing [true] and [null]: with CODEGEN_ONLY, filter(a, x -> x) returns [true] and [null] instead of [true] and []; with NO_CODEGEN, it returns the expected results. Please use a predicate equivalent to !functionCode.isNull && functionCode.value for both the tracker and count, and add a generated-code regression test.
[ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ]
There was a problem hiding this comment.
Fixed in eecc672cb44.
Generated ArrayFilter now computes a local keep predicate as !functionCode.isNull && functionCode.value and uses that value for both the tracker array and the retained element count. I added a Catalyst regression with custom GenericArrayData that keeps isNullAt=true while exposing a stale typed boolean value, which fails against the old generated path.
Verified with:
build/sbt 'catalyst/testOnly org.apache.spark.sql.catalyst.expressions.HigherOrderFunctionsSuite -- -z "generated code"'Result: passed (2 tests, 0 failures).
| private def childrenToRecurse(expr: Expression): Seq[Expression] = expr match { | ||
| case _: CodegenFallback => Nil | ||
| case c: ConditionalExpression => c.alwaysEvaluatedInputs.map(skipForShortcut) | ||
| case h: HigherOrderFunction => h.arguments |
There was a problem hiding this comment.
[P1] Do not eagerly evaluate ArrayAggregate.zero for null input arrays
For ArrayAggregate, zero is not evaluated when the array argument is null, but recursing into all h.arguments allows subexpression elimination to generate expressions from zero outside that null guard. On the current head, with ANSI mode, CSE, and generated code enabled, aggregate(CAST(NULL AS ARRAY<INT>), (cast(id as int) / 0) + (cast(id as int) / 0), (acc, x) -> acc + x) throws DIVIDE_BY_ZERO; with CSE disabled, the same generated query returns null. Please recurse only into the always-evaluated input array for ArrayAggregate and add this regression. This is separate from the empty-array nullable-accumulator fix in 9f330488952.
[ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ]
There was a problem hiding this comment.
Fixed in eecc672cb44.
I changed HOF CSE recursion to use an alwaysEvaluatedArguments hook. By default it returns arguments, but ArrayAggregate overrides it to return only the input array argument, so zero is no longer made CSE-visible before the null-array guard.
I added the ANSI/CSE regression from this comment and verified it fails with the old h.arguments recursion, then passes with the fix.
Verified with:
build/sbt 'sql/testOnly org.apache.spark.sql.DataFrameFunctionsSuite -- -z "aggregate function - null array does not evaluate zero expression through CSE"'Result: passed (1 test, 0 failures).
| s"InternalRow.copyValue(${functionCode.value})" | ||
| } | ||
| val resultNull = if (function.nullable) Some(functionCode.isNull.toString) else None | ||
| val resultAssignment = CodeGenerator.setArrayElement(arrayData, dataType.elementType, |
There was a problem hiding this comment.
[P1] Preserve null complex elements in generated array HOF output
This new output-writing path passes a null bit for nullable lambda results into CodeGenerator.setArrayElement, but that helper applies setNullAt only for primitive element types. I reproduced this on the current head with native vectorized ORC input containing array(CAST(NULL AS ARRAY<INT>), array(1)): the input reads back with a[0] IS NULL = true, and interpreted transform/filter return [true, true], but generated transform(a, x -> x)[0] IS NULL and filter(a, x -> x IS NULL)[0] IS NULL return [false, false]. Please make nullable non-primitive array writes honor isNull and add a generated nested-null regression test.
[ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ]
There was a problem hiding this comment.
Fixed in eecc672cb44.
CodeGenerator.setArrayElement now honors the provided isNull bit for non-primitive element types as well, so nullable complex array writes call setNullAt instead of writing a stale complex value.
I moved the regression to Catalyst with custom GenericArrayData that simulates the vectorized-reader shape: isNullAt(0) reports null while the typed complex getter can still expose a stale nested value. The test covers both generated transform output and filter output.
Verified with:
build/sbt 'catalyst/testOnly org.apache.spark.sql.catalyst.expressions.HigherOrderFunctionsSuite -- -z "generated code"'Result: passed (2 tests, 0 failures).
| zeroCode.isNull, zero.nullable) | ||
|
|
||
| val mergeAssignment = assignVar(accForMergeCode, mergeAtomic, mergeCopy, | ||
| mergeCode.isNull, merge.nullable) |
There was a problem hiding this comment.
[P1] Clear ArrayAggregate accumulator null state on every generated assignment
accForMergeVar is always bound nullable in bindInternal, and withLambdaVars stores its isNull flag as mutable generated state. These assignments only update that flag when the source expression is nullable, which produces wrong results in both cases below:
- Within one row,
aggregate(array(CAST(id AS INT) + 1, CAST(id AS INT) + 2), CAST(NULL AS INT), (acc, x) -> coalesce(acc, 0) + x, acc -> coalesce(acc, -1))returns-1withCODEGEN_ONLYinstead of3; the same query returns3withNO_CODEGEN. - Across rows in one generated partition, after a row whose merge becomes null, an empty-array row with zero
0returns-1instead of0withCODEGEN_ONLY; the interpreted path returns0.
Please pass accForMergeVar.nullable to both initialAssignment and mergeAssignment and add generated-code regressions for these paths.
[ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ]
There was a problem hiding this comment.
Fixed in 261147f456d.
initialAssignment and mergeAssignment now both use accForMergeVar.nullable, so the generated accumulator lambda variable null flag is updated on every assignment. That clears the stale-null state both after assigning a non-null zero and after a non-null merge result.
I added generated-code regressions for both paths from the comment:
- stale null state within a single aggregate loop
- stale null state carried across rows in the same generated partition
I verified both tests fail before the fix and pass after it.
Verified with:
build/sbt 'sql/testOnly org.apache.spark.sql.DataFrameFunctionsSuite -- -z "aggregate function - generated code clears accumulator null state"'Result: passed (2 tests, 0 failures).
| val initialization = CodeGenerator.createArrayData( | ||
| arrayData, dataType.elementType, numElements, s" $prettyName failed.") | ||
|
|
||
| val functionCode = function.genCode(ctx) |
There was a problem hiding this comment.
[P2] Rebind fallback lambda variables before code generation
The generated path uses the original lambda tree here, while interpreted evaluation deliberately uses functionsForEval to replace separately instantiated NamedLambdaVariables with the bound argument instance by exprId. As a result, a valid resolved lambda containing a CodegenFallback expression can succeed interpreted but fail under codegen: I reproduced ArrayTransform(array(1, 2, 3), LambdaFunction(CodegenFallbackExpr(detachedArg + 1), Seq(arg))), where detachedArg has the same exprId as arg but a different AtomicReference; checkEvaluation passes interpreted evaluation and then throws NullPointerException in GeneratedClass$SpecificMutableProjection.apply. Please generate the rebound function tree (and keep fallback atomic state synchronized) so codegen preserves the existing lambda-binding semantics.
[ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ]
There was a problem hiding this comment.
Fixed in fe004a5435e.
The generated HOF paths now mirror interpreted evaluation by generating from the rebound lambda body (functionForEval / functionsForEval) instead of the original lambda tree. This preserves the existing exprId-based lambda variable rebinding semantics while keeping the change scoped to HOF codegen.
I added a Catalyst regression that constructs a resolved ArrayTransform lambda where a CodegenFallbackExpr references a detached NamedLambdaVariable with the same exprId as the bound argument. The test failed before the fix with the generated-path NPE described in this comment, and passes after the fix.
Verified with:
build/sbt 'catalyst/testOnly org.apache.spark.sql.catalyst.expressions.HigherOrderFunctionsSuite -- -z "ArrayTransform codegen rebinds fallback lambda variables by expression ID"'
build/sbt 'catalyst/testOnly org.apache.spark.sql.catalyst.expressions.HigherOrderFunctionsSuite -- -z "ArrayTransform"'Results: focused regression passed (1 test), ArrayTransform slice passed (3 tests).
There was a problem hiding this comment.
Summary
Adds code generation support for the array higher-order functions transform, filter, exists, forall, and aggregate.
Prior state and problem.
These functions currently create an interpreted island inside otherwise generated SQL execution. Even when the surrounding projection or predicate is part of whole-stage code generation, Spark still evaluates the lambda body through Expression.eval for every array element.
That cost compounds quickly: the query pays it once per row and again once per element, and complex lambda bodies amplify it further. In CPU profiles, the overhead appears under ArrayExists.eval / ArrayExists.nullSafeEval and the lambda's child expressions, such as CaseWhen, string manipulation, and nested boolean predicates.
Generating these functions is therefore a worthwhile general optimization, not a workload-specific rewrite. The challenge is correctness: the interpreted implementation already defines subtle behavior for NULL elements, nullable predicate results, short-circuiting, fallback expressions, and mutable aggregate state. The generated path must preserve all of it.
Design approach.
The PR moves both the array traversal and ordinary lambda evaluation into the generated Java code produced for the surrounding expression. Instead of entering Expression.eval once per array element, generated execution now loads an element, binds it as the lambda argument, evaluates the lambda body, and consumes the result in one generated loop.
Conceptually, this turns:
exists(array, x -> predicate(x))into a loop that evaluatespredicate(x)and exits on the firsttrue.transform(array, x -> f(x))into a loop that produces one output value per input element.aggregate(array, zero, (acc, x) -> merge(acc, x), finish)into generated mutable accumulator state followed by one finish evaluation.
The important part is that this is not restricted to lambdas whose entire expression tree supports code generation. If a lambda contains a CodegenFallback descendant, the generated loop still updates the existing lambda backing reference before that descendant executes. Normal expressions read generated locals directly; fallback expressions see the same current element or accumulator value through the interpreted interface.
This preserves the existing execution model while removing interpreted evaluation from the common path.
Key design decisions made by this PR.
The first key decision is to generate the rebound lambda tree rather than the original lambda expression. Higher-order functions already support logically identical NamedLambdaVariable instances that share an exprId without sharing an object reference. Interpreted execution handles that through functionForEval / functionsForEval. Generated execution must do the same; otherwise a fallback expression can read a detached variable and produce a generated-only failure. The current revision correctly uses the rebound expression tree.
The second decision is to treat NULL behavior as part of the generated contract, rather than assuming the generated value is meaningful whenever a loop iteration runs. For filter, a NULL predicate means that the input element is not retained. For complex array elements, a NULL position must remain NULL even if the physical array container can expose an underlying payload at that ordinal. For exists and forall, generated execution preserves both short-circuit behavior and three-valued boolean semantics.
The third decision concerns aggregate, whose generated implementation carries mutable accumulator state across iterations and potentially across reused projection invocations. The zero value is evaluated only after confirming that the input array is non-NULL. Every accumulator assignment updates both its value and null state. The finish lambda receives that final pair, including for an empty array where the accumulator remains the initial zero value. This is necessary to prevent stale generated state from changing results.
Finally, filter intentionally uses two passes. It cannot allocate a compact output array until it knows how many elements pass the predicate, but evaluating an arbitrary predicate twice would be incorrect and potentially expensive. Recording keep decisions during the first pass, then copying selected elements during the second pass, preserves single evaluation while producing the correctly sized output.
Implementation sketch.
Each generated higher-order function follows the same broad flow.
First, it evaluates the input array once and retains the existing NULL short-circuit behavior. A NULL input array returns NULL immediately. In the aggregate case, this also ensures that neither the zero expression nor the finish expression is evaluated for that row.
Second, for a non-NULL array, it enters a generated element loop. Each iteration loads the current element and updates the generated lambda-variable state. When fallback evaluation remains anywhere inside the lambda body, the same binding step updates the fallback-visible backing reference, so generated and interpreted descendants observe one consistent current value.
Third, each function applies its own result rule:
transformwrites one nullable result value into a new array for each input element.filterrecords predicate decisions once and then copies only retained input elements into a compact result array.existsstops when it findstrue, while remembering NULL results where three-valued logic requires a NULL final answer.forallstops when it findsfalse, with equivalent NULL tracking.aggregateinitializes accumulator state fromzero, replaces it with each merge result, and evaluatesfinishagainst the final state.
The implementation keeps shared mechanics, such as lambda binding and fallback-visible state synchronization, in common helpers while leaving operation-specific control flow in each HOF implementation.
Suggested improvements.
The correctness gaps found in earlier revisions are addressed in the current patch:
- generated
filternow treats NULL predicate results as false; - NULL complex array elements are preserved correctly;
aggregateno longer evaluateszerofor a NULL input array;- nullable aggregate accumulator state is propagated correctly through merge and finish evaluation, including empty arrays and reused generated projections;
- generated evaluation now uses the rebound lambda expression tree, preserving fallback lambda-variable semantics.
The added tests exercise the corresponding semantic boundaries, including the generated-only failure case where a CodegenFallback expression refers to a separately instantiated lambda variable with the same exprId.
I re-checked the generated paths for transform, filter, exists, forall, and aggregate after these fixes and did not find another actionable correctness issue.
Approving.
|
@Kimahriman can you also fix the CI failures? cc @cloud-fan @peter-toth @viirya also for reviews |
fe004a5 to
c47fa7f
Compare
|
Rebased to master to re-trigger CI |
What changes were proposed in this pull request?
This PR adds codegen support to array based higher order functions except ArraySort. This is my first time playing around with codegen, so definitely looking for any feedback.
A few notes:
Why are the changes needed?
To improve performance of array higher-order function operations, letting the children be codegen'd and participate in WholeStageCodegen
Does this PR introduce any user-facing change?
No, only performance improvements.
How was this patch tested?
Existing unit tests, let me know if there's other codegen-specific unit tests I should add.