Skip to content

Commit 646735e

Browse files
authored
branch-4.1: [fix](variant) Avoid mutating shared variant columns (#64198)
cherry-pick #64092
1 parent 5ab0a91 commit 646735e

4 files changed

Lines changed: 178 additions & 29 deletions

File tree

be/src/core/data_type/data_type_variant.cpp

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,16 @@ bool DataTypeVariant::equals(const IDataType& rhs) const {
6262

6363
int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column,
6464
int be_exec_version) const {
65-
const auto& column_variant = assert_cast<const ColumnVariant&>(column);
66-
if (!column_variant.is_finalized()) {
67-
const_cast<ColumnVariant&>(column_variant).finalize();
65+
const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
66+
MutableColumnPtr finalized_column;
67+
if (!column_variant->is_finalized()) {
68+
// Local exchange can share the same block across downstream tasks. Serialize a private
69+
// finalized copy so serialization never mutates shared variant columns.
70+
finalized_column = column_variant->clone_finalized();
71+
column_variant = assert_cast<const ColumnVariant*>(finalized_column.get());
6872
}
6973

70-
const auto& subcolumns = column_variant.get_subcolumns();
74+
const auto& subcolumns = column_variant->get_subcolumns();
7175
size_t size = 0;
7276

7377
size += sizeof(uint32_t);
@@ -94,24 +98,28 @@ int64_t DataTypeVariant::get_uncompressed_serialized_bytes(const IColumn& column
9498
// sparse column
9599
// TODO make compability with sparse column
96100
size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
97-
*column_variant.get_sparse_column(), be_exec_version);
101+
*column_variant->get_sparse_column(), be_exec_version);
98102

99103
size += ColumnVariant::get_binary_column_type()->get_uncompressed_serialized_bytes(
100-
*column_variant.get_doc_value_column(), be_exec_version);
104+
*column_variant->get_doc_value_column(), be_exec_version);
101105
return size;
102106
}
103107

104108
char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_version) const {
105-
const auto& column_variant = assert_cast<const ColumnVariant&>(column);
106-
if (!column_variant.is_finalized()) {
107-
const_cast<ColumnVariant&>(column_variant).finalize();
109+
const auto* column_variant = assert_cast<const ColumnVariant*>(&column);
110+
MutableColumnPtr finalized_column;
111+
if (!column_variant->is_finalized()) {
112+
// Local exchange can share the same block across downstream tasks. Serialize a private
113+
// finalized copy so serialization never mutates shared variant columns.
114+
finalized_column = column_variant->clone_finalized();
115+
column_variant = assert_cast<const ColumnVariant*>(finalized_column.get());
108116
}
109117
#ifndef NDEBUG
110118
// DCHECK size
111-
column_variant.check_consistency();
119+
column_variant->check_consistency();
112120
#endif
113121

114-
const auto& subcolumns = column_variant.get_subcolumns();
122+
const auto& subcolumns = column_variant->get_subcolumns();
115123

116124
char* size_pos = buf;
117125
buf += sizeof(uint32_t);
@@ -144,15 +152,15 @@ char* DataTypeVariant::serialize(const IColumn& column, char* buf, int be_exec_v
144152
// Safe case
145153
unaligned_store<uint32_t>(size_pos, static_cast<UInt32>(num_of_columns));
146154
// serialize num of rows, only take effect when subcolumns empty
147-
unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant.rows()));
155+
unaligned_store<uint32_t>(buf, static_cast<UInt32>(column_variant->rows()));
148156
buf += sizeof(uint32_t);
149157

150158
// serialize sparse column
151159
// TODO make compability with sparse column
152-
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_sparse_column(),
153-
buf, be_exec_version);
154-
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant.get_doc_value_column(),
160+
buf = ColumnVariant::get_binary_column_type()->serialize(*column_variant->get_sparse_column(),
155161
buf, be_exec_version);
162+
buf = ColumnVariant::get_binary_column_type()->serialize(
163+
*column_variant->get_doc_value_column(), buf, be_exec_version);
156164
return buf;
157165
}
158166

be/src/exprs/function/cast/cast_to_variant.h

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,52 @@ inline Status cast_from_variant_impl(FunctionContext* context, Block& block,
3232
const auto& col_with_type_and_name = block.get_by_position(arguments[0]);
3333
const auto& col_from = col_with_type_and_name.column;
3434
const IColumn* variant_column = col_from.get();
35-
if (const auto* nullable = check_and_get_column<ColumnNullable>(*variant_column)) {
35+
const auto* nullable = check_and_get_column<ColumnNullable>(*variant_column);
36+
if (nullable != nullptr) {
3637
variant_column = &nullable->get_nested_column();
3738
}
38-
const auto& variant = assert_cast<const ColumnVariant&>(*variant_column);
39+
const auto* variant = assert_cast<const ColumnVariant*>(variant_column);
3940
ColumnPtr col_to = data_type_to->create_column();
4041

41-
if (!variant.is_finalized()) {
42-
// ColumnVariant should be finalized before parsing, finalize maybe modify original column structure
43-
variant.assume_mutable()->finalize();
42+
ColumnPtr finalized_input_column;
43+
if (!variant->is_finalized()) {
44+
// Local exchange can share the same input block across multiple downstream tasks.
45+
// Finalize a private copy so variant casts never mutate shared input columns.
46+
auto finalized_variant = variant->clone_finalized();
47+
variant = assert_cast<const ColumnVariant*>(finalized_variant.get());
48+
if (nullable != nullptr) {
49+
auto cloned_null_map =
50+
nullable->get_null_map_column_ptr()->clone_resized(input_rows_count);
51+
finalized_input_column = ColumnNullable::create(std::move(finalized_variant),
52+
std::move(cloned_null_map));
53+
} else {
54+
finalized_input_column = std::move(finalized_variant);
55+
}
4456
}
57+
auto execute_on_finalized_input = [&](auto&& executor) -> Status {
58+
if (!finalized_input_column) {
59+
return executor(block);
60+
}
61+
Block finalized_block = block;
62+
finalized_block.replace_by_position(arguments[0], finalized_input_column);
63+
RETURN_IF_ERROR(executor(finalized_block));
64+
block.replace_by_position(result, finalized_block.get_by_position(result).column);
65+
return Status::OK();
66+
};
4567

4668
// It's important to convert as many elements as possible in this context. For instance,
4769
// if the root of this variant column is a number column, converting it to a number column
4870
// is acceptable. However, if the destination type is a string and root is none scalar root, then
4971
// we should convert the entire tree to a string.
50-
bool is_root_valuable = variant.is_scalar_variant() ||
51-
(!variant.is_null_root() &&
52-
variant.get_root_type()->get_primitive_type() != INVALID_TYPE &&
72+
bool is_root_valuable = variant->is_scalar_variant() ||
73+
(!variant->is_null_root() &&
74+
variant->get_root_type()->get_primitive_type() != INVALID_TYPE &&
5375
!is_string_type(data_type_to->get_primitive_type()) &&
5476
data_type_to->get_primitive_type() != TYPE_JSONB);
5577

5678
if (is_root_valuable) {
57-
ColumnPtr nested = variant.get_root();
58-
auto nested_from_type = variant.get_root_type();
79+
ColumnPtr nested = variant->get_root();
80+
auto nested_from_type = variant->get_root_type();
5981
// DCHECK(nested_from_type->is_nullable());
6082
DCHECK(!data_type_to->is_nullable());
6183
auto new_context = context == nullptr ? nullptr : context->clone();
@@ -84,16 +106,21 @@ inline Status cast_from_variant_impl(FunctionContext* context, Block& block,
84106
{0, 1}, input_rows_count);
85107
}
86108
} else {
87-
if (variant.only_have_default_values()) {
109+
if (variant->only_have_default_values()) {
88110
col_to->assume_mutable()->insert_many_defaults(input_rows_count);
89111
col_to = make_nullable(col_to, true);
90112
} else if (is_string_type(data_type_to->get_primitive_type())) {
91113
// serialize to string
92-
return CastToStringFunction::execute_impl(context, block, arguments, result,
93-
input_rows_count);
114+
return execute_on_finalized_input([&](Block& finalized_block) {
115+
return CastToStringFunction::execute_impl(context, finalized_block, arguments,
116+
result, input_rows_count);
117+
});
94118
} else if (data_type_to->get_primitive_type() == TYPE_JSONB) {
95119
// serialize to json by parsing
96-
return cast_from_generic_to_jsonb(context, block, arguments, result, input_rows_count);
120+
return execute_on_finalized_input([&](Block& finalized_block) {
121+
return cast_from_generic_to_jsonb(context, finalized_block, arguments, result,
122+
input_rows_count);
123+
});
97124
} else if (!data_type_to->is_nullable() &&
98125
!is_string_type(data_type_to->get_primitive_type())) {
99126
// other types

be/test/core/column/column_variant_test.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727
#include <algorithm>
2828
#include <cstddef>
2929
#include <cstdint>
30+
#include <memory>
3031

32+
#include "agent/be_exec_version_manager.h"
3133
#include "common/cast_set.h"
34+
#include "core/block/block.h"
3235
#include "core/column/column_variant.cpp"
3336
#include "core/column/common_column_test.h"
3437
#include "core/column/subcolumn_tree.h"
@@ -40,9 +43,11 @@
4043
#include "core/types.h"
4144
#include "core/value/jsonb_value.h"
4245
#include "exec/common/variant_util.h"
46+
#include "gen_cpp/data.pb.h"
4347
#include "storage/olap_common.h"
4448
#include "testutil/test_util.h"
4549
#include "testutil/variant_util.h"
50+
#include "util/block_compression.h"
4651

4752
using namespace doris;
4853
namespace doris {
@@ -2039,6 +2044,58 @@ TEST_F(ColumnVariantTest, clone_finalized) {
20392044
test_func(std::move(cloned_object));
20402045
}
20412046

2047+
TEST_F(ColumnVariantTest, clone_finalized_deep_copies_columns) {
2048+
auto source_column = VariantUtil::construct_advanced_varint_column();
2049+
source_column->finalize(ColumnVariant::FinalizeMode::READ_MODE);
2050+
2051+
auto cloned = source_column->clone_finalized();
2052+
auto* cloned_variant = assert_cast<ColumnVariant*>(cloned.get());
2053+
EXPECT_TRUE(cloned_variant->is_finalized());
2054+
2055+
for (const auto& source_subcolumn : source_column->get_subcolumns()) {
2056+
const auto* cloned_subcolumn =
2057+
cloned_variant->get_subcolumns().find_exact(source_subcolumn->path);
2058+
ASSERT_NE(cloned_subcolumn, nullptr);
2059+
EXPECT_NE(source_subcolumn->data.get_finalized_column_ptr().get(),
2060+
cloned_subcolumn->data.get_finalized_column_ptr().get())
2061+
<< source_subcolumn->path.get_path();
2062+
}
2063+
EXPECT_NE(source_column->get_sparse_column().get(), cloned_variant->get_sparse_column().get());
2064+
EXPECT_NE(source_column->get_doc_value_column().get(),
2065+
cloned_variant->get_doc_value_column().get());
2066+
}
2067+
2068+
TEST_F(ColumnVariantTest, serialize_does_not_finalize_source_column) {
2069+
auto source_column = VariantUtil::construct_advanced_varint_column();
2070+
ASSERT_FALSE(source_column->is_finalized());
2071+
2072+
const int be_exec_version = BeExecVersionManager::get_newest_version();
2073+
const auto size =
2074+
dt_variant->get_uncompressed_serialized_bytes(*source_column, be_exec_version);
2075+
EXPECT_FALSE(source_column->is_finalized());
2076+
2077+
auto buffer = std::make_unique<char[]>(size);
2078+
dt_variant->serialize(*source_column, buffer.get(), be_exec_version);
2079+
EXPECT_FALSE(source_column->is_finalized());
2080+
}
2081+
2082+
TEST_F(ColumnVariantTest, block_serialize_does_not_finalize_source_column) {
2083+
auto source_column = VariantUtil::construct_advanced_varint_column();
2084+
ASSERT_FALSE(source_column->is_finalized());
2085+
2086+
Block block({{source_column->get_ptr(), dt_variant, "variant_col"}});
2087+
PBlock pblock;
2088+
size_t uncompressed_bytes = 0;
2089+
size_t compressed_bytes = 0;
2090+
int64_t compress_time = 0;
2091+
auto status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock,
2092+
&uncompressed_bytes, &compressed_bytes, &compress_time,
2093+
segment_v2::NO_COMPRESSION);
2094+
ASSERT_TRUE(status.ok()) << status;
2095+
EXPECT_FALSE(source_column->is_finalized());
2096+
EXPECT_GT(pblock.column_values().size(), 0);
2097+
}
2098+
20422099
TEST_F(ColumnVariantTest, sanitize) {
20432100
auto test_func = [](const auto& source_column) {
20442101
auto src_size = source_column->size();

be/test/exprs/function/cast/function_variant_cast_test.cpp

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,63 @@ TEST(FunctionVariantCast, CastFromVariant) {
288288
}
289289
}
290290

291+
TEST(FunctionVariantCast, CastFromVariantDoesNotFinalizeSourceColumn) {
292+
auto variant_type = std::make_shared<DataTypeVariant>();
293+
auto int32_type = std::make_shared<DataTypeInt32>();
294+
auto string_type = std::make_shared<DataTypeString>();
295+
auto variant_col = construct_basic_varint_column();
296+
297+
ASSERT_FALSE(variant_col->is_finalized());
298+
299+
{
300+
ColumnsWithTypeAndName arguments {{variant_col->get_ptr(), variant_type, "variant_col"},
301+
{nullptr, int32_type, "int32_type"}};
302+
303+
auto function =
304+
SimpleFunctionFactory::instance().get_function("CAST", arguments, int32_type);
305+
ASSERT_NE(function, nullptr);
306+
307+
Block block {arguments};
308+
size_t result_column = block.columns();
309+
block.insert({nullptr, int32_type, "result"});
310+
311+
RuntimeState state;
312+
auto ctx = FunctionContext::create_context(&state, {}, {});
313+
ASSERT_TRUE(
314+
function->execute(ctx.get(), block, {0}, result_column, variant_col->size()).ok());
315+
316+
EXPECT_FALSE(variant_col->is_finalized());
317+
318+
auto result_col = block.get_by_position(result_column).column;
319+
ASSERT_NE(result_col.get(), nullptr);
320+
ASSERT_EQ(result_col->size(), variant_col->size());
321+
}
322+
323+
{
324+
ColumnsWithTypeAndName arguments {{variant_col->get_ptr(), variant_type, "variant_col"},
325+
{nullptr, string_type, "string_type"}};
326+
327+
auto function =
328+
SimpleFunctionFactory::instance().get_function("CAST", arguments, string_type);
329+
ASSERT_NE(function, nullptr);
330+
331+
Block block {arguments};
332+
size_t result_column = block.columns();
333+
block.insert({nullptr, string_type, "result"});
334+
335+
RuntimeState state;
336+
auto ctx = FunctionContext::create_context(&state, {}, {});
337+
ASSERT_TRUE(
338+
function->execute(ctx.get(), block, {0}, result_column, variant_col->size()).ok());
339+
340+
EXPECT_FALSE(variant_col->is_finalized());
341+
342+
auto result_col = block.get_by_position(result_column).column;
343+
ASSERT_NE(result_col.get(), nullptr);
344+
ASSERT_EQ(result_col->size(), variant_col->size());
345+
}
346+
}
347+
291348
TEST(FunctionVariantCast, CastVariantWithNull) {
292349
auto variant_type = std::make_shared<DataTypeVariant>();
293350
auto int32_type = std::make_shared<DataTypeInt32>();

0 commit comments

Comments
 (0)