Skip to content

Commit 710466b

Browse files
committed
Change the API name to rangeScan
1 parent 0aa4813 commit 710466b

6 files changed

Lines changed: 57 additions & 41 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.spark.unsafe.Platform
4646
sealed trait RocksDBKeyStateEncoder {
4747
def supportPrefixKeyScan: Boolean
4848
def supportsDeleteRange: Boolean
49+
def supportsRangeScan: Boolean
4950
def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte]
5051
def encodeKey(row: UnsafeRow): Array[Byte]
5152
def decodeKey(keyBytes: Array[Byte]): UnsafeRow
@@ -1500,6 +1501,8 @@ class PrefixKeyScanStateEncoder(
15001501
override def supportPrefixKeyScan: Boolean = true
15011502

15021503
override def supportsDeleteRange: Boolean = false
1504+
1505+
override def supportsRangeScan: Boolean = false
15031506
}
15041507

15051508
/**
@@ -1699,6 +1702,8 @@ class RangeKeyScanStateEncoder(
16991702
override def supportPrefixKeyScan: Boolean = true
17001703

17011704
override def supportsDeleteRange: Boolean = true
1705+
1706+
override def supportsRangeScan: Boolean = true
17021707
}
17031708

17041709
/**
@@ -1731,6 +1736,8 @@ class NoPrefixKeyStateEncoder(
17311736

17321737
override def supportsDeleteRange: Boolean = false
17331738

1739+
override def supportsRangeScan: Boolean = false
1740+
17341741
override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
17351742
throw new IllegalStateException("This encoder doesn't support prefix key!")
17361743
}
@@ -1884,6 +1891,8 @@ class TimestampAsPrefixKeyStateEncoder(
18841891

18851892
// TODO: [SPARK-55491] Revisit this to support delete range if needed.
18861893
override def supportsDeleteRange: Boolean = false
1894+
1895+
override def supportsRangeScan: Boolean = true
18871896
}
18881897

18891898
/**
@@ -1932,6 +1941,8 @@ class TimestampAsPostfixKeyStateEncoder(
19321941
}
19331942

19341943
override def supportsDeleteRange: Boolean = false
1944+
1945+
override def supportsRangeScan: Boolean = true
19351946
}
19361947

19371948
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -549,14 +549,17 @@ private[sql] class RocksDBStateStoreProvider
549549
new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
550550
}
551551

552-
override def scan(
552+
override def rangeScan(
553553
startKey: Option[UnsafeRow],
554554
endKey: Option[UnsafeRow],
555555
colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
556556
validateAndTransitionState(UPDATE)
557-
verifyColFamilyOperations("scan", colFamilyName)
557+
verifyColFamilyOperations("rangeScan", colFamilyName)
558558

559559
val kvEncoder = keyValueEncoderMap.get(colFamilyName)
560+
require(kvEncoder._1.supportsRangeScan,
561+
"Range scan requires an encoder that supports range scanning!")
562+
560563
val encodedStartKey = startKey.map(kvEncoder._1.encodeKey)
561564
val encodedEndKey = endKey.map(kvEncoder._1.encodeKey)
562565

@@ -571,14 +574,16 @@ private[sql] class RocksDBStateStoreProvider
571574
new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
572575
}
573576

574-
override def scanWithMultiValues(
577+
override def rangeScanWithMultiValues(
575578
startKey: Option[UnsafeRow],
576579
endKey: Option[UnsafeRow],
577580
colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
578581
validateAndTransitionState(UPDATE)
579-
verifyColFamilyOperations("scanWithMultiValues", colFamilyName)
582+
verifyColFamilyOperations("rangeScanWithMultiValues", colFamilyName)
580583

581584
val kvEncoder = keyValueEncoderMap.get(colFamilyName)
585+
require(kvEncoder._1.supportsRangeScan,
586+
"Range scan requires an encoder that supports range scanning!")
582587
verify(
583588
kvEncoder._2.supportsMultipleValuesPerKey,
584589
"Multi-value iterator operation requires an encoder" +

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,12 @@ trait ReadStateStore {
196196
* bytes for the scan range to be meaningful (e.g., timestamp-based encoders or
197197
* RangeKeyScanStateEncoder).
198198
*/
199-
def scan(
199+
def rangeScan(
200200
startKey: Option[UnsafeRow],
201201
endKey: Option[UnsafeRow],
202202
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME)
203203
: StateStoreIterator[UnsafeRowPair] = {
204-
throw StateStoreErrors.unsupportedOperationException("scan", "")
204+
throw StateStoreErrors.unsupportedOperationException("rangeScan", "")
205205
}
206206

207207
/**
@@ -220,12 +220,12 @@ trait ReadStateStore {
220220
* It is expected to throw exception if Spark calls this method without setting
221221
* multipleValuesPerKey as true for the column family.
222222
*/
223-
def scanWithMultiValues(
223+
def rangeScanWithMultiValues(
224224
startKey: Option[UnsafeRow],
225225
endKey: Option[UnsafeRow],
226226
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME)
227227
: StateStoreIterator[UnsafeRowPair] = {
228-
throw StateStoreErrors.unsupportedOperationException("scanWithMultiValues", "")
228+
throw StateStoreErrors.unsupportedOperationException("rangeScanWithMultiValues", "")
229229
}
230230

231231
/** Return an iterator containing all the key-value pairs in the StateStore. */
@@ -456,18 +456,18 @@ class WrappedReadStateStore(store: StateStore) extends ReadStateStore {
456456
store.prefixScanWithMultiValues(prefixKey, colFamilyName)
457457
}
458458

459-
override def scan(
459+
override def rangeScan(
460460
startKey: Option[UnsafeRow],
461461
endKey: Option[UnsafeRow],
462462
colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
463-
store.scan(startKey, endKey, colFamilyName)
463+
store.rangeScan(startKey, endKey, colFamilyName)
464464
}
465465

466-
override def scanWithMultiValues(
466+
override def rangeScanWithMultiValues(
467467
startKey: Option[UnsafeRow],
468468
endKey: Option[UnsafeRow],
469469
colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
470-
store.scanWithMultiValues(startKey, endKey, colFamilyName)
470+
store.rangeScanWithMultiValues(startKey, endKey, colFamilyName)
471471
}
472472

473473
override def iteratorWithMultiValues(

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,18 @@ case class CkptIdCollectingStateStoreWrapper(innerStore: StateStore) extends Sta
172172
innerStore.prefixScanWithMultiValues(prefixKey, colFamilyName)
173173
}
174174

175-
override def scan(
175+
override def rangeScan(
176176
startKey: Option[UnsafeRow],
177177
endKey: Option[UnsafeRow],
178178
colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
179-
innerStore.scan(startKey, endKey, colFamilyName)
179+
innerStore.rangeScan(startKey, endKey, colFamilyName)
180180
}
181181

182-
override def scanWithMultiValues(
182+
override def rangeScanWithMultiValues(
183183
startKey: Option[UnsafeRow],
184184
endKey: Option[UnsafeRow],
185185
colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
186-
innerStore.scanWithMultiValues(startKey, endKey, colFamilyName)
186+
innerStore.rangeScanWithMultiValues(startKey, endKey, colFamilyName)
187187
}
188188

189189
override def iteratorWithMultiValues(

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1633,7 +1633,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
16331633
-230L, -14569L, -92L, -7434253L, 35L, 6L, 9L, -323L, 5L,
16341634
-32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L)
16351635

1636-
testWithColumnFamiliesAndEncodingTypes("rocksdb range scan - scan",
1636+
testWithColumnFamiliesAndEncodingTypes("rocksdb range scan - rangeScan",
16371637
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
16381638

16391639
tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
@@ -1653,7 +1653,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
16531653
}
16541654

16551655
// Bounded positive range [0, 100)
1656-
val boundedIter = store.scan(
1656+
val boundedIter = store.rangeScan(
16571657
Some(dataToKeyRowWithRangeScan(0L, "a")),
16581658
Some(dataToKeyRowWithRangeScan(100L, "a")), cfName)
16591659
val boundedResults = boundedIter.map { pair =>
@@ -1667,7 +1667,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
16671667
// Exact bound: startKey is inclusive, endKey is exclusive.
16681668
// 9 exists in diverseTimestamps, 90 exists in diverseTimestamps.
16691669
// Scan [9, 90) should include 9 but exclude 90.
1670-
val exactIter = store.scan(
1670+
val exactIter = store.rangeScan(
16711671
Some(dataToKeyRowWithRangeScan(9L, "a")),
16721672
Some(dataToKeyRowWithRangeScan(90L, "a")), cfName)
16731673
val exactResults = exactIter.map(_.key.getLong(0)).toList
@@ -1677,28 +1677,28 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
16771677
assert(!exactResults.contains(90L))
16781678

16791679
// None startKey scans from beginning to 0
1680-
val noneStartIter = store.scan(
1680+
val noneStartIter = store.rangeScan(
16811681
None, Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
16821682
val noneStartResults = noneStartIter.map(_.key.getLong(0)).toList
16831683
noneStartIter.close()
16841684
assert(noneStartResults === diverseTimestamps.filter(_ < 0).sorted)
16851685

16861686
// None endKey scans from 1000 to end
1687-
val noneEndIter = store.scan(
1687+
val noneEndIter = store.rangeScan(
16881688
Some(dataToKeyRowWithRangeScan(1000L, "a")), None, cfName)
16891689
val noneEndResults = noneEndIter.map(_.key.getLong(0)).toList
16901690
noneEndIter.close()
16911691
assert(noneEndResults === diverseTimestamps.filter(_ >= 1000).sorted)
16921692

16931693
// Empty range [10, 31) - no entries between 9 and 32
1694-
val emptyIter = store.scan(
1694+
val emptyIter = store.rangeScan(
16951695
Some(dataToKeyRowWithRangeScan(10L, "a")),
16961696
Some(dataToKeyRowWithRangeScan(31L, "a")), cfName)
16971697
assert(!emptyIter.hasNext)
16981698
emptyIter.close()
16991699

17001700
// Bounded negative range [-300, 0)
1701-
val negIter = store.scan(
1701+
val negIter = store.rangeScan(
17021702
Some(dataToKeyRowWithRangeScan(-300L, "a")),
17031703
Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
17041704
val negResults = negIter.map(_.key.getLong(0)).toList
@@ -1734,7 +1734,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
17341734

17351735
val startKey = dataToKeyRowWithRangeScan(100L, "a")
17361736
val endKey = dataToKeyRowWithRangeScan(201L, "a")
1737-
val iter = store.scan(Some(startKey), Some(endKey), cfName)
1737+
val iter = store.rangeScan(Some(startKey), Some(endKey), cfName)
17381738
val results = iter.map { pair =>
17391739
(pair.key.getLong(0), pair.key.getUTF8String(1).toString)
17401740
}.toList
@@ -1751,7 +1751,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
17511751
}
17521752

17531753
testWithColumnFamiliesAndEncodingTypes(
1754-
"rocksdb range scan - scanWithMultiValues",
1754+
"rocksdb range scan - rangeScanWithMultiValues",
17551755
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
17561756

17571757
if (colFamiliesEnabled) {
@@ -1775,7 +1775,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
17751775
}
17761776

17771777
// Bounded range [0, 1001)
1778-
val boundedIter = store.scanWithMultiValues(
1778+
val boundedIter = store.rangeScanWithMultiValues(
17791779
Some(dataToKeyRowWithRangeScan(0L, "a")),
17801780
Some(dataToKeyRowWithRangeScan(1001L, "a")), cfName)
17811781
val boundedResults = boundedIter.map { pair =>
@@ -1793,7 +1793,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
17931793

17941794
// Exact bound: startKey is inclusive, endKey is exclusive.
17951795
// 9 exists in diverseTimestamps, 90 exists in diverseTimestamps.
1796-
val exactIter = store.scanWithMultiValues(
1796+
val exactIter = store.rangeScanWithMultiValues(
17971797
Some(dataToKeyRowWithRangeScan(9L, "a")),
17981798
Some(dataToKeyRowWithRangeScan(90L, "a")), cfName)
17991799
val exactResults = exactIter.map(_.key.getLong(0)).toList
@@ -1805,28 +1805,28 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
18051805
assert(!exactResultsDistinct.contains(90L))
18061806

18071807
// None startKey scans from beginning to 0
1808-
val noneStartIter = store.scanWithMultiValues(
1808+
val noneStartIter = store.rangeScanWithMultiValues(
18091809
None, Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
18101810
val noneStartResults = noneStartIter.map(_.key.getLong(0)).toList
18111811
noneStartIter.close()
18121812
assert(noneStartResults.distinct === diverseTimestamps.filter(_ < 0).sorted)
18131813

18141814
// None endKey scans from 1000 to end
1815-
val noneEndIter = store.scanWithMultiValues(
1815+
val noneEndIter = store.rangeScanWithMultiValues(
18161816
Some(dataToKeyRowWithRangeScan(1000L, "a")), None, cfName)
18171817
val noneEndResults = noneEndIter.map(_.key.getLong(0)).toList
18181818
noneEndIter.close()
18191819
assert(noneEndResults.distinct === diverseTimestamps.filter(_ >= 1000).sorted)
18201820

18211821
// Empty range [10, 31) - no entries between 9 and 32
1822-
val emptyIter = store.scanWithMultiValues(
1822+
val emptyIter = store.rangeScanWithMultiValues(
18231823
Some(dataToKeyRowWithRangeScan(10L, "a")),
18241824
Some(dataToKeyRowWithRangeScan(31L, "a")), cfName)
18251825
assert(!emptyIter.hasNext)
18261826
emptyIter.close()
18271827

18281828
// Bounded negative range [-300, 0)
1829-
val negIter = store.scanWithMultiValues(
1829+
val negIter = store.rangeScanWithMultiValues(
18301830
Some(dataToKeyRowWithRangeScan(-300L, "a")),
18311831
Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
18321832
val negResults = negIter.map(_.key.getLong(0)).toList

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
567567
}
568568

569569
Seq("unsaferow", "avro").foreach { encoding =>
570-
test(s"scan with postfix encoder (encoding = $encoding)") {
570+
test(s"rangeScan with postfix encoder (encoding = $encoding)") {
571571
tryWithProviderResource(
572572
newStoreProviderWithTimestampEncoder(
573573
encoderType = "postfix",
@@ -587,7 +587,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
587587
Array(valueToRow(999)))
588588

589589
// Bounded range [0, 1001)
590-
val boundedIter = store.scanWithMultiValues(
590+
val boundedIter = store.rangeScanWithMultiValues(
591591
Some(keyAndTimestampToRow("key1", 1, 0L)),
592592
Some(keyAndTimestampToRow("key1", 1, 1001L)))
593593
val boundedResults = boundedIter.map { pair =>
@@ -606,7 +606,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
606606

607607
// Exact bound: startKey is inclusive, endKey is exclusive.
608608
// 9 exists in diverseTimestamps, 90 exists in diverseTimestamps.
609-
val exactIter = store.scanWithMultiValues(
609+
val exactIter = store.rangeScanWithMultiValues(
610610
Some(keyAndTimestampToRow("key1", 1, 9L)),
611611
Some(keyAndTimestampToRow("key1", 1, 90L)))
612612
val exactResults = exactIter.map(_.key.getLong(2)).toList
@@ -623,7 +623,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
623623
// test bounded ranges with explicit keys here.
624624

625625
// Full range [MinValue, MaxValue)
626-
val fullIter = store.scanWithMultiValues(
626+
val fullIter = store.rangeScanWithMultiValues(
627627
Some(keyAndTimestampToRow("key1", 1, Long.MinValue)),
628628
Some(keyAndTimestampToRow("key1", 1, Long.MaxValue)))
629629
val fullResults = fullIter.map(_.key.getLong(2)).toList
@@ -632,7 +632,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
632632
assert(fullResults.distinct === diverseTimestamps.sorted)
633633

634634
// Bounded negative range [-300, 0)
635-
val negIter = store.scanWithMultiValues(
635+
val negIter = store.rangeScanWithMultiValues(
636636
Some(keyAndTimestampToRow("key1", 1, -300L)),
637637
Some(keyAndTimestampToRow("key1", 1, 0L)))
638638
val negResults = negIter.map(_.key.getLong(2)).toList
@@ -641,7 +641,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
641641
.filter(ts => ts >= -300 && ts < 0).sorted)
642642

643643
// Empty range [10, 31) - no diverseTimestamps entries between 9 and 32
644-
val emptyIter = store.scanWithMultiValues(
644+
val emptyIter = store.rangeScanWithMultiValues(
645645
Some(keyAndTimestampToRow("key1", 1, 10L)),
646646
Some(keyAndTimestampToRow("key1", 1, 31L)))
647647
assert(!emptyIter.hasNext)
@@ -653,9 +653,9 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
653653
}
654654

655655
// Sanity test for prefix encoder scan. Full scan coverage is in RocksDBStateStoreSuite's
656-
// "rocksdb range scan - scan" and "rocksdb range scan - scanWithMultiValues" tests.
656+
// "rocksdb range scan - rangeScan" and "rocksdb range scan - rangeScanWithMultiValues" tests.
657657
// This test verifies the timestamp prefix encoder integration works correctly.
658-
test(s"scan with prefix encoder (encoding = $encoding)") {
658+
test(s"rangeScan with prefix encoder (encoding = $encoding)") {
659659
tryWithProviderResource(
660660
newStoreProviderWithTimestampEncoder(
661661
encoderType = "prefix",
@@ -672,7 +672,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
672672
store.merge(keyAndTimestampToRow("key2", 2, 150L), valueToRow(150))
673673

674674
// None startKey scans from beginning up to 301 (exclusive)
675-
val iter1 = store.scanWithMultiValues(None,
675+
val iter1 = store.rangeScanWithMultiValues(None,
676676
Some(keyAndTimestampToRow("key1", 1, 301L)))
677677
val results1 = iter1.map { pair =>
678678
(pair.key.getString(0), pair.key.getLong(2))
@@ -684,7 +684,7 @@ class RocksDBTimestampEncoderOperationsSuite extends SharedSparkSession
684684

685685
// Boundary safety: endKey at 201, includes everything up to 200
686686
// regardless of join key
687-
val iter2 = store.scanWithMultiValues(None,
687+
val iter2 = store.rangeScanWithMultiValues(None,
688688
Some(keyAndTimestampToRow("key1", 1, 201L)))
689689
val results2 = iter2.map { pair =>
690690
(pair.key.getString(0), pair.key.getLong(2))

0 commit comments

Comments
 (0)