-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-56369][SS] Introduce a new API "rangeScan/rangeScanWithMultiValues" in StateStore #55226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
b019191
9a97b91
a972d58
c7b6d72
a562039
77c12db
1f734c4
1a418de
d96129a
8f4a37d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -556,6 +556,68 @@ private[sql] class RocksDBStateStoreProvider | |
| new StateStoreIterator(iter, rocksDbIter.closeIfNeeded) | ||
| } | ||
|
|
||
| override def rangeScan( | ||
| startKey: Option[UnsafeRow], | ||
| endKey: Option[UnsafeRow], | ||
| colFamilyName: String): StateStoreIterator[UnsafeRowPair] = { | ||
| validateAndTransitionState(UPDATE) | ||
| verifyColFamilyOperations("rangeScan", colFamilyName) | ||
|
|
||
| val kvEncoder = keyValueEncoderMap.get(colFamilyName) | ||
| require(kvEncoder._1.supportsRangeScan, | ||
| "Range scan requires an encoder that supports range scanning!") | ||
|
|
||
| val encodedStartKey = startKey.map(kvEncoder._1.encodeKey) | ||
| val encodedEndKey = endKey.map(kvEncoder._1.encodeKey) | ||
|
|
||
| val rowPair = new UnsafeRowPair() | ||
| val rocksDbIter = rocksDB.rangeScan(encodedStartKey, encodedEndKey, colFamilyName) | ||
| val iter = rocksDbIter.map { kv => | ||
| rowPair.withRows(kvEncoder._1.decodeKey(kv.key), | ||
| kvEncoder._2.decodeValue(kv.value)) | ||
| rowPair | ||
| } | ||
|
|
||
| new StateStoreIterator(iter, rocksDbIter.closeIfNeeded) | ||
| } | ||
|
|
||
| override def rangeScanWithMultiValues( | ||
| startKey: Option[UnsafeRow], | ||
| endKey: Option[UnsafeRow], | ||
| colFamilyName: String): StateStoreIterator[UnsafeRowPair] = { | ||
| validateAndTransitionState(UPDATE) | ||
| verifyColFamilyOperations("rangeScanWithMultiValues", colFamilyName) | ||
|
|
||
| val kvEncoder = keyValueEncoderMap.get(colFamilyName) | ||
| require(kvEncoder._1.supportsRangeScan, | ||
| "Range scan requires an encoder that supports range scanning!") | ||
| verify( | ||
| kvEncoder._2.supportsMultipleValuesPerKey, | ||
| "Multi-value iterator operation requires an encoder" + | ||
| " which supports multiple values for a single key") | ||
|
|
||
| val encodedStartKey = startKey.map(kvEncoder._1.encodeKey) | ||
| val encodedEndKey = endKey.map(kvEncoder._1.encodeKey) | ||
| val rocksDbIter = rocksDB.rangeScan(encodedStartKey, encodedEndKey, colFamilyName) | ||
|
|
||
| val rowPair = new UnsafeRowPair() | ||
| val iter = rocksDbIter.flatMap { kv => | ||
| val keyRow = kvEncoder._1.decodeKey(kv.key) | ||
| val valueRows = kvEncoder._2.decodeValues(kv.value) | ||
| valueRows.iterator.map { valueRow => | ||
| rowPair.withRows(keyRow, valueRow) | ||
| if (!isValidated && rowPair.value != null && !useColumnFamilies) { | ||
| StateStoreProvider.validateStateRowFormat( | ||
| rowPair.key, keySchema, rowPair.value, valueSchema, stateStoreId, storeConf) | ||
| isValidated = true | ||
| } | ||
|
Comment on lines
+609
to
+613
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why only rangeScanWithMultiValues needs to validate the state row format, but rangeScan doesn't?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice finding! Looks like we missed prefixScan to do that (while prefixScanWithMultiValues does) and missed the same for rangeScan? Let's deal with it as FOLLOWUP or another JIRA ticket since we want to address both prefixScan and rangeScan, not only rangeScan. |
||
| rowPair | ||
| } | ||
| } | ||
|
|
||
| new StateStoreIterator(iter, rocksDbIter.closeIfNeeded) | ||
| } | ||
|
|
||
| var checkpointInfo: Option[StateStoreCheckpointInfo] = None | ||
| private var storedMetrics: Option[RocksDBMetrics] = None | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
Range scan with multiple values requires an