Skip to content

[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1#56045

Open
anew wants to merge 3 commits into
apache:masterfrom
anew:autocdc-python-api
Open

[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1#56045
anew wants to merge 3 commits into
apache:masterfrom
anew:autocdc-python-api

Conversation

@anew
Copy link
Copy Markdown
Contributor

@anew anew commented May 21, 2026

What changes were proposed in this pull request?

Adds create_auto_cdc_flow to the the SDP Python API. For now, this will only support SCD Type 1. Parameters:

  • name: the name of the flow
  • target: the target table
  • source: the source dataset with the change events
  • keys: the unique key per row,
  • sequence_by: a sequence id to establish time order
  • apply_as_deletes: a boolean expression indicating whether an event represents a delete
  • apply_as_truncates: a boolean expression indicating whether an event represents a truncation
  • column_list: a list of columns to include in the target table
  • except_column_list: a list of columns to exclude from the target table
  • stored_as_scd_type the SCD type, must be 1
  • ignore_null_updates_column_list: a list of columns for which to ignore null values
  • ignore_null_updates_except_column_list: a list of columns for which not to ignore null values
  • source_code_location: the location in the Python source code that defines this flow

Why are the changes needed?

See the SPIP at https://docs.google.com/document/d/1Hp5BGEYJRHbk6J7XUph3bAPZKRQXKOuV1PEaqZMMRoQ/

Does this PR introduce any user-facing change?

Yes, it introduces a new method in the SDP Python API.

How was this patch tested?

Unit tests were added, using a local graph registry.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

anew added 2 commits May 21, 2026 18:12
- Remove spaces around = in keyword arguments (PEP 8)
- Fix type hint: List[Union[str, Column]] -> Union[List[str], List[Column]]
- Reorder imports and collapse unnecessary line continuations

Co-authored-by: Isaac
Copy link
Copy Markdown
Contributor

@AnishMahto AnishMahto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only real comment is to drop ignore null API for now. LGTM.

assert sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")

def test_create_auto_cdc_flow(self):
from pyspark.sql.connect.functions.builtin import col, expr
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking nit: can just lift imports out of each individual test

self.assertEqual(sink_obj.options["key1"], "value1")
assert sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py")

def test_create_auto_cdc_flow(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking nit: This test can just be collapsed with test_create_auto_cdc_flow_with_all_args

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this tests the behavior with minimal required arguments. But I'd say it should validate that the defaults are correct. Adding that.

Comment on lines +543 to +544
ignore_null_updates_column_list: Optional[Union[List[str], List[Column]]] = None,
ignore_null_updates_except_column_list: Optional[Union[List[str], List[Column]]] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add these API later when ignore null execution support is actually built.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we not building that?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will eventually (hopefully soon!), but I'm generally in favor of only adding the API once the feature is built. Otherwise we'll just be throwing a not support exception anyway if the user tries specifying an ignore null column selection.

keys: Union[List[str], List[Column]],
sequence_by: Union[str, Column],
apply_as_deletes: Optional[Union[str, Column]] = None,
apply_as_truncates: Optional[Union[str, Column]] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a heads up, there's a good chance we're not going to get apply_as_truncates functionality merged in for the 4.2 cut.

I'll most likely drop this argument when I connect these APIs to the graph registration context on the spark connect backend, and then add it back for spark 4.3+.

No action needed on your side, just giving the heads up.

- Move inline imports to module level
- Fix assertNone -> assertIsNone
- Fix assertEqual(stored_as_scd_type, "1") -> assertIsNone for default case
- Add missing assertions for optional fields in test_create_auto_cdc_flow

Co-authored-by: Isaac
Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few nits on validation, tests, imports, and docstring casing.

"""
Create an Auto CDC flow into the target table from the Change Data Capture (CDC) source.
Target table must have already been created using create_streaming_table function. Only one
of column_list and except_column_list can be specified.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc says mutual exclusion and non-empty keys, but nothing enforces it. Validate after normalization (like other SDP APIs) so users get a clear client error.

def _normalize_column_list(
column_list: Union[List[str], List[Column]],
) -> List[Column]:
return [F.col(c) if isinstance(c, str) else c for c in column_list]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for string args (keys=["id"], sequence_by="ts", etc.), not only Connect col/expr.

command.pipeline_command.define_flow.CopyFrom(inner_command)
self._client.execute_command(command)

def register_auto_cdc_flow(self, flow: AutoCdcFlow) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After server implements AUTO_CDC_FLOW_DETAILS, add a Connect registry test; note in PR that Connect still throws today.

@@ -17,8 +17,13 @@
from pathlib import Path
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: import shuffle only — consider keeping prior order to shrink diff.

table. These keys also identify records in the target table, e.g., if there exists a record \
for given keys and the CDC source has an UPSERT operation for the same keys, we will update \
the existing record. At least one key must be provided. This should be a list of column \
identifiers without qualifiers, expressed as either Python strings or Pyspark Columns.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: PysparkPySpark in this docstring (573, 575, 581–585).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants