Skip to content

Commit e2aaee4

Browse files
csun5285claude
andauthored
[fix](serde) match STRUCT sub-fields by name when loading JSON (#64011)
Stream Load into a STRUCT column reads each value as a string and converts it with DataTypeStructSerDe::from_string (CAST varchar -> struct). That path matched sub-fields by position, so JSON keys whose order differed from the DDL turned the whole struct column into NULL, and a row missing a field failed to load. doc: apache/doris-website#3907 ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [x] Regression test - [x] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 3ab5d8b commit e2aaee4

8 files changed

Lines changed: 263 additions & 32 deletions

File tree

be/src/core/data_type_serde/data_type_struct_serde.cpp

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include "core/data_type_serde/data_type_struct_serde.h"
1919

20+
#include <algorithm>
21+
2022
#include "arrow/array/builder_nested.h"
2123
#include "common/status.h"
2224
#include "core/column/column.h"
@@ -516,21 +518,17 @@ Status DataTypeStructSerDe::_from_string(StringRef& str, IColumn& column,
516518

517519
const auto elem_size = elem_serdes_ptrs.size();
518520

519-
std::vector<StringRef> field_value;
520-
// check syntax error
521-
if (split_result.size() == elem_size) {
522-
// no field name
523-
for (int i = 0; i < split_result.size(); i++) {
524-
if (i != split_result.size() - 1 &&
525-
split_result[i].delimiter != options.collection_delim) {
526-
return Status::InvalidArgument(
527-
"Struct field value {} is not separated by collection_delim.", i);
528-
}
529-
field_value.push_back(split_result[i].element);
530-
}
531-
} else if (split_result.size() == 2 * elem_size) {
532-
// field name : field value
533-
int field_pos = 0;
521+
std::vector<StringRef> field_value(elem_size);
522+
std::vector<bool> got(elem_size, false);
523+
524+
// Named mode is detected by the delimiter structure (the first token is followed by a
525+
// map_key_delim, e.g. {f1:1,f2:2}), so it also covers the case where only some fields are
526+
// provided. In named mode the input field order may differ from the schema order and
527+
// missing nullable fields are filled with NULL. Otherwise the tokens are matched positionally.
528+
bool named_mode = !split_result.empty() && (split_result.size() % 2 == 0) &&
529+
split_result[0].delimiter == options.map_key_delim;
530+
531+
if (named_mode) {
534532
for (int i = 0; i < split_result.size(); i += 2) {
535533
if (split_result[i].delimiter != options.map_key_delim) {
536534
return Status::InvalidArgument(
@@ -540,18 +538,37 @@ Status DataTypeStructSerDe::_from_string(StringRef& str, IColumn& column,
540538
return Status::InvalidArgument(
541539
"Struct name-value pair does not have collection delimiter");
542540
}
543-
if (field_pos >= elem_size) {
544-
return Status::InvalidArgument(
545-
"Struct field number is more than schema field number");
546-
}
547541
auto field_name = split_result[i].element.trim_quote();
548-
549-
if (!field_name.eq(StringRef(elem_names[field_pos]))) {
550-
return Status::InvalidArgument("Cannot find struct field name {} in schema.",
551-
split_result[i].element.to_string());
542+
// struct field names are stored lower-cased, so lower-case the input key for a
543+
// case-insensitive match (consistent with the simdjson JSON reader).
544+
std::string lower_name(field_name.data, field_name.size);
545+
std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(), ::tolower);
546+
auto name_it = std::find(elem_names.begin(), elem_names.end(), lower_name);
547+
if (name_it == elem_names.end()) {
548+
// the input key is not a field of this struct
549+
if constexpr (is_strict_mode) {
550+
// strict CAST treats an unmatched key as bad input and fails
551+
return Status::InvalidArgument("Cannot find struct field name {} in schema.",
552+
split_result[i].element.to_string());
553+
}
554+
// non-strict load tolerates it: drop the extra key, matching the simdjson
555+
// JSON reader that feeds STRUCT columns on JSON stream load
556+
continue;
552557
}
553-
field_value.push_back(split_result[i + 1].element);
554-
field_pos++;
558+
size_t target = name_it - elem_names.begin();
559+
field_value[target] = split_result[i + 1].element;
560+
got[target] = true;
561+
}
562+
} else if (split_result.size() == elem_size) {
563+
// no field name, matched positionally
564+
for (int i = 0; i < split_result.size(); i++) {
565+
if (i != split_result.size() - 1 &&
566+
split_result[i].delimiter != options.collection_delim) {
567+
return Status::InvalidArgument(
568+
"Struct field value {} is not separated by collection_delim.", i);
569+
}
570+
field_value[i] = split_result[i].element;
571+
got[i] = true;
555572
}
556573
} else {
557574
return Status::InvalidArgument(
@@ -560,6 +577,12 @@ Status DataTypeStructSerDe::_from_string(StringRef& str, IColumn& column,
560577
}
561578

562579
for (int field_pos = 0; field_pos < elem_size; ++field_pos) {
580+
if (!got[field_pos]) {
581+
// a missing field is filled with NULL (struct sub-columns are always nullable,
582+
// same as the empty-struct '{}' handling above)
583+
struct_column.get_column(field_pos).insert_default();
584+
continue;
585+
}
563586
// Previously, there was rollback logic here in case of errors, similar to the logic in deserialize_one_cell_from_json.
564587
// But it's not necessary here.
565588
// If it is non-strict mode, the internal type is Nullable, and Nullable will handle errors itself.

be/test/core/data_type_serde/data_type_serde_struct_test.cpp

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@
4242
#include "core/data_type/data_type_string.h"
4343
#include "core/data_type/data_type_struct.h"
4444
#include "core/data_type/define_primitive_type.h"
45+
#include "core/data_type/primitive_type.h"
4546
#include "core/field.h"
47+
#include "core/string_buffer.hpp"
4648
#include "core/types.h"
4749
#include "storage/olap_common.h"
4850
#include "testutil/test_util.h"
@@ -157,4 +159,105 @@ TEST_F(DataTypeStructSerDeTest, ArrowMemNotAligned) {
157159
EXPECT_TRUE(st.ok());
158160
}
159161

162+
// Regression test for OPENSOURCE-374: from_string (used by CAST string->struct, which is the
163+
// stream-load JSON path) must match struct sub-fields by name, tolerate missing fields, and
164+
// keep working for positional input. This covers every branch of DataTypeStructSerDe::_from_string.
165+
TEST_F(DataTypeStructSerDeTest, FromStringByFieldName) {
166+
DataTypePtr f1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
167+
DataTypePtr f2 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat32>());
168+
DataTypePtr f3 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
169+
DataTypePtr st =
170+
std::make_shared<DataTypeStruct>(DataTypes {f1, f2, f3}, Strings {"f1", "f2", "f3"});
171+
auto serde = st->get_serde(1);
172+
DataTypeSerDe::FormatOptions opt;
173+
174+
auto from_string = [&](const std::string& s, MutableColumnPtr& col) -> Status {
175+
col = st->create_column();
176+
std::string buf = s;
177+
StringRef ref(buf.data(), buf.size());
178+
return serde->from_string(ref, *col, opt);
179+
};
180+
// to_string is the inverse of from_string and always writes fields in schema order, so the
181+
// input field order cancels out and we can compare results structurally.
182+
auto to_string = [&](const MutableColumnPtr& col) -> std::string {
183+
auto out = ColumnString::create();
184+
VectorBufferWriter bw(*out);
185+
serde->to_string(*col, 0, bw, opt);
186+
bw.commit();
187+
auto s = out->get_data_at(0);
188+
return std::string(s.data, s.size);
189+
};
190+
auto has = [](const Status& st, const std::string& sub) {
191+
return st.to_string().find(sub) != std::string::npos;
192+
};
193+
194+
MutableColumnPtr ordered;
195+
MutableColumnPtr c;
196+
ASSERT_TRUE(from_string(R"({"f1":1,"f2":2.5,"f3":"a"})", ordered).ok());
197+
198+
// 1) out-of-order keys are matched by name (the core fix)
199+
ASSERT_TRUE(from_string(R"({"f2":2.5,"f1":1,"f3":"a"})", c).ok());
200+
EXPECT_EQ(to_string(ordered), to_string(c));
201+
202+
// 2) field names are matched case-insensitively (input is lower-cased before lookup)
203+
ASSERT_TRUE(from_string(R"({"F2":2.5,"F1":1,"F3":"a"})", c).ok());
204+
EXPECT_EQ(to_string(ordered), to_string(c));
205+
206+
// 3) a missing field is filled with NULL (named mode tolerates fewer fields)
207+
MutableColumnPtr with_null;
208+
ASSERT_TRUE(from_string(R"({"f1":1,"f2":2.5,"f3":null})", with_null).ok());
209+
ASSERT_TRUE(from_string(R"({"f2":2.5,"f1":1})", c).ok());
210+
EXPECT_EQ(to_string(with_null), to_string(c));
211+
212+
// 4) positional input (no field names) still works
213+
ASSERT_TRUE(from_string(R"({1,2.5,"a"})", c).ok());
214+
EXPECT_EQ(to_string(ordered), to_string(c));
215+
216+
// 5) empty struct '{}' yields all-NULL fields
217+
MutableColumnPtr empty;
218+
ASSERT_TRUE(from_string("{}", empty).ok());
219+
ASSERT_TRUE(from_string(R"({"f1":null,"f2":null,"f3":null})", c).ok());
220+
EXPECT_EQ(to_string(c), to_string(empty));
221+
222+
// 6) an unknown field is ignored, missing schema fields become NULL (consistent with the
223+
// simdjson reader and PostgreSQL/Spark/Trino)
224+
MutableColumnPtr f2_null;
225+
ASSERT_TRUE(from_string(R"({"f1":1,"f2":null,"f3":"a"})", f2_null).ok());
226+
ASSERT_TRUE(from_string(R"({"f1":1,"fx":2.5,"f3":"a"})", c).ok());
227+
EXPECT_EQ(to_string(f2_null), to_string(c));
228+
229+
// 7) extra named fields beyond the schema are ignored (4 fields into a 3-field struct)
230+
ASSERT_TRUE(from_string(R"({"f1":1,"f2":2.5,"f3":"a","f4":9})", c).ok());
231+
EXPECT_EQ(to_string(ordered), to_string(c));
232+
233+
// --- error paths (each exercises a distinct branch / message) ---
234+
// 8) name-value pair missing the collection delimiter, e.g. {"f1":1:"f2":2}
235+
Status e = from_string(R"({"f1":1:"f2":2})", c);
236+
EXPECT_FALSE(e.ok());
237+
EXPECT_TRUE(has(e, "does not have collection delimiter"));
238+
239+
// 9) positional input whose count does not match the schema is rejected (too few or too
240+
// many) -- without field names the arity must be exact, same as PG/Spark/Trino
241+
e = from_string(R"({1,2.5})", c); // 2 values into a 3-field struct
242+
EXPECT_FALSE(e.ok());
243+
EXPECT_TRUE(has(e, "not equal to schema field number"));
244+
e = from_string(R"({1,2.5,a,9})", c); // 4 values into a 3-field struct
245+
EXPECT_FALSE(e.ok());
246+
EXPECT_TRUE(has(e, "not equal to schema field number"));
247+
248+
// 10) positional value not separated by the collection delimiter, e.g. {1,2.5:3}
249+
e = from_string(R"({1,2.5:x})", c);
250+
EXPECT_FALSE(e.ok());
251+
EXPECT_TRUE(has(e, "not separated by collection_delim"));
252+
253+
// 11) bad framing (missing braces)
254+
EXPECT_FALSE(from_string(R"("f1":1)", c).ok());
255+
256+
// 12) strict mode propagates a sub-field parse error
257+
auto sc = st->create_column();
258+
std::string sbuf = R"({"f1":"notanint","f2":2.5,"f3":"a"})";
259+
StringRef sref(sbuf.data(), sbuf.size());
260+
EXPECT_FALSE(serde->from_string_strict_mode(sref, *sc, opt).ok());
261+
}
262+
160263
} // namespace doris
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[
2+
{"id": 1, "c_struct": {"f1": 10, "f2": 3.14, "f3": "Emily"}},
3+
{"id": 2, "c_struct": {"f1": 4, "f2": 1.5, "f3": null}},
4+
{"id": 3, "c_struct": {"f1": 7, "f2": null, "f3": "Benjamin"}},
5+
{"id": 4, "c_struct": {}},
6+
{"id": 5, "c_struct": null}
7+
]
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[
2+
{"id": 20, "c_struct": {"f2": 1.5, "f1": 4, "f3": null}},
3+
{"id": 21, "c_struct": {"f3": "Tom", "f2": 2.5, "f1": 9}},
4+
{"id": 22, "c_struct": {"f2": 8.5, "f1": 1}},
5+
{"id": 23, "c_struct": {"f1": 7, "f2": 3.5, "f3": "Z", "f4": 999}},
6+
{"id": 24, "c_struct": {"F2": 6.5, "F1": 5, "F3": "U"}}
7+
]

regression-test/data/load_p0/stream_load/test_stream_load.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@
102102
3 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.100000}
103103
4 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
104104
5 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
105-
6 \N
106-
7 \N
107-
8 \N
105+
6 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
106+
7 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
107+
8 {"f1":1, "f2":null, "f3":null, "f4":null, "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
108108
9 {"f1":null, "f2":null, "f3":null, "f4":null, "f5":null, "f6":null, "f7":null, "f8":null, "f9":null, "f10":null}
109109
10 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
110110
11 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.100000}

regression-test/data/load_p0/stream_load/test_stream_load_move_memtable.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,9 @@
8787
3 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.100000}
8888
4 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
8989
5 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
90-
6 \N
91-
7 \N
92-
8 \N
90+
6 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
91+
7 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
92+
8 {"f1":1, "f2":null, "f3":null, "f4":null, "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
9393
9 {"f1":null, "f2":null, "f3":null, "f4":null, "f5":null, "f6":null, "f7":null, "f8":null, "f9":null, "f10":null}
9494
10 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
9595
11 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.100000}

regression-test/data/query_p0/sql_functions/cast_function/test_cast_struct.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,5 @@
4242
{"f1":1, "f2":"2022-10-10 00:00:00"}
4343

4444
-- !sql15 --
45-
\N
45+
{"a":1, "b":1}
4646

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// Regression test for OPENSOURCE-374: when loading JSON into a STRUCT column, sub-fields must
19+
// be matched by field name. Out-of-order JSON keys in Stream Load used to turn the whole struct
20+
// column into NULL; a missing field is now filled with NULL and an unknown field is ignored
21+
// (consistent with PostgreSQL / Spark / Trino). Struct-to-struct CAST stays positional, which
22+
// also matches those engines.
23+
suite("test_struct_field_align") {
24+
def tableName = "test_struct_field_align"
25+
26+
sql "DROP TABLE IF EXISTS ${tableName}"
27+
sql """
28+
CREATE TABLE ${tableName} (
29+
id INT NOT NULL,
30+
c_struct STRUCT<f1:INT,f2:FLOAT,f3:STRING> NULL
31+
)
32+
UNIQUE KEY(id)
33+
DISTRIBUTED BY HASH(id) BUCKETS 1
34+
PROPERTIES ("replication_allocation" = "tag.location.default: 1")
35+
"""
36+
37+
def doStreamLoad = { fileName ->
38+
streamLoad {
39+
table tableName
40+
set 'format', 'json'
41+
set 'strip_outer_array', 'true'
42+
set 'columns', 'id, c_struct'
43+
file fileName
44+
time 10000
45+
check { result, exception, startTime, endTime ->
46+
if (exception != null) {
47+
throw exception
48+
}
49+
def json = parseJson(result)
50+
assertEquals("success", json.Status.toLowerCase())
51+
assertEquals(0, json.NumberFilteredRows)
52+
assertTrue(json.NumberLoadedRows > 0)
53+
}
54+
}
55+
}
56+
57+
// 1) ordered keys (baseline)
58+
doStreamLoad "test_struct_field_align_ordered.json"
59+
// 2) keys whose order differs from the DDL (the core bug) and a row that omits f3
60+
doStreamLoad "test_struct_field_align_swapped.json"
61+
62+
sql "sync"
63+
64+
def rows = sql "SELECT id, c_struct FROM ${tableName} ORDER BY id"
65+
def actual = [:]
66+
for (row in rows) {
67+
actual[row[0] as int] = (row[1] == null ? "NULL" : row[1].toString())
68+
}
69+
70+
def expected = [
71+
1 : '{"f1":10, "f2":3.14, "f3":"Emily"}',
72+
2 : '{"f1":4, "f2":1.5, "f3":null}',
73+
3 : '{"f1":7, "f2":null, "f3":"Benjamin"}',
74+
4 : '{"f1":null, "f2":null, "f3":null}',
75+
5 : 'NULL',
76+
// swapped JSON keys must align by name instead of producing NULL
77+
20 : '{"f1":4, "f2":1.5, "f3":null}',
78+
21 : '{"f1":9, "f2":2.5, "f3":"Tom"}',
79+
// f3 omitted -> filled with NULL
80+
22 : '{"f1":1, "f2":8.5, "f3":null}',
81+
// an unknown field (f4) is ignored, the matched fields still align by name
82+
23 : '{"f1":7, "f2":3.5, "f3":"Z"}',
83+
// upper-case JSON keys are matched case-insensitively to the lower-cased schema names
84+
24 : '{"f1":5, "f2":6.5, "f3":"U"}',
85+
]
86+
87+
assertEquals(expected.size(), actual.size())
88+
for (e in expected) {
89+
assertEquals(e.value, actual[e.key], "row id=${e.key} mismatch".toString())
90+
}
91+
}

0 commit comments

Comments
 (0)