-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1 #56045
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,12 +14,12 @@ | |
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
| from typing import Callable, Dict, List, Optional, Union, overload | ||
| from typing import Callable, Dict, List, Literal, Optional, Union, overload | ||
|
|
||
| from pyspark.errors import PySparkTypeError | ||
| from pyspark.pipelines.graph_element_registry import get_active_graph_element_registry | ||
| from pyspark.pipelines.type_error_utils import validate_optional_list_of_str_arg | ||
| from pyspark.pipelines.flow import Flow, QueryFunction | ||
| from pyspark.pipelines.flow import AutoCdcFlow, Flow, QueryFunction | ||
| from pyspark.pipelines.source_code_location import ( | ||
| get_caller_source_code_location, | ||
| ) | ||
|
|
@@ -29,6 +29,8 @@ | |
| TemporaryView, | ||
| Sink, | ||
| ) | ||
| from pyspark.sql import Column | ||
| from pyspark.sql import functions as F | ||
| from pyspark.sql.types import StructType | ||
|
|
||
|
|
||
|
|
@@ -525,3 +527,136 @@ def create_sink( | |
| comment=None, | ||
| ) | ||
| get_active_graph_element_registry().register_output(sink) | ||
|
|
||
|
|
||
| def create_auto_cdc_flow( | ||
| target: str, | ||
| source: str, | ||
| keys: Union[List[str], List[Column]], | ||
| sequence_by: Union[str, Column], | ||
| apply_as_deletes: Optional[Union[str, Column]] = None, | ||
| apply_as_truncates: Optional[Union[str, Column]] = None, | ||
| column_list: Optional[Union[List[str], List[Column]]] = None, | ||
| except_column_list: Optional[Union[List[str], List[Column]]] = None, | ||
| stored_as_scd_type: Optional[Literal[1, "1"]] = None, | ||
| name: Optional[str] = None, | ||
| ignore_null_updates_column_list: Optional[Union[List[str], List[Column]]] = None, | ||
| ignore_null_updates_except_column_list: Optional[Union[List[str], List[Column]]] = None, | ||
|
Comment on lines
+543
to
+544
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add these API later when ignore null execution support is actually built.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we not building that?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will eventually (hopefully soon!), but I'm generally in favor of only adding the API once the feature is built. Otherwise we'll just be throwing a not support exception anyway if the user tries specifying an ignore null column selection. |
||
| ) -> None: | ||
| """ | ||
| Create an Auto CDC flow into the target table from the Change Data Capture (CDC) source. | ||
| Target table must have already been created using create_streaming_table function. Only one | ||
| of column_list and except_column_list can be specified. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doc says mutual exclusion and non-empty |
||
|
|
||
| Example: | ||
| create_auto_cdc_flow( | ||
| target = "target", | ||
| source = "source", | ||
| keys = ["key"], | ||
| sequence_by = "sequence_expr", | ||
| ignore_null_updates_column_list = ["value"], | ||
| column_list = ["key", "value"], | ||
| ) | ||
|
|
||
| Note that for keys, sequence_by, column_list, except_column_list, | ||
| ignore_null_updates_column_list, and ignore_null_updates_except_column_list the arguments | ||
| have to be column identifiers without qualifiers, e.g. they cannot be | ||
| col("sourceTable.keyId"). | ||
|
|
||
| :param target: The name of the target table that receives the Auto CDC flow. | ||
| :param source: The name of the CDC source to stream from. | ||
| :param keys: The column or combination of columns that uniquely identify a row in the source \ | ||
| data. This is used to identify which CDC events apply to specific records in the target \ | ||
| table. These keys also identify records in the target table, e.g., if there exists a record \ | ||
| for given keys and the CDC source has an UPSERT operation for the same keys, we will update \ | ||
| the existing record. At least one key must be provided. This should be a list of column \ | ||
| identifiers without qualifiers, expressed as either Python strings or Pyspark Columns. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: |
||
| :param sequence_by: An expression that we use to order the source data. This can be expressed \ | ||
| as either a Python string or Pyspark Expression. | ||
| :param apply_as_deletes: Delete condition for the merged operation. This should be a string of \ | ||
| expression e.g. "operation = 'DELETE'" | ||
| :param apply_as_truncates: Truncate condition for the merged operation. This should be a string \ | ||
| expression e.g. "operation = 'TRUNCATE'" | ||
| :param column_list: Columns that will be included in the output table. This should be a list \ | ||
| of column identifiers without qualifiers, expressed as either Python strings or Pyspark \ | ||
| Column. Only one of column_list and except_column_list can be specified. | ||
| :param except_column_list: Columns that will be excluded in the output table. This should be a \ | ||
| list of column identifiers without qualifiers, expressed as either Python strings or Pyspark \ | ||
| Column. Only one of column_list and except_column_list can be specified. When this is \ | ||
| specified, all columns in the dataframe of the target table except those in this list will \ | ||
| be in the output table. | ||
| :param stored_as_scd_type: The SCD type for the target table. Only 1 (or "1") is supported. \ | ||
| When not specified the server default applies. | ||
| :param name: The name of the flow for this create_auto_cdc_flow command. When unspecified this \ | ||
| will build a "default flow" with name equal to the target name. | ||
| :param ignore_null_updates_column_list: Subset of columns to ignore null values in during \ | ||
| updates. When a source row has a null for one of these columns, the existing value in the \ | ||
| target is preserved. Only one of ignore_null_updates_column_list and \ | ||
| ignore_null_updates_except_column_list can be specified. | ||
| :param ignore_null_updates_except_column_list: Columns excluded from null-update ignoring. \ | ||
| All other columns will have null values ignored during updates. Only one of \ | ||
| ignore_null_updates_column_list and ignore_null_updates_except_column_list can be specified. | ||
| """ | ||
| keys = _normalize_column_list(keys) | ||
|
|
||
| column_list = _normalize_optional_column_list(column_list) | ||
| except_column_list = _normalize_optional_column_list(except_column_list) | ||
| ignore_null_updates_column_list = _normalize_optional_column_list( | ||
| ignore_null_updates_column_list | ||
| ) | ||
| ignore_null_updates_except_column_list = _normalize_optional_column_list( | ||
| ignore_null_updates_except_column_list | ||
| ) | ||
|
|
||
| if isinstance(sequence_by, str): | ||
| sequence_by = F.expr(sequence_by) | ||
|
|
||
| if isinstance(apply_as_deletes, str): | ||
| apply_as_deletes = F.expr(apply_as_deletes) | ||
|
|
||
| if isinstance(apply_as_truncates, str): | ||
| apply_as_truncates = F.expr(apply_as_truncates) | ||
|
|
||
| if stored_as_scd_type is not None and str(stored_as_scd_type) != "1": | ||
| raise PySparkTypeError( | ||
| errorClass="NOT_EXPECTED_TYPE", | ||
| messageParameters={ | ||
| "arg_name": "stored_as_scd_type", | ||
| "expected_type": "Literal[1, '1']", | ||
| "arg_type": type(stored_as_scd_type).__name__, | ||
| }, | ||
| ) | ||
|
|
||
| source_code_location = get_caller_source_code_location(stacklevel=1) | ||
|
|
||
| flow = AutoCdcFlow( | ||
| name=name, | ||
| target=target, | ||
| source=source, | ||
| keys=keys, | ||
| sequence_by=sequence_by, | ||
| apply_as_deletes=apply_as_deletes, | ||
| apply_as_truncates=apply_as_truncates, | ||
| column_list=column_list, | ||
| except_column_list=except_column_list, | ||
| stored_as_scd_type=stored_as_scd_type, | ||
| ignore_null_updates_column_list=ignore_null_updates_column_list, | ||
| ignore_null_updates_except_column_list=ignore_null_updates_except_column_list, | ||
| source_code_location=source_code_location, | ||
| ) | ||
|
|
||
| get_active_graph_element_registry().register_auto_cdc_flow(flow) | ||
|
|
||
|
|
||
| def _normalize_optional_column_list( | ||
| column_list: Optional[Union[List[str], List[Column]]], | ||
| ) -> Optional[List[Column]]: | ||
| if column_list is None: | ||
| return None | ||
| return _normalize_column_list(column_list) | ||
|
|
||
|
|
||
| def _normalize_column_list( | ||
| column_list: Union[List[str], List[Column]], | ||
| ) -> List[Column]: | ||
| return [F.col(c) if isinstance(c, str) else c for c in column_list] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add tests for string args ( |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,8 +17,13 @@ | |
| from pathlib import Path | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: import shuffle only — consider keeping prior order to shrink diff. |
||
|
|
||
| from pyspark.errors import PySparkTypeError | ||
| from pyspark.sql import SparkSession | ||
| from pyspark.sql import SparkSession, Column | ||
| from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame | ||
| from pyspark.sql.connect.types import pyspark_types_to_proto_types | ||
| from pyspark.sql.types import StructType | ||
| from pyspark.pipelines.add_pipeline_analysis_context import add_pipeline_analysis_context | ||
| from pyspark.pipelines.flow import AutoCdcFlow, Flow | ||
| from pyspark.pipelines.graph_element_registry import GraphElementRegistry | ||
| from pyspark.pipelines.output import ( | ||
| Output, | ||
| MaterializedView, | ||
|
|
@@ -27,14 +32,10 @@ | |
| StreamingTable, | ||
| TemporaryView, | ||
| ) | ||
| from pyspark.pipelines.flow import Flow | ||
| from pyspark.pipelines.graph_element_registry import GraphElementRegistry | ||
| from pyspark.pipelines.source_code_location import SourceCodeLocation | ||
| from pyspark.sql.connect.types import pyspark_types_to_proto_types | ||
| from pyspark.sql.types import StructType | ||
| from typing import Any, cast | ||
| from typing import Any, List, Optional, cast | ||
|
|
||
| import pyspark.sql.connect.proto as pb2 | ||
| from pyspark.pipelines.add_pipeline_analysis_context import add_pipeline_analysis_context | ||
|
|
||
|
|
||
| class SparkConnectGraphElementRegistry(GraphElementRegistry): | ||
|
|
@@ -133,6 +134,47 @@ def register_flow(self, flow: Flow) -> None: | |
| command.pipeline_command.define_flow.CopyFrom(inner_command) | ||
| self._client.execute_command(command) | ||
|
|
||
| def register_auto_cdc_flow(self, flow: AutoCdcFlow) -> None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After server implements |
||
| from pyspark.sql.connect.column import Column as ConnectColumn | ||
|
|
||
| def to_plan(col: Column) -> Any: | ||
| return cast(ConnectColumn, col).to_plan(self._client) | ||
|
|
||
| def to_plans(cols: Optional[List[Column]]) -> list: | ||
| return [] if cols is None else [to_plan(c) for c in cols] | ||
|
|
||
| auto_cdc_details = pb2.PipelineCommand.DefineFlow.AutoCdcFlowDetails( | ||
| source=flow.source, | ||
| keys=to_plans(flow.keys), | ||
| sequence_by=to_plan(flow.sequence_by), | ||
| column_list=to_plans(flow.column_list), | ||
| except_column_list=to_plans(flow.except_column_list), | ||
| ignore_null_updates_column_list=to_plans(flow.ignore_null_updates_column_list), | ||
| ignore_null_updates_except_column_list=to_plans( | ||
| flow.ignore_null_updates_except_column_list | ||
| ), | ||
| ) | ||
| if flow.stored_as_scd_type is not None: | ||
| auto_cdc_details.stored_as_scd_type = pb2.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_1 | ||
| if flow.apply_as_deletes is not None: | ||
| auto_cdc_details.apply_as_deletes.CopyFrom(to_plan(flow.apply_as_deletes)) | ||
| if flow.apply_as_truncates is not None: | ||
| auto_cdc_details.apply_as_truncates.CopyFrom(to_plan(flow.apply_as_truncates)) | ||
|
|
||
| inner_command = pb2.PipelineCommand.DefineFlow( | ||
| dataflow_graph_id=self._dataflow_graph_id, | ||
| target_dataset_name=flow.target, | ||
| auto_cdc_flow_details=auto_cdc_details, | ||
| sql_conf={}, | ||
| source_code_location=source_code_location_to_proto(flow.source_code_location), | ||
| ) | ||
| if flow.name is not None: | ||
| inner_command.flow_name = flow.name | ||
|
|
||
| command = pb2.Command() | ||
| command.pipeline_command.define_flow.CopyFrom(inner_command) | ||
| self._client.execute_command(command) | ||
|
|
||
| def register_sql(self, sql_text: str, file_path: Path) -> None: | ||
| inner_command = pb2.PipelineCommand.DefineSqlGraphElements( | ||
| dataflow_graph_id=self._dataflow_graph_id, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a heads up, there's a good chance we're not going to get
apply_as_truncatesfunctionality merged in for the 4.2 cut.I'll most likely drop this argument when I connect these APIs to the graph registration context on the spark connect backend, and then add it back for spark 4.3+.
No action needed on your side, just giving the heads up.