Skip to content

Commit 07e241d

Browse files
committed
fix
1 parent 137c25b commit 07e241d

1 file changed

Lines changed: 3 additions & 2 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ class SymmetricHashJoinStateManagerV4(
660660
private val iter = if (minTs == Long.MinValue && maxTs == Long.MaxValue) {
661661
stateStore.prefixScanWithMultiValues(key, colFamilyName)
662662
} else {
663-
val startKeyRow = createKeyRow(key, minTs)
663+
val startKeyRow = createKeyRow(key, minTs).copy()
664664
val endKeyRow = createKeyRow(key, maxTs + 1)
665665
stateStore.scanWithMultiValues(Some(startKeyRow), endKeyRow, colFamilyName)
666666
}
@@ -774,8 +774,9 @@ class SymmetricHashJoinStateManagerV4(
774774
}
775775

776776
private lazy val dummyKeyRow: UnsafeRow = {
777+
val defaultValues = keySchema.fields.map(f => Literal.default(f.dataType).eval())
777778
val projection = UnsafeProjection.create(keySchema)
778-
projection(new GenericInternalRow(keySchema.length)).copy()
779+
projection(new GenericInternalRow(defaultValues)).copy()
779780
}
780781

781782
case class EvictedKeysResult(key: UnsafeRow, timestamp: Long, numValues: Int)

0 commit comments

Comments
 (0)