Skip to content

Commit e8af7ef

Browse files
authored
fix: check data batch nullability match with schema in write process (#122)
1 parent 17ab0c3 commit e8af7ef

8 files changed

Lines changed: 368 additions & 34 deletions

File tree

include/paimon/file_store_write.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class PAIMON_EXPORT FileStoreWrite {
4747
virtual ~FileStoreWrite() = default;
4848

4949
/// Support write an input `RecordBatch` to internal buffer or file.
50+
/// @note If a field in table schema is marked as non-nullable (`nullable = false`),
51+
/// the corresponding array in `batch` must have zero null entries.
5052
virtual Status Write(std::unique_ptr<RecordBatch>&& batch) = 0;
5153

5254
/// Generate a list of commit messages with the latest generated data file meta

include/paimon/record_batch.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,17 @@ class PAIMON_EXPORT RecordBatch {
9393
/// various properties such as data, row kinds, partition information, and bucket id.
9494
class PAIMON_EXPORT RecordBatchBuilder {
9595
public:
96-
/// Constructs a `RecordBatchBuilder` with Arrow data.
97-
/// @param data Arrow array containing the record data.
96+
/// Constructs a `RecordBatchBuilder` with Arrow data
97+
///
98+
/// @note The `data` must conform to table schema:
99+
/// - Each array in `data` corresponds to a field in table schema.
100+
/// - If a field in table schema is marked as non-nullable (`nullable = false`),
101+
/// the corresponding array in `data` must have zero null entries.
102+
///
103+
/// @note Consistency between `data` and table schema will be validated during the write
104+
/// process.
105+
///
106+
/// @param data ArrowArray struct containing the columnar data (via C Data Interface)
98107
explicit RecordBatchBuilder(::ArrowArray* data);
99108

100109
~RecordBatchBuilder();

src/paimon/common/utils/arrow/arrow_utils.h

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class ArrowUtils {
3030
~ArrowUtils() = delete;
3131

3232
static Result<std::shared_ptr<arrow::Schema>> DataTypeToSchema(
33-
const std::shared_ptr<::arrow::DataType>& data_type) {
33+
const std::shared_ptr<arrow::DataType>& data_type) {
3434
if (data_type->id() != arrow::Type::STRUCT) {
3535
return Status::Invalid(fmt::format("Expected struct data type, actual data type: {}",
3636
data_type->ToString()));
@@ -40,8 +40,7 @@ class ArrowUtils {
4040
}
4141

4242
static Result<std::vector<int32_t>> CreateProjection(
43-
const std::shared_ptr<::arrow::Schema>& file_schema,
44-
const arrow::FieldVector& read_fields) {
43+
const std::shared_ptr<arrow::Schema>& file_schema, const arrow::FieldVector& read_fields) {
4544
std::vector<int32_t> target_to_src_mapping;
4645
target_to_src_mapping.reserve(read_fields.size());
4746
for (const auto& field : read_fields) {
@@ -54,6 +53,54 @@ class ArrowUtils {
5453
}
5554
return target_to_src_mapping;
5655
}
56+
57+
static Status CheckNullabilityMatch(const std::shared_ptr<arrow::Schema>& schema,
58+
const std::shared_ptr<arrow::Array>& data) {
59+
auto struct_array = arrow::internal::checked_pointer_cast<arrow::StructArray>(data);
60+
if (struct_array->num_fields() != schema->num_fields()) {
61+
return Status::Invalid(fmt::format(
62+
"CheckNullabilityMatch failed, data field count {} mismatch schema field count {}",
63+
struct_array->num_fields(), schema->num_fields()));
64+
}
65+
for (int32_t i = 0; i < schema->num_fields(); i++) {
66+
PAIMON_RETURN_NOT_OK(
67+
InnerCheckNullabilityMatch(schema->field(i), struct_array->field(i)));
68+
}
69+
return Status::OK();
70+
}
71+
72+
private:
73+
static Status InnerCheckNullabilityMatch(const std::shared_ptr<arrow::Field>& field,
74+
const std::shared_ptr<arrow::Array>& data) {
75+
if (PAIMON_UNLIKELY(!field->nullable() && data->null_count() != 0)) {
76+
return Status::Invalid(fmt::format(
77+
"CheckNullabilityMatch failed, field {} not nullable while data have null value",
78+
field->name()));
79+
}
80+
auto type = field->type();
81+
if (type->id() == arrow::Type::STRUCT) {
82+
auto struct_type =
83+
arrow::internal::checked_pointer_cast<arrow::StructType>(field->type());
84+
auto struct_array = arrow::internal::checked_pointer_cast<arrow::StructArray>(data);
85+
for (int32_t i = 0; i < struct_type->num_fields(); ++i) {
86+
PAIMON_RETURN_NOT_OK(
87+
InnerCheckNullabilityMatch(struct_type->field(i), struct_array->field(i)));
88+
}
89+
} else if (type->id() == arrow::Type::LIST) {
90+
auto list_type = arrow::internal::checked_pointer_cast<arrow::ListType>(field->type());
91+
auto list_array = arrow::internal::checked_pointer_cast<arrow::ListArray>(data);
92+
PAIMON_RETURN_NOT_OK(
93+
InnerCheckNullabilityMatch(list_type->value_field(), list_array->values()));
94+
} else if (type->id() == arrow::Type::MAP) {
95+
auto map_type = arrow::internal::checked_pointer_cast<arrow::MapType>(field->type());
96+
auto map_array = arrow::internal::checked_pointer_cast<arrow::MapArray>(data);
97+
PAIMON_RETURN_NOT_OK(
98+
InnerCheckNullabilityMatch(map_type->key_field(), map_array->keys()));
99+
PAIMON_RETURN_NOT_OK(
100+
InnerCheckNullabilityMatch(map_type->item_field(), map_array->items()));
101+
}
102+
return Status::OK();
103+
}
57104
};
58105

59106
} // namespace paimon

src/paimon/common/utils/arrow/arrow_utils_test.cpp

Lines changed: 230 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717
#include "paimon/common/utils/arrow/arrow_utils.h"
1818

1919
#include "arrow/api.h"
20+
#include "arrow/ipc/api.h"
2021
#include "gtest/gtest.h"
22+
#include "paimon/common/types/data_field.h"
2123
#include "paimon/testing/utils/testharness.h"
22-
2324
namespace paimon::test {
2425

2526
TEST(ArrowUtilsTest, TestCreateProjection) {
@@ -107,4 +108,232 @@ TEST(ArrowUtilsTest, TestCreateProjection) {
107108
}
108109
}
109110

111+
TEST(ArrowUtilsTest, TestCheckNullableMatchSimple) {
112+
auto field = arrow::field("column1", arrow::int32(), /*nullable=*/false);
113+
auto schema = arrow::schema({field});
114+
{
115+
std::shared_ptr<arrow::Array> array =
116+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({field}), R"([
117+
[20],
118+
[null],
119+
[10]
120+
])")
121+
.ValueOrDie();
122+
123+
ASSERT_NOK_WITH_MSG(
124+
ArrowUtils::CheckNullabilityMatch(schema, array),
125+
"CheckNullabilityMatch failed, field column1 not nullable while data have null value");
126+
}
127+
{
128+
std::shared_ptr<arrow::Array> array =
129+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({field}), R"([
130+
[20],
131+
[10]
132+
])")
133+
.ValueOrDie();
134+
135+
ASSERT_OK(ArrowUtils::CheckNullabilityMatch(schema, array));
136+
}
137+
}
138+
139+
TEST(ArrowUtilsTest, TestCheckNullableMatchWithStruct) {
140+
auto child1 = arrow::field("child1", arrow::int32(), /*nullable=*/false);
141+
auto child2 = arrow::field("child2", arrow::float64(), /*nullable=*/true);
142+
auto struct_field =
143+
arrow::field("parent", arrow::struct_({child1, child2}), /*nullable=*/false);
144+
auto schema = arrow::schema({struct_field});
145+
{
146+
std::shared_ptr<arrow::Array> array =
147+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({struct_field}), R"([
148+
[null]
149+
])")
150+
.ValueOrDie();
151+
ASSERT_NOK_WITH_MSG(
152+
ArrowUtils::CheckNullabilityMatch(schema, array),
153+
"CheckNullabilityMatch failed, field parent not nullable while data have null value");
154+
}
155+
{
156+
std::shared_ptr<arrow::Array> array =
157+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({struct_field}), R"([
158+
[[1, null]],
159+
[[null, 10.0]]
160+
])")
161+
.ValueOrDie();
162+
ASSERT_NOK_WITH_MSG(
163+
ArrowUtils::CheckNullabilityMatch(schema, array),
164+
"CheckNullabilityMatch failed, field child1 not nullable while data have null value");
165+
}
166+
{
167+
std::shared_ptr<arrow::Array> array =
168+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({struct_field}), R"([
169+
[[1, null]],
170+
[[2, 10.0]]
171+
])")
172+
.ValueOrDie();
173+
ASSERT_OK(ArrowUtils::CheckNullabilityMatch(schema, array));
174+
}
175+
}
176+
177+
TEST(ArrowUtilsTest, TestCheckNullableMatchWithList) {
178+
auto value_field = arrow::field("value", arrow::int32(), /*nullable=*/false);
179+
auto list_field = arrow::field("list_column", arrow::list(value_field), /*nullable=*/false);
180+
auto schema = arrow::schema({list_field});
181+
182+
{
183+
std::shared_ptr<arrow::Array> array =
184+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({list_field}), R"([
185+
[[1, 2, null, 4, 5]],
186+
[null]
187+
])")
188+
.ValueOrDie();
189+
ASSERT_NOK_WITH_MSG(ArrowUtils::CheckNullabilityMatch(schema, array),
190+
"CheckNullabilityMatch failed, field list_column not nullable while "
191+
"data have null value");
192+
}
193+
{
194+
std::shared_ptr<arrow::Array> array =
195+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({list_field}), R"([
196+
[[1, 2, null, 4, 5]]
197+
])")
198+
.ValueOrDie();
199+
ASSERT_NOK_WITH_MSG(
200+
ArrowUtils::CheckNullabilityMatch(schema, array),
201+
"CheckNullabilityMatch failed, field value not nullable while data have null value");
202+
}
203+
{
204+
std::shared_ptr<arrow::Array> array =
205+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({list_field}), R"([
206+
[[1, 2, 3, 4, 5]]
207+
])")
208+
.ValueOrDie();
209+
ASSERT_OK(ArrowUtils::CheckNullabilityMatch(schema, array));
210+
}
211+
}
212+
213+
TEST(ArrowUtilsTest, TestCheckNullableMatchWithMap) {
214+
auto key_field = arrow::field("key", arrow::int32(), /*nullable=*/false);
215+
auto value_field = arrow::field("value", arrow::int32(), /*nullable=*/true);
216+
auto map_type = std::make_shared<arrow::MapType>(key_field, value_field);
217+
auto map_field = arrow::field("map_column", map_type, /*nullable=*/false);
218+
auto schema = arrow::schema({map_field});
219+
220+
{
221+
std::shared_ptr<arrow::Array> array =
222+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({map_field}), R"([
223+
[null]
224+
])")
225+
.ValueOrDie();
226+
ASSERT_NOK_WITH_MSG(ArrowUtils::CheckNullabilityMatch(schema, array),
227+
"CheckNullabilityMatch failed, field map_column not nullable while "
228+
"data have null value");
229+
}
230+
{
231+
std::shared_ptr<arrow::Array> array =
232+
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({map_field}), R"([
233+
[[[1, null]]]
234+
])")
235+
.ValueOrDie();
236+
ASSERT_OK(ArrowUtils::CheckNullabilityMatch(schema, array));
237+
}
238+
}
239+
240+
TEST(ArrowUtilsTest, TestCheckNullableMatchComplex) {
241+
auto key_field = arrow::field("key", arrow::int32(), /*nullable=*/false);
242+
auto value_field = arrow::field("value", arrow::int32(), /*nullable=*/false);
243+
244+
auto inner_child1 =
245+
arrow::field("inner1",
246+
arrow::map(arrow::utf8(), arrow::field("inner_list", arrow::list(value_field),
247+
/*nullable=*/true)),
248+
/*nullable=*/false);
249+
auto inner_child2 = arrow::field(
250+
"inner2",
251+
arrow::map(arrow::utf8(), arrow::field("inner_map", arrow::map(arrow::utf8(), value_field),
252+
/*nullable=*/true)),
253+
/*nullable=*/false);
254+
auto inner_child3 = arrow::field(
255+
"inner3",
256+
arrow::map(arrow::utf8(),
257+
arrow::field("inner_struct", arrow::struct_({key_field, value_field}),
258+
/*nullable=*/true)),
259+
/*nullable=*/false);
260+
261+
auto schema = arrow::schema({inner_child1, inner_child2, inner_child3});
262+
// test inner1
263+
{
264+
std::shared_ptr<arrow::Array> array =
265+
arrow::ipc::internal::json::ArrayFromJSON(
266+
arrow::struct_({inner_child1, inner_child2, inner_child3}), R"([
267+
[[["outer_key", [1, 2, 3, null]]], [["outer_key", [["key1", 1]]]], [["outer_key", [100, 200]]]]
268+
])")
269+
.ValueOrDie();
270+
ASSERT_NOK_WITH_MSG(
271+
ArrowUtils::CheckNullabilityMatch(schema, array),
272+
"CheckNullabilityMatch failed, field value not nullable while data have null value");
273+
}
274+
{
275+
std::shared_ptr<arrow::Array> array =
276+
arrow::ipc::internal::json::ArrayFromJSON(
277+
arrow::struct_({inner_child1, inner_child2, inner_child3}), R"([
278+
[[["outer_key", [1, 2, 3]]], [["outer_key", [["key1", 1]]]], [["outer_key", [100, 200]]]],
279+
[[["outer_key", null]], [["outer_key", [["key1", 1]]]], [["outer_key", [100, 200]]]],
280+
[null, [["outer_key", [["key1", 1]]]], [["outer_key", [100, 200]]]]
281+
])")
282+
.ValueOrDie();
283+
ASSERT_NOK_WITH_MSG(
284+
ArrowUtils::CheckNullabilityMatch(schema, array),
285+
"CheckNullabilityMatch failed, field inner1 not nullable while data have null value");
286+
}
287+
// test inner2
288+
{
289+
std::shared_ptr<arrow::Array> array =
290+
arrow::ipc::internal::json::ArrayFromJSON(
291+
arrow::struct_({inner_child1, inner_child2, inner_child3}), R"([
292+
[[["outer_key", [1, 2, 3]]], [["outer_key", null]], [["outer_key", [100, 200]]]],
293+
[[["outer_key", null]], [["outer_key", [["key1", null]]]], [["outer_key", [100, 200]]]]
294+
])")
295+
.ValueOrDie();
296+
ASSERT_NOK_WITH_MSG(
297+
ArrowUtils::CheckNullabilityMatch(schema, array),
298+
"CheckNullabilityMatch failed, field value not nullable while data have null value");
299+
}
300+
{
301+
std::shared_ptr<arrow::Array> array =
302+
arrow::ipc::internal::json::ArrayFromJSON(
303+
arrow::struct_({inner_child1, inner_child2, inner_child3}), R"([
304+
[[["outer_key", [1, 2, 3]]], [["outer_key", null]], [["outer_key", [100, 200]]]],
305+
[[["outer_key", [1, 2, 3]]], null, [["outer_key", [100, 200]]]]
306+
])")
307+
.ValueOrDie();
308+
ASSERT_NOK_WITH_MSG(
309+
ArrowUtils::CheckNullabilityMatch(schema, array),
310+
"CheckNullabilityMatch failed, field inner2 not nullable while data have null value");
311+
}
312+
// test inner3
313+
{
314+
std::shared_ptr<arrow::Array> array =
315+
arrow::ipc::internal::json::ArrayFromJSON(
316+
arrow::struct_({inner_child1, inner_child2, inner_child3}), R"([
317+
[[["outer_key", [1, 2, 3]]], [["outer_key", null]], [["outer_key", null]]],
318+
[[["outer_key", null]], [["outer_key", [["key1", 2]]]], [["outer_key", [100, null]]]]
319+
])")
320+
.ValueOrDie();
321+
ASSERT_NOK_WITH_MSG(
322+
ArrowUtils::CheckNullabilityMatch(schema, array),
323+
"CheckNullabilityMatch failed, field value not nullable while data have null value");
324+
}
325+
{
326+
std::shared_ptr<arrow::Array> array =
327+
arrow::ipc::internal::json::ArrayFromJSON(
328+
arrow::struct_({inner_child1, inner_child2, inner_child3}), R"([
329+
[[["outer_key", [1, 2, 3]]], [["outer_key", null]], [["outer_key", null]]],
330+
[[["outer_key", null]], [["outer_key", [["key1", 2]]]], null]
331+
])")
332+
.ValueOrDie();
333+
ASSERT_NOK_WITH_MSG(
334+
ArrowUtils::CheckNullabilityMatch(schema, array),
335+
"CheckNullabilityMatch failed, field inner3 not nullable while data have null value");
336+
}
337+
}
338+
110339
} // namespace paimon::test

src/paimon/core/operation/abstract_file_store_write.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ Status AbstractFileStoreWrite::Write(std::unique_ptr<RecordBatch>&& batch) {
105105
options_.GetBucket()));
106106
}
107107
}
108+
// check nullability
109+
PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
110+
std::shared_ptr<arrow::Array> data,
111+
arrow::ImportArray(batch->GetData(), arrow::struct_(write_schema_->fields())));
112+
PAIMON_RETURN_NOT_OK(ArrowUtils::CheckNullabilityMatch(write_schema_, data));
113+
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*data, batch->GetData()));
114+
108115
PAIMON_ASSIGN_OR_RAISE(BinaryRow partition,
109116
file_store_path_factory_->ToBinaryRow(batch->GetPartition()))
110117
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BatchWriter> writer,

0 commit comments

Comments
 (0)