Skip to content

Commit 0aa4813

Browse files
committed
Second round of self review comments
1 parent a494b31 commit 0aa4813

2 files changed

Lines changed: 94 additions & 51 deletions

File tree

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1633,7 +1633,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
16331633
-230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L,
16341634
-32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L)
16351635

1636-
testWithColumnFamiliesAndEncodingTypes("rocksdb range scan",
1636+
testWithColumnFamiliesAndEncodingTypes("rocksdb range scan - scan",
16371637
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
16381638

16391639
tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
@@ -1664,6 +1664,18 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
16641664
assert(boundedResults.map(_._1) === expectedBoundedTs)
16651665
assert(boundedResults.map(_._2) === expectedBoundedTs.map(_.toInt))
16661666

1667+
// Exact bound: startKey is inclusive, endKey is exclusive.
1668+
// 9 exists in diverseTimestamps, 90 exists in diverseTimestamps.
1669+
// Scan [9, 90) should include 9 but exclude 90.
1670+
val exactIter = store.scan(
1671+
Some(dataToKeyRowWithRangeScan(9L, "a")),
1672+
Some(dataToKeyRowWithRangeScan(90L, "a")), cfName)
1673+
val exactResults = exactIter.map(_.key.getLong(0)).toList
1674+
exactIter.close()
1675+
assert(exactResults === diverseTimestamps.filter(ts => ts >= 9 && ts < 90).sorted)
1676+
assert(exactResults.contains(9L))
1677+
assert(!exactResults.contains(90L))
1678+
16671679
// None startKey scans from beginning to 0
16681680
val noneStartIter = store.scan(
16691681
None, Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
@@ -1728,8 +1740,10 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
17281740
}.toList
17291741
iter.close()
17301742

1731-
assert(results.map(_._1).distinct.sorted === Seq(100L, 200L))
1732-
assert(results.length === 6) // 3 key2 values x 2 key1 values
1743+
val expectedResults = Seq(
1744+
(100L, "a"), (100L, "b"), (100L, "c"),
1745+
(200L, "a"), (200L, "b"), (200L, "c"))
1746+
assert(results === expectedResults)
17331747
} finally {
17341748
if (!store.hasCommitted) store.abort()
17351749
}
@@ -1777,13 +1791,48 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
17771791
.flatMap { case (_, idx) => Seq(idx * 10, idx * 10 + 1) }
17781792
assert(boundedResults.map(_._2) === expectedValues)
17791793

1794+
// Exact bound: startKey is inclusive, endKey is exclusive.
1795+
// 9 exists in diverseTimestamps, 90 exists in diverseTimestamps.
1796+
val exactIter = store.scanWithMultiValues(
1797+
Some(dataToKeyRowWithRangeScan(9L, "a")),
1798+
Some(dataToKeyRowWithRangeScan(90L, "a")), cfName)
1799+
val exactResults = exactIter.map(_.key.getLong(0)).toList
1800+
exactIter.close()
1801+
val exactResultsDistinct = exactResults.distinct
1802+
assert(exactResultsDistinct === diverseTimestamps
1803+
.filter(ts => ts >= 9 && ts < 90).sorted)
1804+
assert(exactResultsDistinct.contains(9L))
1805+
assert(!exactResultsDistinct.contains(90L))
1806+
17801807
// None startKey scans from beginning to 0
17811808
val noneStartIter = store.scanWithMultiValues(
17821809
None, Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
17831810
val noneStartResults = noneStartIter.map(_.key.getLong(0)).toList
17841811
noneStartIter.close()
1785-
17861812
assert(noneStartResults.distinct === diverseTimestamps.filter(_ < 0).sorted)
1813+
1814+
// None endKey scans from 1000 to end
1815+
val noneEndIter = store.scanWithMultiValues(
1816+
Some(dataToKeyRowWithRangeScan(1000L, "a")), None, cfName)
1817+
val noneEndResults = noneEndIter.map(_.key.getLong(0)).toList
1818+
noneEndIter.close()
1819+
assert(noneEndResults.distinct === diverseTimestamps.filter(_ >= 1000).sorted)
1820+
1821+
// Empty range [10, 31) - no entries between 9 and 32
1822+
val emptyIter = store.scanWithMultiValues(
1823+
Some(dataToKeyRowWithRangeScan(10L, "a")),
1824+
Some(dataToKeyRowWithRangeScan(31L, "a")), cfName)
1825+
assert(!emptyIter.hasNext)
1826+
emptyIter.close()
1827+
1828+
// Bounded negative range [-300, 0)
1829+
val negIter = store.scanWithMultiValues(
1830+
Some(dataToKeyRowWithRangeScan(-300L, "a")),
1831+
Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
1832+
val negResults = negIter.map(_.key.getLong(0)).toList
1833+
negIter.close()
1834+
assert(negResults.distinct === diverseTimestamps
1835+
.filter(ts => ts >= -300 && ts < 0).sorted)
17871836
} finally {
17881837
if (!store.hasCommitted) store.abort()
17891838
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala

Lines changed: 41 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -591,17 +591,36 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
591591
Some(keyAndTimestampToRow("key1", 1, 0L)),
592592
Some(keyAndTimestampToRow("key1", 1, 1001L)))
593593
val boundedResults = boundedIter.map { pair =>
594-
(pair.key.getLong(2), pair.value.getInt(0))
594+
(pair.key.getString(0), pair.key.getLong(2), pair.value.getInt(0))
595595
}.toList
596596
boundedIter.close()
597597

598598
val expectedTimestamps = diverseTimestamps.filter(ts => ts >= 0 && ts <= 1000).sorted
599-
assert(boundedResults.map(_._1).distinct === expectedTimestamps)
599+
assert(boundedResults.map(_._2).distinct === expectedTimestamps)
600600
val expectedValues = diverseTimestamps.zipWithIndex
601601
.filter { case (ts, _) => ts >= 0 && ts <= 1000 }
602602
.sortBy(_._1)
603603
.flatMap { case (_, idx) => Seq(idx * 10, idx * 10 + 1) }
604-
assert(boundedResults.map(_._2) === expectedValues)
604+
assert(boundedResults.map(_._3) === expectedValues)
605+
assert(boundedResults.forall(_._1 == "key1"))
606+
607+
// Exact bound: startKey is inclusive, endKey is exclusive.
608+
// 9 exists in diverseTimestamps, 90 exists in diverseTimestamps.
609+
val exactIter = store.scanWithMultiValues(
610+
Some(keyAndTimestampToRow("key1", 1, 9L)),
611+
Some(keyAndTimestampToRow("key1", 1, 90L)))
612+
val exactResults = exactIter.map(_.key.getLong(2)).toList
613+
exactIter.close()
614+
val exactResultsDistinct = exactResults.distinct
615+
assert(exactResultsDistinct === diverseTimestamps
616+
.filter(ts => ts >= 9 && ts < 90).sorted)
617+
assert(exactResultsDistinct.contains(9L))
618+
assert(!exactResultsDistinct.contains(90L))
619+
620+
// Postfix timestamp encoder places the timestamp after the key prefix.
621+
// With different key prefixes, None in startKey or endKey would scan across
622+
// key boundaries, which is not meaningful for postfix encoding. Hence we only
623+
// test bounded ranges with explicit keys here.
605624

606625
// Full range [MinValue, MaxValue)
607626
val fullIter = store.scanWithMultiValues(
@@ -612,6 +631,15 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
612631

613632
assert(fullResults.distinct === diverseTimestamps.sorted)
614633

634+
// Bounded negative range [-300, 0)
635+
val negIter = store.scanWithMultiValues(
636+
Some(keyAndTimestampToRow("key1", 1, -300L)),
637+
Some(keyAndTimestampToRow("key1", 1, 0L)))
638+
val negResults = negIter.map(_.key.getLong(2)).toList
639+
negIter.close()
640+
assert(negResults.distinct === diverseTimestamps
641+
.filter(ts => ts >= -300 && ts < 0).sorted)
642+
615643
// Empty range [10, 31) - no diverseTimestamps entries between 9 and 32
616644
val emptyIter = store.scanWithMultiValues(
617645
Some(keyAndTimestampToRow("key1", 1, 10L)),
@@ -624,6 +652,9 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
624652
}
625653
}
626654

655+
// Sanity test for prefix encoder scan. Full scan coverage is in RocksDBStateStoreSuite's
656+
// "rocksdb range scan - scan" and "rocksdb range scan - scanWithMultiValues" tests.
657+
// This test verifies the timestamp prefix encoder integration works correctly.
627658
test(s"scan with prefix encoder (encoding = $encoding)") {
628659
tryWithProviderResource(
629660
newStoreProviderWithTimestampEncoder(
@@ -643,10 +674,13 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
643674
// None startKey scans from beginning up to 301 (exclusive)
644675
val iter1 = store.scanWithMultiValues(None,
645676
Some(keyAndTimestampToRow("key1", 1, 301L)))
646-
val results1 = iter1.map(_.key.getLong(2)).toList
677+
val results1 = iter1.map { pair =>
678+
(pair.key.getString(0), pair.key.getLong(2))
679+
}.toList
647680
iter1.close()
648681

649-
assert(results1 === Seq(100L, 150L, 200L, 300L))
682+
assert(results1 === Seq(
683+
("key1", 100L), ("key2", 150L), ("key1", 200L), ("key1", 300L)))
650684

651685
// Boundary safety: endKey at 201, includes everything up to 200
652686
// regardless of join key
@@ -657,54 +691,14 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
657691
}.toList
658692
iter2.close()
659693

660-
assert(results2.map(_._2) === Seq(100L, 150L, 200L))
694+
assert(results2 === Seq(
695+
("key1", 100L), ("key2", 150L), ("key1", 200L)))
661696
} finally {
662697
store.abort()
663698
}
664699
}
665700
}
666701

667-
test(s"scan single-value variant (encoding = $encoding)") {
668-
tryWithProviderResource(
669-
newStoreProviderWithTimestampEncoder(
670-
encoderType = "postfix",
671-
useMultipleValuesPerKey = false,
672-
dataEncoding = encoding)
673-
) { provider =>
674-
val store = provider.getStore(0)
675-
676-
try {
677-
diverseTimestamps.foreach { ts =>
678-
store.put(keyAndTimestampToRow("key1", 1, ts), valueToRow(ts.toInt))
679-
}
680-
681-
// Bounded positive range [0, 100)
682-
val posIter = store.scan(
683-
Some(keyAndTimestampToRow("key1", 1, 0L)),
684-
Some(keyAndTimestampToRow("key1", 1, 100L)))
685-
val posResults = posIter.map { pair =>
686-
(pair.key.getLong(2), pair.value.getInt(0))
687-
}.toList
688-
posIter.close()
689-
690-
val expectedPosTs = diverseTimestamps.filter(ts => ts >= 0 && ts < 100).sorted
691-
assert(posResults.map(_._1) === expectedPosTs)
692-
assert(posResults.map(_._2) === expectedPosTs.map(_.toInt))
693-
694-
// Bounded negative range [-300, 0)
695-
val negIter = store.scan(
696-
Some(keyAndTimestampToRow("key1", 1, -300L)),
697-
Some(keyAndTimestampToRow("key1", 1, 0L)))
698-
val negResults = negIter.map(_.key.getLong(2)).toList
699-
negIter.close()
700-
701-
val expectedNegTs = diverseTimestamps.filter(ts => ts >= -300 && ts < 0).sorted
702-
assert(negResults === expectedNegTs)
703-
} finally {
704-
store.abort()
705-
}
706-
}
707-
}
708702
}
709703

710704
// Helper methods to create test data

0 commit comments

Comments
 (0)