Skip to content

Commit 6593a3f

Browse files
committed
review comments from Micheal
1 parent 710466b commit 6593a3f

3 files changed

Lines changed: 18 additions & 6 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1672,7 +1672,7 @@ class RocksDB(
16721672
* @param cfName The column family name.
16731673
* @return An iterator of ByteArrayPairs in the given range.
16741674
*/
1675-
def scan(
1675+
def rangeScan(
16761676
startKey: Option[Array[Byte]],
16771677
endKey: Option[Array[Byte]],
16781678
cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): NextIterator[ByteArrayPair] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ private[sql] class RocksDBStateStoreProvider
564564
val encodedEndKey = endKey.map(kvEncoder._1.encodeKey)
565565

566566
val rowPair = new UnsafeRowPair()
567-
val rocksDbIter = rocksDB.scan(encodedStartKey, encodedEndKey, colFamilyName)
567+
val rocksDbIter = rocksDB.rangeScan(encodedStartKey, encodedEndKey, colFamilyName)
568568
val iter = rocksDbIter.map { kv =>
569569
rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
570570
kvEncoder._2.decodeValue(kv.value))
@@ -591,7 +591,7 @@ private[sql] class RocksDBStateStoreProvider
591591

592592
val encodedStartKey = startKey.map(kvEncoder._1.encodeKey)
593593
val encodedEndKey = endKey.map(kvEncoder._1.encodeKey)
594-
val rocksDbIter = rocksDB.scan(encodedStartKey, encodedEndKey, colFamilyName)
594+
val rocksDbIter = rocksDB.rangeScan(encodedStartKey, encodedEndKey, colFamilyName)
595595

596596
val rowPair = new UnsafeRowPair()
597597
val iter = rocksDbIter.flatMap { kv =>

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1634,7 +1634,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
16341634
-32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L)
16351635

16361636
testWithColumnFamiliesAndEncodingTypes("rocksdb range scan - rangeScan",
1637-
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
1637+
TestWithChangelogCheckpointingDisabled) { colFamiliesEnabled =>
16381638

16391639
tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
16401640
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
@@ -1704,6 +1704,12 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
17041704
val negResults = negIter.map(_.key.getLong(0)).toList
17051705
negIter.close()
17061706
assert(negResults === diverseTimestamps.filter(ts => ts >= -300 && ts < 0).sorted)
1707+
1708+
// Both None: scan entire column family
1709+
val allIter = store.rangeScan(None, None, cfName)
1710+
val allResults = allIter.map(_.key.getLong(0)).toList
1711+
allIter.close()
1712+
assert(allResults === diverseTimestamps.sorted)
17071713
} finally {
17081714
if (!store.hasCommitted) store.abort()
17091715
}
@@ -1712,7 +1718,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
17121718

17131719
testWithColumnFamiliesAndEncodingTypes(
17141720
"rocksdb range scan - scan with multiple key2 values within same key1 range",
1715-
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
1721+
TestWithChangelogCheckpointingDisabled) { colFamiliesEnabled =>
17161722

17171723
tryWithProviderResource(newStoreProvider(keySchemaWithRangeScan,
17181724
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
@@ -1752,7 +1758,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
17521758

17531759
testWithColumnFamiliesAndEncodingTypes(
17541760
"rocksdb range scan - rangeScanWithMultiValues",
1755-
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
1761+
TestWithChangelogCheckpointingDisabled) { colFamiliesEnabled =>
17561762

17571763
if (colFamiliesEnabled) {
17581764
tryWithProviderResource(newStoreProvider(
@@ -1833,6 +1839,12 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
18331839
negIter.close()
18341840
assert(negResults.distinct === diverseTimestamps
18351841
.filter(ts => ts >= -300 && ts < 0).sorted)
1842+
1843+
// Both None: scan entire column family
1844+
val allIter = store.rangeScanWithMultiValues(None, None, cfName)
1845+
val allResults = allIter.map(_.key.getLong(0)).toList
1846+
allIter.close()
1847+
assert(allResults.distinct === diverseTimestamps.sorted)
18361848
} finally {
18371849
if (!store.hasCommitted) store.abort()
18381850
}

0 commit comments

Comments
 (0)