Skip to content

Commit e2f65a2

Browse files
committed
Add tests: SPIP matrix coverage, combined post-processing, composite rowId
- ChangelogEndToEndSuite: 3 new e2e tests covering missing SPIP matrix cells ((true, false) DELETE, (true, true) without computeUpdates), and combined netChanges + carry-over removal post-processing path. - ResolveChangelogTableStreamingPostProcessingSuite: assert grouping attribute on netChanges-alone test; add composite rowId plan-shape test. - Remove stale "deduplicationMode=netChanges is rejected on streaming" test (netChanges is now supported on streaming) and the now-unused AnalysisException import. Co-authored-by: Isaac
1 parent fd82d92 commit e2f65a2

2 files changed

Lines changed: 178 additions & 17 deletions

File tree

sql/core/src/test/scala/org/apache/spark/sql/connector/ChangelogEndToEndSuite.scala

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -874,4 +874,149 @@ class ChangelogEndToEndSuite extends SharedSparkSession {
874874
}
875875
}
876876
}
877+
878+
test("streaming netChanges emits DELETE for pre-existing row deleted in range") {
879+
// Exercises the SPIP `(true, false)` cell: existedBefore = true (first event is a
880+
// delete or update_preimage), existsAfter = false (last event is a delete).
881+
val id = recreateWithRowVersion()
882+
catalog.setChangelogProperties(id, ChangelogProperties(
883+
containsIntermediateChanges = true,
884+
rowIdNames = Seq("id"),
885+
rowVersionName = Some("row_commit_version")))
886+
887+
catalog.addChangeRows(id, Seq(
888+
// v1: pre-existing Alice gets updated to Bob.
889+
ppRow(1L, "Alice", 1L, CHANGE_TYPE_UPDATE_PREIMAGE, 1L, 1000000L),
890+
ppRow(1L, "Bob", 1L, CHANGE_TYPE_UPDATE_POSTIMAGE, 1L, 1000000L),
891+
// v2: Bob deleted -- the v1 preimage is the first event and the v2 delete is
892+
// the last event for row identity 1 across the entire range.
893+
ppRow(1L, "Bob", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L),
894+
// v3: insert Carol -- gives the watermark something to advance past, so row
895+
// identity 1's timer fires before end-of-input.
896+
ppRow(2L, "Carol", 3L, CHANGE_TYPE_INSERT, 3L, 3000000L)))
897+
898+
withSQLConf(rocksDbProviderConf) {
899+
val q = spark.readStream
900+
.option("startingVersion", "1")
901+
.option("deduplicationMode", "netChanges")
902+
.changes(fullTableName)
903+
.select("id", "data", "_change_type")
904+
.writeStream
905+
.format("memory")
906+
.queryName("cdc_stream_netchanges_delete")
907+
.outputMode("append")
908+
.start()
909+
try {
910+
q.processAllAvailable()
911+
checkAnswer(
912+
spark.sql("SELECT * FROM cdc_stream_netchanges_delete"),
913+
Seq(
914+
// (true, false): emit a single DELETE carrying the *first* event's data
915+
// (the preimage), per the batch contract.
916+
Row(1L, "Alice", CHANGE_TYPE_DELETE),
917+
Row(2L, "Carol", CHANGE_TYPE_INSERT)))
918+
} finally {
919+
q.stop()
920+
}
921+
}
922+
}
923+
924+
test("streaming netChanges without computeUpdates keeps persisting rows as DELETE+INSERT") {
925+
// Exercises the SPIP `(true, true)` cell with computeUpdates = false: the pair is
926+
// emitted as DELETE + INSERT rather than relabeled as
927+
// update_preimage + update_postimage.
928+
val id = recreateWithRowVersion()
929+
catalog.setChangelogProperties(id, ChangelogProperties(
930+
containsIntermediateChanges = true,
931+
rowIdNames = Seq("id"),
932+
rowVersionName = Some("row_commit_version")))
933+
934+
catalog.addChangeRows(id, Seq(
935+
// v1: pre-existing Alice updated to Bob.
936+
ppRow(1L, "Alice", 1L, CHANGE_TYPE_UPDATE_PREIMAGE, 1L, 1000000L),
937+
ppRow(1L, "Bob", 1L, CHANGE_TYPE_UPDATE_POSTIMAGE, 1L, 1000000L),
938+
// v2: Bob updated to Robert -- row identity 1 spans (preimage Alice) ..
939+
// (postimage Robert).
940+
ppRow(1L, "Bob", 1L, CHANGE_TYPE_UPDATE_PREIMAGE, 2L, 2000000L),
941+
ppRow(1L, "Robert", 2L, CHANGE_TYPE_UPDATE_POSTIMAGE, 2L, 2000000L)))
942+
943+
withSQLConf(rocksDbProviderConf) {
944+
val q = spark.readStream
945+
.option("startingVersion", "1")
946+
.option("deduplicationMode", "netChanges")
947+
// computeUpdates defaults to false.
948+
.changes(fullTableName)
949+
.select("id", "data", "_change_type")
950+
.writeStream
951+
.format("memory")
952+
.queryName("cdc_stream_netchanges_no_compute_updates")
953+
.outputMode("append")
954+
.start()
955+
try {
956+
q.processAllAvailable()
957+
checkAnswer(
958+
spark.sql("SELECT * FROM cdc_stream_netchanges_no_compute_updates"),
959+
Seq(
960+
Row(1L, "Alice", CHANGE_TYPE_DELETE),
961+
Row(1L, "Robert", CHANGE_TYPE_INSERT)))
962+
} finally {
963+
q.stop()
964+
}
965+
}
966+
}
967+
968+
test("streaming netChanges + carry-over removal: combined post-processing") {
969+
// Validates the design point that the row-level rewrite and the netChanges rewrite
970+
// share a single EventTimeWatermark on `_commit_timestamp` and produce the
971+
// expected combined result. Carry-over CoW pairs are dropped before the netChanges
972+
// collapse runs, so the final emission only reflects real content changes.
973+
val id = recreateWithRowVersion()
974+
catalog.setChangelogProperties(id, ChangelogProperties(
975+
containsCarryoverRows = true,
976+
containsIntermediateChanges = true,
977+
rowIdNames = Seq("id"),
978+
rowVersionName = Some("row_commit_version")))
979+
980+
catalog.addChangeRows(id, Seq(
981+
// v1: insert Alice, Bob (rcv=1).
982+
ppRow(1L, "Alice", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L),
983+
ppRow(2L, "Bob", 1L, CHANGE_TYPE_INSERT, 1L, 1000000L),
984+
// v2: real delete Alice + carry-over for Bob (rcv unchanged means CoW rewrite,
985+
// no content change).
986+
ppRow(1L, "Alice", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L),
987+
ppRow(2L, "Bob", 1L, CHANGE_TYPE_DELETE, 2L, 2000000L),
988+
ppRow(2L, "Bob", 1L, CHANGE_TYPE_INSERT, 2L, 2000000L),
989+
// v3: insert Carol -- advances the watermark past v2 so timers for row
990+
// identities 1 and 2 fire and the netChanges output is emitted.
991+
ppRow(3L, "Carol", 3L, CHANGE_TYPE_INSERT, 3L, 3000000L)))
992+
993+
withSQLConf(rocksDbProviderConf) {
994+
val q = spark.readStream
995+
.option("startingVersion", "1")
996+
.option("deduplicationMode", "netChanges")
997+
.changes(fullTableName)
998+
.select("id", "data", "_change_type")
999+
.writeStream
1000+
.format("memory")
1001+
.queryName("cdc_stream_netchanges_with_carryover")
1002+
.outputMode("append")
1003+
.start()
1004+
try {
1005+
q.processAllAvailable()
1006+
// After carry-over removal: Alice has v1 INSERT + v2 DELETE; Bob has only
1007+
// v1 INSERT (the v2 CoW pair was dropped); Carol has v3 INSERT.
1008+
// After netChanges:
1009+
// Alice -- (false, false) -> no output
1010+
// Bob -- (false, true) -> emit INSERT
1011+
// Carol -- (false, true) -> emit INSERT
1012+
checkAnswer(
1013+
spark.sql("SELECT * FROM cdc_stream_netchanges_with_carryover"),
1014+
Seq(
1015+
Row(2L, "Bob", CHANGE_TYPE_INSERT),
1016+
Row(3L, "Carol", CHANGE_TYPE_INSERT)))
1017+
} finally {
1018+
q.stop()
1019+
}
1020+
}
1021+
}
8771022
}

sql/core/src/test/scala/org/apache/spark/sql/connector/ResolveChangelogTableStreamingPostProcessingSuite.scala

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.Collections
2121

2222
import org.scalatest.BeforeAndAfterEach
2323

24-
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
24+
import org.apache.spark.sql.{DataFrame, QueryTest}
2525
import org.apache.spark.sql.catalyst.expressions.Inline
2626
import org.apache.spark.sql.catalyst.plans.logical.{
2727
Aggregate, EventTimeWatermark, Filter, Generate, LogicalPlan, Project, TransformWithState}
@@ -236,6 +236,38 @@ class ResolveChangelogTableStreamingPostProcessingSuite
236236
assertHelperColumnsRemoved(analyzed)
237237
}
238238

239+
test("netChanges with composite rowId groups by all helper columns") {
240+
// Recreate with a two-column rowId so we exercise the rowIdColumn(idx) helper
241+
// for idx > 0. The single-rowId test asserts the size-1 case; this guards
242+
// against a regression that hard-codes a single helper attribute.
243+
val cat = catalog
244+
val ident = identifier
245+
cat.dropTable(ident)
246+
cat.createTable(
247+
ident,
248+
Array(
249+
Column.create("ns", LongType, false),
250+
Column.create("id", LongType, false),
251+
Column.create("row_commit_version", LongType, false)),
252+
Array.empty[Transform],
253+
Collections.emptyMap[String, String]())
254+
cat.setChangelogProperties(ident, ChangelogProperties(
255+
containsIntermediateChanges = true,
256+
rowIdNames = Seq("ns", "id"),
257+
rowVersionName = Some("row_commit_version")))
258+
259+
val analyzed = streamingDf(
260+
"deduplicationMode" -> "netChanges").queryExecution.analyzed
261+
assertWatermarkOnCommitTimestamp(analyzed)
262+
val tws = analyzed.collect { case t: TransformWithState => t }
263+
assert(tws.size == 1, s"Expected one TransformWithState. Plan:\n$analyzed")
264+
val groupingNames = tws.head.groupingAttributes.map(_.name)
265+
assert(groupingNames == Seq("__spark_cdc_rowid_0", "__spark_cdc_rowid_1"),
266+
s"Expected grouping by [__spark_cdc_rowid_0, __spark_cdc_rowid_1]; got $groupingNames. " +
267+
s"Plan:\n$analyzed")
268+
assertHelperColumnsRemoved(analyzed)
269+
}
270+
239271
test("netChanges + carry-over removal share a single watermark") {
240272
catalog.setChangelogProperties(identifier, ChangelogProperties(
241273
containsCarryoverRows = true,
@@ -276,20 +308,4 @@ class ResolveChangelogTableStreamingPostProcessingSuite
276308
assertNoStreamingPostProcessing(analyzed)
277309
}
278310

279-
// ===========================================================================
280-
// Net change computation rejection
281-
// ===========================================================================
282-
283-
test("deduplicationMode=netChanges is rejected on streaming") {
284-
catalog.setChangelogProperties(identifier, ChangelogProperties(
285-
containsIntermediateChanges = true,
286-
rowIdNames = Seq("id"),
287-
rowVersionName = Some("row_commit_version")))
288-
289-
val e = intercept[AnalysisException] {
290-
streamingDf("deduplicationMode" -> "netChanges").queryExecution.analyzed
291-
}
292-
assert(e.getCondition == "INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED",
293-
s"Unexpected error: ${e.getMessage}")
294-
}
295311
}

0 commit comments

Comments
 (0)