[SPARK-56402][SS] Apply rangeScan API in stream-stream join format version 4#55267
[SPARK-56402][SS] Apply rangeScan API in stream-stream join format version 4#55267HeartSaVioR wants to merge 9 commits intoapache:masterfrom
Conversation
|
Only the last commit is related to this PR. Once #55226 is merged, I'll rebase. |
| InternalRow.fromSeq(schema.map(f => defaultValueForType(f.dataType))) | ||
| } | ||
|
|
||
| private def defaultValueForType(dt: DataType): Any = dt match { |
There was a problem hiding this comment.
How can we make sure that this list is comprehensive? I am surprised that Spark does not have a native utility for this. For example, not sure if missing DecimalType, ArrayType, or MapType is fine here.
There was a problem hiding this comment.
Will returning null lead to crash in the encoder? Wonder if we should just throw UnsupportedException for wildcard.
There was a problem hiding this comment.
I think I simply missed the utility method since it was indirect.
Literal object has default method and Literal instance can give the value via .value. I'll update the code here.
| // startTimestamp is exclusive (already evicted), so we seek from st + 1. | ||
| val startKeyRow = startTimestamp.flatMap { st => | ||
| if (st < Long.MaxValue) Some(createScanBoundaryRow(st + 1)) | ||
| else None |
There was a problem hiding this comment.
Can we just return empty Iterator here? This avoids the silent full scan of the state.
There was a problem hiding this comment.
This just means we do not know clearly about where to start from (e.g. first batch), not that we can skip scanning. W.r.t. concern of full scan, we will still be guarding the range with column family.
There was a problem hiding this comment.
You mean startTimestamp == Long.maxValue means we are at the first batch? I thought it should be None.
There was a problem hiding this comment.
Oh yes you raise a good point. start being Long.maxValue should not match anything.
| // Filter out entries outside [minTs, maxTs]. This is essential when using | ||
| // prefixScan (which returns all timestamps for the key) and serves as a | ||
| // safety guard for rangeScan as well. | ||
| if (ts < minTs || ts > maxTs) { |
There was a problem hiding this comment.
If it is a safety guard, maybe we should throw an error? IIUC, ts should never be greater than maxTs in either case (rangeScan or prefixScan) based on the current code? ts < minTs may be valid for prefixScan but not for rangeScan
There was a problem hiding this comment.
It's a valid logic for prefixScan and safety guard for rangeScan. It's more about generalization of the code, but we can specifically check with prefixScan vs rangeScan and only apply assertion on rangeScan - do you think that'd be preferable?
Btw I realize we should not change the codebase if we want to generalize the code. pastUpperBound flag is still useful for prefixScan. Maybe I'll need to revert the code except applying rangeScan. Still, if you prefer to have assertion for rangeScan, we can put it additionally.
There was a problem hiding this comment.
I agree we should generalize on the common logics, but I think this check is not one of it. I think it is good to push this range check to the separate methods of prefixScan and rangeScan since the two come with different guarantees. We can assert in rangeScan and do early exit in prefixScan this way.
There was a problem hiding this comment.
Let's just assert the case for range scan rather than having duplicated code.
| * timestamp matters for ordering in the prefix encoder. | ||
| */ | ||
| private def createScanBoundaryRow(timestamp: Long): UnsafeRow = { | ||
| val defaultKey = UnsafeProjection.create(keySchema) |
There was a problem hiding this comment.
nit: we are creating new UnsafeRow in each call, consider have a reuseable row?
There was a problem hiding this comment.
Yeah probably defaultKey can be simply reused. Nice finding!
| @@ -1105,6 +1105,67 @@ class SymmetricHashJoinStateManagerEventTimeInValueSuite | |||
| } | |||
| } | |||
|
|
|||
There was a problem hiding this comment.
nit: Can we test for overflow boundaries for eviction as well?
There was a problem hiding this comment.
We have test with providing Range on Long.MinValue and Long.MaxValue. Would you mind to be more specific?
There was a problem hiding this comment.
Ah looks like we don't have it for eviction. Working on it.
There was a problem hiding this comment.
I reviewed the last two commits of this PR and they look good. Thanks for making the change. This could greatly improve Stream-Stream Join performance. Please make sure to address this comment. It may be hard to find.
#55267 (comment)
f3a3213 to
e976887
Compare
|
cc. @viirya |
| } | ||
| /** Predicate for watermark on state keys */ | ||
| case class JoinStateKeyWatermarkPredicate(expr: Expression, stateWatermark: Long) | ||
| case class JoinStateKeyWatermarkPredicate( |
There was a problem hiding this comment.
Can we add a high level comment to explain why the prevStateWatermark is passed here
There was a problem hiding this comment.
Good suggestion! Done.
| val ts = TimestampKeyStateEncoder.extractTimestamp(unsafeRowPair.key) | ||
|
|
||
| if (useRangeScan) { | ||
| assert(ts >= minTs && ts <= maxTs, |
There was a problem hiding this comment.
Could we add an error class for this ?
| * @param cfName The column family name. | ||
| * @return An iterator of ByteArrayPairs in the given range. | ||
| */ | ||
| def scan( |
There was a problem hiding this comment.
nit: should we call it scanRange ?
|
|
||
| val kvEncoder = keyValueEncoderMap.get(colFamilyName) | ||
| require(kvEncoder._1.supportsRangeScan, | ||
| "Range scan requires an encoder that supports range scanning!") |
There was a problem hiding this comment.
nit: with multiple values requires
e976887 to
ef05991
Compare
| private val iter = if (useRangeScan) { | ||
| val startKey = createKeyRow(key, minTs).copy() | ||
| // rangeScanWithMultiValues endKey is exclusive, so use maxTs + 1 | ||
| val endKey = Some(createKeyRow(key, maxTs + 1)) |
There was a problem hiding this comment.
Do we need to copy it like startKey?
There was a problem hiding this comment.
We don't need to copy endKey since startKey and endKey should co-exist at the same time, but once we call the rangeScanWithMultiValues, both startKey and endKey aren't used.
I'm leaving code comment instead.
|
The last commit (before the empty commit) is the same with #55265 . |
6ec5336 to
05eeb66
Compare
Use bounded scan ranges in stream-stream join V4 operators to narrow the iteration scope during eviction and value lookup: - scanEvictedKeys (TsWithKeyTypeStore): use scanWithMultiValues with startKey derived from the previous batch's state watermark and endKey from the current eviction threshold. Thread prevBatchStateWatermark through JoinStateWatermarkPredicate -> SupportsEvictByTimestamp. - getValuesInRange (KeyWithTsToValuesStore): use scanWithMultiValues for bounded timestamp ranges, falling back to prefixScan for full range. Create default-valued boundary rows to avoid NullPointerException when the join key schema contains non-nullable fields (e.g. window structs).
05eeb66 to
b3e0380
Compare
|
https://github.com/HeartSaVioR/spark/runs/72040646689 CI only failed in sparkr which is unrelated. |
|
Thanks! Merging to master. |
What changes were proposed in this pull request?
This PR proposes to apply rangeScan API in stream-stream join format version 4, which will give an improvement of scanning on matching rows for time interval join and eviction.
The main idea for eviction is to perform scanning secondary index from [the end timestamp of previous scan + 1, new end timestamp], which was [None, new end timestamp]. Previously it had to go through tombstones prior batches made in prior evictions (till compaction happens), and with this change we will be able to skip those tombstones.
The idea of time interval join is straightforward - we know the timestamp range of matching rows and we used it to scope it. Previously we scan all timestamps within the key from RocksDB and apply filter. We move the due of filter to RocksDB, to leverage the same effect with the above (skipping tombstones).
Why are the changes needed?
This change will give a hit to RocksDB about the exact range to scan, reducing the chance of reading tombstone a lot.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New UTs, and existing UTs.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude 4.6 Opus