Skip to content

Commit a116f63

Browse files
committed
Make RangeScanBoundaryUtils safe against CharType / VariantType key schemas
Address #55265 (comment): Literal.default's recursive "smallest" property does not hold for CharType or VariantType, so unguarded use can silently produce incorrect range-scan boundaries when such types appear in the key schema. - CharType(n): override with n zero-bytes (U+0000 repeated, UTF-8 encoded), which is the byte-wise smallest legitimate CharType(n) value. This keeps preserveCharVarcharTypeInfo=true users supported. - VariantType: assert-reject, because the Variant binary spec is @unstable and a hand-encoded minimum would be brittle. Existing Spark analysis already blocks VariantType in grouping / hashing positions, so this is defensive and should never fire in practice. Add RangeScanBoundaryUtilsSuite covering the happy path, byte-wise sanity check for CharType, nested CharType in struct, VarcharType empty default, and VariantType rejection at top level / nested in struct / nested in array.
1 parent d8e9bd4 commit a116f63

2 files changed

Lines changed: 202 additions & 15 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RangeScanBoundaryUtils.scala

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
package org.apache.spark.sql.execution.streaming.state
1919

2020
import org.apache.spark.sql.catalyst.InternalRow
21-
import org.apache.spark.sql.catalyst.expressions.{Literal, UnsafeProjection, UnsafeRow}
22-
import org.apache.spark.sql.types.StructType
21+
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal, UnsafeProjection, UnsafeRow}
22+
import org.apache.spark.sql.types.{CharType, DataType, StructType, UserDefinedType, VariantType}
23+
import org.apache.spark.unsafe.types.UTF8String
2324

2425
/**
2526
* Utilities for building boundary rows used as start / end keys in state store
@@ -31,18 +32,27 @@ import org.apache.spark.sql.types.StructType
3132
* The non-ordering columns must encode byte-wise no larger than any real entry at
3233
* the same ordering prefix, or `seek()` will silently skip matching entries.
3334
*
34-
* The helpers here fill non-ordering columns with `Literal.default` (recursive
35-
* zero / empty / false). In UnsafeRow, that encodes as the byte-wise smallest
36-
* representation for all standard Dataset/SQL-encoded types -- numerics (including
37-
* negatives, whose non-zero two's-complement bytes still sort greater than
38-
* all-zero), and variable-length types (size=0 beats any non-zero size).
35+
* The helpers here fill non-ordering columns with recursive defaults. For most
36+
* types, `Literal.default` (recursive zero / empty / false) is already byte-wise
37+
* smallest in UnsafeRow -- numerics (including negatives, whose non-zero two's-
38+
* complement bytes still sort greater than all-zero), and variable-length types
39+
* (size=0 beats any non-zero size).
3940
*
40-
* Caveats where the "smallest" property does NOT hold and which therefore should
41-
* not appear in caller key schemas:
42-
* - `CharType(n)`: default is space-padded (0x20), but real values may legally
43-
* contain control bytes (0x00..0x1F).
41+
* Exceptions handled explicitly:
42+
* - `CharType(n)`: `Literal.default` is space-padded (0x20) because every
43+
* stored value has exactly `n` characters, and real values may legally
44+
* contain control bytes (0x00..0x1F). We override with `n` bytes of `0x00`
45+
* (U+0000 is a legal 1-byte UTF-8 code point), giving the smallest possible
46+
* UnsafeRow encoding (minimum byte length `n`, minimum per-byte content).
47+
*
48+
* Types rejected at runtime because we cannot safely hand-encode a byte-wise
49+
* minimum:
4450
* - `VariantType`: the Variant binary layout is not guaranteed minimized by
45-
* `castToVariant(0, IntegerType)`.
51+
* `castToVariant(0, IntegerType)`, and the Variant spec is `@Unstable`, so
52+
* the minimum would be brittle against future spec changes. In practice,
53+
* Spark analysis (grouping / hashing checks in `ExprUtils`, `HashExpression`)
54+
* already rejects VariantType in state-store key positions before reaching
55+
* the range-scan helpers, so this assertion is defensive only.
4656
*
4757
* A cleaner long-term fix would extend the state store API to accept an
4858
* ordering-column-only range bound and avoid synthesizing boundary rows at all.
@@ -54,11 +64,13 @@ import org.apache.spark.sql.types.StructType
5464
private[sql] object RangeScanBoundaryUtils {
5565

5666
/**
57-
* Build an `InternalRow` whose fields are the recursive defaults of `schema`. See
58-
* the object-level docstring for the byte-wise-ordering guarantees and caveats.
67+
* Build an `InternalRow` whose fields are the recursive byte-wise-smallest
68+
* defaults of `schema`. See the object-level docstring for the ordering
69+
* guarantees and the CharType / VariantType handling.
5970
*/
6071
def defaultInternalRow(schema: StructType): InternalRow = {
61-
InternalRow.fromSeq(schema.map(f => Literal.default(f.dataType).value))
72+
assertBoundarySchemaSupported(schema)
73+
InternalRow.fromSeq(schema.map(f => recursiveDefaultValue(f.dataType)))
6274
}
6375

6476
/**
@@ -70,4 +82,37 @@ private[sql] object RangeScanBoundaryUtils {
7082
def defaultUnsafeRow(schema: StructType): UnsafeRow = {
7183
UnsafeProjection.create(schema).apply(defaultInternalRow(schema)).copy()
7284
}
85+
86+
/**
87+
* Produce the byte-wise smallest legitimate value for `dt`. Falls back to
88+
* `Literal.default` for types where it is already byte-wise smallest; overrides
89+
* `CharType` with `n` zero-bytes; recurses through `StructType` and unwraps
90+
* `UserDefinedType`. `ArrayType` / `MapType` defaults are empty and therefore
91+
* already smallest regardless of element type, so no recursion is needed.
92+
*/
93+
private def recursiveDefaultValue(dt: DataType): Any = dt match {
94+
case c: CharType =>
95+
UTF8String.fromBytes(new Array[Byte](c.length))
96+
case struct: StructType =>
97+
new GenericInternalRow(
98+
struct.fields.map(f => recursiveDefaultValue(f.dataType)))
99+
case udt: UserDefinedType[_] =>
100+
recursiveDefaultValue(udt.sqlType)
101+
case _ =>
102+
Literal.default(dt).value
103+
}
104+
105+
/**
106+
* Reject schemas containing types for which we cannot hand-encode a byte-wise
107+
* minimum (see object-level docstring). Existing Spark analysis already blocks
108+
* VariantType in grouping / hashing positions, so this assertion is defensive
109+
* and should never fire in practice.
110+
*/
111+
private def assertBoundarySchemaSupported(schema: StructType): Unit = {
112+
val hasVariantType = schema.existsRecursively(_.isInstanceOf[VariantType])
113+
assert(!hasVariantType,
114+
"RangeScanBoundaryUtils cannot build a scan boundary for a schema containing " +
115+
"VariantType; see RangeScanBoundaryUtils docstring for details. " +
116+
s"schema=${schema.catalogString}")
117+
}
73118
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming.state
19+
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
22+
import org.apache.spark.sql.types._
23+
import org.apache.spark.unsafe.types.UTF8String
24+
25+
class RangeScanBoundaryUtilsSuite extends SparkFunSuite {
26+
27+
test("defaultInternalRow / defaultUnsafeRow accept schemas with common types") {
28+
val schema = new StructType()
29+
.add("ts", LongType)
30+
.add("s", StringType)
31+
.add("i", IntegerType)
32+
.add("nested", new StructType().add("b", BinaryType).add("d", DoubleType))
33+
.add("arr", ArrayType(IntegerType))
34+
.add("m", MapType(StringType, IntegerType))
35+
36+
val row = RangeScanBoundaryUtils.defaultInternalRow(schema)
37+
assert(row.numFields === schema.length)
38+
39+
val unsafe = RangeScanBoundaryUtils.defaultUnsafeRow(schema)
40+
assert(unsafe.numFields === schema.length)
41+
}
42+
43+
test("CharType field uses n zero-bytes, which is byte-wise <= any legitimate value") {
44+
val n = 5
45+
val schema = new StructType().add("ts", LongType).add("c", CharType(n))
46+
val boundary = RangeScanBoundaryUtils.defaultUnsafeRow(schema)
47+
val boundaryStr = boundary.getUTF8String(1)
48+
49+
// UTF-8 encoding of n U+0000 code points is exactly n bytes of 0x00.
50+
assert(boundaryStr.numBytes() === n)
51+
(0 until n).foreach { i =>
52+
assert(boundaryStr.getByte(i) === 0.toByte, s"byte $i is not 0x00")
53+
}
54+
55+
// Sanity: a legitimate CharType(5) value written through UnsafeProjection
56+
// (e.g. "abcde") should produce a row whose raw bytes are >= the boundary row's.
57+
def encode(value: UTF8String): Array[Byte] = {
58+
val input = new GenericInternalRow(2)
59+
input.setLong(0, 0L)
60+
input.update(1, value)
61+
UnsafeProjection.create(schema).apply(input).getBytes
62+
}
63+
64+
val boundaryBytes = boundary.getBytes
65+
val realBytes = encode(UTF8String.fromString("abcde"))
66+
assert(compareBytes(boundaryBytes, realBytes) <= 0,
67+
"boundary bytes should be <= 'abcde' bytes")
68+
69+
// Also <= a value with legitimate low code points (e.g. all U+0001 padded).
70+
val lowValue = UTF8String.fromBytes(Array.fill[Byte](n)(1.toByte))
71+
assert(compareBytes(boundaryBytes, encode(lowValue)) <= 0,
72+
"boundary bytes should be <= all-0x01 bytes")
73+
}
74+
75+
test("CharType nested inside a struct is still handled") {
76+
val inner = new StructType().add("c", CharType(3))
77+
val schema = new StructType().add("ts", LongType).add("nested", inner)
78+
// Should not throw, and the nested struct's char field should be 3 zero-bytes.
79+
val row = RangeScanBoundaryUtils.defaultInternalRow(schema)
80+
val nested = row.getStruct(1, inner.length)
81+
val str = nested.getUTF8String(0)
82+
assert(str.numBytes() === 3)
83+
(0 until 3).foreach(i => assert(str.getByte(i) === 0.toByte))
84+
}
85+
86+
test("VarcharType default (empty string) is still used and is byte-wise smallest") {
87+
val schema = new StructType().add("ts", LongType).add("v", VarcharType(10))
88+
// Should not throw; VarcharType's Literal.default is empty string (no padding).
89+
val row = RangeScanBoundaryUtils.defaultInternalRow(schema)
90+
val s = row.getUTF8String(1)
91+
assert(s.numBytes() === 0)
92+
}
93+
94+
test("defaultInternalRow rejects schemas containing VariantType at top level") {
95+
val schema = new StructType().add("ts", LongType).add("v", VariantType)
96+
val e = intercept[AssertionError] {
97+
RangeScanBoundaryUtils.defaultInternalRow(schema)
98+
}
99+
assert(e.getMessage.contains("VariantType"))
100+
}
101+
102+
test("defaultInternalRow rejects schemas containing VariantType nested in array") {
103+
val schema = new StructType()
104+
.add("ts", LongType)
105+
.add("arr", ArrayType(VariantType))
106+
val e = intercept[AssertionError] {
107+
RangeScanBoundaryUtils.defaultInternalRow(schema)
108+
}
109+
assert(e.getMessage.contains("VariantType"))
110+
}
111+
112+
test("defaultInternalRow rejects schemas containing VariantType nested in struct") {
113+
val schema = new StructType()
114+
.add("ts", LongType)
115+
.add("nested", new StructType().add("v", VariantType))
116+
val e = intercept[AssertionError] {
117+
RangeScanBoundaryUtils.defaultInternalRow(schema)
118+
}
119+
assert(e.getMessage.contains("VariantType"))
120+
}
121+
122+
test("defaultUnsafeRow also triggers the schema check") {
123+
val schema = new StructType().add("ts", LongType).add("v", VariantType)
124+
val e = intercept[AssertionError] {
125+
RangeScanBoundaryUtils.defaultUnsafeRow(schema)
126+
}
127+
assert(e.getMessage.contains("VariantType"))
128+
}
129+
130+
// Byte-wise unsigned lexicographic comparison, matching the RocksDB key order.
131+
private def compareBytes(a: Array[Byte], b: Array[Byte]): Int = {
132+
val len = math.min(a.length, b.length)
133+
var i = 0
134+
while (i < len) {
135+
val ai = a(i) & 0xFF
136+
val bi = b(i) & 0xFF
137+
if (ai != bi) return ai - bi
138+
i += 1
139+
}
140+
a.length - b.length
141+
}
142+
}

0 commit comments

Comments
 (0)