Skip to content

Commit 7fe4b3b

Browse files
windtalkergengliqi
andauthored
Support json_object pushdown (#10744)
close #10743 support json_object pushdown Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn> Co-authored-by: Liqi Geng <gengliqiii@gmail.com>
1 parent 090086e commit 7fe4b3b

7 files changed

Lines changed: 494 additions & 3 deletions

File tree

dbms/src/Flash/Coprocessor/DAGUtils.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ const std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
472472
//{tipb::ScalarFuncSig::JsonReplaceSig, "cast"},
473473
//{tipb::ScalarFuncSig::JsonRemoveSig, "cast"},
474474
//{tipb::ScalarFuncSig::JsonMergeSig, "cast"},
475-
//{tipb::ScalarFuncSig::JsonObjectSig, "cast"},
475+
{tipb::ScalarFuncSig::JsonObjectSig, "json_object"},
476476
{tipb::ScalarFuncSig::JsonArraySig, "json_array"},
477477
{tipb::ScalarFuncSig::JsonValidJsonSig, "json_valid_json"},
478478
{tipb::ScalarFuncSig::JsonValidOthersSig, "json_valid_others"},

dbms/src/Functions/FunctionsJson.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ void registerFunctionsJson(FunctionFactory & factory)
2424
factory.registerFunction<FunctionCastJsonAsString>();
2525
factory.registerFunction<FunctionJsonLength>();
2626
factory.registerFunction<FunctionJsonArray>();
27+
factory.registerFunction<FunctionJsonObject>();
2728
factory.registerFunction<FunctionCastJsonAsJson>();
2829
factory.registerFunction<FunctionCastRealAsJson>();
2930
factory.registerFunction<FunctionCastDecimalAsJson>();

dbms/src/Functions/FunctionsJson.h

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
#include <simdjson.h>
4242
#include <tipb/expression.pb.h>
4343

44+
#include <algorithm>
4445
#include <ext/range.h>
46+
#include <limits>
4547
#include <magic_enum.hpp>
4648
#include <string_view>
4749
#include <type_traits>
@@ -63,6 +65,8 @@ namespace DB
6365

6466
namespace ErrorCodes
6567
{
68+
extern const int ARGUMENT_OUT_OF_BOUND;
69+
extern const int BAD_ARGUMENTS;
6670
extern const int ILLEGAL_COLUMN;
6771
extern const int UNKNOWN_TYPE;
6872
} // namespace ErrorCodes
@@ -976,6 +980,208 @@ class FunctionJsonArray : public IFunction
976980
};
977981

978982

983+
class FunctionJsonObject : public IFunction
984+
{
985+
public:
986+
static constexpr auto name = "json_object";
987+
static FunctionPtr create(const Context &) { return std::make_shared<FunctionJsonObject>(); }
988+
989+
String getName() const override { return name; }
990+
991+
size_t getNumberOfArguments() const override { return 0; }
992+
993+
bool isVariadic() const override { return true; }
994+
995+
bool useDefaultImplementationForNulls() const override { return false; }
996+
bool useDefaultImplementationForConstants() const override { return true; }
997+
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
998+
{
999+
if (unlikely(arguments.size() % 2 != 0))
1000+
{
1001+
throw Exception(
1002+
fmt::format("Incorrect parameter count in the call to native function '{}'", getName()),
1003+
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
1004+
}
1005+
for (const auto arg_idx : ext::range(0, arguments.size()))
1006+
{
1007+
if (arg_idx % 2 == 0 && arguments[arg_idx]->onlyNull())
1008+
throw Exception("JSON documents may not contain NULL member names.", ErrorCodes::BAD_ARGUMENTS);
1009+
1010+
if (!arguments[arg_idx]->onlyNull())
1011+
{
1012+
const auto * arg = removeNullable(arguments[arg_idx]).get();
1013+
if (!arg->isStringOrFixedString())
1014+
throw Exception(
1015+
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
1016+
"Illegal type {} of argument {} of function {}",
1017+
arg->getName(),
1018+
arg_idx + 1,
1019+
getName());
1020+
}
1021+
}
1022+
return std::make_shared<DataTypeString>();
1023+
}
1024+
1025+
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const override
1026+
{
1027+
if (arguments.empty())
1028+
{
1029+
// clang-format off
1030+
const UInt8 empty_object_json_value[] = {
1031+
JsonBinary::TYPE_CODE_OBJECT, // object_type
1032+
0x0, 0x0, 0x0, 0x0, // element_count
1033+
0x8, 0x0, 0x0, 0x0}; // total_size
1034+
// clang-format on
1035+
auto empty_object_json = ColumnString::create();
1036+
empty_object_json->insertData(
1037+
reinterpret_cast<const char *>(empty_object_json_value),
1038+
sizeof(empty_object_json_value) / sizeof(UInt8));
1039+
block.getByPosition(result).column = ColumnConst::create(std::move(empty_object_json), block.rows());
1040+
return;
1041+
}
1042+
1043+
auto nested_block = createBlockWithNestedColumns(block, arguments);
1044+
StringSources sources;
1045+
for (auto column_number : arguments)
1046+
{
1047+
sources.push_back(
1048+
block.getByPosition(column_number).column->onlyNull()
1049+
? nullptr
1050+
: createDynamicStringSource(*nested_block.getByPosition(column_number).column));
1051+
}
1052+
1053+
auto rows = block.rows();
1054+
auto col_to = ColumnString::create();
1055+
auto & data_to = col_to->getChars();
1056+
auto & offsets_to = col_to->getOffsets();
1057+
offsets_to.resize(rows);
1058+
1059+
std::vector<const NullMap *> nullmaps;
1060+
nullmaps.reserve(sources.size());
1061+
bool is_input_nullable = false;
1062+
for (auto column_number : arguments)
1063+
{
1064+
const auto & col = block.getByPosition(column_number).column;
1065+
if (col->isColumnNullable())
1066+
{
1067+
const auto & column_nullable = static_cast<const ColumnNullable &>(*col);
1068+
nullmaps.push_back(&(column_nullable.getNullMapData()));
1069+
is_input_nullable = true;
1070+
}
1071+
else
1072+
{
1073+
nullmaps.push_back(nullptr);
1074+
}
1075+
}
1076+
1077+
if (is_input_nullable)
1078+
doExecuteImpl<true>(sources, rows, data_to, offsets_to, nullmaps);
1079+
else
1080+
doExecuteImpl<false>(sources, rows, data_to, offsets_to, nullmaps);
1081+
1082+
block.getByPosition(result).column = std::move(col_to);
1083+
}
1084+
1085+
private:
1086+
template <bool is_input_nullable>
1087+
static void doExecuteImpl(
1088+
StringSources & sources,
1089+
size_t rows,
1090+
ColumnString::Chars_t & data_to,
1091+
ColumnString::Offsets & offsets_to,
1092+
const std::vector<const NullMap *> & nullmaps)
1093+
{
1094+
struct JsonObjectEntry
1095+
{
1096+
StringRef key;
1097+
JsonBinary value;
1098+
size_t input_order;
1099+
};
1100+
1101+
const size_t pair_count = sources.size() / 2;
1102+
size_t reserve_size = rows * (1 + pair_count * 16);
1103+
for (const auto & source : sources)
1104+
reserve_size += source ? source->getSizeForReserve() : rows;
1105+
JsonBinary::JsonBinaryWriteBuffer write_buffer(data_to, reserve_size);
1106+
1107+
std::vector<JsonObjectEntry> entries;
1108+
std::vector<StringRef> keys;
1109+
std::vector<JsonBinary> values;
1110+
entries.reserve(pair_count);
1111+
keys.reserve(pair_count);
1112+
values.reserve(pair_count);
1113+
1114+
for (size_t i = 0; i < rows; ++i)
1115+
{
1116+
entries.clear();
1117+
for (size_t col = 0; col < sources.size(); col += 2)
1118+
{
1119+
if constexpr (is_input_nullable)
1120+
{
1121+
const auto * key_nullmap = nullmaps[col];
1122+
if (!sources[col] || (key_nullmap && (*key_nullmap)[i]))
1123+
throw Exception("JSON documents may not contain NULL member names.", ErrorCodes::BAD_ARGUMENTS);
1124+
}
1125+
1126+
assert(sources[col]);
1127+
const auto & key_from = sources[col]->getWhole();
1128+
if (unlikely(key_from.size > std::numeric_limits<UInt16>::max()))
1129+
throw Exception(
1130+
"TiDB/TiFlash does not yet support JSON objects with the key length >= 65536",
1131+
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
1132+
StringRef key{key_from.data, key_from.size};
1133+
1134+
JsonBinary value(JsonBinary::TYPE_CODE_LITERAL, StringRef(&JsonBinary::LITERAL_NIL, 1));
1135+
if constexpr (is_input_nullable)
1136+
{
1137+
const auto * value_nullmap = nullmaps[col + 1];
1138+
if (sources[col + 1] && !(value_nullmap && (*value_nullmap)[i]))
1139+
{
1140+
const auto & data_from = sources[col + 1]->getWhole();
1141+
value = JsonBinary(data_from.data[0], StringRef(&data_from.data[1], data_from.size - 1));
1142+
}
1143+
}
1144+
else
1145+
{
1146+
assert(sources[col + 1]);
1147+
const auto & data_from = sources[col + 1]->getWhole();
1148+
value = JsonBinary(data_from.data[0], StringRef(&data_from.data[1], data_from.size - 1));
1149+
}
1150+
1151+
entries.push_back({key, value, col >> 1});
1152+
}
1153+
1154+
std::sort(entries.begin(), entries.end(), [](const auto & lhs, const auto & rhs) {
1155+
return lhs.key == rhs.key ? lhs.input_order < rhs.input_order : lhs.key < rhs.key;
1156+
});
1157+
1158+
keys.clear();
1159+
values.clear();
1160+
for (size_t entry_idx = 0; entry_idx < entries.size();)
1161+
{
1162+
size_t last_idx = entry_idx;
1163+
while (last_idx + 1 < entries.size() && entries[last_idx + 1].key == entries[entry_idx].key)
1164+
++last_idx;
1165+
1166+
keys.push_back(entries[last_idx].key);
1167+
values.push_back(entries[last_idx].value);
1168+
entry_idx = last_idx + 1;
1169+
}
1170+
1171+
JsonBinary::buildBinaryJsonObjectInBuffer(keys, values, write_buffer);
1172+
writeChar(0, write_buffer);
1173+
offsets_to[i] = write_buffer.count();
1174+
1175+
for (const auto & source : sources)
1176+
{
1177+
if (source)
1178+
source->next();
1179+
}
1180+
}
1181+
}
1182+
};
1183+
1184+
9791185
class FunctionCastJsonAsJson : public IFunction
9801186
{
9811187
public:
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include <TestUtils/FunctionTestUtils.h>
16+
#include <TestUtils/TiFlashTestBasic.h>
17+
#include <TiDB/Schema/TiDBTypes.h>
18+
#include <gtest/gtest.h>
19+
20+
#include <limits>
21+
22+
namespace DB::ErrorCodes
23+
{
24+
extern const int ARGUMENT_OUT_OF_BOUND;
25+
extern const int BAD_ARGUMENTS;
26+
} // namespace DB::ErrorCodes
27+
28+
namespace DB::tests
29+
{
30+
class TestJsonObject : public DB::tests::FunctionTest
31+
{
32+
public:
33+
ColumnWithTypeAndName castStringToJson(const ColumnWithTypeAndName & column)
34+
{
35+
assert(removeNullable(column.type)->isString());
36+
ColumnsWithTypeAndName inputs{column};
37+
return executeFunction("cast_string_as_json", inputs, nullptr, true);
38+
}
39+
40+
ColumnWithTypeAndName executeFunctionWithCast(
41+
const ColumnNumbers & argument_column_numbers,
42+
const ColumnsWithTypeAndName & columns)
43+
{
44+
auto json_column = executeFunction("json_object", argument_column_numbers, columns);
45+
tipb::FieldType field_type;
46+
field_type.set_flen(-1);
47+
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
48+
field_type.set_tp(TiDB::TypeString);
49+
return executeCastJsonAsStringFunction(json_column, field_type);
50+
}
51+
};
52+
53+
template <typename Fn>
54+
void assertThrowsCode(Fn && fn, int expected_code)
55+
{
56+
try
57+
{
58+
fn();
59+
FAIL() << "Expected DB::Exception to be thrown";
60+
}
61+
catch (const Exception & e)
62+
{
63+
ASSERT_EQ(expected_code, e.code()) << e.displayText();
64+
}
65+
}
66+
67+
TEST_F(TestJsonObject, TestBasicSemantics)
68+
try
69+
{
70+
constexpr size_t rows_count = 2;
71+
72+
{
73+
ColumnsWithTypeAndName inputs{createColumn<String>({"placeholder", "placeholder"})};
74+
auto res = executeFunctionWithCast({}, inputs);
75+
ASSERT_COLUMN_EQ(createConstColumn<Nullable<String>>(rows_count, "{}"), res);
76+
}
77+
78+
{
79+
ColumnsWithTypeAndName inputs{
80+
createColumn<String>({"b", "b"}),
81+
castStringToJson(createColumn<String>({"1", "1"})),
82+
createColumn<String>({"a", "a"}),
83+
castStringToJson(createColumn<Nullable<String>>({{}, "\"x\""})),
84+
};
85+
auto res = executeFunctionWithCast({0, 1, 2, 3}, inputs);
86+
auto expect = createColumn<Nullable<String>>({R"({"a": null, "b": 1})", R"({"a": "x", "b": 1})"});
87+
ASSERT_COLUMN_EQ(expect, res);
88+
}
89+
90+
{
91+
ColumnsWithTypeAndName inputs{
92+
createConstColumn<String>(rows_count, "dup"),
93+
castStringToJson(createConstColumn<String>(rows_count, "1")),
94+
createConstColumn<String>(rows_count, "dup"),
95+
castStringToJson(createColumn<String>({"2", "3"})),
96+
};
97+
auto res = executeFunctionWithCast({0, 1, 2, 3}, inputs);
98+
auto expect = createColumn<Nullable<String>>({R"({"dup": 2})", R"({"dup": 3})"});
99+
ASSERT_COLUMN_EQ(expect, res);
100+
}
101+
}
102+
CATCH
103+
104+
TEST_F(TestJsonObject, TestErrors)
105+
try
106+
{
107+
ASSERT_THROW(executeFunction("json_object", {createColumn<String>({"a"})}), Exception);
108+
109+
auto value = castStringToJson(createColumn<String>({"1"}));
110+
assertThrowsCode(
111+
[&] {
112+
executeFunction("json_object", {createOnlyNullColumn(1), value});
113+
},
114+
ErrorCodes::BAD_ARGUMENTS);
115+
116+
assertThrowsCode(
117+
[&] {
118+
executeFunction("json_object", {createColumn<Nullable<String>>({{}}), value});
119+
},
120+
ErrorCodes::BAD_ARGUMENTS);
121+
122+
String too_long_key(std::numeric_limits<UInt16>::max() + 1, 'a');
123+
assertThrowsCode(
124+
[&] {
125+
executeFunction("json_object", {createColumn<String>({too_long_key}), value});
126+
},
127+
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
128+
}
129+
CATCH
130+
131+
} // namespace DB::tests

0 commit comments

Comments
 (0)