Skip to content

Commit 7da3f96

Browse files
committed
Reflect review comment
1 parent 2135faf commit 7da3f96

1 file changed

Lines changed: 6 additions & 3 deletions

File tree

  • sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/transformwithstate/ttl/TTLState.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,12 @@ trait TTLState {
162162
store.iterator(TTL_INDEX).map(kv => toTTLRow(kv.key))
163163
}
164164

165-
// Returns an Iterator over all the keys in the TTL index that have expired. This method
166-
// does not delete the keys from the TTL index; it is the responsibility of the caller
167-
// to do so.
165+
// Returns an Iterator over the keys in the TTL index that have expired. Uses a bounded
166+
// range scan over [prevBatchTimestampMs+1, batchTimestampMs+1) to skip entries that
167+
// were already evicted in previous batches.
168+
//
169+
// This method does not delete the keys from the TTL index; it is the responsibility of
170+
// the caller to do so.
168171
//
169172
// The schema of the UnsafeRow returned by this iterator is (expirationMs, elementKey).
170173
private[sql] def ttlEvictionIterator(): Iterator[UnsafeRow] = {

0 commit comments

Comments
 (0)