Skip to content

Commit dc3b190

Browse files
ericm-dbcloud-fan
authored andcommitted
[SPARK-56972][4.2][SS] Persist sink name in V3 commit log via MicroBatchExecution
### What changes were proposed in this pull request? Backport of [SPARK-56972] ([#56020](#56020)) to `branch-4.2`. Wire the sink name through `MicroBatchExecution` so that, when sink evolution is enabled, each committed batch writes a `CommitMetadataV3` whose `sinkMetadataMap` records the current sink as the active entry alongside any sinks that were active in earlier batches: - Add a per-execution `sinkMetadataMap` that is hydrated from the latest `CommitMetadataV3` in `populateStartOffsets`. - When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true, the commit-log write in `runBatch` produces `CommitMetadataV3` with every prior entry marked `isActive = false` and the current `(sinkName, sink.getClass.getName)` entered as `isActive = true`. - When sink evolution is disabled, the existing V1/V2 commit-log path is preserved unchanged. This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred. **Stacked PR.** `branch-4.2` does not yet have the predecessors SPARK-56970 ([#56018](#56018)) and SPARK-56971 ([#56019](#56019)), which are still under review for 4.2 in [#56548](#56548). This PR is built on top of #56548 and currently shows those two predecessor commits in its diff; that will resolve once #56548 merges. Only the final commit (`[SPARK-56972][SS] Persist sink name in V3 commit log via MicroBatchExecution`) is the subject of this PR. The cherry-pick of `cfa759af5b6` produced the same diff as on master (+156/-3); the only conflict was an import-line collision in `MicroBatchExecution.scala`, resolved by keeping the branch's existing import and adding `CommitLog`, `CommitMetadataV3`, and `SinkMetadataInfo`. ### Why are the changes needed? SPARK-56719 introduced the `DataStreamWriter.name()` API and the in-memory `sinkName` plumbing inside `MicroBatchExecution`, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable. ### Does this PR introduce _any_ user-facing change? Behavior change only when `enableSinkEvolution` is true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off. ### How was this patch tested? - Cherry-picked `cfa759af5b6` on top of the SPARK-56971 4.2 backport branch (#56548); resolved the single import-line conflict in `MicroBatchExecution.scala`. - `StreamingSinkEvolutionSuite` passes on this branch (12 tests, including the four new V3 commit-log cases: named-sink active entry, historical-sink retention across rename, V1/V2 preserved when disabled, and mid-checkpoint upgrade to V3). - `sql/core` main and test sources compile cleanly (`build/sbt sql/Test/compile`). ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-8) This pull request and its description were written by Isaac. Closes #56707 from ericm-db/SPARK-56972-branch-4.2. Authored-by: Eric Marnadi <eric.marnadi@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 7c86d36 commit dc3b190

2 files changed

Lines changed: 156 additions & 3 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
4646
import org.apache.spark.sql.execution.datasources.LogicalRelation
4747
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
4848
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
49-
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
49+
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitLog, CommitMetadataV3, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2, SinkMetadataInfo}
5050
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
5151
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
5252
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
@@ -129,6 +129,15 @@ class MicroBatchExecution(
129129
}
130130
}
131131

132+
// Historical sink metadata read from the commit log on restart. Insertion order is preserved so
133+
// that we can re-emit deactivated sinks in the same order they originally appeared. Mutated by
134+
// [[populateStartOffsets]] (reads) and by the commit-log write in [[runBatch]] (updates).
135+
private val sinkMetadataMap = mutable.LinkedHashMap.empty[String, SinkMetadataInfo]
136+
137+
/** True when the current query should persist V3 sink metadata in the commit log. */
138+
private def commitLogV3Enabled: Boolean =
139+
sparkSession.sessionState.conf.enableStreamingSinkEvolution
140+
132141
@volatile protected[sql] var triggerExecutor: TriggerExecutor = _
133142

134143
protected def getTrigger(): TriggerExecutor = {
@@ -765,6 +774,11 @@ class MicroBatchExecution(
765774
commitMetadata.stateUniqueIds.foreach {
766775
stateUniqueIds => currentStateStoreCkptId ++= stateUniqueIds
767776
}
777+
commitMetadata match {
778+
case v3: CommitMetadataV3 =>
779+
sinkMetadataMap ++= v3.sinkMetadataMap
780+
case _ =>
781+
}
768782
if (latestBatchId == latestCommittedBatchId) {
769783
/* The last batch was successfully committed, so we can safely process a
770784
* new next batch but first:
@@ -1463,10 +1477,36 @@ class MicroBatchExecution(
14631477
} else {
14641478
None
14651479
}
1466-
if (!commitLog.add(execCtx.batchId,
1480+
val metadata = if (commitLogV3Enabled) {
1481+
val sinkApiVersion = sink match {
1482+
case _: SupportsWrite => "DSv2"
1483+
case _ => "DSv1"
1484+
}
1485+
val currentSinkInfo = SinkMetadataInfo(
1486+
sinkName = sinkName,
1487+
commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET,
1488+
providerName = sink.getClass.getName,
1489+
apiVersion = sinkApiVersion,
1490+
isActive = true)
1491+
// Mark every previously-seen sink as inactive, then overlay the current sink as active.
1492+
// The previous entry for [[sinkName]], if any, is overwritten here.
1493+
val deactivated = sinkMetadataMap.iterator
1494+
.map { case (name, info) => name -> info.copy(isActive = false) }
1495+
.toMap
1496+
val updatedSinkMap = deactivated + (sinkName -> currentSinkInfo)
1497+
sinkMetadataMap.clear()
1498+
sinkMetadataMap ++= updatedSinkMap
14671499
commitLog.createMetadata(
14681500
nextBatchWatermarkMs = watermarkTracker.currentWatermark,
1469-
stateUniqueIds = stateStoreCkptId))) {
1501+
stateUniqueIds = stateStoreCkptId,
1502+
sinkMetadataMap = updatedSinkMap,
1503+
commitLogFormatVersion = CommitLog.VERSION_3)
1504+
} else {
1505+
commitLog.createMetadata(
1506+
nextBatchWatermarkMs = watermarkTracker.currentWatermark,
1507+
stateUniqueIds = stateStoreCkptId)
1508+
}
1509+
if (!commitLog.add(execCtx.batchId, metadata)) {
14701510
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
14711511
}
14721512
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.scalatest.{BeforeAndAfterEach, Tag}
2121

2222
import org.apache.spark.SparkException
2323
import org.apache.spark.sql._
24+
import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadataV3}
2425
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
2526
import org.apache.spark.sql.internal.SQLConf
2627
import org.apache.spark.sql.streaming.StreamTest
@@ -183,6 +184,118 @@ class StreamingSinkEvolutionSuite extends StreamTest with BeforeAndAfterEach {
183184
q2.stop()
184185
}
185186

187+
// ===========================
188+
// Commit log V3 persistence
189+
// ===========================
190+
191+
testWithSinkEvolution("commit log records V3 metadata with named sink") {
192+
val checkpointDir = newMetadataDir
193+
val input = MemoryStream[Int]
194+
input.addData(1, 2, 3)
195+
val q = input.toDF().writeStream
196+
.format("noop")
197+
.name("my_sink")
198+
.option("checkpointLocation", checkpointDir)
199+
.start()
200+
q.processAllAvailable()
201+
q.stop()
202+
203+
val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true)
204+
val latest = commitLog.getLatest().getOrElse(fail("No commit recorded"))
205+
val v3 = latest._2 match {
206+
case v: CommitMetadataV3 => v
207+
case other => fail(s"Expected CommitMetadataV3, got $other")
208+
}
209+
val active = v3.activeSinkMetadataInfo
210+
assert(active.sinkName === "my_sink")
211+
assert(active.isActive)
212+
assert(v3.sinkMetadataMap.size === 1)
213+
}
214+
215+
testWithSinkEvolution("commit log V3 retains historical sink after rename") {
216+
val checkpointDir = newMetadataDir
217+
val input = MemoryStream[Int]
218+
219+
// First batch under sink name "old_sink".
220+
input.addData(1)
221+
val q1 = input.toDF().writeStream
222+
.format("noop")
223+
.name("old_sink")
224+
.option("checkpointLocation", checkpointDir)
225+
.start()
226+
q1.processAllAvailable()
227+
q1.stop()
228+
229+
// Restart with a new sink name "new_sink" against the same checkpoint.
230+
input.addData(2)
231+
val q2 = input.toDF().writeStream
232+
.format("noop")
233+
.name("new_sink")
234+
.option("checkpointLocation", checkpointDir)
235+
.start()
236+
q2.processAllAvailable()
237+
q2.stop()
238+
239+
val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true)
240+
val v3 = commitLog.getLatest().get._2.asInstanceOf[CommitMetadataV3]
241+
assert(v3.sinkMetadataMap.keySet === Set("old_sink", "new_sink"))
242+
assert(v3.activeSinkMetadataInfo.sinkName === "new_sink")
243+
assert(v3.sinkMetadataMap("old_sink").isActive === false)
244+
assert(v3.sinkMetadataMap("new_sink").isActive === true)
245+
}
246+
247+
test("commit log stays V1/V2 when sink evolution is disabled") {
248+
val checkpointDir = newMetadataDir
249+
withSQLConf(SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") {
250+
val input = MemoryStream[Int]
251+
input.addData(1, 2)
252+
val q = input.toDF().writeStream
253+
.format("noop")
254+
.option("checkpointLocation", checkpointDir)
255+
.start()
256+
q.processAllAvailable()
257+
q.stop()
258+
}
259+
260+
val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true)
261+
val latest = commitLog.getLatest().get._2
262+
assert(latest.version === CommitLog.VERSION_1 || latest.version === CommitLog.VERSION_2,
263+
s"Expected V1 or V2 commit log, got v${latest.version}")
264+
assert(!latest.isInstanceOf[CommitMetadataV3])
265+
}
266+
267+
testWithSinkEvolution("enabling sink evolution mid-checkpoint upgrades commit log to V3") {
268+
val checkpointDir = newMetadataDir
269+
val input = MemoryStream[Int]
270+
271+
// First run with sink evolution disabled writes V1/V2, no sink metadata.
272+
withSQLConf(SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") {
273+
input.addData(1)
274+
val q = input.toDF().writeStream
275+
.format("noop")
276+
.option("checkpointLocation", checkpointDir)
277+
.start()
278+
q.processAllAvailable()
279+
q.stop()
280+
}
281+
282+
// Restart with sink evolution enabled, supplying a name. V3 should now be written; the
283+
// previous V1/V2 batches contribute no historical sinks.
284+
input.addData(2)
285+
val q = input.toDF().writeStream
286+
.format("noop")
287+
.name("upgraded_sink")
288+
.option("checkpointLocation", checkpointDir)
289+
.start()
290+
q.processAllAvailable()
291+
q.stop()
292+
293+
val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true)
294+
val v3 = commitLog.getLatest().get._2.asInstanceOf[CommitMetadataV3]
295+
assert(v3.activeSinkMetadataInfo.sinkName === "upgraded_sink")
296+
assert(v3.sinkMetadataMap.size === 1)
297+
}
298+
186299
// ==============
187300
// Helper Methods
188301
// ==============

0 commit comments

Comments
 (0)