-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-56956][SDP] Introduce AutoCDC Flow Dataclasses #56042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5a2b7bc
7bef34a
153d5be
5415e00
bc0c1d8
61165a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -176,7 +176,7 @@ private class FlowResolver(rawGraph: DataflowGraph) { | |
| } else { | ||
| f | ||
| } | ||
| convertResolvedToTypedFlow(flowToResolve, maybeNewFuncResult) | ||
| transformUnresolvedFlowToResolvedFlow(flowToResolve, maybeNewFuncResult) | ||
|
|
||
| // If the flow failed due to an UnresolvedDatasetException, it means that one of the | ||
| // flow's inputs wasn't available. After other flows are resolved, these inputs | ||
|
|
@@ -199,9 +199,18 @@ private class FlowResolver(rawGraph: DataflowGraph) { | |
| } | ||
| } | ||
|
|
||
| private def convertResolvedToTypedFlow( | ||
| private def transformUnresolvedFlowToResolvedFlow( | ||
| flow: UnresolvedFlow, | ||
| funcResult: FlowFunctionResult): ResolvedFlow = { | ||
| flow match { | ||
| case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That means an Since this PR documents that AutoCDC is streaming-only ( case acf: AutoCdcFlow =>
if (!funcResult.dataFrame.get.isStreaming) {
throw new AnalysisException(
errorClass = "INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_AUTOCDC_FLOW",
messageParameters = Map("flowIdentifier" -> acf.identifier.quotedString)
)
}
new AutoCdcMergeFlow(acf, funcResult)Happy to defer to the execution PR if you prefer, but worth tracking so we don't rely on destination-type checks alone. |
||
| case utf: UntypedFlow => transformUntypedFlowToResolvedFlow(utf, funcResult) | ||
| } | ||
| } | ||
|
|
||
| private def transformUntypedFlowToResolvedFlow( | ||
| flow: UntypedFlow, | ||
| funcResult: FlowFunctionResult): ResolvedFlow = { | ||
| flow match { | ||
| case _ if flow.once => new AppendOnceFlow(flow, funcResult) | ||
| case _ if funcResult.dataFrame.get.isStreaming => | ||
|
|
@@ -210,7 +219,7 @@ private class FlowResolver(rawGraph: DataflowGraph) { | |
| // then get their results overwritten. | ||
| val mustBeAppend = rawGraph.flowsTo(flow.destinationIdentifier).size > 1 | ||
| new StreamingFlow(flow, funcResult, mustBeAppend = mustBeAppend) | ||
| case _: UnresolvedFlow => new CompleteFlow(flow, funcResult) | ||
| case _ => new CompleteFlow(flow, funcResult) | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,11 +20,19 @@ package org.apache.spark.sql.pipelines.graph | |
| import scala.util.Try | ||
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} | ||
| import org.apache.spark.sql.classic.DataFrame | ||
| import org.apache.spark.sql.pipelines.AnalysisWarning | ||
| import org.apache.spark.sql.pipelines.autocdc.{ | ||
| CaseSensitivityLabels, | ||
| ChangeArgs, | ||
| ColumnSelection, | ||
| Scd1BatchProcessor, | ||
| ScdType | ||
| } | ||
| import org.apache.spark.sql.pipelines.util.InputReadOptions | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.types.{StructField, StructType} | ||
|
|
||
| /** | ||
| * Contains the catalog and database context information for query execution. | ||
|
|
@@ -121,15 +129,56 @@ case class FlowFunctionResult( | |
| } | ||
|
|
||
| /** A [[Flow]] whose output schema and dependencies aren't known. */ | ||
| case class UnresolvedFlow( | ||
| sealed trait UnresolvedFlow extends Flow { | ||
| /** Returns a copy of this flow with the given SQL confs overriding the existing ones. */ | ||
| def withSqlConf(newSqlConf: Map[String, String]): UnresolvedFlow | ||
| } | ||
|
|
||
| /** | ||
| * An [[UnresolvedFlow]] whose execution-type has not yet been determined. | ||
| * | ||
| * In some cases, we know the execution-type for an [[UnresolvedFlow]] even before flow analysis | ||
| * and resolution. For example an AutoCDCFlow is a special unresolved-but-typed flow; we know a | ||
| * flow will be an AutoCDC flow immediately on construction, because it has its own special | ||
| * registration API. Such flows are considered "typed flows", but there isn't any semantic reason | ||
| * yet to explicitly introduce a `TypedFlow` trait/class. | ||
| */ | ||
| case class UntypedFlow( | ||
| identifier: TableIdentifier, | ||
| destinationIdentifier: TableIdentifier, | ||
| func: FlowFunction, | ||
| queryContext: QueryContext, | ||
| sqlConf: Map[String, String], | ||
| override val once: Boolean, | ||
| override val origin: QueryOrigin | ||
| ) extends Flow | ||
| ) extends UnresolvedFlow { | ||
| override def withSqlConf(newSqlConf: Map[String, String]): UntypedFlow = | ||
| copy(sqlConf = newSqlConf) | ||
| } | ||
|
|
||
| /** | ||
| * An unresolved but typed that applies a CDC event stream to a target table via MERGE. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: missing word in the scaladoc — should be something like:
|
||
| * | ||
| * [[AutoCdcFlow]] is a typed flow because it is only supported for streaming, and not as a once | ||
| * flow. Therefore by definition it is a streaming-type flow. | ||
| * | ||
| * In the future once-support for [[AutoCdcFlow]] may be added. | ||
| */ | ||
| case class AutoCdcFlow( | ||
| identifier: TableIdentifier, | ||
| destinationIdentifier: TableIdentifier, | ||
| func: FlowFunction, | ||
| queryContext: QueryContext, | ||
| sqlConf: Map[String, String] = Map.empty, | ||
| comment: Option[String] = None, | ||
| override val origin: QueryOrigin, | ||
| changeArgs: ChangeArgs | ||
| ) extends UnresolvedFlow { | ||
| override val once: Boolean = false | ||
|
|
||
| override def withSqlConf(newSqlConf: Map[String, String]): AutoCdcFlow = | ||
| copy(sqlConf = newSqlConf) | ||
| } | ||
|
|
||
| /** | ||
| * A [[Flow]] whose flow function has been invoked, meaning either: | ||
|
|
@@ -194,3 +243,108 @@ class AppendOnceFlow( | |
|
|
||
| override val once = true | ||
| } | ||
|
|
||
| /** | ||
| * A resolved flow that applies a CDC event stream to a target table via MERGE, in accordance to | ||
| * the configured [[flow.changeArgs]]. | ||
| */ | ||
| class AutoCdcMergeFlow( | ||
| val flow: AutoCdcFlow, | ||
| val funcResult: FlowFunctionResult | ||
| ) extends ResolvedFlow { | ||
| requireReservedPrefixAbsentInSourceColumns() | ||
|
|
||
| def changeArgs: ChangeArgs = flow.changeArgs | ||
|
|
||
| /** | ||
| * Returns the augmented output schema of this flow, which can differ from the schema of the | ||
| * source change-data-feed dataframe. | ||
| * | ||
| * The source dataframe's schema describes the incoming CDC events; the augmented schema here | ||
| * applies the user-specified [[ColumnSelection]] and appends the SCD-specific metadata | ||
| * columns that the AutoCDC MERGE engine projects onto the target table. Downstream | ||
| * dependencies in the pipeline see this augmented schema. | ||
| */ | ||
| override val schema: StructType = { | ||
| val userSelectedSchema = ColumnSelection.applyToSchema( | ||
| schemaName = "changeDataFeed", | ||
| schema = df.schema, | ||
| columnSelection = changeArgs.columnSelection, | ||
| caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis | ||
| ) | ||
|
|
||
| // AutoCDC flows require all key columns to be present in the target table, to adhere to SCD | ||
| // semantics. | ||
| requireKeysPresentInSelectedSchema(userSelectedSchema) | ||
|
|
||
| changeArgs.storedAsScdType match { | ||
| case ScdType.Type1 => | ||
| // SCD1 produces a target table with all the user-selected output columns and a projected | ||
| // CDC operational metadata column at the end. | ||
| StructType( | ||
| userSelectedSchema.fields :+ StructField( | ||
| Scd1BatchProcessor.cdcMetadataColName, | ||
| Scd1BatchProcessor.cdcMetadataColSchema( | ||
| sequencingType = df.select(changeArgs.sequencing).schema.head.dataType | ||
| ), | ||
| nullable = false | ||
| ) | ||
| ) | ||
| case ScdType.Type2 => | ||
| throw new UnsupportedOperationException( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Other AutoCDC validation in this PR uses For consistency with SDP/Connect error handling (and so clients get a stable case ScdType.Type2 =>
throw new AnalysisException(
errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
messageParameters = Map.empty
)(with a corresponding entry in |
||
| "AutoCDC flows do not currently support SCD Type 2 transformations." | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Validate that the resolved source dataframe for the AutoCDC flow does not contain any column | ||
| * names that use the reserved Spark AutoCDC prefix. | ||
| */ | ||
| private def requireReservedPrefixAbsentInSourceColumns(): Unit = { | ||
| val resolver = spark.sessionState.conf.resolver | ||
| val reservedPrefix = Scd1BatchProcessor.reservedColumnNamePrefix | ||
|
|
||
| def nameContainsReservedPrefix(name: String): Boolean = { | ||
| name.length >= reservedPrefix.length && resolver( | ||
| name.substring(0, reservedPrefix.length), | ||
| reservedPrefix | ||
| ) | ||
| } | ||
|
|
||
| df.schema.fieldNames.find(nameContainsReservedPrefix).foreach { conflictingColumnName => | ||
| throw new AnalysisException( | ||
| errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT", | ||
| messageParameters = Map( | ||
| "caseSensitivity" -> CaseSensitivityLabels.of( | ||
| spark.sessionState.conf.caseSensitiveAnalysis | ||
| ), | ||
| "columnName" -> conflictingColumnName, | ||
| "schemaName" -> "changeDataFeed", | ||
| "reservedColumnNamePrefix" -> reservedPrefix | ||
| ) | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Validate all keys specified in changeArgs are actually present in the user-selected schema. | ||
| */ | ||
| private def requireKeysPresentInSelectedSchema(selectedSchema: StructType): Unit = { | ||
| val resolver = spark.sessionState.conf.resolver | ||
|
|
||
| changeArgs.keys | ||
| .find(key => !selectedSchema.fieldNames.exists(name => resolver(name, key.name))) | ||
| .foreach { missingKey => | ||
| throw new AnalysisException( | ||
| errorClass = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA", | ||
| messageParameters = Map( | ||
| "caseSensitivity" -> CaseSensitivityLabels.of( | ||
| spark.sessionState.conf.caseSensitiveAnalysis | ||
| ), | ||
| "keyColumnName" -> missingKey.name | ||
| ) | ||
| ) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: resolveFlow is a lot shorter