Skip to content

Commit ce7e1d0

Browse files
author
Rafał Hibner
committed
Merge branch 'filter_not_null' into combined3
2 parents f6e2af3 + 462ba2b commit ce7e1d0

2 files changed

Lines changed: 32 additions & 3 deletions

File tree

cpp/src/arrow/acero/filter_node.cc

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ namespace arrow {
3434

3535
using internal::checked_cast;
3636

37+
using compute::and_;
38+
using compute::field_ref;
3739
using compute::FilterOptions;
40+
using compute::is_null;
41+
using compute::not_;
3842

3943
namespace acero {
4044
namespace {
@@ -52,14 +56,34 @@ class FilterNode : public MapNode {
5256
auto schema = inputs[0]->output_schema();
5357

5458
const auto& filter_options = checked_cast<const FilterNodeOptions&>(options);
59+
Expression filter_expression;
60+
if (!filter_options.filter_not_null.empty()) {
61+
std::vector<Expression> operands = {filter_options.filter_expression};
62+
for (const auto& c : filter_options.filter_not_null) {
63+
auto field_index = schema->GetFieldIndex(c);
64+
if (field_index == -1) {
65+
return Status::KeyError("Column not found: ", c);
66+
}
67+
auto& field = schema->field(field_index);
68+
if (field->nullable()) {
69+
ARROW_ASSIGN_OR_RAISE(
70+
schema, schema->SetField(field_index, field->WithNullable(false)));
71+
}
72+
operands.push_back(not_(is_null(field_ref(c))));
73+
}
74+
filter_expression = and_(operands);
75+
} else {
76+
filter_expression = filter_options.filter_expression;
77+
}
5578

56-
auto filter_expression = filter_options.filter_expression;
5779
if (!filter_expression.IsBound()) {
5880
ARROW_ASSIGN_OR_RAISE(
5981
filter_expression,
6082
filter_expression.Bind(*schema, plan->query_context()->exec_context()));
6183
}
6284

85+
std::vector<std::string> filter_not_null;
86+
6387
if (filter_expression.type()->id() != Type::BOOL) {
6488
return Status::TypeError("Filter expression must evaluate to bool, but ",
6589
filter_expression.ToString(), " evaluates to ",

cpp/src/arrow/acero/options.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,18 @@ class ARROW_ACERO_EXPORT RecordBatchSourceNodeOptions
256256
class ARROW_ACERO_EXPORT FilterNodeOptions : public ExecNodeOptions {
257257
public:
258258
/// \brief create an instance from values
259-
explicit FilterNodeOptions(Expression filter_expression)
260-
: filter_expression(std::move(filter_expression)) {}
259+
explicit FilterNodeOptions(Expression filter_expression,
260+
std::vector<std::string> filter_not_null = {})
261+
: filter_expression(std::move(filter_expression)),
262+
filter_not_null(std::move(filter_not_null)) {}
261263

262264
/// \brief the expression to filter batches
263265
///
264266
/// The return type of this expression must be boolean
265267
Expression filter_expression;
268+
269+
/// \brief filter out null values from selected columns and assert not null in schema
270+
std::vector<std::string> filter_not_null;
266271
};
267272

268273
/// \brief a node which selects a specified subset from the input

0 commit comments

Comments
 (0)