Skip to content

Commit 5fd1438

Browse files
committed
[SPARK-56406][SS] Stream-stream join v4: skip writing secondary index if the operator will not evict from that side
### What changes were proposed in this pull request? This PR proposes to skip writing secondary index if the operator will not evict from that side. For simplicity, we keep creating column family and just skip writing to secondary index. The way to understand whether the operator won't evict from that side is following: It's very obvious for the case where both sides do not have event time column - both sides do not evict any state at all. The tricky case is when one side has event time column and another side can deduce from it to evict the state row. * equality join (event time column is in join key): non-watermarked side can actually evict the state. * time-interval join (event time column is in value side): "neither" side can actually evict the state since one side is unbound and other side has to be relative with it. For the former, the logic is able to detect the ability for non-watermarked side and write secondary index for it. (See how joinKeyOrdinalForWatermark is constructed.) For the latter, technically, we can skip "both" sides to skip writing secondary index, but that's fairly minor case and the logic only enables non-watermarked side to skip writing secondary index. (The PR left the potential optimization as TODO code comment, but we are not going to file a JIRA ticket since we don't know whether we ever demand it.) The main coverage of this optimization is a regular join where both sides do not have event time column; the coverage of watermark on only one side is a sort of bonus. For safety net, `evict***` methods will raise an exception if the operator has skipped writing secondary index, since it is NOT expected for these methods to be called. ### Why are the changes needed? There are several cases where the operator never leverages the secondary index on one (or both) side(s), so there is no value to write to the secondary index. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude 4.6 Opus Closes #55271 from HeartSaVioR/SPARK-56406. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent d7df192 commit 5fd1438

3 files changed

Lines changed: 173 additions & 9 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,16 @@ class SymmetricHashJoinStateManagerV4(
244244
// pass the information. The information is in SQLConf.
245245
allowMultipleEventTimeColumns = false)
246246

247+
// When there is no event time column in the value and no watermark ordinal in the key,
248+
// the secondary index (TsWithKey) will never be used for eviction. Skip writing to it
249+
// to avoid unnecessary RocksDB merge overhead.
250+
// TODO(SPARK-56536): This could be further optimized by also considering whether the state
251+
// watermark predicate is defined. Even when an event time column exists, the secondary index
252+
// is unused if eviction is not possible (e.g., only one side defines a watermark in a time
253+
// interval join). That would require propagating the predicate information here.
254+
private val hasEventTime: Boolean =
255+
eventTimeColIdxOpt.isDefined || joinKeyOrdinalForWatermark.isDefined
256+
247257
private val random = new scala.util.Random(System.currentTimeMillis())
248258
private val bucketCountForNoEventTime = 1024
249259
private val extractEventTimeFn: UnsafeRow => Long = { row =>
@@ -353,7 +363,9 @@ class SymmetricHashJoinStateManagerV4(
353363
val eventTime = extractEventTimeFnFromKey(key).getOrElse(extractEventTimeFn(value))
354364
// We always do blind merge for appending new value.
355365
keyWithTsToValues.append(key, eventTime, value, matched)
356-
tsWithKey.add(eventTime, key)
366+
if (hasEventTime) {
367+
tsWithKey.add(eventTime, key)
368+
}
357369
}
358370

359371
override def getJoinedRows(
@@ -508,6 +520,8 @@ class SymmetricHashJoinStateManagerV4(
508520
}
509521

510522
override def evictByTimestamp(endTimestamp: Long): Long = {
523+
require(hasEventTime,
524+
"evictByTimestamp requires event time; secondary index was not populated")
511525
var removed = 0L
512526
tsWithKey.scanEvictedKeys(endTimestamp).foreach { evicted =>
513527
val key = evicted.key
@@ -524,6 +538,8 @@ class SymmetricHashJoinStateManagerV4(
524538
}
525539

526540
override def evictAndReturnByTimestamp(endTimestamp: Long): Iterator[KeyToValuePair] = {
541+
require(hasEventTime,
542+
"evictAndReturnByTimestamp requires event time; secondary index was not populated")
527543
val reusableKeyToValuePair = KeyToValuePair()
528544

529545
tsWithKey.scanEvictedKeys(endTimestamp).flatMap { evicted =>

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -931,13 +931,16 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite {
931931
AddData(input2, 1, 10),
932932
CheckNewAnswer((1, 2, 3)),
933933
Execute { query =>
934-
val numInternalKeys =
934+
val numInternalCfs =
935935
query.lastProgress
936936
.stateOperators(0)
937937
.customMetrics
938-
.get("rocksdbNumInternalColFamiliesKeys")
939-
// Number of internal column family keys should be nonzero for this join implementation
940-
assert(numInternalKeys.longValue() > 0)
938+
.get("rocksdbNumInternalColumnFamilies")
939+
// The V4 virtual-column-family join uses internal column families for the
940+
// secondary index, so the CF count should be nonzero for this join implementation.
941+
// Note: we intentionally check the CF count (not the key count), because for joins
942+
// without event time the secondary index is created but never populated.
943+
assert(numInternalCfs.longValue() > 0)
941944
},
942945
StopStream,
943946
// Restart the query from the same checkpoint
@@ -948,13 +951,13 @@ abstract class StreamingInnerJoinBase extends StreamingJoinSuite {
948951
CheckNewAnswer((2, 4, 6), (2, 4, 6)),
949952
Execute { query =>
950953
// The join implementation should not have changed between runs
951-
val numInternalKeys =
954+
val numInternalCfs =
952955
query.lastProgress
953956
.stateOperators(0)
954957
.customMetrics
955-
.get("rocksdbNumInternalColFamiliesKeys")
956-
// Number of internal column family keys should still be nonzero for this join
957-
assert(numInternalKeys.longValue() > 0)
958+
.get("rocksdbNumInternalColumnFamilies")
959+
// Number of internal column families should still be nonzero for this join
960+
assert(numInternalCfs.longValue() > 0)
958961
},
959962
StopStream
960963
)

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.streaming
2020
import org.apache.hadoop.fs.Path
2121
import org.scalatest.Tag
2222

23+
import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions
2324
import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
2425
import org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinExec
2526
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
@@ -184,6 +185,150 @@ class StreamingInnerJoinV4Suite
184185
)
185186
}
186187
}
188+
189+
private def readStateStore(checkpointLoc: String, storeName: String): Long = {
190+
spark.read.format("statestore")
191+
.option(StateSourceOptions.PATH, checkpointLoc)
192+
.option(StateSourceOptions.STORE_NAME, storeName)
193+
.load()
194+
.count()
195+
}
196+
197+
testWithVirtualColumnFamilyJoins(
198+
"SPARK-56406: secondary index is not populated for join without event time") {
199+
withTempDir { checkpointDir =>
200+
val input1 = MemoryStream[Int]
201+
val input2 = MemoryStream[Int]
202+
203+
val df1 = input1.toDF()
204+
.select($"value" as "key", ($"value" * 2) as "leftValue")
205+
val df2 = input2.toDF()
206+
.select($"value" as "key", ($"value" * 3) as "rightValue")
207+
val joined = df1.join(df2, "key")
208+
209+
testStream(joined)(
210+
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
211+
AddData(input1, 1, 2, 3),
212+
CheckAnswer(),
213+
AddData(input2, 1, 2),
214+
CheckNewAnswer((1, 2, 3), (2, 4, 6)),
215+
Execute { _ =>
216+
val checkpointLoc = checkpointDir.getCanonicalPath
217+
218+
assert(readStateStore(checkpointLoc, "left-keyWithTsToValues") > 0,
219+
"left primary store should have rows")
220+
assert(readStateStore(checkpointLoc, "right-keyWithTsToValues") > 0,
221+
"right primary store should have rows")
222+
223+
assert(readStateStore(checkpointLoc, "left-tsWithKey") === 0,
224+
"left secondary index should be empty without event time")
225+
assert(readStateStore(checkpointLoc, "right-tsWithKey") === 0,
226+
"right secondary index should be empty without event time")
227+
},
228+
StopStream
229+
)
230+
}
231+
}
232+
233+
testWithVirtualColumnFamilyJoins(
234+
"SPARK-56406: secondary index populated on both sides when watermark is on join key") {
235+
withTempDir { checkpointDir =>
236+
val input1 = MemoryStream[(Int, Int)]
237+
val input2 = MemoryStream[(Int, Int)]
238+
239+
val df1 = input1.toDF().toDF("key", "time")
240+
.select($"key", timestamp_seconds($"time") as "ts", ($"key" * 2) as "leftValue")
241+
.withWatermark("ts", "10 seconds")
242+
val df2 = input2.toDF().toDF("key", "time")
243+
.select($"key", timestamp_seconds($"time") as "ts", ($"key" * 3) as "rightValue")
244+
// Only left side has watermark; ts is part of the join key, so
245+
// joinKeyOrdinalForWatermark is defined -> hasEventTime = true for both sides.
246+
247+
val joined = df1.join(df2, Seq("key", "ts"))
248+
.select($"key", $"ts".cast("long"), $"leftValue", $"rightValue")
249+
250+
testStream(joined)(
251+
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
252+
// Use ts=20 for the row we expect to join against input2.
253+
// withWatermark("ts", "10 seconds") causes batch 0 to advance the watermark to
254+
// max(ts) - 10s = 10s. Because watermark-based cleanup is enabled,
255+
// MicroBatchExecution fires a no-data batch (shouldRunAnotherBatch) after
256+
// batch 0 that evicts any state rows with ts <= 10 (inclusive). Keeping
257+
// ts=20 > 10 ensures the row survives that eviction so the input2 row in
258+
// the following batch can match it.
259+
AddData(input1, (1, 20), (2, 10)),
260+
CheckAnswer(),
261+
AddData(input2, (1, 20)),
262+
CheckNewAnswer((1, 20, 2, 3)),
263+
Execute { _ =>
264+
val checkpointLoc = checkpointDir.getCanonicalPath
265+
266+
assert(readStateStore(checkpointLoc, "left-keyWithTsToValues") > 0,
267+
"left primary store should have rows")
268+
assert(readStateStore(checkpointLoc, "right-keyWithTsToValues") > 0,
269+
"right primary store should have rows")
270+
271+
// Both secondary indexes should be populated because joinKeyOrdinalForWatermark
272+
// is defined (watermark on join key applies to both sides).
273+
assert(readStateStore(checkpointLoc, "left-tsWithKey") > 0,
274+
"left secondary index should be populated when watermark is on join key")
275+
assert(readStateStore(checkpointLoc, "right-tsWithKey") > 0,
276+
"right secondary index should be populated when watermark is on join key")
277+
},
278+
StopStream
279+
)
280+
}
281+
}
282+
283+
testWithVirtualColumnFamilyJoins(
284+
"SPARK-56406: secondary index only populated on watermarked side for time interval join") {
285+
withTempDir { checkpointDir =>
286+
val leftInput = MemoryStream[(Int, Int)]
287+
val rightInput = MemoryStream[(Int, Int)]
288+
289+
val df1 = leftInput.toDF().toDF("leftKey", "time")
290+
.select($"leftKey", timestamp_seconds($"time") as "leftTime",
291+
($"leftKey" * 2) as "leftValue")
292+
.withWatermark("leftTime", "10 seconds")
293+
val df2 = rightInput.toDF().toDF("rightKey", "time")
294+
.select($"rightKey", timestamp_seconds($"time") as "rightTime",
295+
($"rightKey" * 3) as "rightValue")
296+
// Only left side has watermark; watermark is on a value column, not the join key.
297+
// joinKeyOrdinalForWatermark is None -> only left has hasEventTime = true.
298+
// Neither side can actually evict: the left state watermark is derived from the right
299+
// side's watermark via the join condition, which is absent here. The left secondary
300+
// index is populated but never used for eviction.
301+
302+
val joined = df1.join(df2,
303+
expr("leftKey = rightKey AND " +
304+
"leftTime BETWEEN rightTime - interval 5 seconds AND rightTime + interval 5 seconds"))
305+
.select($"leftKey", $"leftTime".cast("int"), $"rightTime".cast("int"))
306+
307+
testStream(joined)(
308+
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
309+
AddData(leftInput, (1, 10), (2, 20)),
310+
CheckAnswer(),
311+
AddData(rightInput, (1, 12)),
312+
CheckNewAnswer((1, 10, 12)),
313+
Execute { _ =>
314+
val checkpointLoc = checkpointDir.getCanonicalPath
315+
316+
assert(readStateStore(checkpointLoc, "left-keyWithTsToValues") > 0,
317+
"left primary store should have rows")
318+
assert(readStateStore(checkpointLoc, "right-keyWithTsToValues") > 0,
319+
"right primary store should have rows")
320+
321+
// Left has watermark on a value column -> hasEventTime = true, secondary index populated.
322+
assert(readStateStore(checkpointLoc, "left-tsWithKey") > 0,
323+
"left secondary index should be populated (watermark on left value column)")
324+
// Right has no watermark -> hasEventTime = false, secondary index empty.
325+
assert(readStateStore(checkpointLoc, "right-tsWithKey") === 0,
326+
"right secondary index should be empty (no watermark on right side)")
327+
},
328+
StopStream
329+
)
330+
}
331+
}
187332
}
188333

189334
@SlowSQLTest

0 commit comments

Comments
 (0)