Skip to content

Commit e6dacfe

Browse files
committed
fix
1 parent a1fcd0c commit e6dacfe

2 files changed

Lines changed: 11 additions & 7 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -854,11 +854,16 @@ class SymmetricHashJoinStateManagerV4(
854854
def scanEvictedKeys(
855855
endTimestamp: Long,
856856
startTimestamp: Option[Long] = None): Iterator[EvictedKeysResult] = {
857+
// If startTimestamp == Long.MaxValue, everything has already been evicted;
858+
// nothing can match, so return immediately.
859+
if (startTimestamp.contains(Long.MaxValue)) {
860+
return Iterator.empty
861+
}
862+
857863
// rangeScanWithMultiValues: startKey is inclusive, endKey is exclusive.
858864
// startTimestamp is exclusive (already evicted), so we seek from st + 1.
859-
val startKeyRow = startTimestamp.flatMap { st =>
860-
if (st < Long.MaxValue) Some(createScanBoundaryRow(st + 1))
861-
else None
865+
val startKeyRow = startTimestamp.map { st =>
866+
createScanBoundaryRow(st + 1)
862867
}
863868
// endTimestamp is inclusive, so we use endTimestamp + 1 as the exclusive upper bound.
864869
// When endTimestamp == Long.MaxValue we cannot add 1, so endKeyRow is None. This is

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,10 +1178,9 @@ class SymmetricHashJoinStateManagerEventTimeInValueSuite
11781178
// --- overflow boundary: startTimestamp = Some(Long.MaxValue) ---
11791179
Seq(10, 20).foreach(append(40, _))
11801180
// startTimestamp=Long.MaxValue (exclusive) means everything <= Long.MaxValue was already
1181-
// evicted. Since startKeyRow falls back to None, endTimestamp=50 bounds the scan.
1182-
// All remaining entries (10, 20, 30) have timestamps <= 50, so they are evicted.
1183-
assert(evictByTs.evictByTimestamp(50, Some(Long.MaxValue)) === 3)
1184-
assert(get(40) === Seq.empty)
1181+
// evicted. Nothing can remain, so the scan returns an empty iterator immediately.
1182+
assert(evictByTs.evictByTimestamp(50, Some(Long.MaxValue)) === 0)
1183+
assert(get(40) === Seq(10, 20, 30))
11851184
}
11861185
}
11871186

0 commit comments

Comments
 (0)