Skip to content

[SPARK-56369][SS] Introduce a new API "rangeScan/rangeScanWithMultiValues" in StateStore#55226

Closed
HeartSaVioR wants to merge 10 commits intoapache:masterfrom
HeartSaVioR:SPARK-56369
Closed

[SPARK-56369][SS] Introduce a new API "rangeScan/rangeScanWithMultiValues" in StateStore#55226
HeartSaVioR wants to merge 10 commits intoapache:masterfrom
HeartSaVioR:SPARK-56369

Conversation

@HeartSaVioR
Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR commented Apr 7, 2026

What changes were proposed in this pull request?

This PR proposes to introduce a new API "rangeScan/rangeScanWithMultiValues" in StateStore. This new API is mostly an optimization focused on RocksDB state store provider.

The new API receives startKey (inclusive) and endKey (exclusive), and provides valid entries in the range. The new API requires the column family to use the state key encoder which can support range scan. This PR adds a new flag on state key encoder to represent the case. At this point, the new API supports three state key encoders - range scan encoder, timestamp prefix/postfix encoder.

Worth noting that the new API supports the combination of "prefix + range (+ remaining)" along with "range + remaining", hence the callers should reason about the ordering for the keys based on the state key encoder they are using for the CF before calling the API. The API implementation may not prevent the incorrect orderliness of the given startKey and endKey (e.g. startKey being later than endKey, some unexpected keys in binary ordering of the keys between startKey and endKey), so the callers should be very careful of composing startKeys and endKeys.

Why are the changes needed?

This will enable the pattern of range scan to optimize further, effectively skipping tombstones and valid keys in both sides (before the lower bound, after the upper bound) within CF.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UTs.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude 4.6 Opus

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Maybe there could be a couple improvements (or open discussions):

  1. The name of API: "iterator" can also work.
  2. The default implementation of the API: we can call iterator() and filter out keys which aren't bound to the range.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

I just changed the API name to "rangeScan" to clarify the meaning. I'll update the PR title and description. Like prefixScan, we will require key state encoder to be compatible with the API.

@HeartSaVioR HeartSaVioR changed the title [SPARK-56369][SS] Introduce a new API "scan/scanWithMultiValues" in StateStore [SPARK-56369][SS] Introduce a new API "rangeScan/rangeScanWithMultiValues" in StateStore Apr 7, 2026
* @param cfName The column family name.
* @return An iterator of ByteArrayPairs in the given range.
*/
def scan(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rangeScan

-32L, -64L, -256L, 64L, 32L, 1024L, 4096L, 0L)

testWithColumnFamiliesAndEncodingTypes("rocksdb range scan - rangeScan",
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to test with changelog on and off? same for the others below

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it isn't needed. Good point.

Some(dataToKeyRowWithRangeScan(0L, "a")), cfName)
val negResults = negIter.map(_.key.getLong(0)).toList
negIter.close()
assert(negResults === diverseTimestamps.filter(ts => ts >= -300 && ts < 0).sorted)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also test start=None, end=None?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

cc. @viirya


val kvEncoder = keyValueEncoderMap.get(colFamilyName)
require(kvEncoder._1.supportsRangeScan,
"Range scan requires an encoder that supports range scanning!")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Range scan with multiple values requires an

}
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline ?

Comment on lines +609 to +613
if (!isValidated && rowPair.value != null && !useColumnFamilies) {
StateStoreProvider.validateStateRowFormat(
rowPair.key, keySchema, rowPair.value, valueSchema, stateStoreId, storeConf)
isValidated = true
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only rangeScanWithMultiValues needs to validate the state row format, but rangeScan doesn't?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding! Looks like we missed prefixScan to do that (while prefixScanWithMultiValues does) and missed the same for rangeScan?

Let's deal with it as FOLLOWUP or another JIRA ticket since we want to address both prefixScan and rangeScan, not only rangeScan.

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Just one question.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

I'll file a JIRA ticket for #55226 (comment).

Thanks! Merging to master.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants