Skip to content

[SPARK-56402][SS] Apply rangeScan API in stream-stream join format version 4#55267

Closed
HeartSaVioR wants to merge 9 commits intoapache:masterfrom
HeartSaVioR:SPARK-56402-on-top-of-SPARK-56369
Closed

[SPARK-56402][SS] Apply rangeScan API in stream-stream join format version 4#55267
HeartSaVioR wants to merge 9 commits intoapache:masterfrom
HeartSaVioR:SPARK-56402-on-top-of-SPARK-56369

Conversation

@HeartSaVioR
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR proposes to apply rangeScan API in stream-stream join format version 4, which will give an improvement of scanning on matching rows for time interval join and eviction.

The main idea for eviction is to perform scanning secondary index from [the end timestamp of previous scan + 1, new end timestamp], which was [None, new end timestamp]. Previously it had to go through tombstones prior batches made in prior evictions (till compaction happens), and with this change we will be able to skip those tombstones.

The idea of time interval join is straightforward - we know the timestamp range of matching rows and we used it to scope it. Previously we scan all timestamps within the key from RocksDB and apply filter. We move the due of filter to RocksDB, to leverage the same effect with the above (skipping tombstones).

Why are the changes needed?

This change will give a hit to RocksDB about the exact range to scan, reducing the chance of reading tombstone a lot.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UTs, and existing UTs.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude 4.6 Opus

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Only the last commit is related to this PR. Once #55226 is merged, I'll rebase.

InternalRow.fromSeq(schema.map(f => defaultValueForType(f.dataType)))
}

private def defaultValueForType(dt: DataType): Any = dt match {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we make sure that this list is comprehensive? I am surprised that Spark does not have a native utility for this. For example, not sure if missing DecimalType, ArrayType, or MapType is fine here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will returning null lead to crash in the encoder? Wonder if we should just throw UnsupportedException for wildcard.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I simply missed the utility method since it was indirect.

Literal object has default method and Literal instance can give the value via .value. I'll update the code here.

// startTimestamp is exclusive (already evicted), so we seek from st + 1.
val startKeyRow = startTimestamp.flatMap { st =>
if (st < Long.MaxValue) Some(createScanBoundaryRow(st + 1))
else None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just return empty Iterator here? This avoids the silent full scan of the state.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just means we do not know clearly about where to start from (e.g. first batch), not that we can skip scanning. W.r.t. concern of full scan, we will still be guarding the range with column family.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean startTimestamp == Long.maxValue means we are at the first batch? I thought it should be None.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes you raise a good point. start being Long.maxValue should not match anything.

// Filter out entries outside [minTs, maxTs]. This is essential when using
// prefixScan (which returns all timestamps for the key) and serves as a
// safety guard for rangeScan as well.
if (ts < minTs || ts > maxTs) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is a safety guard, maybe we should throw an error? IIUC, ts should never be greater than maxTs in either case (rangeScan or prefixScan) based on the current code? ts < minTs may be valid for prefixScan but not for rangeScan

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a valid logic for prefixScan and safety guard for rangeScan. It's more about generalization of the code, but we can specifically check with prefixScan vs rangeScan and only apply assertion on rangeScan - do you think that'd be preferable?

Btw I realize we should not change the codebase if we want to generalize the code. pastUpperBound flag is still useful for prefixScan. Maybe I'll need to revert the code except applying rangeScan. Still, if you prefer to have assertion for rangeScan, we can put it additionally.

Copy link
Copy Markdown
Contributor

@eason-yuchen-liu eason-yuchen-liu Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should generalize on the common logics, but I think this check is not one of it. I think it is good to push this range check to the separate methods of prefixScan and rangeScan since the two come with different guarantees. We can assert in rangeScan and do early exit in prefixScan this way.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just assert the case for range scan rather than having duplicated code.

* timestamp matters for ordering in the prefix encoder.
*/
private def createScanBoundaryRow(timestamp: Long): UnsafeRow = {
val defaultKey = UnsafeProjection.create(keySchema)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we are creating new UnsafeRow in each call, consider have a reuseable row?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah probably defaultKey can be simply reused. Nice finding!

@@ -1105,6 +1105,67 @@ class SymmetricHashJoinStateManagerEventTimeInValueSuite
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we test for overflow boundaries for eviction as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have test with providing Range on Long.MinValue and Long.MaxValue. Would you mind to be more specific?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah looks like we don't have it for eviction. Working on it.

Copy link
Copy Markdown
Contributor

@eason-yuchen-liu eason-yuchen-liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the last two commits of this PR and they look good. Thanks for making the change. This could greatly improve Stream-Stream Join performance. Please make sure to address this comment. It may be hard to find.
#55267 (comment)

@HeartSaVioR HeartSaVioR force-pushed the SPARK-56402-on-top-of-SPARK-56369 branch from f3a3213 to e976887 Compare April 15, 2026 03:35
@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

cc. @viirya

}
/** Predicate for watermark on state keys */
case class JoinStateKeyWatermarkPredicate(expr: Expression, stateWatermark: Long)
case class JoinStateKeyWatermarkPredicate(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a high level comment to explain why the prevStateWatermark is passed here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion! Done.

val ts = TimestampKeyStateEncoder.extractTimestamp(unsafeRowPair.key)

if (useRangeScan) {
assert(ts >= minTs && ts <= maxTs,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an error class for this ?

* @param cfName The column family name.
* @return An iterator of ByteArrayPairs in the given range.
*/
def scan(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we call it scanRange ?


val kvEncoder = keyValueEncoderMap.get(colFamilyName)
require(kvEncoder._1.supportsRangeScan,
"Range scan requires an encoder that supports range scanning!")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: with multiple values requires

@HeartSaVioR HeartSaVioR force-pushed the SPARK-56402-on-top-of-SPARK-56369 branch from e976887 to ef05991 Compare April 18, 2026 13:07
private val iter = if (useRangeScan) {
val startKey = createKeyRow(key, minTs).copy()
// rangeScanWithMultiValues endKey is exclusive, so use maxTs + 1
val endKey = Some(createKeyRow(key, maxTs + 1))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to copy it like startKey?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to copy endKey since startKey and endKey should co-exist at the same time, but once we call the rangeScanWithMultiValues, both startKey and endKey aren't used.

I'm leaving code comment instead.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

The last commit (before the empty commit) is the same with #55265 .

@HeartSaVioR HeartSaVioR force-pushed the SPARK-56402-on-top-of-SPARK-56369 branch from 6ec5336 to 05eeb66 Compare April 19, 2026 21:17
Use bounded scan ranges in stream-stream join V4 operators to narrow
the iteration scope during eviction and value lookup:

- scanEvictedKeys (TsWithKeyTypeStore): use scanWithMultiValues with
  startKey derived from the previous batch's state watermark and endKey
  from the current eviction threshold. Thread prevBatchStateWatermark
  through JoinStateWatermarkPredicate -> SupportsEvictByTimestamp.
- getValuesInRange (KeyWithTsToValuesStore): use scanWithMultiValues
  for bounded timestamp ranges, falling back to prefixScan for full
  range.

Create default-valued boundary rows to avoid NullPointerException when
the join key schema contains non-nullable fields (e.g. window structs).
@HeartSaVioR HeartSaVioR force-pushed the SPARK-56402-on-top-of-SPARK-56369 branch from 05eeb66 to b3e0380 Compare April 19, 2026 21:21
@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

https://github.com/HeartSaVioR/spark/runs/72040646689

CI only failed in sparkr which is unrelated.

@HeartSaVioR
Copy link
Copy Markdown
Contributor Author

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants