Skip to content

Commit 5dacfe3

Browse files
NJAHNAVI2907NJAHNAVI2907cloud-fan
committed
[SPARK-56654][SQL] Reject unpaired UTF-16 surrogates in Variant JSON parsing
Jackson's permissive surrogate handling let lone surrogates from \uXXXX escapes pass through `parse_json`, `try_parse_json`, and `from_json('variant')`, where `getBytes(UTF_8)` then silently substituted U+FFFD and corrupted the Variant. Validate the decoded strings before they enter the dictionary or write buffer, gated by a new internal SQL conf (default-on) for opt-out compatibility. ### What changes were proposed in this pull request? This PR adds strict Unicode validation to the Variant JSON parser so it rejects strings containing unpaired UTF-16 surrogate code units (e.g. a lone `\uD835` high surrogate). The check runs inside `VariantBuilder.buildJson` for both JSON object keys and string values, before either is encoded to UTF-8 and committed to the Variant binary buffer. The validation is gated by a new internal SQL conf `spark.sql.variant.validateUnicodeInJsonParsing`, defaulting to `true` so the strict, RFC 8259-compliant behavior is enabled by default. Setting the conf to `false` restores the legacy permissive behavior as a transitional escape hatch for pipelines that currently depend on it. The fix applies to all three Variant-parsing entry points: - `parse_json` — throws `MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION` on lone surrogates. - `try_parse_json` — returns `NULL`. - `from_json` — returns `NULL` in `PERMISSIVE` mode (default), throws in `FAILFAST`. ### Why are the changes needed? 1. JSON containing a lone surrogate (e.g. `"\uD835"` not followed by a low surrogate) is invalid. 2. Strict parsers such as simdjson reject these inputs; Jackson's `ReaderBasedJsonParser`, which Spark uses on the JVM, accepts them and decodes the escape into a Java `char` containing the lone surrogate. 3. The Variant ends up containing `?` where the original input was supposed to be, with no error or warning a silent data-corruption bug. 4. The records containing `\uD835` were silently accepted with substituted characters when handled by the JVM, but correctly rejected by Photon. 5. This PR closes that JVM ↔ Photon divergence at its root. ### Does this PR introduce _any_ user-facing change? Yes. With the default spark.sql.variant.validateUnicodeInJsonParsing = true, JSON input containing an unpaired UTF-16 surrogate (e.g. a lone \uD835) will now produce an error instead of being silently accepted. Specifically: - parse_json throws MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION. - try_parse_json returns NULL. - from_json(col, 'variant') returns NULL in PERMISSIVE mode (default) and throws in FAILFAST. Previously, the lone surrogate was decoded into a Java char, then silently substituted with the Unicode replacement character during UTF-8 encoding, producing a Variant value containing ? with no error or warning. Setting spark.sql.variant.validateUnicodeInJsonParsing = false restores the previous permissive behavior as a transitional opt-out. ### How was this patch tested? Added new test cases in VariantExpressionEvalUtilsSuite (unit tests for both reject and legacy-mode paths, covering lone high/low surrogates as values and as object keys, plus valid surrogate pairs as a control) and VariantEndToEndSuite (end-to-end SQL test exercising parse_json / try_parse_json / from_json in both PERMISSIVE and FAILFAST modes, with the conf flipped on and off). ### Was this patch authored or co-authored using generative AI tooling? co-authored by : 'claude-opus-4.7' Closes #55661 from NJAHNAVI2907/SPARK-56654-strict-unicode-json. Lead-authored-by: Jahnavi Nelavelli <75218211+NJAHNAVI2907@users.noreply.github.com> Co-authored-by: NJAHNAVI2907 <jahnavinelavelli29@gmail.com> Co-authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 0594b12) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 7ee145d commit 5dacfe3

15 files changed

Lines changed: 180 additions & 18 deletions

File tree

common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,12 @@
4343
*/
4444
public class VariantBuilder {
4545
public VariantBuilder(boolean allowDuplicateKeys) {
46+
this(allowDuplicateKeys, true);
47+
}
48+
49+
public VariantBuilder(boolean allowDuplicateKeys, boolean validateUnicodeInJsonParsing) {
4650
this.allowDuplicateKeys = allowDuplicateKeys;
51+
this.validateUnicodeInJsonParsing = validateUnicodeInJsonParsing;
4752
}
4853

4954
/**
@@ -53,18 +58,41 @@ public VariantBuilder(boolean allowDuplicateKeys) {
5358
* @throws IOException if any JSON parsing error happens.
5459
*/
5560
public static Variant parseJson(String json, boolean allowDuplicateKeys) throws IOException {
61+
return parseJson(json, allowDuplicateKeys, true);
62+
}
63+
64+
/**
65+
* Similar to {@link #parseJson(String, boolean)}, but additionally controls whether JSON
66+
* string contents are validated to be well-formed Unicode (no unpaired UTF-16 surrogate code
67+
* units). Strict validation is the default and matches RFC 8259 section 7. The flag exists
68+
* to allow callers to opt out for backward compatibility with input that previously parsed
69+
* (with the unpaired surrogate silently replaced by the Unicode replacement character).
70+
*/
71+
public static Variant parseJson(String json, boolean allowDuplicateKeys,
72+
boolean validateUnicodeInJsonParsing) throws IOException {
5673
try (JsonParser parser = new JsonFactory().createParser(json)) {
5774
parser.nextToken();
58-
return parseJson(parser, allowDuplicateKeys);
75+
return parseJson(parser, allowDuplicateKeys, validateUnicodeInJsonParsing);
5976
}
6077
}
6178

6279
/**
63-
* Similar {@link #parseJson(String, boolean)}, but takes a JSON parser instead of string input.
80+
* Similar to {@link #parseJson(String, boolean)}, but takes a JSON parser instead of string
81+
* input.
6482
*/
6583
public static Variant parseJson(JsonParser parser, boolean allowDuplicateKeys)
6684
throws IOException {
67-
VariantBuilder builder = new VariantBuilder(allowDuplicateKeys);
85+
return parseJson(parser, allowDuplicateKeys, true);
86+
}
87+
88+
/**
89+
* Similar to {@link #parseJson(JsonParser, boolean)}, but additionally controls whether JSON
90+
* string contents are validated to be well-formed Unicode. See
91+
* {@link #parseJson(String, boolean, boolean)}.
92+
*/
93+
public static Variant parseJson(JsonParser parser, boolean allowDuplicateKeys,
94+
boolean validateUnicodeInJsonParsing) throws IOException {
95+
VariantBuilder builder = new VariantBuilder(allowDuplicateKeys, validateUnicodeInJsonParsing);
6896
builder.buildJson(parser);
6997
return builder.result();
7098
}
@@ -495,6 +523,9 @@ private void buildJson(JsonParser parser) throws IOException {
495523
int start = writePos;
496524
while (parser.nextToken() != JsonToken.END_OBJECT) {
497525
String key = parser.currentName();
526+
if (validateUnicodeInJsonParsing) {
527+
checkValidUnicodeString(key, parser);
528+
}
498529
parser.nextToken();
499530
int id = addKey(key);
500531
fields.add(new FieldEntry(key, id, writePos - start));
@@ -513,9 +544,14 @@ private void buildJson(JsonParser parser) throws IOException {
513544
finishWritingArray(start, offsets);
514545
break;
515546
}
516-
case VALUE_STRING:
517-
appendString(parser.getText());
547+
case VALUE_STRING: {
548+
String text = parser.getText();
549+
if (validateUnicodeInJsonParsing) {
550+
checkValidUnicodeString(text, parser);
551+
}
552+
appendString(text);
518553
break;
554+
}
519555
case VALUE_NUMBER_INT:
520556
try {
521557
appendLong(parser.getLongValue());
@@ -557,6 +593,30 @@ private void parseFloatingPoint(JsonParser parser) throws IOException {
557593
}
558594
}
559595

596+
// Reject JSON strings that contain unpaired UTF-16 surrogate code units. Java strings can
597+
// hold lone surrogates, but RFC 8259 section 7 requires JSON string contents to be well-formed
598+
// Unicode. Stricter parsers such as simdjson reject these inputs, while Jackson's
599+
// `ReaderBasedJsonParser` accepts them and silently replaces the invalid character with U+FFFD
600+
// when the result is encoded as UTF-8. That silent replacement causes data corruption, so
601+
// we surface a JSON parse error instead.
602+
private static void checkValidUnicodeString(String str, JsonParser parser)
603+
throws JsonParseException {
604+
int len = str.length();
605+
for (int i = 0; i < len; ++i) {
606+
char c = str.charAt(i);
607+
if (Character.isHighSurrogate(c)) {
608+
if (i + 1 >= len || !Character.isLowSurrogate(str.charAt(i + 1))) {
609+
throw new JsonParseException(parser, String.format(
610+
"Invalid Unicode in JSON string: lone high surrogate U+%04X", (int) c));
611+
}
612+
++i;
613+
} else if (Character.isLowSurrogate(c)) {
614+
throw new JsonParseException(parser, String.format(
615+
"Invalid Unicode in JSON string: lone low surrogate U+%04X", (int) c));
616+
}
617+
}
618+
}
619+
560620
// Try to parse a JSON number as a decimal. Return whether the parsing succeeds. The input must
561621
// only use the decimal format (an integer value with an optional '.' in it) and must not use
562622
// scientific notation. It also must fit into the precision limitation of decimal types.
@@ -583,4 +643,8 @@ private boolean tryParseDecimal(String input) {
583643
// Store all keys in `dictionary` in the order of id.
584644
private final ArrayList<byte[]> dictionaryKeys = new ArrayList<>();
585645
private final boolean allowDuplicateKeys;
646+
// When true, JSON string contents are validated to be well-formed Unicode (RFC 8259 sec 7).
647+
// Unpaired UTF-16 surrogate code units cause a `JsonParseException` to be thrown during
648+
// `buildJson`, which surfaces as a `MALFORMED_RECORD_IN_PARSING` error to SQL callers.
649+
private final boolean validateUnicodeInJsonParsing;
586650
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ object VariantExpressionEvalUtils {
3434
def parseJson(
3535
input: UTF8String,
3636
allowDuplicateKeys: Boolean = false,
37-
failOnError: Boolean = true): VariantVal = {
37+
failOnError: Boolean = true,
38+
validateUnicodeInJsonParsing: Boolean = true): VariantVal = {
3839
def parseJsonFailure(exception: Throwable): VariantVal = {
3940
if (failOnError) {
4041
throw exception
@@ -43,7 +44,8 @@ object VariantExpressionEvalUtils {
4344
}
4445
}
4546
try {
46-
val v = VariantBuilder.parseJson(input.toString, allowDuplicateKeys)
47+
val v = VariantBuilder.parseJson(
48+
input.toString, allowDuplicateKeys, validateUnicodeInJsonParsing)
4749
new VariantVal(v.getValue, v.getMetadata)
4850
} catch {
4951
case _: VariantSizeLimitException =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/variantExpressions.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ case class ParseJson(child: Expression, failOnError: Boolean = true)
6262
Seq(
6363
child,
6464
Literal(SQLConf.get.getConf(SQLConf.VARIANT_ALLOW_DUPLICATE_KEYS), BooleanType),
65-
Literal(failOnError, BooleanType)),
66-
inputTypes :+ BooleanType :+ BooleanType,
65+
Literal(failOnError, BooleanType),
66+
Literal(SQLConf.get.getConf(SQLConf.VARIANT_VALIDATE_UNICODE_IN_JSON_PARSING), BooleanType)),
67+
inputTypes :+ BooleanType :+ BooleanType :+ BooleanType,
6768
returnNullable = !failOnError)
6869

6970
override def inputTypes: Seq[AbstractDataType] =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ class JacksonParser(
122122
}
123123

124124
private val variantAllowDuplicateKeys = SQLConf.get.getConf(SQLConf.VARIANT_ALLOW_DUPLICATE_KEYS)
125+
private val variantValidateUnicodeInJsonParsing =
126+
SQLConf.get.getConf(SQLConf.VARIANT_VALIDATE_UNICODE_IN_JSON_PARSING)
125127

126128
protected final def parseVariant(parser: JsonParser): VariantVal = {
127129
// Skips `FIELD_NAME` at the beginning. This check is adapted from `parseJsonToken`, but we
@@ -131,7 +133,8 @@ class JacksonParser(
131133
parser.nextToken()
132134
}
133135
try {
134-
val v = VariantBuilder.parseJson(parser, variantAllowDuplicateKeys)
136+
val v = VariantBuilder.parseJson(
137+
parser, variantAllowDuplicateKeys, variantValidateUnicodeInJsonParsing)
135138
new VariantVal(v.getValue, v.getMetadata)
136139
} catch {
137140
case _: VariantSizeLimitException =>

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6135,6 +6135,19 @@ object SQLConf {
61356135
.booleanConf
61366136
.createWithDefault(false)
61376137

6138+
val VARIANT_VALIDATE_UNICODE_IN_JSON_PARSING =
6139+
buildConf("spark.sql.variant.validateUnicodeInJsonParsing")
6140+
.internal()
6141+
.doc("When true, parsing variant from JSON rejects strings that contain unpaired UTF-16 " +
6142+
"surrogate code units (such as a lone high surrogate like \\uD835), which are invalid " +
6143+
"per RFC 8259 section 7. When false, restores the legacy permissive behavior in which " +
6144+
"the unpaired surrogate is silently replaced by the Unicode replacement character " +
6145+
"during UTF-8 encoding, causing data corruption that diverges from strict JSON parsers.")
6146+
.version("4.3.0")
6147+
.withBindingPolicy(ConfigBindingPolicy.SESSION)
6148+
.booleanConf
6149+
.createWithDefault(true)
6150+
61386151
val VARIANT_ALLOW_READING_SHREDDED =
61396152
buildConf("spark.sql.variant.allowReadingShredded")
61406153
.internal()

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtilsSuite.scala

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,52 @@ class VariantExpressionEvalUtilsSuite extends SparkFunSuite {
140140
}
141141
}
142142

143+
test("SPARK-56654: reject unpaired UTF-16 surrogates in JSON strings") {
144+
val invalidJsonInputs = Seq(
145+
"\"\\uD835\"", // lone high surrogate (string value)
146+
"\"\\uDC00\"", // lone low surrogate (string value)
147+
"\"\\uD835x\\uDC00\"", // surrogates separated by non-surrogate
148+
"\"\\uD835\\uD835\"", // two high surrogates in a row
149+
"\"prefix \\uD835\"", // trailing lone high surrogate
150+
"{\"\\uD835\":1}", // lone surrogate in an object key
151+
"[\"ok\", \"\\uDC00\"]" // lone surrogate inside an array element
152+
)
153+
for (json <- invalidJsonInputs) {
154+
checkError(
155+
exception = intercept[SparkThrowable] {
156+
VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
157+
allowDuplicateKeys = false)
158+
},
159+
condition = "MALFORMED_RECORD_IN_PARSING.WITHOUT_SUGGESTION",
160+
parameters = Map("badRecord" -> json, "failFastMode" -> "FAILFAST")
161+
)
162+
val tryResult = VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
163+
allowDuplicateKeys = false, failOnError = false)
164+
assert(tryResult === null)
165+
}
166+
val validJsonInputs = Seq(
167+
"\"\\uD83D\\uDE05\"", // U+1F605 GRINNING FACE WITH SWEAT
168+
"\"\\uD835\\uDC00\"", // U+1D400 MATHEMATICAL BOLD CAPITAL A
169+
"{\"\\uD83D\\uDE05\":1}", // surrogate pair in an object key
170+
"[\"\\uD835\\uDC00\"]" // surrogate pair inside an array
171+
)
172+
for (json <- validJsonInputs) {
173+
val parsed = VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
174+
allowDuplicateKeys = false)
175+
assert(parsed != null, s"expected non-null variant for $json")
176+
}
177+
}
178+
179+
test("SPARK-56654: legacy mode accepts unpaired surrogates") {
180+
val json = "\"\\uD835\""
181+
val parsed = VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
182+
allowDuplicateKeys = false, validateUnicodeInJsonParsing = false)
183+
assert(parsed != null)
184+
val tryParsed = VariantExpressionEvalUtils.parseJson(UTF8String.fromString(json),
185+
allowDuplicateKeys = false, failOnError = false, validateUnicodeInJsonParsing = false)
186+
assert(tryParsed != null)
187+
}
188+
143189
test("isVariantNull") {
144190
def check(json: String, expected: Boolean): Unit = {
145191
if (json != null) {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [static_invoke(VariantExpressionEvalUtils.isValidVariant(static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false, true)))) AS is_valid_variant(parse_json(g))#0]
1+
Project [static_invoke(VariantExpressionEvalUtils.isValidVariant(static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false, true, true)))) AS is_valid_variant(parse_json(g))#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [static_invoke(VariantExpressionEvalUtils.isVariantNull(static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false, true)))) AS is_variant_null(parse_json(g))#0]
1+
Project [static_invoke(VariantExpressionEvalUtils.isVariantNull(static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false, true, true)))) AS is_variant_null(parse_json(g))#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false, true)) AS parse_json(g)#0]
1+
Project [static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false, true, true)) AS parse_json(g)#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [static_invoke(SchemaOfVariant.schemaOfVariant(static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false, true)))) AS schema_of_variant(parse_json(g))#0]
1+
Project [static_invoke(SchemaOfVariant.schemaOfVariant(static_invoke(VariantExpressionEvalUtils.parseJson(g#0, false, true, true)))) AS schema_of_variant(parse_json(g))#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]

0 commit comments

Comments
 (0)