Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,27 @@
],
"sqlState" : "22023"
},
"AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA" : {
"message" : [
"Using <caseSensitivity> column name comparison, the AutoCDC key column `<keyColumnName>` is not present in the flow's selected source schema. AutoCDC requires every key column to be present in the source change-data feed and retained by any configured column selection."
],
"sqlState" : "22023"
},
"AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
"message" : [
"Expected a single column identifier; got the multi-part identifier <columnName> (parts: <nameParts>)."
],
"sqlState" : "42703"
},
"AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : {
"AUTOCDC_MULTIPLE_FLOWS_TO_TARGET" : {
"message" : [
"Invalid AutoCDC destination <tableName> with multiple flows: <flows>. An AutoCDC target table must have exactly one flow writing to it."
],
"sqlState" : "42000"
},
"AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : {
"message" : [
"Using <caseSensitivity> column name comparison, the column `<columnName>` in the <schemaName> schema conflicts with the reserved AutoCDC column name `<reservedColumnName>`. Rename or remove the column."
"The column `<columnName>` in the <schemaName> schema collides with the reserved AutoCDC column name prefix `<reservedColumnNamePrefix>` (using <caseSensitivity> column name comparison). Rename or remove the column."
],
"sqlState" : "42710"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.execution.command.{ShowCatalogsCommand, ShowNamespacesCommand}
import org.apache.spark.sql.pipelines.Language.Python
import org.apache.spark.sql.pipelines.common.RunState.{CANCELED, FAILED}
import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UnresolvedFlow}
import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, TemporaryView, UntypedFlow}
import org.apache.spark.sql.pipelines.logging.{PipelineEvent, RunProgress}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -371,7 +371,7 @@ private[connect] object PipelinesHandler extends Logging {
case proto.PipelineCommand.DefineFlow.DetailsCase.RELATION_FLOW_DETAILS =>
val relationFlowDetails = flow.getRelationFlowDetails
graphElementRegistry.registerFlow(
UnresolvedFlow(
UntypedFlow(
identifier = flowIdentifier,
destinationIdentifier = destinationIdentifier,
func = FlowAnalysis.createFlowFunctionFromLogicalPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object ColumnSelection {
}

/** User-facing case-sensitivity labels surfaced in AutoCDC error messages. */
private[autocdc] object CaseSensitivityLabels {
private[pipelines] object CaseSensitivityLabels {
val CaseSensitive: String = "case-sensitive"
val CaseInsensitive: String = "case-insensitive"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.pipelines.autocdc

import org.apache.spark.SparkException
import org.apache.spark.sql.{functions => F, AnalysisException}
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.util.QuotingUtils
import org.apache.spark.sql.classic.DataFrame
Expand Down Expand Up @@ -89,9 +89,6 @@ case class Scd1BatchProcessor(
* column.
*/
def extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch: DataFrame): DataFrame = {
// Proactively validate the reserved CDC metadata column does not exist in the microbatch.
validateCdcMetadataColumnNotPresent(validatedMicrobatch)

val rowDeleteSequence: Column = changeArgs.deleteCondition match {
case Some(deleteCondition) =>
F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null))
Expand Down Expand Up @@ -165,39 +162,26 @@ case class Scd1BatchProcessor(
finalColumnsInMicrobatchToSelect.toImmutableArraySeq: _*
)
}

private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver

microbatch.schema.fieldNames
.find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
.foreach { conflictingColumnName =>
throw new AnalysisException(
errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
messageParameters = Map(
"caseSensitivity" -> CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
"columnName" -> conflictingColumnName,
"schemaName" -> "microbatch",
"reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
)
)
}
}
}

object Scd1BatchProcessor {
// Columns prefixed with `__spark_autocdc_` are reserved for internal SDP AutoCDC processing.
private[autocdc] val winningRowColName: String = "__spark_autocdc_winning_row"
private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata"
/**
* Reserved column-name prefix for internal SDP AutoCDC processing. Source change-data-feed
* dataframes must not contain any columns starting with this prefix; the invariant is
* enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] construction.
*/
private[pipelines] val reservedColumnNamePrefix: String = "__spark_autocdc_"

private[autocdc] val winningRowColName: String = s"${reservedColumnNamePrefix}winning_row"
private[pipelines] val cdcMetadataColName: String = s"${reservedColumnNamePrefix}metadata"

private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"

/**
* Schema of the CDC metadata struct column for SCD1.
*/
private def cdcMetadataColSchema(sequencingType: DataType): StructType =
private[pipelines] def cdcMetadataColSchema(sequencingType: DataType): StructType =
StructType(
Seq(
// The sequencing of the event if it represents a delete, null otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
} else {
f
}
convertResolvedToTypedFlow(flowToResolve, maybeNewFuncResult)
transformUnresolvedFlowToResolvedFlow(flowToResolve, maybeNewFuncResult)

// If the flow failed due to an UnresolvedDatasetException, it means that one of the
// flow's inputs wasn't available. After other flows are resolved, these inputs
Expand All @@ -199,9 +199,18 @@ private class FlowResolver(rawGraph: DataflowGraph) {
}
}

private def convertResolvedToTypedFlow(
private def transformUnresolvedFlowToResolvedFlow(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: resolveFlow is a lot shorter

flow: UnresolvedFlow,
funcResult: FlowFunctionResult): ResolvedFlow = {
flow match {
case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UntypedFlow resolution uses funcResult.dataFrame.get.isStreaming to choose StreamingFlow vs CompleteFlow, but AutoCdcFlow always becomes AutoCdcMergeFlow regardless of whether the source is streaming.

That means an AutoCdcFlow with a batch source can still resolve successfully when the destination is a non-streaming table (e.g. materialized view), because validateFlowStreamingness only rejects streaming sources for MVs—not batch sources for AutoCDC specifically.

Since this PR documents that AutoCDC is streaming-only (once = false, class-level comments), consider enforcing df.isStreaming here (or in validateFlowStreamingness with an AutoCDC-specific check), e.g.:

case acf: AutoCdcFlow =>
  if (!funcResult.dataFrame.get.isStreaming) {
    throw new AnalysisException(
      errorClass = "INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_AUTOCDC_FLOW",
      messageParameters = Map("flowIdentifier" -> acf.identifier.quotedString)
    )
  }
  new AutoCdcMergeFlow(acf, funcResult)

Happy to defer to the execution PR if you prefer, but worth tracking so we don't rely on destination-type checks alone.

case utf: UntypedFlow => transformUntypedFlowToResolvedFlow(utf, funcResult)
}
}

private def transformUntypedFlowToResolvedFlow(
flow: UntypedFlow,
funcResult: FlowFunctionResult): ResolvedFlow = {
flow match {
case _ if flow.once => new AppendOnceFlow(flow, funcResult)
case _ if funcResult.dataFrame.get.isStreaming =>
Expand All @@ -210,7 +219,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
// then get their results overwritten.
val mustBeAppend = rawGraph.flowsTo(flow.destinationIdentifier).size > 1
new StreamingFlow(flow, funcResult, mustBeAppend = mustBeAppend)
case _: UnresolvedFlow => new CompleteFlow(flow, funcResult)
case _ => new CompleteFlow(flow, funcResult)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@ package org.apache.spark.sql.pipelines.graph
import scala.util.Try

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.pipelines.AnalysisWarning
import org.apache.spark.sql.pipelines.autocdc.{
CaseSensitivityLabels,
ChangeArgs,
ColumnSelection,
Scd1BatchProcessor,
ScdType
}
import org.apache.spark.sql.pipelines.util.InputReadOptions
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}

/**
* Contains the catalog and database context information for query execution.
Expand Down Expand Up @@ -121,15 +129,56 @@ case class FlowFunctionResult(
}

/** A [[Flow]] whose output schema and dependencies aren't known. */
case class UnresolvedFlow(
sealed trait UnresolvedFlow extends Flow {
/** Returns a copy of this flow with the given SQL confs overriding the existing ones. */
def withSqlConf(newSqlConf: Map[String, String]): UnresolvedFlow
}

/**
* An [[UnresolvedFlow]] whose execution-type has not yet been determined.
*
* In some cases, we know the execution-type for an [[UnresolvedFlow]] even before flow analysis
* and resolution. For example an AutoCDCFlow is a special unresolved-but-typed flow; we know a
* flow will be an AutoCDC flow immediately on construction, because it has its own special
* registration API. Such flows are considered "typed flows", but there isn't any semantic reason
* yet to explicitly introduce a `TypedFlow` trait/class.
*/
case class UntypedFlow(
identifier: TableIdentifier,
destinationIdentifier: TableIdentifier,
func: FlowFunction,
queryContext: QueryContext,
sqlConf: Map[String, String],
override val once: Boolean,
override val origin: QueryOrigin
) extends Flow
) extends UnresolvedFlow {
override def withSqlConf(newSqlConf: Map[String, String]): UntypedFlow =
copy(sqlConf = newSqlConf)
}

/**
* An unresolved but typed that applies a CDC event stream to a target table via MERGE.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: missing word in the scaladoc — should be something like:

An unresolved but typed flow that applies a CDC event stream to a target table via MERGE.

*
* [[AutoCdcFlow]] is a typed flow because it is only supported for streaming, and not as a once
* flow. Therefore by definition it is a streaming-type flow.
*
* In the future once-support for [[AutoCdcFlow]] may be added.
*/
case class AutoCdcFlow(
identifier: TableIdentifier,
destinationIdentifier: TableIdentifier,
func: FlowFunction,
queryContext: QueryContext,
sqlConf: Map[String, String] = Map.empty,
comment: Option[String] = None,
override val origin: QueryOrigin,
changeArgs: ChangeArgs
) extends UnresolvedFlow {
override val once: Boolean = false

override def withSqlConf(newSqlConf: Map[String, String]): AutoCdcFlow =
copy(sqlConf = newSqlConf)
}

/**
* A [[Flow]] whose flow function has been invoked, meaning either:
Expand Down Expand Up @@ -194,3 +243,108 @@ class AppendOnceFlow(

override val once = true
}

/**
* A resolved flow that applies a CDC event stream to a target table via MERGE, in accordance to
* the configured [[flow.changeArgs]].
*/
class AutoCdcMergeFlow(
val flow: AutoCdcFlow,
val funcResult: FlowFunctionResult
) extends ResolvedFlow {
requireReservedPrefixAbsentInSourceColumns()

def changeArgs: ChangeArgs = flow.changeArgs

/**
* Returns the augmented output schema of this flow, which can differ from the schema of the
* source change-data-feed dataframe.
*
* The source dataframe's schema describes the incoming CDC events; the augmented schema here
* applies the user-specified [[ColumnSelection]] and appends the SCD-specific metadata
* columns that the AutoCDC MERGE engine projects onto the target table. Downstream
* dependencies in the pipeline see this augmented schema.
*/
override val schema: StructType = {
val userSelectedSchema = ColumnSelection.applyToSchema(
schemaName = "changeDataFeed",
schema = df.schema,
columnSelection = changeArgs.columnSelection,
caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
)

// AutoCDC flows require all key columns to be present in the target table, to adhere to SCD
// semantics.
requireKeysPresentInSelectedSchema(userSelectedSchema)

changeArgs.storedAsScdType match {
case ScdType.Type1 =>
// SCD1 produces a target table with all the user-selected output columns and a projected
// CDC operational metadata column at the end.
StructType(
userSelectedSchema.fields :+ StructField(
Scd1BatchProcessor.cdcMetadataColName,
Scd1BatchProcessor.cdcMetadataColSchema(
sequencingType = df.select(changeArgs.sequencing).schema.head.dataType
),
nullable = false
)
)
case ScdType.Type2 =>
throw new UnsupportedOperationException(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other AutoCDC validation in this PR uses AnalysisException with structured error classes (AUTOCDC_*). SCD2 throws a bare UnsupportedOperationException here.

For consistency with SDP/Connect error handling (and so clients get a stable errorClass), consider something like:

case ScdType.Type2 =>
  throw new AnalysisException(
    errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
    messageParameters = Map.empty
  )

(with a corresponding entry in error-conditions.json). The eager failure at AutoCdcMergeFlow construction is good; only the exception type differs.

"AutoCDC flows do not currently support SCD Type 2 transformations."
)
}
}

/**
* Validate that the resolved source dataframe for the AutoCDC flow does not contain any column
* names that use the reserved Spark AutoCDC prefix.
*/
private def requireReservedPrefixAbsentInSourceColumns(): Unit = {
val resolver = spark.sessionState.conf.resolver
val reservedPrefix = Scd1BatchProcessor.reservedColumnNamePrefix

def nameContainsReservedPrefix(name: String): Boolean = {
name.length >= reservedPrefix.length && resolver(
name.substring(0, reservedPrefix.length),
reservedPrefix
)
}

df.schema.fieldNames.find(nameContainsReservedPrefix).foreach { conflictingColumnName =>
throw new AnalysisException(
errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
messageParameters = Map(
"caseSensitivity" -> CaseSensitivityLabels.of(
spark.sessionState.conf.caseSensitiveAnalysis
),
"columnName" -> conflictingColumnName,
"schemaName" -> "changeDataFeed",
"reservedColumnNamePrefix" -> reservedPrefix
)
)
}
}

/**
* Validate all keys specified in changeArgs are actually present in the user-selected schema.
*/
private def requireKeysPresentInSelectedSchema(selectedSchema: StructType): Unit = {
val resolver = spark.sessionState.conf.resolver

changeArgs.keys
.find(key => !selectedSchema.fieldNames.exists(name => resolver(name, key.name)))
.foreach { missingKey =>
throw new AnalysisException(
errorClass = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA",
messageParameters = Map(
"caseSensitivity" -> CaseSensitivityLabels.of(
spark.sessionState.conf.caseSensitiveAnalysis
),
"keyColumnName" -> missingKey.name
)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class GraphRegistrationContext(
}

def registerFlow(flowDef: UnresolvedFlow): Unit = {
flows += flowDef.copy(sqlConf = defaultSqlConf ++ flowDef.sqlConf)
flows += flowDef.withSqlConf(defaultSqlConf ++ flowDef.sqlConf)
}

private def isEmpty: Boolean = {
Expand Down
Loading