Skip to content

Commit ea5478d

Browse files
committed
[SPARK-56971][SS] Add CommitMetadataV3 and SinkMetadataInfo for sink evolution
Add the commit log data structures for streaming sink evolution: - `CommitMetadataV3` (`VERSION_3` of the commit log wire format) carries a `sinkMetadataMap: Map[String, SinkMetadataInfo]` keyed by sink name, in addition to the V2 fields (`nextBatchWatermarkMs`, `stateUniqueIds`). - `SinkMetadataInfo` records per-sink metadata: `sinkName`, `commitOffset` (serialized via `OffsetV2.json()`), `providerName`, and an `isActive` flag used to distinguish the current sink from historical sinks that were used in earlier batches but are no longer in use. - `CommitMetadataV3.activeSinkMetadataInfoOpt` returns the entry with `isActive = true`, if any. - `CommitLog.createMetadata` learns to produce a `CommitMetadataV3` when `commitLogFormatVersion = VERSION_3`, requiring a non-empty `sinkMetadataMap`. - `CommitLog.readCommitMetadata` dispatches `v3` files to the new class. The V3 metadata is dormant in this PR: no caller produces it yet. Wiring through `MicroBatchExecution` (so each batch persists its sink name + offset, and so restarts read the map back) is the SPARK-56972 follow-up. This PR is built on top of #56018 (SPARK-56970). It currently shows the SPARK-56970 commits in its diff; that will resolve once SPARK-56970 merges. SPARK-56719 added `DataStreamWriter.name()` as the API surface for sink evolution. Without a place in the commit log to durably record the sink name and offset alongside the rest of a committed batch's metadata, sink names cannot be observed on restart and the evolution feature cannot be completed. This PR introduces that storage in a separate, narrowly scoped change. No. `CommitMetadataV3` is in the internal `org.apache.spark.sql.execution.streaming.checkpointing` package and is not produced by any code path yet. Added unit tests in `CommitLogSuite`: - V3 SerDe with a single active sink (round-trips through commit log). - V3 retains historical sinks alongside the active one and `activeSinkMetadataInfoOpt` resolves correctly. - `createMetadata(version = V3, sinkMetadataMap = Map.empty)` fails fast with `IllegalArgumentException`. Generated-by: Claude Code (claude-opus-4-7) Closes #56019 from ericm-db/sink-evolution-sink-metadata-info. Authored-by: ericm-db <eric.marnadi@databricks.com> Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com> (cherry picked from commit 4d26262) Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
1 parent 67dd8a0 commit ea5478d

2 files changed

Lines changed: 175 additions & 2 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.json4s.{Formats, NoTypeHints}
2626
import org.json4s.jackson.Serialization
2727

2828
import org.apache.spark.sql.SparkSession
29+
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
2930
import org.apache.spark.sql.errors.QueryExecutionErrors
3031
import org.apache.spark.sql.internal.SQLConf
3132

@@ -76,12 +77,18 @@ class CommitLog(
7677
/**
7778
* Factory for creating a [[CommitMetadataBase]] for the requested wire format version.
7879
* Defaults to the version configured via [[SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION]].
80+
*
81+
* For [[VERSION_3]], [[sinkMetadataMap]] must be non-empty and contain exactly one active
82+
* sink; [[CommitMetadataV3]] enforces this invariant.
7983
*/
8084
def createMetadata(
8185
nextBatchWatermarkMs: Long = 0,
8286
stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None,
87+
sinkMetadataMap: Map[String, SinkMetadataInfo] = Map.empty,
8388
commitLogFormatVersion: Int = defaultVersion): CommitMetadataBase = {
8489
commitLogFormatVersion match {
90+
case VERSION_3 =>
91+
CommitMetadataV3(nextBatchWatermarkMs, stateUniqueIds, sinkMetadataMap)
8592
case VERSION_2 =>
8693
CommitMetadataV2(nextBatchWatermarkMs, stateUniqueIds)
8794
case VERSION_1 =>
@@ -98,7 +105,8 @@ object CommitLog {
98105
private val EMPTY_JSON = "{}"
99106
val VERSION_1 = 1
100107
val VERSION_2 = 2
101-
val MAX_VERSION: Int = VERSION_2
108+
val VERSION_3 = 3
109+
val MAX_VERSION: Int = VERSION_3
102110

103111
/**
104112
* Reads a single commit log entry and dispatches to the matching
@@ -112,6 +120,7 @@ object CommitLog {
112120
val version = MetadataVersionUtil.validateVersion(lines.next().trim, MAX_VERSION)
113121
val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON
114122
version match {
123+
case VERSION_3 => CommitMetadataV3(metadataJson)
115124
case VERSION_2 => CommitMetadataV2(metadataJson)
116125
case VERSION_1 => CommitMetadata(metadataJson)
117126
case v => throw QueryExecutionErrors.logVersionGreaterThanSupported(v, MAX_VERSION)
@@ -205,3 +214,78 @@ object CommitMetadataV2 {
205214

206215
def apply(json: String): CommitMetadataV2 = Serialization.read[CommitMetadataV2](json)
207216
}
217+
218+
/**
219+
* Commit log metadata for [[CommitLog.VERSION_3]]. Extends V2 with a map of per-sink metadata
220+
* keyed by sink name. This enables streaming sink evolution: each batch records the active sink
221+
* along with any historical sinks that were used in earlier batches but are no longer active.
222+
*
223+
* @param nextBatchWatermarkMs The watermark of the next batch.
224+
* @param stateUniqueIds Per-operator state store unique ids (see [[CommitMetadataV2]]).
225+
* @param sinkMetadataMap Non-empty map keyed by sink name with exactly one active entry per
226+
* commit; deactivated sinks are retained to detect reuse of a sink name.
227+
*/
228+
case class CommitMetadataV3(
229+
nextBatchWatermarkMs: Long = 0,
230+
stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None,
231+
sinkMetadataMap: Map[String, SinkMetadataInfo]) extends CommitMetadataBase {
232+
require(sinkMetadataMap.nonEmpty,
233+
"VERSION_3 commit log requires a non-empty sinkMetadataMap")
234+
require(sinkMetadataMap.values.count(_.isActive) == 1,
235+
"VERSION_3 commit log requires exactly one active sink, but found " +
236+
s"${sinkMetadataMap.values.count(_.isActive)} in sinkMetadataMap")
237+
238+
override def version: Int = CommitLog.VERSION_3
239+
240+
override def withStateUniqueIds(
241+
stateUniqueIds: Option[Map[Long, Array[Array[String]]]]): CommitMetadataV3 =
242+
copy(stateUniqueIds = stateUniqueIds)
243+
244+
/** Returns the currently active sink's metadata; exactly one always exists (see require). */
245+
def activeSinkMetadataInfo: SinkMetadataInfo = sinkMetadataMap.values.find(_.isActive).get
246+
}
247+
248+
object CommitMetadataV3 {
249+
implicit val format: Formats = Serialization.formats(NoTypeHints)
250+
251+
def apply(json: String): CommitMetadataV3 = Serialization.read[CommitMetadataV3](json)
252+
}
253+
254+
/**
255+
* Per-sink metadata recorded in a [[CommitMetadataV3]] entry.
256+
*
257+
* @param sinkName Sink name as supplied via `DataStreamWriter.name()`, or
258+
* `MicroBatchExecution.DEFAULT_SINK_NAME` when sink evolution is disabled.
259+
* @param commitOffset The latest offset committed to the sink as a JSON string
260+
* (i.e. [[OffsetV2.json()]]), or [[OffsetSeqLog.SERIALIZED_VOID_OFFSET]] if
261+
* no offset is available.
262+
* @param providerName Identifies the sink implementation (e.g. fully-qualified class name).
263+
* @param apiVersion The API version for the sink - whether it is DSv1 or DSv2.
264+
* @param isActive Whether this sink is the active sink for the current batch. Historical sinks
265+
* are retained with `isActive = false`.
266+
*/
267+
case class SinkMetadataInfo(
268+
sinkName: String,
269+
commitOffset: String,
270+
providerName: String,
271+
apiVersion: String,
272+
isActive: Boolean = true) {
273+
def json: String = Serialization.write(this)(SinkMetadataInfo.format)
274+
}
275+
276+
object SinkMetadataInfo {
277+
private implicit val format: Formats = Serialization.formats(NoTypeHints)
278+
279+
def apply(
280+
sinkName: String,
281+
commitOffset: Option[OffsetV2],
282+
providerName: String,
283+
apiVersion: String,
284+
isActive: Boolean): SinkMetadataInfo = {
285+
val offsetString = commitOffset match {
286+
case Some(off) => off.json
287+
case None => OffsetSeqLog.SERIALIZED_VOID_OFFSET
288+
}
289+
new SinkMetadataInfo(sinkName, offsetString, providerName, apiVersion, isActive)
290+
}
291+
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, FileInputStream, FileOutputStream}
2121
import java.nio.file.Path
2222

2323
import org.apache.spark.SparkFunSuite
24-
import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataBase, CommitMetadataV2}
24+
import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataBase, CommitMetadataV2, CommitMetadataV3, OffsetSeqLog, SinkMetadataInfo}
2525
import org.apache.spark.sql.internal.SQLConf
2626
import org.apache.spark.sql.test.SharedSparkSession
2727

@@ -115,6 +115,95 @@ class CommitLogSuite extends SparkFunSuite with SharedSparkSession {
115115
}
116116
}
117117

118+
test("Basic Commit Log V3 SerDe - single active sink") {
119+
withTempDir { tempDir =>
120+
val commitLog = new CommitLog(spark, tempDir.getAbsolutePath)
121+
val sinkInfo = SinkMetadataInfo(
122+
sinkName = "sink-0",
123+
commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET,
124+
providerName = "memory",
125+
apiVersion = "v2",
126+
isActive = true)
127+
val metadata = commitLog.createMetadata(
128+
nextBatchWatermarkMs = 42,
129+
sinkMetadataMap = Map("sink-0" -> sinkInfo),
130+
commitLogFormatVersion = CommitLog.VERSION_3)
131+
assert(commitLog.add(0, metadata))
132+
133+
val read = commitLog.get(0).get
134+
assert(read.version === CommitLog.VERSION_3)
135+
assert(read.nextBatchWatermarkMs === 42)
136+
val readV3 = read.asInstanceOf[CommitMetadataV3]
137+
assert(readV3.sinkMetadataMap === Map("sink-0" -> sinkInfo))
138+
assert(readV3.activeSinkMetadataInfo === sinkInfo)
139+
}
140+
}
141+
142+
test("Commit Log V3 - retains historical sinks alongside active") {
143+
withTempDir { tempDir =>
144+
val commitLog = new CommitLog(spark, tempDir.getAbsolutePath)
145+
val historical = SinkMetadataInfo(
146+
sinkName = "sink-0",
147+
commitOffset = """{"offset":3}""",
148+
providerName = "memory",
149+
apiVersion = "v2",
150+
isActive = false)
151+
val active = SinkMetadataInfo(
152+
sinkName = "sink-1",
153+
commitOffset = """{"offset":7}""",
154+
providerName = "memory",
155+
apiVersion = "v2",
156+
isActive = true)
157+
val metadata = commitLog.createMetadata(
158+
nextBatchWatermarkMs = 100,
159+
sinkMetadataMap = Map("sink-0" -> historical, "sink-1" -> active),
160+
commitLogFormatVersion = CommitLog.VERSION_3)
161+
assert(commitLog.add(0, metadata))
162+
163+
val readV3 = commitLog.get(0).get.asInstanceOf[CommitMetadataV3]
164+
assert(readV3.activeSinkMetadataInfo === active)
165+
assert(readV3.sinkMetadataMap("sink-0") === historical)
166+
assert(readV3.sinkMetadataMap("sink-1") === active)
167+
}
168+
}
169+
170+
test("createMetadata for V3 requires non-empty sinkMetadataMap") {
171+
withTempDir { tempDir =>
172+
val commitLog = new CommitLog(spark, tempDir.getAbsolutePath)
173+
intercept[IllegalArgumentException] {
174+
commitLog.createMetadata(
175+
nextBatchWatermarkMs = 0,
176+
sinkMetadataMap = Map.empty,
177+
commitLogFormatVersion = CommitLog.VERSION_3)
178+
}
179+
}
180+
}
181+
182+
test("CommitMetadataV3 requires exactly one active sink") {
183+
val historical = SinkMetadataInfo(
184+
sinkName = "sink-0",
185+
commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET,
186+
providerName = "memory",
187+
apiVersion = "v2",
188+
isActive = false)
189+
val active = SinkMetadataInfo(
190+
sinkName = "sink-1",
191+
commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET,
192+
providerName = "memory",
193+
apiVersion = "v2",
194+
isActive = true)
195+
196+
// No active sink.
197+
intercept[IllegalArgumentException] {
198+
CommitMetadataV3(sinkMetadataMap = Map("sink-0" -> historical))
199+
}
200+
// More than one active sink.
201+
intercept[IllegalArgumentException] {
202+
CommitMetadataV3(sinkMetadataMap =
203+
Map("sink-0" -> active.copy(sinkName = "sink-0"), "sink-1" -> active))
204+
}
205+
}
206+
118207
// SPARK-50653: When the configured commit log version is V2, a V1 file on disk should still
119208
// deserialize successfully into a V1 [[CommitMetadata]] because the wire format version is now
120209
// discovered from the file header rather than enforced to match the conf.

0 commit comments

Comments
 (0)