Skip to content

Commit 27c6a3c

Browse files
committed
feat: add utils to project expressions on rows to expressions on partitions
add a schema field to PartitionSpec since ProjectionVisitor's Predicate on UnboundPredicate need to Bind to schema.
1 parent 09f26b6 commit 27c6a3c

13 files changed

Lines changed: 1460 additions & 28 deletions

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ set(ICEBERG_SOURCES
2828
expression/inclusive_metrics_evaluator.cc
2929
expression/literal.cc
3030
expression/predicate.cc
31+
expression/projections.cc
3132
expression/rewrite_not.cc
3233
expression/strict_metrics_evaluator.cc
3334
expression/term.cc

src/iceberg/expression/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ install_headers(
2626
'inclusive_metrics_evaluator.h',
2727
'literal.h',
2828
'predicate.h',
29+
'projections.h',
2930
'rewrite_not.h',
3031
'strict_metrics_evaluator.h',
3132
'term.h',
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/expression/projections.h"
21+
22+
#include <memory>
23+
#include <vector>
24+
25+
#include "iceberg/expression/expression.h"
26+
#include "iceberg/expression/expression_visitor.h"
27+
#include "iceberg/expression/expressions.h"
28+
#include "iceberg/expression/predicate.h"
29+
#include "iceberg/expression/rewrite_not.h"
30+
#include "iceberg/expression/term.h"
31+
#include "iceberg/partition_field.h"
32+
#include "iceberg/partition_spec.h"
33+
#include "iceberg/result.h"
34+
#include "iceberg/transform.h"
35+
#include "iceberg/util/checked_cast.h"
36+
#include "iceberg/util/macros.h"
37+
38+
namespace iceberg {
39+
40+
// Implementation detail - not exported
41+
class ProjectionVisitor : public ExpressionVisitor<std::shared_ptr<Expression>> {
42+
public:
43+
~ProjectionVisitor() override = default;
44+
45+
ProjectionVisitor(const PartitionSpec& spec, bool case_sensitive)
46+
: spec_(spec), case_sensitive_(case_sensitive) {}
47+
48+
Result<std::shared_ptr<Expression>> AlwaysTrue() override { return True::Instance(); }
49+
50+
Result<std::shared_ptr<Expression>> AlwaysFalse() override { return False::Instance(); }
51+
52+
Result<std::shared_ptr<Expression>> Not(
53+
const std::shared_ptr<Expression>& child_result) override {
54+
return InvalidExpression("Project called on expression with a not");
55+
}
56+
57+
Result<std::shared_ptr<Expression>> And(
58+
const std::shared_ptr<Expression>& left_result,
59+
const std::shared_ptr<Expression>& right_result) override {
60+
return Expressions::And(left_result, right_result);
61+
}
62+
63+
Result<std::shared_ptr<Expression>> Or(
64+
const std::shared_ptr<Expression>& left_result,
65+
const std::shared_ptr<Expression>& right_result) override {
66+
return Expressions::Or(left_result, right_result);
67+
}
68+
69+
Result<std::shared_ptr<Expression>> Predicate(
70+
const std::shared_ptr<UnboundPredicate>& pred) override {
71+
ICEBERG_ASSIGN_OR_RAISE(auto bound_pred,
72+
pred->Bind(*spec_.schema(), case_sensitive_));
73+
if (bound_pred->is_bound_predicate()) {
74+
return Predicate(internal::checked_pointer_cast<BoundPredicate>(bound_pred));
75+
}
76+
return bound_pred;
77+
}
78+
79+
Result<std::shared_ptr<Expression>> Predicate(
80+
const std::shared_ptr<BoundPredicate>& pred) override {
81+
return InvalidExpression("Bound predicates are not supported in projections");
82+
}
83+
84+
protected:
85+
const PartitionSpec& spec_;
86+
bool case_sensitive_;
87+
88+
/// \brief Get partition fields that match the predicate's term.
89+
std::vector<const PartitionField*> GetFieldsByPredicate(
90+
const std::shared_ptr<BoundPredicate>& pred) const {
91+
int32_t source_id;
92+
switch (pred->term()->kind()) {
93+
case Term::Kind::kReference: {
94+
const auto& ref =
95+
internal::checked_pointer_cast<BoundReference>(pred->term()->reference());
96+
source_id = ref->field().field_id();
97+
break;
98+
}
99+
case Term::Kind::kTransform: {
100+
const auto& transform =
101+
internal::checked_pointer_cast<BoundTransform>(pred->term());
102+
source_id = transform->reference()->field().field_id();
103+
break;
104+
}
105+
default:
106+
std::unreachable();
107+
}
108+
109+
std::vector<const PartitionField*> result;
110+
for (const auto& field : spec_.fields()) {
111+
if (field.source_id() == source_id) {
112+
result.push_back(&field);
113+
}
114+
}
115+
return result;
116+
}
117+
};
118+
119+
ProjectionEvaluator::ProjectionEvaluator(std::unique_ptr<ProjectionVisitor> visitor)
120+
: visitor_(std::move(visitor)) {}
121+
122+
ProjectionEvaluator::~ProjectionEvaluator() = default;
123+
124+
/// \brief Inclusive projection visitor.
125+
///
126+
/// Uses AND to combine projections from multiple partition fields.
127+
class InclusiveProjectionVisitor : public ProjectionVisitor {
128+
public:
129+
~InclusiveProjectionVisitor() override = default;
130+
131+
InclusiveProjectionVisitor(const PartitionSpec& spec, bool case_sensitive)
132+
: ProjectionVisitor(spec, case_sensitive) {}
133+
134+
Result<std::shared_ptr<Expression>> Predicate(
135+
const std::shared_ptr<BoundPredicate>& pred) override {
136+
ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
137+
// Find partition fields that match the predicate's term
138+
auto partition_fields = GetFieldsByPredicate(pred);
139+
if (partition_fields.empty()) {
140+
// The predicate has no partition column
141+
return AlwaysTrue();
142+
}
143+
144+
// Project the predicate for each partition field and combine with AND
145+
//
146+
// consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d)
147+
// projections: b1 = bucket(7, '2019-01-01') = 5, b2 = bucket(5, '2019-01-01') = 0
148+
// any value where b1 != 5 or any value where b2 != 0 cannot be the '2019-01-01'
149+
//
150+
// similarly, if partitioning by day(ts) and hour(ts), the more restrictive
151+
// projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and
152+
// hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02.
153+
std::shared_ptr<Expression> result = True::Instance();
154+
for (const auto* part_field : partition_fields) {
155+
ICEBERG_ASSIGN_OR_RAISE(auto projected,
156+
part_field->transform()->Project(part_field->name(), pred));
157+
if (projected != nullptr) {
158+
result =
159+
Expressions::And(result, std::shared_ptr<Expression>(projected.release()));
160+
}
161+
}
162+
163+
return result;
164+
}
165+
166+
protected:
167+
};
168+
169+
/// \brief Strict projection evaluator.
170+
///
171+
/// Uses OR to combine projections from multiple partition fields.
172+
class StrictProjectionVisitor : public ProjectionVisitor {
173+
public:
174+
~StrictProjectionVisitor() override = default;
175+
176+
StrictProjectionVisitor(const PartitionSpec& spec, bool case_sensitive)
177+
: ProjectionVisitor(spec, case_sensitive) {}
178+
179+
Result<std::shared_ptr<Expression>> Predicate(
180+
const std::shared_ptr<BoundPredicate>& pred) override {
181+
ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
182+
// Find partition fields that match the predicate's term
183+
auto partition_fields = GetFieldsByPredicate(pred);
184+
if (partition_fields.empty()) {
185+
// The predicate has no matching partition columns
186+
return AlwaysFalse();
187+
}
188+
189+
// Project the predicate for each partition field and combine with OR
190+
//
191+
// consider (ts > 2019-01-01T01:00:00) with day(ts) and hour(ts)
192+
// projections: d >= 2019-01-02 and h >= 2019-01-01-02 (note the inclusive bounds).
193+
// any timestamp where either projection predicate is true must match the original
194+
// predicate. For example, ts = 2019-01-01T03:00:00 matches the hour projection but
195+
// not the day, but does match the original predicate.
196+
std::shared_ptr<Expression> result = False::Instance();
197+
for (const auto* part_field : partition_fields) {
198+
ICEBERG_ASSIGN_OR_RAISE(auto projected, part_field->transform()->ProjectStrict(
199+
part_field->name(), pred));
200+
if (projected != nullptr) {
201+
result =
202+
Expressions::Or(result, std::shared_ptr<Expression>(projected.release()));
203+
}
204+
}
205+
206+
return result;
207+
}
208+
};
209+
210+
Result<std::shared_ptr<Expression>> ProjectionEvaluator::Project(
211+
const std::shared_ptr<Expression>& expr) {
212+
// Projections assume that there are no NOT nodes in the expression tree. To ensure that
213+
// this is the case, the expression is rewritten to push all NOT nodes down to the
214+
// expression leaf nodes.
215+
//
216+
// This is necessary to ensure that the default expression returned when a predicate
217+
// can't be projected is correct.
218+
ICEBERG_ASSIGN_OR_RAISE(auto rewritten, RewriteNot::Visit(expr));
219+
return Visit<std::shared_ptr<Expression>, ProjectionVisitor>(rewritten, *visitor_);
220+
}
221+
222+
std::unique_ptr<ProjectionEvaluator> Projections::Inclusive(const PartitionSpec& spec,
223+
bool case_sensitive) {
224+
auto visitor = std::make_unique<InclusiveProjectionVisitor>(spec, case_sensitive);
225+
return std::unique_ptr<ProjectionEvaluator>(
226+
new ProjectionEvaluator(std::move(visitor)));
227+
}
228+
229+
std::unique_ptr<ProjectionEvaluator> Projections::Strict(const PartitionSpec& spec,
230+
bool case_sensitive) {
231+
auto visitor = std::make_unique<StrictProjectionVisitor>(spec, case_sensitive);
232+
return std::unique_ptr<ProjectionEvaluator>(
233+
new ProjectionEvaluator(std::move(visitor)));
234+
}
235+
236+
} // namespace iceberg
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/expression/projections.h
23+
/// Utils to project expressions on rows to expressions on partitions.
24+
25+
#include <memory>
26+
27+
#include "iceberg/expression/expression.h"
28+
#include "iceberg/iceberg_export.h"
29+
#include "iceberg/partition_spec.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/type_fwd.h"
32+
33+
namespace iceberg {
34+
35+
/// \brief A class that projects expressions for a table's data rows into expressions on
36+
/// the table's partition values, for a table's partition spec.
37+
class ICEBERG_EXPORT ProjectionEvaluator {
38+
public:
39+
~ProjectionEvaluator();
40+
41+
/// \brief Project the given row expression to a partition expression.
42+
///
43+
/// \param expr an expression on data rows
44+
/// \return an expression on partition data (depends on the projection)
45+
Result<std::shared_ptr<Expression>> Project(const std::shared_ptr<Expression>& expr);
46+
47+
private:
48+
friend class Projections;
49+
50+
/// \brief Create a ProjectionEvaluator.
51+
///
52+
/// \param visitor The projection visitor to use
53+
explicit ProjectionEvaluator(std::unique_ptr<class ProjectionVisitor> visitor);
54+
55+
std::unique_ptr<class ProjectionVisitor> visitor_;
56+
};
57+
58+
/// \brief Utils to project expressions on rows to expressions on partitions.
59+
///
60+
/// There are two types of projections: inclusive and strict.
61+
///
62+
/// An inclusive projection guarantees that if an expression matches a row, the projected
63+
/// expression will match the row's partition.
64+
///
65+
/// A strict projection guarantees that if a partition matches a projected expression,
66+
/// then all rows in that partition will match the original expression.
67+
class ICEBERG_EXPORT Projections {
68+
public:
69+
/// \brief Creates an inclusive ProjectionEvaluator for the partition spec.
70+
///
71+
/// An evaluator is used to project expressions for a table's data rows into expressions
72+
/// on the table's partition values. The evaluator returned by this function is
73+
/// inclusive and will build expressions with the following guarantee: if the original
74+
/// expression matches a row, then the projected expression will match that row's
75+
/// partition.
76+
///
77+
/// Each predicate in the expression is projected using Transform::Project.
78+
///
79+
/// \param spec a partition spec
80+
/// \param case_sensitive whether the Projection should consider case sensitivity on
81+
/// column names or not. Defaults to true (case sensitive).
82+
/// \return an inclusive projection evaluator for the partition spec
83+
static std::unique_ptr<ProjectionEvaluator> Inclusive(const PartitionSpec& spec,
84+
bool case_sensitive = true);
85+
86+
/// \brief Creates a strict ProjectionEvaluator for the partition spec.
87+
///
88+
/// An evaluator is used to project expressions for a table's data rows into expressions
89+
/// on the table's partition values. The evaluator returned by this function is strict
90+
/// and will build expressions with the following guarantee: if the projected expression
91+
/// matches a partition, then the original expression will match all rows in that
92+
/// partition.
93+
///
94+
/// Each predicate in the expression is projected using Transform::ProjectStrict.
95+
///
96+
/// \param spec a partition spec
97+
/// \param case_sensitive whether the Projection should consider case sensitivity on
98+
/// column names or not. Defaults to true (case sensitive).
99+
/// \return a strict projection evaluator for the partition spec
100+
static std::unique_ptr<ProjectionEvaluator> Strict(const PartitionSpec& spec,
101+
bool case_sensitive = true);
102+
103+
private:
104+
Projections() = default;
105+
};
106+
107+
} // namespace iceberg

src/iceberg/json_internal.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -547,11 +547,11 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
547547
std::unique_ptr<PartitionSpec> spec;
548548
if (default_spec_id == spec_id) {
549549
ICEBERG_ASSIGN_OR_RAISE(
550-
spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
550+
spec, PartitionSpec::Make(schema, spec_id, std::move(partition_fields),
551551
/*allow_missing_fields=*/false));
552552
} else {
553553
ICEBERG_ASSIGN_OR_RAISE(
554-
spec, PartitionSpec::Make(*schema, spec_id, std::move(partition_fields),
554+
spec, PartitionSpec::Make(schema, spec_id, std::move(partition_fields),
555555
/*allow_missing_fields=*/true));
556556
}
557557
return spec;
@@ -930,7 +930,7 @@ Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
930930
// Create partition spec with schema validation
931931
ICEBERG_ASSIGN_OR_RAISE(
932932
auto spec,
933-
PartitionSpec::Make(*current_schema, PartitionSpec::kInitialSpecId,
933+
PartitionSpec::Make(current_schema, PartitionSpec::kInitialSpecId,
934934
std::move(fields), /*allow_missing_fields=*/false));
935935
default_spec_id = spec->spec_id();
936936
partition_specs.push_back(std::move(spec));

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ iceberg_sources = files(
5050
'expression/inclusive_metrics_evaluator.cc',
5151
'expression/literal.cc',
5252
'expression/predicate.cc',
53+
'expression/projections.cc',
5354
'expression/rewrite_not.cc',
5455
'expression/strict_metrics_evaluator.cc',
5556
'expression/term.cc',

0 commit comments

Comments
 (0)