Skip to content

Commit bfcc62b

Browse files
ericm-dbanishshri-db
authored andcommitted
[SPARK-56971][SS] Add CommitMetadataV3 and SinkMetadataInfo for sink evolution
### What changes were proposed in this pull request? Backport of [SPARK-56971] ([#56019](#56019)) to `branch-4.2`. Add the commit log data structures for streaming sink evolution: - `CommitMetadataV3` (`VERSION_3` of the commit log wire format) carries a `sinkMetadataMap: Map[String, SinkMetadataInfo]` keyed by sink name, in addition to the V2 fields (`nextBatchWatermarkMs`, `stateUniqueIds`). - `SinkMetadataInfo` records per-sink metadata: `sinkName`, `commitOffset` (serialized via `OffsetV2.json()`), `providerName`, `apiVersion`, and an `isActive` flag used to distinguish the current sink from historical sinks that were used in earlier batches but are no longer in use. - `CommitMetadataV3.activeSinkMetadataInfo` returns the entry with `isActive = true`; `CommitMetadataV3` requires exactly one active sink. - `CommitLog.createMetadata` learns to produce a `CommitMetadataV3` when `commitLogFormatVersion = VERSION_3`, requiring a non-empty `sinkMetadataMap`. - `CommitLog.readCommitMetadata` dispatches `v3` files to the new class. The V3 metadata is dormant in this PR: no caller produces it yet. Wiring through `MicroBatchExecution` is the SPARK-56972 follow-up. **Prerequisite commit.** SPARK-56971 was built on top of [SPARK-56970] ([#56018](#56018)), which splits `CommitMetadata` into a `CommitMetadataBase` trait with concrete `CommitMetadata` (V1) and `CommitMetadataV2` case classes. `branch-4.2` does not yet have SPARK-56970, so this PR includes it as the first commit and adds SPARK-56971 on top. Both commits are cherry-picked from the `branch-4.x` backports (`5322ec30c02` and `706ce2f3743`). The only conflicts were import-line collisions in `CommitLogSuite.scala` (the suite extends `SparkFunSuite with SharedSparkSession` on `branch-4.2`); the resolved `CommitLog.scala` is identical to `branch-4.x`. ### Why are the changes needed? SPARK-56719 added `DataStreamWriter.name()` as the API surface for sink evolution. Without a place in the commit log to durably record the sink name and offset alongside the rest of a committed batch's metadata, sink names cannot be observed on restart and the evolution feature cannot be completed. This PR introduces that storage in the 4.2 release line. ### Does this PR introduce _any_ user-facing change? No. `CommitMetadataV3` is in the internal `org.apache.spark.sql.execution.streaming.checkpointing` package and is not produced by any code path yet. As part of the SPARK-56970 refactor, V1 commit log files no longer serialize `stateUniqueIds: null`; old V1 files continue to be read because the V1 deserializer ignores the (now-unknown) field. ### How was this patch tested? - Cherry-picked the two `branch-4.x` commits; resolved import conflicts in `CommitLogSuite.scala`. - Existing and new `CommitLogSuite` cases (V1/V2/V3 SerDe, historical-sink retention, `createMetadata` V3 empty-map failure, exactly-one-active-sink invariant). - `sql/core` main and test sources compile cleanly on `branch-4.2` (`build/sbt sql/Test/compile`). ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-8) Closes #56548 from ericm-db/SPARK-56971-branch-4.2. Lead-authored-by: Eric Marnadi <eric.marnadi@databricks.com> Co-authored-by: ericm-db <eric.marnadi@databricks.com> Signed-off-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
1 parent c655abe commit bfcc62b

9 files changed

Lines changed: 393 additions & 93 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService:
4848
* the async write of the batch is completed. Future may also be completed exceptionally
4949
* to indicate some write error.
5050
*/
51-
def addAsync(batchId: Long, metadata: CommitMetadata): CompletableFuture[Long] = {
51+
def addAsync(batchId: Long, metadata: CommitMetadataBase): CompletableFuture[Long] = {
5252
require(metadata != null, "'null' metadata cannot be written to a metadata log")
5353
val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { output =>
5454
serialize(metadata, output)
@@ -72,7 +72,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService:
7272
* @param metadata metadata of batch to write
7373
* @return true if operation is successful otherwise false.
7474
*/
75-
def addInMemory(batchId: Long, metadata: CommitMetadata): Boolean = {
75+
def addInMemory(batchId: Long, metadata: CommitMetadataBase): Boolean = {
7676
if (batchCache.containsKey(batchId)) {
7777
false
7878
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala

Lines changed: 190 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import org.json4s.{Formats, NoTypeHints}
2626
import org.json4s.jackson.Serialization
2727

2828
import org.apache.spark.sql.SparkSession
29+
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
30+
import org.apache.spark.sql.errors.QueryExecutionErrors
2931
import org.apache.spark.sql.internal.SQLConf
3032

3133
/**
@@ -50,39 +52,127 @@ class CommitLog(
5052
sparkSession: SparkSession,
5153
path: String,
5254
readOnly: Boolean = false)
53-
extends HDFSMetadataLog[CommitMetadata](sparkSession, path, readOnly) {
55+
extends HDFSMetadataLog[CommitMetadataBase](sparkSession, path, readOnly) {
5456

5557
import CommitLog._
5658

57-
private val VERSION: Int = sparkSession.conf.get(
59+
// The configured commit log format version. Used as the default version when callers
60+
// construct metadata through [[createMetadata]].
61+
private[sql] val defaultVersion: Int = sparkSession.conf.get(
5862
SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key).toInt
5963

60-
override protected[sql] def deserialize(in: InputStream): CommitMetadata = {
61-
// called inside a try-finally where the underlying stream is closed in the caller
62-
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
63-
if (!lines.hasNext) {
64-
throw new IllegalStateException("Incomplete log file in the offset commit log")
65-
}
66-
// TODO [SPARK-49462] This validation should be relaxed for a stateless query.
67-
// TODO [SPARK-50653] This validation should be relaxed to support reading
68-
// a V1 log file when VERSION is V2
69-
validateVersionExactMatch(lines.next().trim, VERSION)
70-
val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON
71-
CommitMetadata(metadataJson)
64+
override protected[sql] def deserialize(in: InputStream): CommitMetadataBase = {
65+
CommitLog.readCommitMetadata(in)
7266
}
7367

74-
override protected[sql] def serialize(metadata: CommitMetadata, out: OutputStream): Unit = {
68+
override protected[sql] def serialize(metadata: CommitMetadataBase, out: OutputStream): Unit = {
7569
// called inside a try-finally where the underlying stream is closed in the caller
76-
out.write(s"v${VERSION}".getBytes(UTF_8))
70+
out.write(s"v${metadata.version}".getBytes(UTF_8))
7771
out.write('\n')
7872

7973
// write metadata
8074
out.write(metadata.json.getBytes(UTF_8))
8175
}
76+
77+
/**
78+
* Factory for creating a [[CommitMetadataBase]] for the requested wire format version.
79+
* Defaults to the version configured via [[SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION]].
80+
*
81+
* For [[VERSION_3]], [[sinkMetadataMap]] must be non-empty and contain exactly one active
82+
* sink; [[CommitMetadataV3]] enforces this invariant.
83+
*/
84+
def createMetadata(
85+
nextBatchWatermarkMs: Long = 0,
86+
stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None,
87+
sinkMetadataMap: Map[String, SinkMetadataInfo] = Map.empty,
88+
commitLogFormatVersion: Int = defaultVersion): CommitMetadataBase = {
89+
commitLogFormatVersion match {
90+
case VERSION_3 =>
91+
CommitMetadataV3(nextBatchWatermarkMs, stateUniqueIds, sinkMetadataMap)
92+
case VERSION_2 =>
93+
CommitMetadataV2(nextBatchWatermarkMs, stateUniqueIds)
94+
case VERSION_1 =>
95+
// VERSION_1 cannot persist stateUniqueIds; withStateUniqueIds enforces this invariant
96+
// (it throws if stateUniqueIds is non-empty).
97+
CommitMetadata(nextBatchWatermarkMs).withStateUniqueIds(stateUniqueIds)
98+
case v =>
99+
throw QueryExecutionErrors.logVersionGreaterThanSupported(v, CommitLog.MAX_VERSION)
100+
}
101+
}
82102
}
83103

84104
object CommitLog {
85105
private val EMPTY_JSON = "{}"
106+
val VERSION_1 = 1
107+
val VERSION_2 = 2
108+
val VERSION_3 = 3
109+
val MAX_VERSION: Int = VERSION_3
110+
111+
/**
112+
* Reads a single commit log entry and dispatches to the matching
113+
* [[CommitMetadataBase]] subclass based on the wire format version recorded in the file.
114+
*/
115+
private[spark] def readCommitMetadata(in: InputStream): CommitMetadataBase = {
116+
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
117+
if (!lines.hasNext) {
118+
throw new IllegalStateException("Incomplete log file in the offset commit log")
119+
}
120+
val version = MetadataVersionUtil.validateVersion(lines.next().trim, MAX_VERSION)
121+
val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON
122+
version match {
123+
case VERSION_3 => CommitMetadataV3(metadataJson)
124+
case VERSION_2 => CommitMetadataV2(metadataJson)
125+
case VERSION_1 => CommitMetadata(metadataJson)
126+
case v => throw QueryExecutionErrors.logVersionGreaterThanSupported(v, MAX_VERSION)
127+
}
128+
}
129+
}
130+
131+
/**
132+
* Base trait for commit log metadata. Concrete subclasses correspond to wire format versions
133+
* and override [[version]] accordingly.
134+
*/
135+
trait CommitMetadataBase extends Serializable {
136+
def version: Int
137+
def nextBatchWatermarkMs: Long
138+
def stateUniqueIds: Option[Map[Long, Array[Array[String]]]]
139+
140+
/**
141+
* Returns a copy of this metadata with the given state store unique ids, preserving the
142+
* concrete subclass and all of its other fields. Deriving a new commit from an existing one
143+
* should go through this method (rather than reconstructing via [[CommitLog.createMetadata]])
144+
* so that version-specific fields are not silently dropped when new metadata versions are
145+
* introduced.
146+
*/
147+
def withStateUniqueIds(
148+
stateUniqueIds: Option[Map[Long, Array[Array[String]]]]): CommitMetadataBase
149+
150+
def json: String = Serialization.write(this)(CommitMetadata.format)
151+
}
152+
153+
/**
154+
* Commit log metadata for [[CommitLog.VERSION_1]]. Records the watermark for the next batch only.
155+
*
156+
* @param nextBatchWatermarkMs The watermark of the next batch.
157+
*/
158+
case class CommitMetadata(
159+
nextBatchWatermarkMs: Long = 0) extends CommitMetadataBase {
160+
override def version: Int = CommitLog.VERSION_1
161+
override def stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None
162+
163+
override def withStateUniqueIds(
164+
stateUniqueIds: Option[Map[Long, Array[Array[String]]]]): CommitMetadata = {
165+
require(stateUniqueIds.forall(_.isEmpty),
166+
s"stateUniqueIds cannot be set for commit log format version ${CommitLog.VERSION_1}; " +
167+
s"use version ${CommitLog.VERSION_2} to persist state store checkpoint ids.")
168+
this
169+
}
170+
}
171+
172+
object CommitMetadata {
173+
implicit val format: Formats = Serialization.formats(NoTypeHints)
174+
175+
def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json)
86176
}
87177

88178
/**
@@ -104,19 +194,98 @@ object CommitLog {
104194
* +--- ......
105195
* In the commit log, in addition to nextBatchWatermarkMs, we also store the unique ids of the
106196
* state store files.
197+
*
107198
* @param nextBatchWatermarkMs The watermark of the next batch.
108199
* @param stateUniqueIds Map[Long, Array[Array[String]]] of map
109200
* OperatorId -> (partitionID -> array of uniqueID)
110201
*/
202+
case class CommitMetadataV2(
203+
nextBatchWatermarkMs: Long = 0,
204+
stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None) extends CommitMetadataBase {
205+
override def version: Int = CommitLog.VERSION_2
111206

112-
case class CommitMetadata(
207+
override def withStateUniqueIds(
208+
stateUniqueIds: Option[Map[Long, Array[Array[String]]]]): CommitMetadataV2 =
209+
copy(stateUniqueIds = stateUniqueIds)
210+
}
211+
212+
object CommitMetadataV2 {
213+
import CommitMetadata.format
214+
215+
def apply(json: String): CommitMetadataV2 = Serialization.read[CommitMetadataV2](json)
216+
}
217+
218+
/**
219+
* Commit log metadata for [[CommitLog.VERSION_3]]. Extends V2 with a map of per-sink metadata
220+
* keyed by sink name. This enables streaming sink evolution: each batch records the active sink
221+
* along with any historical sinks that were used in earlier batches but are no longer active.
222+
*
223+
* @param nextBatchWatermarkMs The watermark of the next batch.
224+
* @param stateUniqueIds Per-operator state store unique ids (see [[CommitMetadataV2]]).
225+
* @param sinkMetadataMap Non-empty map keyed by sink name with exactly one active entry per
226+
* commit; deactivated sinks are retained to detect reuse of a sink name.
227+
*/
228+
case class CommitMetadataV3(
113229
nextBatchWatermarkMs: Long = 0,
114-
stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None) {
115-
def json: String = Serialization.write(this)(CommitMetadata.format)
230+
stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None,
231+
sinkMetadataMap: Map[String, SinkMetadataInfo]) extends CommitMetadataBase {
232+
require(sinkMetadataMap.nonEmpty,
233+
"VERSION_3 commit log requires a non-empty sinkMetadataMap")
234+
require(sinkMetadataMap.values.count(_.isActive) == 1,
235+
"VERSION_3 commit log requires exactly one active sink, but found " +
236+
s"${sinkMetadataMap.values.count(_.isActive)} in sinkMetadataMap")
237+
238+
override def version: Int = CommitLog.VERSION_3
239+
240+
override def withStateUniqueIds(
241+
stateUniqueIds: Option[Map[Long, Array[Array[String]]]]): CommitMetadataV3 =
242+
copy(stateUniqueIds = stateUniqueIds)
243+
244+
/** Returns the currently active sink's metadata; exactly one always exists (see require). */
245+
def activeSinkMetadataInfo: SinkMetadataInfo = sinkMetadataMap.values.find(_.isActive).get
116246
}
117247

118-
object CommitMetadata {
248+
object CommitMetadataV3 {
119249
implicit val format: Formats = Serialization.formats(NoTypeHints)
120250

121-
def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json)
251+
def apply(json: String): CommitMetadataV3 = Serialization.read[CommitMetadataV3](json)
252+
}
253+
254+
/**
255+
* Per-sink metadata recorded in a [[CommitMetadataV3]] entry.
256+
*
257+
* @param sinkName Sink name as supplied via `DataStreamWriter.name()`, or
258+
* `MicroBatchExecution.DEFAULT_SINK_NAME` when sink evolution is disabled.
259+
* @param commitOffset The latest offset committed to the sink as a JSON string
260+
* (i.e. [[OffsetV2.json()]]), or [[OffsetSeqLog.SERIALIZED_VOID_OFFSET]] if
261+
* no offset is available.
262+
* @param providerName Identifies the sink implementation (e.g. fully-qualified class name).
263+
* @param apiVersion The API version for the sink - whether it is DSv1 or DSv2.
264+
* @param isActive Whether this sink is the active sink for the current batch. Historical sinks
265+
* are retained with `isActive = false`.
266+
*/
267+
case class SinkMetadataInfo(
268+
sinkName: String,
269+
commitOffset: String,
270+
providerName: String,
271+
apiVersion: String,
272+
isActive: Boolean = true) {
273+
def json: String = Serialization.write(this)(SinkMetadataInfo.format)
274+
}
275+
276+
object SinkMetadataInfo {
277+
private implicit val format: Formats = Serialization.formats(NoTypeHints)
278+
279+
def apply(
280+
sinkName: String,
281+
commitOffset: Option[OffsetV2],
282+
providerName: String,
283+
apiVersion: String,
284+
isActive: Boolean): SinkMetadataInfo = {
285+
val offsetString = commitOffset match {
286+
case Some(off) => off.json
287+
case None => OffsetSeqLog.SERIALIZED_VOID_OFFSET
288+
}
289+
new SinkMetadataInfo(sinkName, offsetString, providerName, apiVersion, isActive)
290+
}
122291
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
4646
import org.apache.spark.sql.execution.datasources.LogicalRelation
4747
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
4848
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
49-
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
49+
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
5050
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
5151
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
5252
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
@@ -1464,7 +1464,9 @@ class MicroBatchExecution(
14641464
None
14651465
}
14661466
if (!commitLog.add(execCtx.batchId,
1467-
CommitMetadata(watermarkTracker.currentWatermark, stateStoreCkptId))) {
1467+
commitLog.createMetadata(
1468+
nextBatchWatermarkMs = watermarkTracker.currentWatermark,
1469+
stateUniqueIds = stateStoreCkptId))) {
14681470
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
14691471
}
14701472
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,9 @@ class OfflineStateRepartitionRunner(
294294
lastCommittedBatchId: Long,
295295
opIdToStateStoreCkptInfo: Option[Map[Long, Array[Array[String]]]]): Unit = {
296296
val latestCommit = checkpointMetadata.commitLog.get(lastCommittedBatchId).get
297-
val commitMetadata = latestCommit.copy(stateUniqueIds = opIdToStateStoreCkptInfo)
297+
// Derive the new commit from the latest one so version-specific fields are preserved and the
298+
// wire format version stays consistent with the source checkpoint.
299+
val commitMetadata = latestCommit.withStateUniqueIds(opIdToStateStoreCkptInfo)
298300

299301
if (!checkpointMetadata.commitLog.add(newBatchId, commitMetadata)) {
300302
throw QueryExecutionErrors.concurrentStreamLogUpdate(newBatchId)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.UUID
2222
import org.apache.hadoop.conf.Configuration
2323
import org.apache.hadoop.fs.Path
2424

25-
import org.apache.spark.{SparkIllegalStateException, SparkThrowable, TaskContext}
25+
import org.apache.spark.{SparkIllegalStateException, TaskContext}
2626
import org.apache.spark.broadcast.Broadcast
2727
import org.apache.spark.internal.Logging
2828
import org.apache.spark.internal.LogKeys._
@@ -376,27 +376,19 @@ class StateRewriter(
376376
}
377377

378378
private def verifyCheckpointFormatVersion(): Unit = {
379-
// Verify checkpoint version in sqlConf based on commitLog for readCheckpoint
380-
// in case user forgot to set STATE_STORE_CHECKPOINT_FORMAT_VERSION.
381-
// Using read batch commit since the latest commit could be a skipped batch.
382-
// If SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION is wrong, readCheckpoint.commitLog
383-
// will throw an exception, and we will propagate this exception upstream.
384-
// This prevents the StateRewriter from failing to write the correct state files
385-
try {
386-
readCheckpoint.commitLog.get(readBatchId)
387-
} catch {
388-
case e: IllegalStateException if e.getCause != null &&
389-
e.getCause.isInstanceOf[SparkThrowable] =>
390-
val sparkThrowable = e.getCause.asInstanceOf[SparkThrowable]
391-
if (sparkThrowable.getCondition == "INVALID_LOG_VERSION.EXACT_MATCH_VERSION") {
392-
val params = sparkThrowable.getMessageParameters
393-
val expectedVersion = params.get("version")
394-
val actualVersion = params.get("matchVersion")
395-
throw StateRewriterErrors.stateCheckpointFormatVersionMismatchError(
396-
checkpointLocationForRead, expectedVersion, actualVersion)
397-
}
398-
throw e
379+
// Verify checkpoint version in sqlConf matches the version recorded in the read commit log,
380+
// in case the user forgot to set STATE_STORE_CHECKPOINT_FORMAT_VERSION. This prevents the
381+
// StateRewriter from writing state files in a format that disagrees with the source
382+
// checkpoint. Using the read batch commit since the latest commit could be a skipped batch.
383+
readCheckpoint.commitLog.get(readBatchId).foreach { metadata =>
384+
val configuredVersion = readCheckpoint.commitLog.defaultVersion
385+
if (metadata.version != configuredVersion) {
386+
throw StateRewriterErrors.stateCheckpointFormatVersionMismatchError(
387+
checkpointLocationForRead,
388+
expectedVersion = metadata.version.toString,
389+
actualVersion = configuredVersion.toString)
399390
}
391+
}
400392
}
401393
}
402394

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
2525
import org.scalatest.Assertions
2626

2727
import org.apache.spark.sql.Row
28-
import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata}
28+
import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataV2}
2929
import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamExecution}
3030
import org.apache.spark.sql.execution.streaming.state._
3131
import org.apache.spark.sql.functions.{col, window}
@@ -237,11 +237,11 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB
237237
new File(tempDir.getAbsolutePath, "commits").getAbsolutePath)
238238

239239
// Start version: treated as v1 (no operator unique ids)
240-
val startMetadata = CommitMetadata(0, None)
240+
val startMetadata = CommitMetadata(0)
241241
assert(commitLog.add(0, startMetadata))
242242

243243
// End version: treated as v2 (operator 0 has unique ids)
244-
val endMetadata = CommitMetadata(0,
244+
val endMetadata = CommitMetadataV2(0,
245245
Some(Map[Long, Array[Array[String]]](0L -> Array(Array("uid")))))
246246
assert(commitLog.add(1, endMetadata))
247247

0 commit comments

Comments
 (0)