Skip to content

[SPARK-57080][SDP] Register AutoCDC Flows from PipelinesHandler#56124

Open
AnishMahto wants to merge 5 commits into
apache:masterfrom
AnishMahto:SPARK-56957-register-autocdc-flow-from-pipelineshandler
Open

[SPARK-57080][SDP] Register AutoCDC Flows from PipelinesHandler#56124
AnishMahto wants to merge 5 commits into
apache:masterfrom
AnishMahto:SPARK-56957-register-autocdc-flow-from-pipelineshandler

Conversation

@AnishMahto
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

In the PipelinesHandler, register an AutoCdcFlow when a DefineFlow proto is received with AUTO_CDC_FLOW_DETAILS.

This is the final step in connecting a spark connect client to the spark connect server's SDP engine for AutoCDC flows. With these changes, a user should be able to run their SDP with AutoCDC flows using the pipelines CLI runner.

Why are the changes needed?

Allows spark connect clients to actually register and execute AutoCDC flows within their pipelines.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Test graph registration and construction through Python client in PythonPipelineSuite.

Was this patch authored or co-authored using generative AI tooling?

Co-authored.

Generated-by: Claude-Opus-4.7-thinking-xhigh

@AnishMahto
Copy link
Copy Markdown
Contributor Author

@szehon-ho @gengliangwang

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a careful pass through this PR (focus on the SDP Connect wiring). Overall this is a focused, well-scoped change that follows existing UntypedFlow registration patterns — happy to see AutoCDC reach end-to-end through Connect. Core implementation is sound and test coverage at the registration/mapping layer is solid.

A few correctness, error-handling, and coverage items I'd like to see addressed before merge — all inline below. The most important is the source identifier handling (comment #1): the proto/Python API don't constrain it, but the current server-side wrapping silently mishandles multi-part names. My recommendation is to restrict and document, rather than partially-support.

Summary of comments:

  1. (PipelinesHandler) Restrict source to single-part and document it (also: update proto comment, Python docstring, add negative tests). Most important item.
  2. (PipelinesHandler) .sql-based column-name extraction is fragile for non-attribute expressions; extract from UnresolvedAttribute directly.
  3. (PipelinesHandler) Use AnalysisException + structured error class instead of IllegalArgumentException for the column_list / except_column_list conflict.
  4. (PipelinesHandler) Nit: drop fully-qualified scala.collection.immutable.Seq.
  5. (PipelinesHandler) Add a TODO listing the proto fields that aren't honored yet (apply_as_truncates, ignore_null_updates_*).
  6. (PythonPipelineSuite) Duplicate SCD-type coverage.
  7. (PythonPipelineSuite) Additional negative-/end-to-end tests.

Housekeeping (non-blocking):

  • PR title references SPARK-57080 but branch is SPARK-56957-.... Worth aligning.
  • The fork's linter check is currently failing — please check before merge.

Generated-by: Claude-Opus-4.7-thinking-xhigh

Copy link
Copy Markdown
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Focused PR — this is the last piece of wiring for AutoCDC flows through Spark Connect (the proto fields, Python client, and server-side AutoCdcFlow model already exist; this connects them). Implementation follows the existing UntypedFlow registration shape.

@szehon-ho's prior review already covers the substantive items (single-part source enforcement, .sql-based column extraction, structured AnalysisException, un-honored proto fields, test gaps). I agree with those findings and won't restate them inline. One additional comment below on a documentation-accuracy point that overlaps his comment on the column_list / except_column_list throw and may be useful regardless of how that comment is resolved.

One meta note on the scala.collection.immutable.Seq(...) style nit (separate thread): I'd hold off on "just use Seq(...)". The file imports scala.collection.Seq at line 20, so plain Seq(...) here resolves to scala.collection.Seq and won't satisfy UnresolvedRelation.multipartIdentifier: scala.collection.immutable.Seq[String]. The FQN is deliberate.

Generated-by: Claude-Opus-4.7

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough response to the first round — all the substantive items (source identifier handling, column extraction, structured AnalysisException for the column-list conflict, TODOs for un-honored proto fields, end-to-end resolution test) are addressed cleanly. Approving on the strength of those.

Leaving a few inline follow-ups, all non-blocking — feel free to fold in here or in a follow-up:

  1. Duplicate SCD-type test could be replaced by a SCD_TYPE_2 negative test (the only currently-uncovered arm).
  2. The two single-part-source resolution tests overlap; either drop one or make one exercise pipeline-level defaults that differ from session defaults.
  3. buildAutoCdcFlow doesn't guard against an unset proto source / sequence_by (Python always populates, but a non-Python client wouldn't).
  4. (Soft / discussion only) SQLSTATE for AUTOCDC_NON_COLUMN_IDENTIFIER — happy to defer since the neighbor AUTOCDC_MULTIPART_COLUMN_IDENTIFIER uses the same code.

One housekeeping item that's worth confirming before merge but isn't a code review point: Run / Linters, licenses, and dependencies (Scala linter step) is still red on d517ddff. The fork's annotation API doesn't surface a line, so a local ./dev/lint-scala is probably the fastest path. Also please confirm SPARK-57092 / SPARK-57093 exist as real JIRAs (the TODO references).

Generated-by: Claude-Opus-4.7

Comment thread common/utils/src/main/resources/error/error-conditions.json Outdated
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if all comments from @szehon-ho are addressed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants