Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/pyspark/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#
from pyspark.pipelines.api import (
append_flow,
create_auto_cdc_flow,
create_streaming_table,
materialized_view,
table,
Expand All @@ -25,6 +26,7 @@

__all__ = [
"append_flow",
"create_auto_cdc_flow",
"create_streaming_table",
"materialized_view",
"table",
Expand Down
139 changes: 137 additions & 2 deletions python/pyspark/pipelines/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Callable, Dict, List, Optional, Union, overload
from typing import Callable, Dict, List, Literal, Optional, Union, overload

from pyspark.errors import PySparkTypeError
from pyspark.pipelines.graph_element_registry import get_active_graph_element_registry
from pyspark.pipelines.type_error_utils import validate_optional_list_of_str_arg
from pyspark.pipelines.flow import Flow, QueryFunction
from pyspark.pipelines.flow import AutoCdcFlow, Flow, QueryFunction
from pyspark.pipelines.source_code_location import (
get_caller_source_code_location,
)
Expand All @@ -29,6 +29,8 @@
TemporaryView,
Sink,
)
from pyspark.sql import Column
from pyspark.sql import functions as F
from pyspark.sql.types import StructType


Expand Down Expand Up @@ -525,3 +527,136 @@ def create_sink(
comment=None,
)
get_active_graph_element_registry().register_output(sink)


def create_auto_cdc_flow(
target: str,
source: str,
keys: Union[List[str], List[Column]],
sequence_by: Union[str, Column],
apply_as_deletes: Optional[Union[str, Column]] = None,
apply_as_truncates: Optional[Union[str, Column]] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a heads up, there's a good chance we're not going to get apply_as_truncates functionality merged in for the 4.2 cut.

I'll most likely drop this argument when I connect these APIs to the graph registration context on the spark connect backend, and then add it back for spark 4.3+.

No action needed on your side, just giving the heads up.

column_list: Optional[Union[List[str], List[Column]]] = None,
except_column_list: Optional[Union[List[str], List[Column]]] = None,
stored_as_scd_type: Optional[Literal[1, "1"]] = None,
name: Optional[str] = None,
ignore_null_updates_column_list: Optional[Union[List[str], List[Column]]] = None,
ignore_null_updates_except_column_list: Optional[Union[List[str], List[Column]]] = None,
Comment on lines +543 to +544
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add these API later when ignore null execution support is actually built.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we not building that?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will eventually (hopefully soon!), but I'm generally in favor of only adding the API once the feature is built. Otherwise we'll just be throwing a not support exception anyway if the user tries specifying an ignore null column selection.

) -> None:
"""
Create an Auto CDC flow into the target table from the Change Data Capture (CDC) source.
Target table must have already been created using create_streaming_table function. Only one
of column_list and except_column_list can be specified.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc says mutual exclusion and non-empty keys, but nothing enforces it. Validate after normalization (like other SDP APIs) so users get a clear client error.


Example:
create_auto_cdc_flow(
target = "target",
source = "source",
keys = ["key"],
sequence_by = "sequence_expr",
ignore_null_updates_column_list = ["value"],
column_list = ["key", "value"],
)

Note that for keys, sequence_by, column_list, except_column_list,
ignore_null_updates_column_list, and ignore_null_updates_except_column_list the arguments
have to be column identifiers without qualifiers, e.g. they cannot be
col("sourceTable.keyId").

:param target: The name of the target table that receives the Auto CDC flow.
:param source: The name of the CDC source to stream from.
:param keys: The column or combination of columns that uniquely identify a row in the source \
data. This is used to identify which CDC events apply to specific records in the target \
table. These keys also identify records in the target table, e.g., if there exists a record \
for given keys and the CDC source has an UPSERT operation for the same keys, we will update \
the existing record. At least one key must be provided. This should be a list of column \
identifiers without qualifiers, expressed as either Python strings or Pyspark Columns.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: PysparkPySpark in this docstring (573, 575, 581–585).

:param sequence_by: An expression that we use to order the source data. This can be expressed \
as either a Python string or Pyspark Expression.
:param apply_as_deletes: Delete condition for the merged operation. This should be a string of \
expression e.g. "operation = 'DELETE'"
:param apply_as_truncates: Truncate condition for the merged operation. This should be a string \
expression e.g. "operation = 'TRUNCATE'"
:param column_list: Columns that will be included in the output table. This should be a list \
of column identifiers without qualifiers, expressed as either Python strings or Pyspark \
Column. Only one of column_list and except_column_list can be specified.
:param except_column_list: Columns that will be excluded in the output table. This should be a \
list of column identifiers without qualifiers, expressed as either Python strings or Pyspark \
Column. Only one of column_list and except_column_list can be specified. When this is \
specified, all columns in the dataframe of the target table except those in this list will \
be in the output table.
:param stored_as_scd_type: The SCD type for the target table. Only 1 (or "1") is supported. \
When not specified the server default applies.
:param name: The name of the flow for this create_auto_cdc_flow command. When unspecified this \
will build a "default flow" with name equal to the target name.
:param ignore_null_updates_column_list: Subset of columns to ignore null values in during \
updates. When a source row has a null for one of these columns, the existing value in the \
target is preserved. Only one of ignore_null_updates_column_list and \
ignore_null_updates_except_column_list can be specified.
:param ignore_null_updates_except_column_list: Columns excluded from null-update ignoring. \
All other columns will have null values ignored during updates. Only one of \
ignore_null_updates_column_list and ignore_null_updates_except_column_list can be specified.
"""
keys = _normalize_column_list(keys)

column_list = _normalize_optional_column_list(column_list)
except_column_list = _normalize_optional_column_list(except_column_list)
ignore_null_updates_column_list = _normalize_optional_column_list(
ignore_null_updates_column_list
)
ignore_null_updates_except_column_list = _normalize_optional_column_list(
ignore_null_updates_except_column_list
)

if isinstance(sequence_by, str):
sequence_by = F.expr(sequence_by)

if isinstance(apply_as_deletes, str):
apply_as_deletes = F.expr(apply_as_deletes)

if isinstance(apply_as_truncates, str):
apply_as_truncates = F.expr(apply_as_truncates)

if stored_as_scd_type is not None and str(stored_as_scd_type) != "1":
raise PySparkTypeError(
errorClass="NOT_EXPECTED_TYPE",
messageParameters={
"arg_name": "stored_as_scd_type",
"expected_type": "Literal[1, '1']",
"arg_type": type(stored_as_scd_type).__name__,
},
)

source_code_location = get_caller_source_code_location(stacklevel=1)

flow = AutoCdcFlow(
name=name,
target=target,
source=source,
keys=keys,
sequence_by=sequence_by,
apply_as_deletes=apply_as_deletes,
apply_as_truncates=apply_as_truncates,
column_list=column_list,
except_column_list=except_column_list,
stored_as_scd_type=stored_as_scd_type,
ignore_null_updates_column_list=ignore_null_updates_column_list,
ignore_null_updates_except_column_list=ignore_null_updates_except_column_list,
source_code_location=source_code_location,
)

get_active_graph_element_registry().register_auto_cdc_flow(flow)


def _normalize_optional_column_list(
column_list: Optional[Union[List[str], List[Column]]],
) -> Optional[List[Column]]:
if column_list is None:
return None
return _normalize_column_list(column_list)


def _normalize_column_list(
column_list: Union[List[str], List[Column]],
) -> List[Column]:
return [F.col(c) if isinstance(c, str) else c for c in column_list]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for string args (keys=["id"], sequence_by="ts", etc.), not only Connect col/expr.

40 changes: 39 additions & 1 deletion python/pyspark/pipelines/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
# limitations under the License.
#
from dataclasses import dataclass
from typing import Callable, Dict
from typing import Callable, Dict, List, Literal, Optional

from pyspark.sql import DataFrame
from pyspark.sql import Column
from pyspark.pipelines.source_code_location import SourceCodeLocation

QueryFunction = Callable[[], DataFrame]
Expand All @@ -41,3 +42,40 @@ class Flow:
spark_conf: Dict[str, str]
source_code_location: SourceCodeLocation
func: QueryFunction


@dataclass(frozen=True)
class AutoCdcFlow:
"""Definition of an Auto CDC flow in a pipeline dataflow graph.

An Auto CDC flow applies Change Data Capture (CDC) events from a source to a target
streaming table.

:param name: Optional name of the flow. When None, defaults to the target name.
:param target: The name of the target streaming table.
:param source: The name of the CDC source to stream from.
:param keys: Column(s) that uniquely identify a row in source and target data.
:param sequence_by: Expression used to order the source data.
:param apply_as_deletes: Optional delete condition for the merged operation.
:param apply_as_truncates: Optional truncate condition for the merged operation.
:param column_list: Optional columns to include in the output table.
:param except_column_list: Optional columns to exclude from the output table.
:param stored_as_scd_type: Optional SCD type for the target table. Only 1 is supported.
:param ignore_null_updates_column_list: Subset of columns to ignore null in updates.
:param ignore_null_updates_except_column_list: Columns excluded from null-ignore in updates.
:param source_code_location: The location of the source code that created this flow.
"""

name: Optional[str]
target: str
source: str
keys: List[Column]
sequence_by: Column
apply_as_deletes: Optional[Column]
apply_as_truncates: Optional[Column]
column_list: Optional[List[Column]]
except_column_list: Optional[List[Column]]
stored_as_scd_type: Optional[Literal[1, "1"]]
ignore_null_updates_column_list: Optional[List[Column]]
ignore_null_updates_except_column_list: Optional[List[Column]]
source_code_location: SourceCodeLocation
6 changes: 5 additions & 1 deletion python/pyspark/pipelines/graph_element_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pathlib import Path

from pyspark.pipelines.output import Output
from pyspark.pipelines.flow import Flow
from pyspark.pipelines.flow import AutoCdcFlow, Flow
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Generator, Optional
Expand All @@ -42,6 +42,10 @@ def register_output(self, output: Output) -> None:
def register_flow(self, flow: Flow) -> None:
"""Add the given flow to the registry."""

@abstractmethod
def register_auto_cdc_flow(self, flow: AutoCdcFlow) -> None:
"""Add the given Auto CDC flow to the registry."""

@abstractmethod
def register_sql(self, sql_text: str, file_path: Path) -> None:
"""Register a string containing SQL statements the dataflow graph.
Expand Down
56 changes: 49 additions & 7 deletions python/pyspark/pipelines/spark_connect_graph_element_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
from pathlib import Path
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: import shuffle only — consider keeping prior order to shrink diff.


from pyspark.errors import PySparkTypeError
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, Column
from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame
from pyspark.sql.connect.types import pyspark_types_to_proto_types
from pyspark.sql.types import StructType
from pyspark.pipelines.add_pipeline_analysis_context import add_pipeline_analysis_context
from pyspark.pipelines.flow import AutoCdcFlow, Flow
from pyspark.pipelines.graph_element_registry import GraphElementRegistry
from pyspark.pipelines.output import (
Output,
MaterializedView,
Expand All @@ -27,14 +32,10 @@
StreamingTable,
TemporaryView,
)
from pyspark.pipelines.flow import Flow
from pyspark.pipelines.graph_element_registry import GraphElementRegistry
from pyspark.pipelines.source_code_location import SourceCodeLocation
from pyspark.sql.connect.types import pyspark_types_to_proto_types
from pyspark.sql.types import StructType
from typing import Any, cast
from typing import Any, List, Optional, cast

import pyspark.sql.connect.proto as pb2
from pyspark.pipelines.add_pipeline_analysis_context import add_pipeline_analysis_context


class SparkConnectGraphElementRegistry(GraphElementRegistry):
Expand Down Expand Up @@ -133,6 +134,47 @@ def register_flow(self, flow: Flow) -> None:
command.pipeline_command.define_flow.CopyFrom(inner_command)
self._client.execute_command(command)

def register_auto_cdc_flow(self, flow: AutoCdcFlow) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After server implements AUTO_CDC_FLOW_DETAILS, add a Connect registry test; note in PR that Connect still throws today.

from pyspark.sql.connect.column import Column as ConnectColumn

def to_plan(col: Column) -> Any:
return cast(ConnectColumn, col).to_plan(self._client)

def to_plans(cols: Optional[List[Column]]) -> list:
return [] if cols is None else [to_plan(c) for c in cols]

auto_cdc_details = pb2.PipelineCommand.DefineFlow.AutoCdcFlowDetails(
source=flow.source,
keys=to_plans(flow.keys),
sequence_by=to_plan(flow.sequence_by),
column_list=to_plans(flow.column_list),
except_column_list=to_plans(flow.except_column_list),
ignore_null_updates_column_list=to_plans(flow.ignore_null_updates_column_list),
ignore_null_updates_except_column_list=to_plans(
flow.ignore_null_updates_except_column_list
),
)
if flow.stored_as_scd_type is not None:
auto_cdc_details.stored_as_scd_type = pb2.PipelineCommand.DefineFlow.SCDType.SCD_TYPE_1
if flow.apply_as_deletes is not None:
auto_cdc_details.apply_as_deletes.CopyFrom(to_plan(flow.apply_as_deletes))
if flow.apply_as_truncates is not None:
auto_cdc_details.apply_as_truncates.CopyFrom(to_plan(flow.apply_as_truncates))

inner_command = pb2.PipelineCommand.DefineFlow(
dataflow_graph_id=self._dataflow_graph_id,
target_dataset_name=flow.target,
auto_cdc_flow_details=auto_cdc_details,
sql_conf={},
source_code_location=source_code_location_to_proto(flow.source_code_location),
)
if flow.name is not None:
inner_command.flow_name = flow.name

command = pb2.Command()
command.pipeline_command.define_flow.CopyFrom(inner_command)
self._client.execute_command(command)

def register_sql(self, sql_text: str, file_path: Path) -> None:
inner_command = pb2.PipelineCommand.DefineSqlGraphElements(
dataflow_graph_id=self._dataflow_graph_id,
Expand Down
10 changes: 9 additions & 1 deletion python/pyspark/pipelines/tests/local_graph_element_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import List, Sequence

from pyspark.pipelines.output import Output
from pyspark.pipelines.flow import Flow
from pyspark.pipelines.flow import AutoCdcFlow, Flow
from pyspark.pipelines.graph_element_registry import GraphElementRegistry


Expand All @@ -34,6 +34,7 @@ class LocalGraphElementRegistry(GraphElementRegistry):
def __init__(self) -> None:
self._outputs: List[Output] = []
self._flows: List[Flow] = []
self._auto_cdc_flows: List[AutoCdcFlow] = []
self._sql_files: List[SqlFile] = []

def register_output(self, output: Output) -> None:
Expand All @@ -42,6 +43,9 @@ def register_output(self, output: Output) -> None:
def register_flow(self, flow: Flow) -> None:
self._flows.append(flow)

def register_auto_cdc_flow(self, flow: AutoCdcFlow) -> None:
self._auto_cdc_flows.append(flow)

def register_sql(self, sql_text: str, file_path: Path) -> None:
self._sql_files.append(SqlFile(sql_text, file_path))

Expand All @@ -53,6 +57,10 @@ def outputs(self) -> Sequence[Output]:
def flows(self) -> Sequence[Flow]:
return self._flows

@property
def auto_cdc_flows(self) -> Sequence[AutoCdcFlow]:
return self._auto_cdc_flows

@property
def sql_files(self) -> Sequence[SqlFile]:
return self._sql_files
Loading