[SPARK-56599][SQL] Add scan narrowing for column-level UPDATEs in DSv2#55518
[SPARK-56599][SQL] Add scan narrowing for column-level UPDATEs in DSv2#55518anuragmantri wants to merge 5 commits into
Conversation
fb14c34 to
ae635f4
Compare
| val required = | ||
| AttributeSet(dataAttrs) ++ AttributeSet(Seq(cond)) ++ AttributeSet(rowIdAttrs) | ||
| val narrowOutput = relation.output.filter(required.contains) | ||
| relation.copy(table = table, output = dedupAttrs(narrowOutput ++ rowIdAttrs ++ metadataAttrs)) |
There was a problem hiding this comment.
Can an attribute in required be missing from relation.output?
rowIdAttrs seems to be added 2 times.
If we already have a dedupAttrs() then probably doesn't make sense build AttributeSets.
There was a problem hiding this comment.
Can an attribute in required be missing from relation.output?
No. dataAttrs come from the connector's requiredDataAttributes() which are resolved against relation (via V2ExpressionUtils.resolveRefs), so they're guaranteed to be present. The condition's referenced columns are also table columns from the user's WHERE clause. rowIdAttrs and metadataAttrs can be absent from relation.output (they're resolved separately), but they're not part of the filter. They're appended unconditionally afterward via dedupAttrs(narrowOutput ++ rowIdAttrs ++ metadataAttrs)
rowIdAttrs seems to be added 2 times. If we already have dedupAttrs() then probably doesn't make sense to build AttributeSets.
Agreed. I fixed it.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Could you resolve the conflicts, @anuragmantri ?
ae635f4 to
a99bb2d
Compare
Thanks. I rebased and fixed the conflicts. |
| return new NamedReference[0]; | ||
| } | ||
|
|
||
|
|
There was a problem hiding this comment.
nit. Remove redundant empty line.
| * including the columns being updated. If {@link #requiredDataAttributes()} returns an empty | ||
| * array, Spark sends only the non-identity assigned columns (heuristic path). | ||
| * | ||
| * @since 4.2.0 |
| * <p> | ||
| * When empty (the default), Spark falls back to sending only the non-identity assigned columns. | ||
| * | ||
| * @since 4.2.0 |
| val table = buildOperationTable(tbl, UPDATE, CaseInsensitiveStringMap.empty()) | ||
| val updatedCols = assignments.collect { | ||
| case Assignment(key: AttributeReference, value) | ||
| if !isIdentityAssignment(key, value) => |
There was a problem hiding this comment.
One liner doesn't violate the line-length rule, does it?
- case Assignment(key: AttributeReference, value)
- if !isIdentityAssignment(key, value) =>
+ case Assignment(key: AttributeReference, value) if !isIdentityAssignment(key, value) =>| // | ||
| // When dataAttrs is non-empty, the relation output is narrowed to include only columns | ||
| // required for a column-update write. When dataAttrs is empty, the full relation.output is | ||
| // preserved. |
There was a problem hiding this comment.
For function description, please follow the community style like the other code path.
/**
* ...
*/
| // When the connector supports column updates and declares required data attributes, | ||
| // the read relation is narrowed at analysis time so that | ||
| // GroupBasedRowLevelOperationScanPlanning uses only the needed columns for the scan. | ||
| // Otherwise the full relation output is used. |
There was a problem hiding this comment.
For function description, please follow the community style like the other code path.
/**
* ...
*/
| WriteDelta(writeRelation, cond, rowDeltaPlan, relation, projections, groupFilterCond) | ||
| } | ||
|
|
||
| // Builds the row delta projection for the column update path. |
There was a problem hiding this comment.
For function description, please follow the community style like the other code path.
/**
* ...
*/
| dataAttrsResolved(inRowAttrs) | ||
| } | ||
|
|
||
| // Validates the narrow-write-schema row projection output. |
There was a problem hiding this comment.
For function description, please follow the community style like the other code path.
/**
* ...
*/
| table.skipSchemaResolution || areCompatible(inRowAttrs, outRowAttrs) | ||
| table.skipSchemaResolution || | ||
| areCompatible(inRowAttrs, outRowAttrs) || | ||
| dataAttrsResolved(inRowAttrs) |
There was a problem hiding this comment.
nit. Please minimize the change of existing code as much as possible like the following.
table.skipSchemaResolution || areCompatible(inRowAttrs, outRowAttrs) ||
dataAttrsResolved(inRowAttrs)
| * is ignored and the full table row is sent (the default behavior). | ||
| * <p> | ||
| * When non-empty, the returned columns become the write schema in declared order. | ||
| * The connector must declare all columns it wants to receive, including the columns being |
There was a problem hiding this comment.
This is very strong assumption, but it seems that this PR didn't have a protection. May I ask if we have some kind of assertion or a test coverage for this?
There was a problem hiding this comment.
Each column the connector returns passes through V2ExpressionUtils.resolveRefs which throws AnalysisException if the column is non existent.
I added a test test("column-update: requiredDataAttributes throws AnalysisException for invalid column")
| // | ||
| // ColumnPruning observes exactly these references and narrows the physical scan accordingly. | ||
| // Connectors that need additional columns in the scan (e.g., partition columns for | ||
| // distribution) should declare them in requiredDataAttributes(). |
There was a problem hiding this comment.
IIUC, for the correctness, we need to throw AnalysisException if requiredDataAttributes is invalid.
There was a problem hiding this comment.
Each column the connector returns passes through V2ExpressionUtils.resolveRefs which throws AnalysisException if the column is non existent.
I added a test test("column-update: requiredDataAttributes throws AnalysisException for invalid column")
| // Connectors that need additional columns in the scan (e.g., partition columns for | ||
| // distribution) should declare them in requiredDataAttributes(). | ||
| // | ||
| // Note: AlignUpdateAssignments guarantees all assignment keys are top-level |
There was a problem hiding this comment.
Do we have a test coverage for this, AlignUpdateAssignments contract?
There was a problem hiding this comment.
I added a new test test("column-update: nested struct field update narrows to the root struct column") that updates an inner field in a struct, the AlignUpdateAssignment returns only the root key.
| * whether pk is already in the updated columns list and, if not, add it to | ||
| * requiredDataAttributes(). | ||
| * | ||
| * @since 4.2.0 |
| // build a plan to replace read groups in the table | ||
| val writeRelation = relation.copy(table = operationTable) | ||
| val projections = buildReplaceDataProjections(query, relation.output, metadataAttrs) | ||
| val query = updatedAndRemainingRowsPlan |
There was a problem hiding this comment.
This looks like duplications: Let's use one variable instead of mixing two variables, updatedAndRemainingRowsPlan and query.
There was a problem hiding this comment.
Done, used a single variable
| // GroupBasedRowLevelOperationScanPlanning needs explicit column declarations to narrow. | ||
| val rowAttrs: Seq[Attribute] = if (isNarrow) connectorDataAttrs else relation.output | ||
|
|
||
| (readRelation, rowAttrs) |
There was a problem hiding this comment.
Please return metadataAttrs too to avoid the following recomputation in the caller-side.
val metadataAttrs = resolveRequiredMetadataAttrs(relation, operationTable.operation)
There was a problem hiding this comment.
I changed this to return metadataAttrs too.
| // | ||
| // Works for both the full-scan and narrow-scan CoW paths. In the narrow case, | ||
| // readRelation.output is already restricted by buildCoWReadSetup, so projecting | ||
| // all plan.output gives the correct narrow write schema. |
There was a problem hiding this comment.
Use function description style.
| * | ||
| * @since 4.2.0 | ||
| */ | ||
| default boolean supportsColumnUpdates() { |
There was a problem hiding this comment.
Given the scope of this PR, shall we mention that DELETE and MERGE ignores this method?
| * | ||
| * @since 4.2.0 | ||
| */ | ||
| default NamedReference[] requiredDataAttributes() { |
There was a problem hiding this comment.
Given the scope of this PR, shall we mention that DELETE and MERGE ignores this method?
There was a problem hiding this comment.
Even though the scope of this PR is UPDATE only, we'd like this API to work for MERGE as well (DELETE doesn't benefit since it doesn't write data columns). I'm still assessing what it takes and will add a section in the SPIP on how it could be implemented.
Happy to add a "currently only consulted for UPDATE" note in the Javadoc for now and remove it when MERGE support lands.
|
I finished the first round review, @anuragmantri . |
There was a problem hiding this comment.
Thanks for the review @dongjoon-hyun. I addressed your comments and cleaned up some AI generated comments which were redundant.
| return new NamedReference[0]; | ||
| } | ||
|
|
||
|
|
| * including the columns being updated. If {@link #requiredDataAttributes()} returns an empty | ||
| * array, Spark sends only the non-identity assigned columns (heuristic path). | ||
| * | ||
| * @since 4.2.0 |
| * is ignored and the full table row is sent (the default behavior). | ||
| * <p> | ||
| * When non-empty, the returned columns become the write schema in declared order. | ||
| * The connector must declare all columns it wants to receive, including the columns being |
There was a problem hiding this comment.
Each column the connector returns passes through V2ExpressionUtils.resolveRefs which throws AnalysisException if the column is non existent.
I added a test test("column-update: requiredDataAttributes throws AnalysisException for invalid column")
| * whether pk is already in the updated columns list and, if not, add it to | ||
| * requiredDataAttributes(). | ||
| * | ||
| * @since 4.2.0 |
| // | ||
| // When dataAttrs is non-empty, the relation output is narrowed to include only columns | ||
| // required for a column-update write. When dataAttrs is empty, the full relation.output is | ||
| // preserved. |
| // Connectors that need additional columns in the scan (e.g., partition columns for | ||
| // distribution) should declare them in requiredDataAttributes(). | ||
| // | ||
| // Note: AlignUpdateAssignments guarantees all assignment keys are top-level |
There was a problem hiding this comment.
I added a new test test("column-update: nested struct field update narrows to the root struct column") that updates an inner field in a struct, the AlignUpdateAssignment returns only the root key.
| // | ||
| // ColumnPruning observes exactly these references and narrows the physical scan accordingly. | ||
| // Connectors that need additional columns in the scan (e.g., partition columns for | ||
| // distribution) should declare them in requiredDataAttributes(). |
There was a problem hiding this comment.
Each column the connector returns passes through V2ExpressionUtils.resolveRefs which throws AnalysisException if the column is non existent.
I added a test test("column-update: requiredDataAttributes throws AnalysisException for invalid column")
| dataAttrsResolved(inRowAttrs) | ||
| } | ||
|
|
||
| // Validates the narrow-write-schema row projection output. |
| table.skipSchemaResolution || areCompatible(inRowAttrs, outRowAttrs) | ||
| table.skipSchemaResolution || | ||
| areCompatible(inRowAttrs, outRowAttrs) || | ||
| dataAttrsResolved(inRowAttrs) |
| * | ||
| * @since 4.2.0 | ||
| */ | ||
| default NamedReference[] requiredDataAttributes() { |
There was a problem hiding this comment.
Even though the scope of this PR is UPDATE only, we'd like this API to work for MERGE as well (DELETE doesn't benefit since it doesn't write data columns). I'm still assessing what it takes and will add a section in the SPIP on how it could be implemented.
Happy to add a "currently only consulted for UPDATE" note in the Javadoc for now and remove it when MERGE support lands.
| .getOrElse { | ||
| throw new AnalysisException( | ||
| errorClass = "_LEGACY_ERROR_TEMP_3075", | ||
| messageParameters = Map( | ||
| "tableAttr" -> tableAttr.toString, | ||
| "scanAttrs" -> scanAttrs.mkString(","))) | ||
| } | ||
| } |
There was a problem hiding this comment.
I believe this is safe because condition-referenced columns are guaranteed to be in the scan. Please correct me if I'm wrong.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Thank you for updating, @anuragmantri .
BTW, I cannot find the vote for the mentioned SPIP. Does pass the vote officially, @anuragmantri ? For SPIP, we need an official vote result to move forward including merging something, don't we? (cc @huaxingao as the Shepherd of SPARK-56599 JIRA issue)
What changes were proposed in this pull request?
For SPIP: SPARK-56599
cc @aokolnychyi too because RowLevelOperation.java has been never changed since being added 4 years ago via the following.
|
Thanks for the review @dongjoon-hyun. For the SPIP, we are waiting for a few more maintainers to also review the design as well as the PR before going for a vote. |
e806004 to
4060cbf
Compare
What changes were proposed in this pull request?
For SPIP: SPARK-56599
This PR adds three new default methods to the DSv2 connector API to enable scan and write-schema narrowing for column-level UPDATEs:
updatedColumns()on RowLevelOperationInfo — Spark informs the connector which columns are being assigned (non-identity only) before the operation isbuilt.
requiredDataAttributes()on RowLevelOperation — the connector declares the exact set of data columns it needs in the write schema, symmetric withrequiredMetadataAttributes().supportsColumnUpdates()on RowLevelOperation — explicit opt-in for receiving a partial row instead of the full table row.When a connector opts in, Spark removes identity assignments from the write plan's Project node, unblocking ColumnPruning to narrow the physical scan automatically (MOR path). For CoW, scan narrowing is done at analysis time via
buildRelationWithAttrs()since GroupBasedRowLevelOperationScanPlanning reads DataSourceV2Relation.output before ColumnPruning fires.All three methods have default implementations that preserve today's full-row behavior. No existing connector is affected.
Why are the changes needed?
Today, Spark's analyzer generates identity assignments for every column during UPDATE alignment. These are used to build a Project that references all columns , preventing Optimizer from narrowing the scan. The cost scales as O(table width) regardless of how many columns are being updated.
This is especially wasteful for columnar formats like Parquet/Iceberg and is a blocker for efficient column-level update implementations in connectors (see the Efficient Column Updates Proposal in Iceberg).
Does this PR introduce any user-facing change?
Yes. Three new default methods are added to the public DSv2 connector API:
RowLevelOperation.supportsColumnUpdates()RowLevelOperation.requiredDataAttributes()RowLevelOperationInfo.updatedColumns()All are opt-in with backward-compatible defaults. Existing connectors see no change.
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6
I used Claude Code to generate code and tests and manually reviewed the generated code.