Skip to content

Commit 02f656a

Browse files
committed
add AutoCdcFlow -> AutoCdcMergeFlow resolution
1 parent d018f89 commit 02f656a

3 files changed

Lines changed: 46 additions & 4 deletions

File tree

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
176176
} else {
177177
f
178178
}
179-
convertResolvedToTypedFlow(flowToResolve, maybeNewFuncResult)
179+
transformUnresolvedFlowToResolvedFlow(flowToResolve, maybeNewFuncResult)
180180

181181
// If the flow failed due to an UnresolvedDatasetException, it means that one of the
182182
// flow's inputs wasn't available. After other flows are resolved, these inputs
@@ -199,9 +199,18 @@ private class FlowResolver(rawGraph: DataflowGraph) {
199199
}
200200
}
201201

202-
private def convertResolvedToTypedFlow(
202+
private def transformUnresolvedFlowToResolvedFlow(
203203
flow: UnresolvedFlow,
204204
funcResult: FlowFunctionResult): ResolvedFlow = {
205+
flow match {
206+
case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult)
207+
case utf: UntypedFlow => transformUntypedFlowToResolvedFlow(utf, funcResult)
208+
}
209+
}
210+
211+
private def transformUntypedFlowToResolvedFlow(
212+
flow: UntypedFlow,
213+
funcResult: FlowFunctionResult): ResolvedFlow = {
205214
flow match {
206215
case _ if flow.once => new AppendOnceFlow(flow, funcResult)
207216
case _ if funcResult.dataFrame.get.isStreaming =>
@@ -210,7 +219,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
210219
// then get their results overwritten.
211220
val mustBeAppend = rawGraph.flowsTo(flow.destinationIdentifier).size > 1
212221
new StreamingFlow(flow, funcResult, mustBeAppend = mustBeAppend)
213-
case _: UnresolvedFlow => new CompleteFlow(flow, funcResult)
222+
case _ => new CompleteFlow(flow, funcResult)
214223
}
215224
}
216225
}

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ sealed trait UnresolvedFlow extends Flow {
136136

137137
/**
138138
* An [[UnresolvedFlow]] whose execution-type has not yet been determined.
139-
*
139+
*
140140
* In some cases, we know the execution-type for an [[UnresolvedFlow]] even before flow analysis
141141
* and resolution. For example an AutoCDCFlow is a special unresolved-but-typed flow; we know a
142142
* flow will be an AutoCDC flow immediately on construction, because it has its own special

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2121
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2222
import org.apache.spark.sql.catalyst.plans.logical.Union
2323
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
24+
import org.apache.spark.sql.pipelines.autocdc.{ChangeArgs, ScdType, UnqualifiedColumnName}
2425
import org.apache.spark.sql.pipelines.utils.{PipelineTest, TestGraphRegistrationContext}
2526
import org.apache.spark.sql.test.SharedSparkSession
2627
import org.apache.spark.sql.types._
@@ -509,6 +510,38 @@ class ConnectValidPipelineSuite extends PipelineTest with SharedSparkSession {
509510
assert(g.flow(TableIdentifier("sink_flow")).isInstanceOf[StreamingFlow])
510511
}
511512

513+
test("AutoCdcFlow registers and resolves to AutoCdcMergeFlow") {
514+
val session = spark
515+
import session.implicits._
516+
517+
val P = new TestGraphRegistrationContext(spark) {
518+
val mem = MemoryStream[Int]
519+
val cdcEvents = mem.toDF().select($"value" as "id", $"value" as "seq")
520+
registerTable("target")
521+
registerFlow(
522+
AutoCdcFlow(
523+
identifier = fullyQualifiedIdentifier("auto_cdc_flow"),
524+
destinationIdentifier = fullyQualifiedIdentifier("target"),
525+
func = dfFlowFunc(cdcEvents),
526+
queryContext = QueryContext(
527+
currentCatalog = Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
528+
currentDatabase = Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
529+
),
530+
origin = QueryOrigin.empty,
531+
changeArgs = ChangeArgs(
532+
keys = Seq(UnqualifiedColumnName("id")),
533+
sequencing = $"seq",
534+
storedAsScdType = ScdType.Type1
535+
)
536+
)
537+
)
538+
}
539+
val g = P.resolveToDataflowGraph()
540+
assert(
541+
g.flow(fullyQualifiedIdentifier("auto_cdc_flow")).isInstanceOf[AutoCdcMergeFlow]
542+
)
543+
}
544+
512545
/** Verifies the [[DataflowGraph]] has the specified [[Flow]] with the specified schema. */
513546
private def verifyFlowSchema(
514547
pipeline: DataflowGraph,

0 commit comments

Comments
 (0)