@@ -20,11 +20,19 @@ package org.apache.spark.sql.pipelines.graph
2020import scala .util .Try
2121
2222import org .apache .spark .internal .Logging
23+ import org .apache .spark .sql .AnalysisException
2324import org .apache .spark .sql .catalyst .{AliasIdentifier , TableIdentifier }
2425import org .apache .spark .sql .classic .DataFrame
2526import org .apache .spark .sql .pipelines .AnalysisWarning
27+ import org .apache .spark .sql .pipelines .autocdc .{
28+ CaseSensitivityLabels ,
29+ ChangeArgs ,
30+ ColumnSelection ,
31+ Scd1BatchProcessor ,
32+ ScdType
33+ }
2634import org .apache .spark .sql .pipelines .util .InputReadOptions
27- import org .apache .spark .sql .types .StructType
35+ import org .apache .spark .sql .types .{ StructField , StructType }
2836
2937/**
3038 * Contains the catalog and database context information for query execution.
@@ -121,15 +129,56 @@ case class FlowFunctionResult(
121129}
122130
123131/** A [[Flow ]] whose output schema and dependencies aren't known. */
124- case class UnresolvedFlow (
132+ sealed trait UnresolvedFlow extends Flow {
133+ /** Returns a copy of this flow with the given SQL confs overriding the existing ones. */
134+ def withSqlConf (newSqlConf : Map [String , String ]): UnresolvedFlow
135+ }
136+
137+ /**
138+ * An [[UnresolvedFlow ]] whose execution-type has not yet been determined.
139+ *
140+ * In some cases, we know the execution-type for an [[UnresolvedFlow ]] even before flow analysis
141+ * and resolution. For example an AutoCDCFlow is a special unresolved-but-typed flow; we know a
142+ * flow will be an AutoCDC flow immediately on construction, because it has its own special
143+ * registration API. Such flows are considered "typed flows", but there isn't any semantic reason
144+ * yet to explicitly introduce a `TypedFlow` trait/class.
145+ */
146+ case class UntypedFlow (
125147 identifier : TableIdentifier ,
126148 destinationIdentifier : TableIdentifier ,
127149 func : FlowFunction ,
128150 queryContext : QueryContext ,
129151 sqlConf : Map [String , String ],
130152 override val once : Boolean ,
131153 override val origin : QueryOrigin
132- ) extends Flow
154+ ) extends UnresolvedFlow {
155+ override def withSqlConf (newSqlConf : Map [String , String ]): UntypedFlow =
156+ copy(sqlConf = newSqlConf)
157+ }
158+
159+ /**
160+ * An unresolved but typed that applies a CDC event stream to a target table via MERGE.
161+ *
162+ * [[AutoCdcFlow ]] is a typed flow because it is only supported for streaming, and not as a once
163+ * flow. Therefore by definition it is a streaming-type flow.
164+ *
165+ * In the future once-support for [[AutoCdcFlow ]] may be added.
166+ */
167+ case class AutoCdcFlow (
168+ identifier : TableIdentifier ,
169+ destinationIdentifier : TableIdentifier ,
170+ func : FlowFunction ,
171+ queryContext : QueryContext ,
172+ sqlConf : Map [String , String ] = Map .empty,
173+ comment : Option [String ] = None ,
174+ override val origin : QueryOrigin ,
175+ changeArgs : ChangeArgs
176+ ) extends UnresolvedFlow {
177+ override val once : Boolean = false
178+
179+ override def withSqlConf (newSqlConf : Map [String , String ]): AutoCdcFlow =
180+ copy(sqlConf = newSqlConf)
181+ }
133182
134183/**
135184 * A [[Flow ]] whose flow function has been invoked, meaning either:
@@ -194,3 +243,108 @@ class AppendOnceFlow(
194243
195244 override val once = true
196245}
246+
247+ /**
248+ * A resolved flow that applies a CDC event stream to a target table via MERGE, in accordance to
249+ * the configured [[flow.changeArgs ]].
250+ */
251+ class AutoCdcMergeFlow (
252+ val flow : AutoCdcFlow ,
253+ val funcResult : FlowFunctionResult
254+ ) extends ResolvedFlow {
255+ requireReservedPrefixAbsentInSourceColumns()
256+
257+ def changeArgs : ChangeArgs = flow.changeArgs
258+
259+ /**
260+ * Returns the augmented output schema of this flow, which can differ from the schema of the
261+ * source change-data-feed dataframe.
262+ *
263+ * The source dataframe's schema describes the incoming CDC events; the augmented schema here
264+ * applies the user-specified [[ColumnSelection ]] and appends the SCD-specific metadata
265+ * columns that the AutoCDC MERGE engine projects onto the target table. Downstream
266+ * dependencies in the pipeline see this augmented schema.
267+ */
268+ override val schema : StructType = {
269+ val userSelectedSchema = ColumnSelection .applyToSchema(
270+ schemaName = " changeDataFeed" ,
271+ schema = df.schema,
272+ columnSelection = changeArgs.columnSelection,
273+ caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
274+ )
275+
276+ // AutoCDC flows require all key columns to be present in the target table, to adhere to SCD
277+ // semantics.
278+ requireKeysPresentInSelectedSchema(userSelectedSchema)
279+
280+ changeArgs.storedAsScdType match {
281+ case ScdType .Type1 =>
282+ // SCD1 produces a target table with all the user-selected output columns and a projected
283+ // CDC operational metadata column at the end.
284+ StructType (
285+ userSelectedSchema.fields :+ StructField (
286+ Scd1BatchProcessor .cdcMetadataColName,
287+ Scd1BatchProcessor .cdcMetadataColSchema(
288+ sequencingType = df.select(changeArgs.sequencing).schema.head.dataType
289+ ),
290+ nullable = false
291+ )
292+ )
293+ case ScdType .Type2 =>
294+ throw new UnsupportedOperationException (
295+ " AutoCDC flows do not currently support SCD Type 2 transformations."
296+ )
297+ }
298+ }
299+
300+ /**
301+ * Validate that the resolved source dataframe for the AutoCDC flow does not contain any column
302+ * names that use the reserved Spark AutoCDC prefix.
303+ */
304+ private def requireReservedPrefixAbsentInSourceColumns (): Unit = {
305+ val resolver = spark.sessionState.conf.resolver
306+ val reservedPrefix = Scd1BatchProcessor .reservedColumnNamePrefix
307+
308+ def nameContainsReservedPrefix (name : String ): Boolean = {
309+ name.length >= reservedPrefix.length && resolver(
310+ name.substring(0 , reservedPrefix.length),
311+ reservedPrefix
312+ )
313+ }
314+
315+ df.schema.fieldNames.find(nameContainsReservedPrefix).foreach { conflictingColumnName =>
316+ throw new AnalysisException (
317+ errorClass = " AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" ,
318+ messageParameters = Map (
319+ " caseSensitivity" -> CaseSensitivityLabels .of(
320+ spark.sessionState.conf.caseSensitiveAnalysis
321+ ),
322+ " columnName" -> conflictingColumnName,
323+ " schemaName" -> " changeDataFeed" ,
324+ " reservedColumnNamePrefix" -> reservedPrefix
325+ )
326+ )
327+ }
328+ }
329+
330+ /**
331+ * Validate all keys specified in changeArgs are actually present in the user-selected schema.
332+ */
333+ private def requireKeysPresentInSelectedSchema (selectedSchema : StructType ): Unit = {
334+ val resolver = spark.sessionState.conf.resolver
335+
336+ changeArgs.keys
337+ .find(key => ! selectedSchema.fieldNames.exists(name => resolver(name, key.name)))
338+ .foreach { missingKey =>
339+ throw new AnalysisException (
340+ errorClass = " AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA" ,
341+ messageParameters = Map (
342+ " caseSensitivity" -> CaseSensitivityLabels .of(
343+ spark.sessionState.conf.caseSensitiveAnalysis
344+ ),
345+ " keyColumnName" -> missingKey.name
346+ )
347+ )
348+ }
349+ }
350+ }
0 commit comments