Skip to content

Commit 42aa1d0

Browse files
authored
feat: add residual evaluator (#402)
1 parent 428a171 commit 42aa1d0

File tree

11 files changed

+1097
-5
lines changed

11 files changed

+1097
-5
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ set(ICEBERG_SOURCES
2828
expression/inclusive_metrics_evaluator.cc
2929
expression/literal.cc
3030
expression/predicate.cc
31+
expression/residual_evaluator.cc
3132
expression/rewrite_not.cc
3233
expression/strict_metrics_evaluator.cc
3334
expression/term.cc

src/iceberg/expression/expression_visitor.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,8 @@ class ICEBERG_EXPORT BoundVisitor : public ExpressionVisitor<R> {
260260

261261
/// \brief Visit an unbound predicate.
262262
///
263-
/// Bound visitors do not support unbound predicates.
264-
///
265263
/// \param pred The unbound predicate
266-
Result<R> Predicate(const std::shared_ptr<UnboundPredicate>& pred) final {
264+
Result<R> Predicate(const std::shared_ptr<UnboundPredicate>& pred) override {
267265
ICEBERG_DCHECK(pred != nullptr, "UnboundPredicate cannot be null");
268266
return NotSupported("Not a bound predicate: {}", pred->ToString());
269267
}

src/iceberg/expression/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ install_headers(
2626
'inclusive_metrics_evaluator.h',
2727
'literal.h',
2828
'predicate.h',
29+
'residual_evaluator.h',
2930
'rewrite_not.h',
3031
'strict_metrics_evaluator.h',
3132
'term.h',
Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/expression/residual_evaluator.h"
21+
22+
#include "iceberg/expression/expression.h"
23+
#include "iceberg/expression/expression_visitor.h"
24+
#include "iceberg/expression/predicate.h"
25+
#include "iceberg/partition_spec.h"
26+
#include "iceberg/row/struct_like.h"
27+
#include "iceberg/schema.h"
28+
#include "iceberg/schema_internal.h"
29+
#include "iceberg/transform.h"
30+
#include "iceberg/util/macros.h"
31+
32+
namespace iceberg {
33+
34+
namespace {
35+
36+
std::shared_ptr<Expression> always_true() { return True::Instance(); }
37+
std::shared_ptr<Expression> always_false() { return False::Instance(); }
38+
39+
class ResidualVisitor : public BoundVisitor<std::shared_ptr<Expression>> {
40+
public:
41+
static Result<ResidualVisitor> Make(const PartitionSpec& spec, const Schema& schema,
42+
const StructLike& partition_data,
43+
bool case_sensitive) {
44+
ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec.PartitionType(schema));
45+
auto partition_schema = FromStructType(std::move(*partition_type), std::nullopt);
46+
return ResidualVisitor(spec, schema, std::move(partition_schema), partition_data,
47+
case_sensitive);
48+
}
49+
50+
Result<std::shared_ptr<Expression>> AlwaysTrue() override { return always_true(); }
51+
52+
Result<std::shared_ptr<Expression>> AlwaysFalse() override { return always_false(); }
53+
54+
Result<std::shared_ptr<Expression>> Not(
55+
const std::shared_ptr<Expression>& child_result) override {
56+
return Not::MakeFolded(child_result);
57+
}
58+
59+
Result<std::shared_ptr<Expression>> And(
60+
const std::shared_ptr<Expression>& left_result,
61+
const std::shared_ptr<Expression>& right_result) override {
62+
return And::MakeFolded(left_result, right_result);
63+
}
64+
65+
Result<std::shared_ptr<Expression>> Or(
66+
const std::shared_ptr<Expression>& left_result,
67+
const std::shared_ptr<Expression>& right_result) override {
68+
return Or::MakeFolded(left_result, right_result);
69+
}
70+
71+
Result<std::shared_ptr<Expression>> IsNull(
72+
const std::shared_ptr<Bound>& expr) override {
73+
return expr->Evaluate(partition_data_).transform([](const auto& value) {
74+
return value.IsNull() ? always_true() : always_false();
75+
});
76+
}
77+
78+
Result<std::shared_ptr<Expression>> NotNull(
79+
const std::shared_ptr<Bound>& expr) override {
80+
return expr->Evaluate(partition_data_).transform([](const auto& value) {
81+
return value.IsNull() ? always_false() : always_true();
82+
});
83+
}
84+
85+
Result<std::shared_ptr<Expression>> IsNaN(const std::shared_ptr<Bound>& expr) override {
86+
return expr->Evaluate(partition_data_).transform([](const auto& value) {
87+
return value.IsNaN() ? always_true() : always_false();
88+
});
89+
}
90+
91+
Result<std::shared_ptr<Expression>> NotNaN(
92+
const std::shared_ptr<Bound>& expr) override {
93+
return expr->Evaluate(partition_data_).transform([](const auto& value) {
94+
return value.IsNaN() ? always_false() : always_true();
95+
});
96+
}
97+
98+
Result<std::shared_ptr<Expression>> Lt(const std::shared_ptr<Bound>& expr,
99+
const Literal& lit) override {
100+
return expr->Evaluate(partition_data_).transform([&lit](const auto& value) {
101+
return value < lit ? always_true() : always_false();
102+
});
103+
}
104+
105+
Result<std::shared_ptr<Expression>> LtEq(const std::shared_ptr<Bound>& expr,
106+
const Literal& lit) override {
107+
return expr->Evaluate(partition_data_).transform([&lit](const auto& value) {
108+
return value <= lit ? always_true() : always_false();
109+
});
110+
}
111+
112+
Result<std::shared_ptr<Expression>> Gt(const std::shared_ptr<Bound>& expr,
113+
const Literal& lit) override {
114+
return expr->Evaluate(partition_data_).transform([&lit](const auto& value) {
115+
return value > lit ? always_true() : always_false();
116+
});
117+
}
118+
119+
Result<std::shared_ptr<Expression>> GtEq(const std::shared_ptr<Bound>& expr,
120+
const Literal& lit) override {
121+
return expr->Evaluate(partition_data_).transform([&lit](const auto& value) {
122+
return value >= lit ? always_true() : always_false();
123+
});
124+
}
125+
126+
Result<std::shared_ptr<Expression>> Eq(const std::shared_ptr<Bound>& expr,
127+
const Literal& lit) override {
128+
return expr->Evaluate(partition_data_).transform([&lit](const auto& value) {
129+
return value == lit ? always_true() : always_false();
130+
});
131+
}
132+
133+
Result<std::shared_ptr<Expression>> NotEq(const std::shared_ptr<Bound>& expr,
134+
const Literal& lit) override {
135+
return expr->Evaluate(partition_data_).transform([&lit](const auto& value) {
136+
return value != lit ? always_true() : always_false();
137+
});
138+
}
139+
140+
Result<std::shared_ptr<Expression>> StartsWith(const std::shared_ptr<Bound>& expr,
141+
const Literal& lit) override {
142+
ICEBERG_ASSIGN_OR_RAISE(auto value, expr->Evaluate(partition_data_));
143+
144+
if (!std::holds_alternative<std::string>(value.value()) ||
145+
!std::holds_alternative<std::string>(lit.value())) {
146+
return InvalidExpression("Both value and literal should be strings");
147+
}
148+
149+
const auto& str_value = std::get<std::string>(value.value());
150+
const auto& str_prefix = std::get<std::string>(lit.value());
151+
return str_value.starts_with(str_prefix) ? always_true() : always_false();
152+
}
153+
154+
Result<std::shared_ptr<Expression>> NotStartsWith(const std::shared_ptr<Bound>& expr,
155+
const Literal& lit) override {
156+
ICEBERG_ASSIGN_OR_RAISE(auto value, expr->Evaluate(partition_data_));
157+
158+
if (!std::holds_alternative<std::string>(value.value()) ||
159+
!std::holds_alternative<std::string>(lit.value())) {
160+
return InvalidExpression("Both value and literal should be strings");
161+
}
162+
163+
const auto& str_value = std::get<std::string>(value.value());
164+
const auto& str_prefix = std::get<std::string>(lit.value());
165+
return str_value.starts_with(str_prefix) ? always_false() : always_true();
166+
}
167+
168+
Result<std::shared_ptr<Expression>> In(
169+
const std::shared_ptr<Bound>& expr,
170+
const BoundSetPredicate::LiteralSet& literal_set) override {
171+
return expr->Evaluate(partition_data_).transform([&literal_set](const auto& value) {
172+
return literal_set.contains(value) ? always_true() : always_false();
173+
});
174+
}
175+
176+
Result<std::shared_ptr<Expression>> NotIn(
177+
const std::shared_ptr<Bound>& expr,
178+
const BoundSetPredicate::LiteralSet& literal_set) override {
179+
return expr->Evaluate(partition_data_).transform([&literal_set](const auto& value) {
180+
return literal_set.contains(value) ? always_false() : always_true();
181+
});
182+
}
183+
184+
Result<std::shared_ptr<Expression>> Predicate(
185+
const std::shared_ptr<BoundPredicate>& pred) override;
186+
187+
Result<std::shared_ptr<Expression>> Predicate(
188+
const std::shared_ptr<UnboundPredicate>& pred) override {
189+
ICEBERG_ASSIGN_OR_RAISE(auto bound, pred->Bind(schema_, case_sensitive_));
190+
if (bound->is_bound_predicate()) {
191+
ICEBERG_ASSIGN_OR_RAISE(
192+
auto residual, Predicate(std::dynamic_pointer_cast<BoundPredicate>(bound)));
193+
if (residual->is_bound_predicate()) {
194+
// replace inclusive original unbound predicate
195+
return pred;
196+
}
197+
return residual;
198+
}
199+
// if binding didn't result in a Predicate, return the expression
200+
return bound;
201+
}
202+
203+
private:
204+
ResidualVisitor(const PartitionSpec& spec, const Schema& schema,
205+
std::unique_ptr<Schema> partition_schema,
206+
const StructLike& partition_data, bool case_sensitive)
207+
: spec_(spec),
208+
schema_(schema),
209+
partition_schema_(std::move(partition_schema)),
210+
partition_data_(partition_data),
211+
case_sensitive_(case_sensitive) {}
212+
213+
const PartitionSpec& spec_;
214+
const Schema& schema_;
215+
std::unique_ptr<Schema> partition_schema_;
216+
const StructLike& partition_data_;
217+
bool case_sensitive_;
218+
};
219+
220+
Result<std::shared_ptr<Expression>> ResidualVisitor::Predicate(
221+
const std::shared_ptr<BoundPredicate>& pred) {
222+
// Get the strict projection and inclusive projection of this predicate in partition
223+
// data, then use them to determine whether to return the original predicate. The
224+
// strict projection returns true iff the original predicate would have returned true,
225+
// so the predicate can be eliminated if the strict projection evaluates to true.
226+
// Similarly the inclusive projection returns false iff the original predicate would
227+
// have returned false, so the predicate can also be eliminated if the inclusive
228+
// projection evaluates to false.
229+
230+
// If there is no strict projection or if it evaluates to false, then return the
231+
// predicate.
232+
ICEBERG_ASSIGN_OR_RAISE(
233+
auto parts, spec_.GetFieldsBySourceId(pred->reference()->field().field_id()));
234+
if (parts.empty()) {
235+
// Not associated with a partition field, can't be evaluated
236+
return pred;
237+
}
238+
239+
for (const auto& part : parts) {
240+
// Check the strict projection
241+
ICEBERG_ASSIGN_OR_RAISE(auto strict_projection, part.get().transform()->ProjectStrict(
242+
part.get().name(), pred));
243+
std::shared_ptr<Expression> strict_result = nullptr;
244+
245+
if (strict_projection != nullptr) {
246+
ICEBERG_ASSIGN_OR_RAISE(
247+
auto bound_strict,
248+
strict_projection->Bind(*partition_schema_, case_sensitive_));
249+
if (bound_strict->is_bound_predicate()) {
250+
ICEBERG_ASSIGN_OR_RAISE(
251+
strict_result, BoundVisitor::Predicate(
252+
std::dynamic_pointer_cast<BoundPredicate>(bound_strict)));
253+
} else {
254+
// If the result is not a predicate, then it must be a constant like alwaysTrue
255+
// or alwaysFalse
256+
strict_result = std::move(bound_strict);
257+
}
258+
}
259+
260+
if (strict_result != nullptr && strict_result->op() == Expression::Operation::kTrue) {
261+
// If strict is true, returning true
262+
return always_true();
263+
}
264+
265+
// Check the inclusive projection
266+
ICEBERG_ASSIGN_OR_RAISE(auto inclusive_projection,
267+
part.get().transform()->Project(part.get().name(), pred));
268+
std::shared_ptr<Expression> inclusive_result = nullptr;
269+
270+
if (inclusive_projection != nullptr) {
271+
ICEBERG_ASSIGN_OR_RAISE(
272+
auto bound_inclusive,
273+
inclusive_projection->Bind(*partition_schema_, case_sensitive_));
274+
275+
if (bound_inclusive->is_bound_predicate()) {
276+
ICEBERG_ASSIGN_OR_RAISE(
277+
inclusive_result,
278+
BoundVisitor::Predicate(
279+
std::dynamic_pointer_cast<BoundPredicate>(bound_inclusive)));
280+
} else {
281+
// If the result is not a predicate, then it must be a constant like alwaysTrue
282+
// or alwaysFalse
283+
inclusive_result = std::move(bound_inclusive);
284+
}
285+
}
286+
287+
if (inclusive_result != nullptr &&
288+
inclusive_result->op() == Expression::Operation::kFalse) {
289+
// If inclusive is false, returning false
290+
return always_false();
291+
}
292+
}
293+
294+
// Neither strict nor inclusive predicate was conclusive, returning the original pred
295+
return pred;
296+
}
297+
298+
// Unpartitioned residual evaluator that always returns the original expression
299+
class UnpartitionedResidualEvaluator : public ResidualEvaluator {
300+
public:
301+
explicit UnpartitionedResidualEvaluator(std::shared_ptr<Expression> expr)
302+
: ResidualEvaluator(std::move(expr), *PartitionSpec::Unpartitioned(),
303+
*kEmptySchema_, true) {}
304+
305+
Result<std::shared_ptr<Expression>> ResidualFor(
306+
const StructLike& /*partition_data*/) const override {
307+
return expr_;
308+
}
309+
310+
private:
311+
// Store an empty schema to avoid dangling reference when passing to base class
312+
inline static const std::shared_ptr<Schema> kEmptySchema_ =
313+
std::make_shared<Schema>(std::vector<SchemaField>{}, std::nullopt);
314+
};
315+
316+
} // namespace
317+
318+
ResidualEvaluator::ResidualEvaluator(std::shared_ptr<Expression> expr,
319+
const PartitionSpec& spec, const Schema& schema,
320+
bool case_sensitive)
321+
: expr_(std::move(expr)),
322+
spec_(spec),
323+
schema_(schema),
324+
case_sensitive_(case_sensitive) {}
325+
326+
ResidualEvaluator::~ResidualEvaluator() = default;
327+
328+
Result<std::unique_ptr<ResidualEvaluator>> ResidualEvaluator::Unpartitioned(
329+
std::shared_ptr<Expression> expr) {
330+
return std::unique_ptr<ResidualEvaluator>(
331+
new UnpartitionedResidualEvaluator(std::move(expr)));
332+
}
333+
334+
Result<std::unique_ptr<ResidualEvaluator>> ResidualEvaluator::Make(
335+
std::shared_ptr<Expression> expr, const PartitionSpec& spec, const Schema& schema,
336+
bool case_sensitive) {
337+
if (spec.fields().empty()) {
338+
return Unpartitioned(std::move(expr));
339+
}
340+
return std::unique_ptr<ResidualEvaluator>(
341+
new ResidualEvaluator(std::move(expr), spec, schema, case_sensitive));
342+
}
343+
344+
Result<std::shared_ptr<Expression>> ResidualEvaluator::ResidualFor(
345+
const StructLike& partition_data) const {
346+
ICEBERG_ASSIGN_OR_RAISE(
347+
auto visitor,
348+
ResidualVisitor::Make(spec_, schema_, partition_data, case_sensitive_));
349+
return Visit<std::shared_ptr<Expression>, ResidualVisitor>(expr_, visitor);
350+
}
351+
352+
} // namespace iceberg

0 commit comments

Comments
 (0)