Skip to content

Commit 2135faf

Browse files
committed
Apply scan to transformWithState operators (TTL and timers)
Use bounded scan ranges in transformWithState TTL eviction and timer expiry to narrow the iteration scope: - TTLState.ttlEvictionIterator: use store.scan with startKey from prevBatchTimestampMs+1 and endKey from batchTimestampMs+1 to skip entries already cleaned up in the previous batch. - TimerStateImpl.getExpiredTimers: use store.scan with startKey from prevExpiryTimestampMs+1 and endKey from expiryTimestampMs+1. Processing-time timers use prevBatchTimestampMs; event-time timers use eventTimeWatermarkForLateEvents. Thread prevBatchTimestampMs from IncrementalExecution (via prevOffsetSeqMetadata) through TransformWithStateExec -> StatefulProcessorHandleImpl -> TTLState / TimerStateImpl. Copy UnsafeRow results from encodeTTLRow/UnsafeProjection to avoid the mutable-row-reuse bug where startKey and endKey alias the same internal buffer.
1 parent 5fd1438 commit 2135faf

10 files changed

Lines changed: 119 additions & 23 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
787787
outputAttr,
788788
stateInfo = None,
789789
batchTimestampMs = None,
790+
prevBatchTimestampMs = None,
790791
eventTimeWatermarkForLateEvents = None,
791792
eventTimeWatermarkForEviction = None,
792793
planLater(child),
@@ -815,6 +816,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
815816
func, t.leftAttributes, outputAttrs, outputMode, timeMode,
816817
stateInfo = None,
817818
batchTimestampMs = None,
819+
prevBatchTimestampMs = None,
818820
eventTimeWatermarkForLateEvents = None,
819821
eventTimeWatermarkForEviction = None,
820822
userFacingDataType,

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ case class TransformWithStateInPySparkExec(
7474
timeMode: TimeMode,
7575
stateInfo: Option[StatefulOperatorStateInfo],
7676
batchTimestampMs: Option[Long],
77+
prevBatchTimestampMs: Option[Long] = None,
7778
eventTimeWatermarkForLateEvents: Option[Long],
7879
eventTimeWatermarkForEviction: Option[Long],
7980
userFacingDataType: TransformWithStateInPySpark.UserFacingDataType.Value,
@@ -314,7 +315,8 @@ case class TransformWithStateInPySparkExec(
314315
val data = groupAndProject(filteredIter, groupingAttributes, child.output, dedupAttributes)
315316

316317
val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
317-
groupingKeyExprEncoder, timeMode, isStreaming, batchTimestampMs, metrics)
318+
groupingKeyExprEncoder, timeMode, isStreaming, batchTimestampMs,
319+
prevBatchTimestampMs, metrics)
318320

319321
val evalType = {
320322
if (userFacingDataType == TransformWithStateInPySpark.UserFacingDataType.PANDAS) {
@@ -442,6 +444,7 @@ object TransformWithStateInPySparkExec {
442444
Some(System.currentTimeMillis),
443445
None,
444446
None,
447+
None,
445448
userFacingDataType,
446449
child,
447450
isStreaming = false,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/TransformWithStateExec.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ case class TransformWithStateExec(
6767
outputObjAttr: Attribute,
6868
stateInfo: Option[StatefulOperatorStateInfo],
6969
batchTimestampMs: Option[Long],
70+
prevBatchTimestampMs: Option[Long] = None,
7071
eventTimeWatermarkForLateEvents: Option[Long],
7172
eventTimeWatermarkForEviction: Option[Long],
7273
child: SparkPlan,
@@ -251,7 +252,7 @@ case class TransformWithStateExec(
251252
case ProcessingTime =>
252253
assert(batchTimestampMs.isDefined)
253254
val batchTimestamp = batchTimestampMs.get
254-
processorHandle.getExpiredTimers(batchTimestamp)
255+
processorHandle.getExpiredTimers(batchTimestamp, prevBatchTimestampMs)
255256
.flatMap { case (keyObj, expiryTimestampMs) =>
256257
numExpiredTimers += 1
257258
handleTimerRows(keyObj, expiryTimestampMs, processorHandle)
@@ -260,7 +261,13 @@ case class TransformWithStateExec(
260261
case EventTime =>
261262
assert(eventTimeWatermarkForEviction.isDefined)
262263
val watermark = eventTimeWatermarkForEviction.get
263-
processorHandle.getExpiredTimers(watermark)
264+
// Only use the late-events watermark as the scan lower bound when a previous batch
265+
// actually existed (prevBatchTimestampMs is set). In the very first batch the
266+
// watermark propagation yields Some(0) for late events even though no timers have
267+
// been processed yet, which would incorrectly skip timers registered at timestamp 0.
268+
val prevWatermark =
269+
if (prevBatchTimestampMs.isDefined) eventTimeWatermarkForLateEvents else None
270+
processorHandle.getExpiredTimers(watermark, prevWatermark)
264271
.flatMap { case (keyObj, expiryTimestampMs) =>
265272
numExpiredTimers += 1
266273
handleTimerRows(keyObj, expiryTimestampMs, processorHandle)
@@ -493,7 +500,7 @@ case class TransformWithStateExec(
493500
CompletionIterator[InternalRow, Iterator[InternalRow]] = {
494501
val processorHandle = new StatefulProcessorHandleImpl(
495502
store, getStateInfo.queryRunId, keyEncoder, timeMode,
496-
isStreaming, batchTimestampMs, metrics)
503+
isStreaming, batchTimestampMs, prevBatchTimestampMs, metrics)
497504
assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
498505
statefulProcessor.setHandle(processorHandle)
499506
withStatefulProcessorErrorHandling("init") {
@@ -509,7 +516,7 @@ case class TransformWithStateExec(
509516
initStateIterator: Iterator[InternalRow]):
510517
CompletionIterator[InternalRow, Iterator[InternalRow]] = {
511518
val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
512-
keyEncoder, timeMode, isStreaming, batchTimestampMs, metrics)
519+
keyEncoder, timeMode, isStreaming, batchTimestampMs, prevBatchTimestampMs, metrics)
513520
assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
514521
statefulProcessor.setHandle(processorHandle)
515522
withStatefulProcessorErrorHandling("init") {
@@ -581,6 +588,7 @@ object TransformWithStateExec {
581588
Some(System.currentTimeMillis),
582589
None,
583590
None,
591+
None,
584592
child,
585593
isStreaming = false,
586594
hasInitialState,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/statefulprocessor/StatefulProcessorHandleImpl.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class StatefulProcessorHandleImpl(
114114
timeMode: TimeMode,
115115
isStreaming: Boolean = true,
116116
batchTimestampMs: Option[Long] = None,
117+
prevBatchTimestampMs: Option[Long] = None,
117118
metrics: Map[String, SQLMetric] = Map.empty)
118119
extends StatefulProcessorHandleImplBase(timeMode, keyEncoder) with Logging {
119120
import StatefulProcessorHandleState._
@@ -171,13 +172,19 @@ class StatefulProcessorHandleImpl(
171172

172173
/**
173174
* Function to retrieve all expired registered timers for all grouping keys
174-
* @param expiryTimestampMs Threshold for expired timestamp in milliseconds, this function
175-
* will return all timers that have timestamp less than passed threshold
175+
* @param expiryTimestampMs Threshold for expired timestamp in milliseconds (inclusive),
176+
* this function will return all timers that have timestamp
177+
* less than or equal to the passed threshold.
178+
* @param prevExpiryTimestampMs If provided, the lower bound (exclusive) of the scan range.
179+
* Timers at or below this timestamp are assumed to have been
180+
* already processed in the previous batch and will be skipped.
176181
* @return - iterator of registered timers for all grouping keys
177182
*/
178-
def getExpiredTimers(expiryTimestampMs: Long): Iterator[(Any, Long)] = {
183+
def getExpiredTimers(
184+
expiryTimestampMs: Long,
185+
prevExpiryTimestampMs: Option[Long] = None): Iterator[(Any, Long)] = {
179186
verifyTimerOperations("get_expired_timers")
180-
timerState.getExpiredTimers(expiryTimestampMs)
187+
timerState.getExpiredTimers(expiryTimestampMs, prevExpiryTimestampMs)
181188
}
182189

183190
/**
@@ -237,7 +244,8 @@ class StatefulProcessorHandleImpl(
237244
validateTTLConfig(ttlConfig, stateName)
238245
assert(batchTimestampMs.isDefined)
239246
val valueStateWithTTL = new ValueStateImplWithTTL[T](store, stateName,
240-
keyEncoder, stateEncoder, ttlConfig, batchTimestampMs.get, metrics)
247+
keyEncoder, stateEncoder, ttlConfig, batchTimestampMs.get,
248+
prevBatchTimestampMs, metrics)
241249
ttlStates.add(valueStateWithTTL)
242250
TWSMetricsUtils.incrementMetric(metrics, "numValueStateWithTTLVars")
243251
valueStateWithTTL
@@ -286,7 +294,8 @@ class StatefulProcessorHandleImpl(
286294
validateTTLConfig(ttlConfig, stateName)
287295
assert(batchTimestampMs.isDefined)
288296
val listStateWithTTL = new ListStateImplWithTTL[T](store, stateName,
289-
keyEncoder, stateEncoder, ttlConfig, batchTimestampMs.get, metrics)
297+
keyEncoder, stateEncoder, ttlConfig, batchTimestampMs.get,
298+
prevBatchTimestampMs, metrics)
290299
TWSMetricsUtils.incrementMetric(metrics, "numListStateWithTTLVars")
291300
ttlStates.add(listStateWithTTL)
292301
listStateWithTTL
@@ -324,7 +333,8 @@ class StatefulProcessorHandleImpl(
324333
validateTTLConfig(ttlConfig, stateName)
325334
assert(batchTimestampMs.isDefined)
326335
val mapStateWithTTL = new MapStateImplWithTTL[K, V](store, stateName, keyEncoder, userKeyEnc,
327-
valEncoder, ttlConfig, batchTimestampMs.get, metrics)
336+
valEncoder, ttlConfig, batchTimestampMs.get,
337+
prevBatchTimestampMs, metrics)
328338
TWSMetricsUtils.incrementMetric(metrics, "numMapStateWithTTLVars")
329339
ttlStates.add(mapStateWithTTL)
330340
mapStateWithTTL

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/timers/TimerStateImpl.scala

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,27 @@ class TimerStateImpl(
112112
schemaForValueRow, RangeKeyScanStateEncoderSpec(keySchemaForSecIndex, Seq(0)),
113113
useMultipleValuesPerKey = false, isInternal = true)
114114

115+
private val secIndexProjection = UnsafeProjection.create(keySchemaForSecIndex)
116+
117+
/**
118+
* Encodes a timestamp into an UnsafeRow key for the secondary index.
119+
* The timestamp is incremented by 1 so that the encoded key serves as an exclusive
120+
* lower / upper bound in range scans. Returns None if tsMs is Long.MaxValue
121+
* (overflow guard).
122+
*
123+
* The returned UnsafeRow is always a fresh copy, safe to hold alongside other
124+
* rows produced by the same projection.
125+
*/
126+
private def encodeTimestampAsKey(tsMs: Long): Option[UnsafeRow] = {
127+
if (tsMs < Long.MaxValue) {
128+
val row = new GenericInternalRow(keySchemaForSecIndex.length)
129+
row.setLong(0, tsMs + 1)
130+
Some(secIndexProjection.apply(row).copy())
131+
} else {
132+
None
133+
}
134+
}
135+
115136
private def getGroupingKey(cfName: String): Any = {
116137
val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
117138
if (keyOption.isEmpty) {
@@ -189,15 +210,22 @@ class TimerStateImpl(
189210

190211
/**
191212
* Function to get all the expired registered timers for all grouping keys.
192-
* Perform a range scan on timestamp and will stop iterating once the key row timestamp equals or
213+
* Perform a range scan on timestamp and will stop iterating once the key row timestamp
193214
* exceeds the limit (as timestamp key is increasingly sorted).
194-
* @param expiryTimestampMs Threshold for expired timestamp in milliseconds, this function
195-
* will return all timers that have timestamp less than passed threshold.
215+
* @param expiryTimestampMs Threshold for expired timestamp in milliseconds (inclusive),
216+
* this function will return all timers that have timestamp
217+
* less than or equal to the passed threshold.
218+
* @param prevExpiryTimestampMs If provided, the lower bound (exclusive) of the scan range.
219+
* Timers at or below this timestamp are assumed to have been
220+
* already processed in the previous batch and will be skipped.
196221
* @return - iterator of all the registered timers for all grouping keys
197222
*/
198-
def getExpiredTimers(expiryTimestampMs: Long): Iterator[(Any, Long)] = {
199-
// this iter is increasingly sorted on timestamp
200-
val iter = store.iterator(tsToKeyCFName)
223+
def getExpiredTimers(
224+
expiryTimestampMs: Long,
225+
prevExpiryTimestampMs: Option[Long] = None): Iterator[(Any, Long)] = {
226+
val startKey = prevExpiryTimestampMs.flatMap(encodeTimestampAsKey)
227+
val endKey = encodeTimestampAsKey(expiryTimestampMs)
228+
val iter = store.rangeScan(startKey, endKey, tsToKeyCFName)
201229

202230
new NextIterator[(Any, Long)] {
203231
override protected def getNext(): (Any, Long) = {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/ListStateImplWithTTL.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ import org.apache.spark.util.NextIterator
3535
* @param valEncoder - Spark SQL encoder for value
3636
* @param ttlConfig - TTL configuration for values stored in this state
3737
* @param batchTimestampMs - current batch processing timestamp.
38+
* @param prevBatchTimestampMs - batch timestamp from the previous micro-batch (exclusive).
39+
* Entries with expiration at or below this timestamp are assumed
40+
* to have been already cleaned up and will be skipped during
41+
* TTL eviction scans.
3842
* @param metrics - metrics to be updated as part of stateful processing
3943
* @tparam S - data type of object that will be stored
4044
*/
@@ -45,9 +49,11 @@ class ListStateImplWithTTL[S](
4549
valEncoder: ExpressionEncoder[Any],
4650
ttlConfig: TTLConfig,
4751
batchTimestampMs: Long,
52+
prevBatchTimestampMs: Option[Long] = None,
4853
metrics: Map[String, SQLMetric])
4954
extends OneToManyTTLState(
50-
stateName, store, keyExprEnc.schema, ttlConfig, batchTimestampMs, metrics) with ListState[S] {
55+
stateName, store, keyExprEnc.schema, ttlConfig, batchTimestampMs,
56+
prevBatchTimestampMs, metrics) with ListState[S] {
5157

5258
private lazy val stateTypesEncoder = StateTypesEncoder(keyExprEnc, valEncoder,
5359
stateName, hasTtl = true)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/MapStateImplWithTTL.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ import org.apache.spark.util.NextIterator
3636
* @param valEncoder - SQL encoder for state variable
3737
* @param ttlConfig - the ttl configuration (time to live duration etc.)
3838
* @param batchTimestampMs - current batch processing timestamp.
39+
* @param prevBatchTimestampMs - batch timestamp from the previous micro-batch (exclusive).
40+
* Entries with expiration at or below this timestamp are assumed
41+
* to have been already cleaned up and will be skipped during
42+
* TTL eviction scans.
3943
* @param metrics - metrics to be updated as part of stateful processing
4044
* @tparam K - type of key for map state variable
4145
* @tparam V - type of value for map state variable
@@ -49,10 +53,11 @@ class MapStateImplWithTTL[K, V](
4953
valEncoder: ExpressionEncoder[Any],
5054
ttlConfig: TTLConfig,
5155
batchTimestampMs: Long,
52-
metrics: Map[String, SQLMetric])
56+
prevBatchTimestampMs: Option[Long] = None,
57+
metrics: Map[String, SQLMetric])
5358
extends OneToOneTTLState(
5459
stateName, store, getCompositeKeySchema(keyExprEnc.schema, userKeyEnc.schema), ttlConfig,
55-
batchTimestampMs, metrics) with MapState[K, V] with Logging {
60+
batchTimestampMs, prevBatchTimestampMs, metrics) with MapState[K, V] with Logging {
5661

5762
private val stateTypesEncoder = new CompositeKeyStateEncoder(
5863
keyExprEnc, userKeyEnc, valEncoder, stateName, hasTtl = true)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/TTLState.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ trait TTLState {
8888
// an expiration at or before this timestamp must be cleaned up.
8989
private[sql] def batchTimestampMs: Long
9090

91+
// The batch timestamp from the previous micro-batch, used to derive the startKey
92+
// for scan-based TTL eviction. Entries at or below prevBatchTimestampMs were already
93+
// cleaned up in the previous batch.
94+
private[sql] def prevBatchTimestampMs: Option[Long]
95+
9196
// The configuration for this run of the streaming query. It may change between runs
9297
// (e.g. user sets ttlConfig1, stops their query, updates to ttlConfig2, and then
9398
// resumes their query).
@@ -105,6 +110,8 @@ trait TTLState {
105110

106111
private final val TTL_ENCODER = new TTLEncoder(elementKeySchema)
107112

113+
private final val ELEMENT_KEY_PROJECTION = UnsafeProjection.create(elementKeySchema)
114+
108115
// Empty row used for values
109116
private final val TTL_EMPTY_VALUE_ROW =
110117
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
@@ -161,10 +168,25 @@ trait TTLState {
161168
//
162169
// The schema of the UnsafeRow returned by this iterator is (expirationMs, elementKey).
163170
private[sql] def ttlEvictionIterator(): Iterator[UnsafeRow] = {
164-
val ttlIterator = store.iterator(TTL_INDEX)
171+
val dummyElementKey = ELEMENT_KEY_PROJECTION
172+
.apply(new GenericInternalRow(elementKeySchema.length))
173+
val startKey = prevBatchTimestampMs.flatMap { prevTs =>
174+
if (prevTs < Long.MaxValue) {
175+
Some(TTL_ENCODER.encodeTTLRow(prevTs + 1, dummyElementKey).copy())
176+
} else {
177+
None
178+
}
179+
}
180+
val endKey = if (batchTimestampMs < Long.MaxValue) {
181+
Some(TTL_ENCODER.encodeTTLRow(batchTimestampMs + 1, dummyElementKey).copy())
182+
} else {
183+
None
184+
}
185+
val ttlIterator = store.rangeScan(startKey, endKey, TTL_INDEX)
165186

166187
// Recall that the format is (expirationMs, elementKey) -> TTL_EMPTY_VALUE_ROW, so
167188
// kv.value doesn't ever need to be used.
189+
// Safety filter: keep only truly expired entries
168190
ttlIterator.takeWhile { kv =>
169191
val expirationMs = kv.key.getLong(0)
170192
StateTTL.isExpired(expirationMs, batchTimestampMs)
@@ -223,12 +245,14 @@ abstract class OneToOneTTLState(
223245
elementKeySchemaArg: StructType,
224246
ttlConfigArg: TTLConfig,
225247
batchTimestampMsArg: Long,
248+
prevBatchTimestampMsArg: Option[Long],
226249
metricsArg: Map[String, SQLMetric]) extends TTLState {
227250
override private[sql] def stateName: String = stateNameArg
228251
override private[sql] def store: StateStore = storeArg
229252
override private[sql] def elementKeySchema: StructType = elementKeySchemaArg
230253
override private[sql] def ttlConfig: TTLConfig = ttlConfigArg
231254
override private[sql] def batchTimestampMs: Long = batchTimestampMsArg
255+
override private[sql] def prevBatchTimestampMs: Option[Long] = prevBatchTimestampMsArg
232256
override private[sql] def metrics: Map[String, SQLMetric] = metricsArg
233257

234258
/**
@@ -340,12 +364,14 @@ abstract class OneToManyTTLState(
340364
elementKeySchemaArg: StructType,
341365
ttlConfigArg: TTLConfig,
342366
batchTimestampMsArg: Long,
367+
prevBatchTimestampMsArg: Option[Long],
343368
metricsArg: Map[String, SQLMetric]) extends TTLState {
344369
override private[sql] def stateName: String = stateNameArg
345370
override private[sql] def store: StateStore = storeArg
346371
override private[sql] def elementKeySchema: StructType = elementKeySchemaArg
347372
override private[sql] def ttlConfig: TTLConfig = ttlConfigArg
348373
override private[sql] def batchTimestampMs: Long = batchTimestampMsArg
374+
override private[sql] def prevBatchTimestampMs: Option[Long] = prevBatchTimestampMsArg
349375
override private[sql] def metrics: Map[String, SQLMetric] = metricsArg
350376

351377
// Schema of the min-expiry index: elementKey -> minExpirationMs

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/ValueStateImplWithTTL.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ import org.apache.spark.sql.streaming.{TTLConfig, ValueState}
3333
* @param valEncoder - Spark SQL encoder for value
3434
* @param ttlConfig - TTL configuration for values stored in this state
3535
* @param batchTimestampMs - current batch processing timestamp.
36+
* @param prevBatchTimestampMs - batch timestamp from the previous micro-batch (exclusive).
37+
* Entries with expiration at or below this timestamp are assumed
38+
* to have been already cleaned up and will be skipped during
39+
* TTL eviction scans.
3640
* @param metrics - metrics to be updated as part of stateful processing
3741
* @tparam S - data type of object that will be stored
3842
*/
@@ -43,9 +47,11 @@ class ValueStateImplWithTTL[S](
4347
valEncoder: ExpressionEncoder[Any],
4448
ttlConfig: TTLConfig,
4549
batchTimestampMs: Long,
50+
prevBatchTimestampMs: Option[Long] = None,
4651
metrics: Map[String, SQLMetric] = Map.empty)
4752
extends OneToOneTTLState(
48-
stateName, store, keyExprEnc.schema, ttlConfig, batchTimestampMs, metrics) with ValueState[S] {
53+
stateName, store, keyExprEnc.schema, ttlConfig, batchTimestampMs,
54+
prevBatchTimestampMs, metrics) with ValueState[S] {
4955

5056
private val stateTypesEncoder =
5157
StateTypesEncoder(keyExprEnc, valEncoder, stateName, hasTtl = true)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ class IncrementalExecution(
384384
t.copy(
385385
stateInfo = Some(nextStatefulOperationStateInfo()),
386386
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
387+
prevBatchTimestampMs = prevOffsetSeqMetadata.map(_.batchTimestampMs),
387388
eventTimeWatermarkForLateEvents = None,
388389
eventTimeWatermarkForEviction = None,
389390
hasInitialState = hasInitialState
@@ -394,6 +395,7 @@ class IncrementalExecution(
394395
t.copy(
395396
stateInfo = Some(nextStatefulOperationStateInfo()),
396397
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
398+
prevBatchTimestampMs = prevOffsetSeqMetadata.map(_.batchTimestampMs),
397399
eventTimeWatermarkForLateEvents = None,
398400
eventTimeWatermarkForEviction = None,
399401
hasInitialState = hasInitialState

0 commit comments

Comments
 (0)