Skip to content

Commit 8633829

Browse files
authored
Merge branch 'main' into table_location_provider
2 parents fab1da2 + f869003 commit 8633829

43 files changed

Lines changed: 1673 additions & 128 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ set(ICEBERG_SOURCES
8282
transform_function.cc
8383
type.cc
8484
update/pending_update.cc
85+
update/snapshot_update.cc
8586
update/update_partition_spec.cc
8687
update/update_properties.cc
8788
update/update_schema.cc

src/iceberg/avro/avro_schema_util.cc

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
* under the License.
1818
*/
1919

20-
#include <charconv>
2120
#include <format>
2221
#include <mutex>
2322
#include <sstream>
@@ -40,6 +39,7 @@
4039
#include "iceberg/schema_util_internal.h"
4140
#include "iceberg/util/formatter.h"
4241
#include "iceberg/util/macros.h"
42+
#include "iceberg/util/string_util.h"
4343
#include "iceberg/util/visit_type.h"
4444

4545
namespace iceberg::avro {
@@ -471,13 +471,7 @@ Result<int32_t> GetId(const ::avro::NodePtr& node, const std::string& attr_name,
471471
return InvalidSchema("Missing avro attribute: {}", attr_name);
472472
}
473473

474-
int32_t id;
475-
const auto& id_value = id_str.value();
476-
auto [_, ec] = std::from_chars(id_value.data(), id_value.data() + id_value.size(), id);
477-
if (ec != std::errc()) {
478-
return InvalidSchema("Invalid {}: {}", attr_name, id_value);
479-
}
480-
return id;
474+
return StringUtils::ParseInt<int32_t>(id_str.value());
481475
}
482476

483477
Result<int32_t> GetElementId(const ::avro::NodePtr& node) {

src/iceberg/expression/binder.cc

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
#include "iceberg/expression/binder.h"
2121

22+
#include "iceberg/result.h"
23+
#include "iceberg/util/macros.h"
24+
2225
namespace iceberg {
2326

2427
Binder::Binder(const Schema& schema, bool case_sensitive)
@@ -54,30 +57,30 @@ Result<std::shared_ptr<Expression>> Binder::Or(
5457

5558
Result<std::shared_ptr<Expression>> Binder::Predicate(
5659
const std::shared_ptr<UnboundPredicate>& pred) {
57-
ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
60+
ICEBERG_PRECHECK(pred != nullptr, "Predicate cannot be null");
5861
return pred->Bind(schema_, case_sensitive_);
5962
}
6063

6164
Result<std::shared_ptr<Expression>> Binder::Predicate(
6265
const std::shared_ptr<BoundPredicate>& pred) {
63-
ICEBERG_DCHECK(pred != nullptr, "Predicate cannot be null");
66+
ICEBERG_PRECHECK(pred != nullptr, "Predicate cannot be null");
6467
return InvalidExpression("Found already bound predicate: {}", pred->ToString());
6568
}
6669

6770
Result<std::shared_ptr<Expression>> Binder::Aggregate(
6871
const std::shared_ptr<BoundAggregate>& aggregate) {
69-
ICEBERG_DCHECK(aggregate != nullptr, "Aggregate cannot be null");
72+
ICEBERG_PRECHECK(aggregate != nullptr, "Aggregate cannot be null");
7073
return InvalidExpression("Found already bound aggregate: {}", aggregate->ToString());
7174
}
7275

7376
Result<std::shared_ptr<Expression>> Binder::Aggregate(
7477
const std::shared_ptr<UnboundAggregate>& aggregate) {
75-
ICEBERG_DCHECK(aggregate != nullptr, "Aggregate cannot be null");
78+
ICEBERG_PRECHECK(aggregate != nullptr, "Aggregate cannot be null");
7679
return aggregate->Bind(schema_, case_sensitive_);
7780
}
7881

7982
Result<bool> IsBoundVisitor::IsBound(const std::shared_ptr<Expression>& expr) {
80-
ICEBERG_DCHECK(expr != nullptr, "Expression cannot be null");
83+
ICEBERG_PRECHECK(expr != nullptr, "Expression cannot be null");
8184
IsBoundVisitor visitor;
8285
return Visit<bool, IsBoundVisitor>(expr, visitor);
8386
}
@@ -113,4 +116,54 @@ Result<bool> IsBoundVisitor::Aggregate(
113116
return false;
114117
}
115118

119+
Result<std::unordered_set<int32_t>> ReferenceVisitor::GetReferencedFieldIds(
120+
const std::shared_ptr<Expression>& expr) {
121+
ICEBERG_PRECHECK(expr != nullptr, "Expression cannot be null");
122+
ReferenceVisitor visitor;
123+
return Visit<FieldIdsSetRef, ReferenceVisitor>(expr, visitor);
124+
}
125+
126+
Result<FieldIdsSetRef> ReferenceVisitor::AlwaysTrue() { return referenced_field_ids_; }
127+
128+
Result<FieldIdsSetRef> ReferenceVisitor::AlwaysFalse() { return referenced_field_ids_; }
129+
130+
Result<FieldIdsSetRef> ReferenceVisitor::Not(
131+
[[maybe_unused]] const FieldIdsSetRef& child_result) {
132+
return referenced_field_ids_;
133+
}
134+
135+
Result<FieldIdsSetRef> ReferenceVisitor::And(
136+
[[maybe_unused]] const FieldIdsSetRef& left_result,
137+
[[maybe_unused]] const FieldIdsSetRef& right_result) {
138+
return referenced_field_ids_;
139+
}
140+
141+
Result<FieldIdsSetRef> ReferenceVisitor::Or(
142+
[[maybe_unused]] const FieldIdsSetRef& left_result,
143+
[[maybe_unused]] const FieldIdsSetRef& right_result) {
144+
return referenced_field_ids_;
145+
}
146+
147+
Result<FieldIdsSetRef> ReferenceVisitor::Predicate(
148+
const std::shared_ptr<BoundPredicate>& pred) {
149+
referenced_field_ids_.insert(pred->reference()->field_id());
150+
return referenced_field_ids_;
151+
}
152+
153+
Result<FieldIdsSetRef> ReferenceVisitor::Predicate(
154+
[[maybe_unused]] const std::shared_ptr<UnboundPredicate>& pred) {
155+
return InvalidExpression("Cannot get referenced field IDs from unbound predicate");
156+
}
157+
158+
Result<FieldIdsSetRef> ReferenceVisitor::Aggregate(
159+
const std::shared_ptr<BoundAggregate>& aggregate) {
160+
referenced_field_ids_.insert(aggregate->reference()->field_id());
161+
return referenced_field_ids_;
162+
}
163+
164+
Result<FieldIdsSetRef> ReferenceVisitor::Aggregate(
165+
[[maybe_unused]] const std::shared_ptr<UnboundAggregate>& aggregate) {
166+
return InvalidExpression("Cannot get referenced field IDs from unbound aggregate");
167+
}
168+
116169
} // namespace iceberg

src/iceberg/expression/binder.h

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
/// \file iceberg/expression/binder.h
2323
/// Bind an expression to a schema.
2424

25+
#include <functional>
26+
#include <unordered_set>
27+
2528
#include "iceberg/expression/expression_visitor.h"
2629

2730
namespace iceberg {
@@ -73,6 +76,31 @@ class ICEBERG_EXPORT IsBoundVisitor : public ExpressionVisitor<bool> {
7376
Result<bool> Aggregate(const std::shared_ptr<UnboundAggregate>& aggregate) override;
7477
};
7578

76-
// TODO(gangwu): add the Java parity `ReferenceVisitor`
79+
using FieldIdsSetRef = std::reference_wrapper<std::unordered_set<int32_t>>;
80+
81+
/// \brief Visitor to collect referenced field IDs from an expression.
82+
class ICEBERG_EXPORT ReferenceVisitor : public ExpressionVisitor<FieldIdsSetRef> {
83+
public:
84+
static Result<std::unordered_set<int32_t>> GetReferencedFieldIds(
85+
const std::shared_ptr<Expression>& expr);
86+
87+
Result<FieldIdsSetRef> AlwaysTrue() override;
88+
Result<FieldIdsSetRef> AlwaysFalse() override;
89+
Result<FieldIdsSetRef> Not(const FieldIdsSetRef& child_result) override;
90+
Result<FieldIdsSetRef> And(const FieldIdsSetRef& left_result,
91+
const FieldIdsSetRef& right_result) override;
92+
Result<FieldIdsSetRef> Or(const FieldIdsSetRef& left_result,
93+
const FieldIdsSetRef& right_result) override;
94+
Result<FieldIdsSetRef> Predicate(const std::shared_ptr<BoundPredicate>& pred) override;
95+
Result<FieldIdsSetRef> Predicate(
96+
const std::shared_ptr<UnboundPredicate>& pred) override;
97+
Result<FieldIdsSetRef> Aggregate(
98+
const std::shared_ptr<BoundAggregate>& aggregate) override;
99+
Result<FieldIdsSetRef> Aggregate(
100+
const std::shared_ptr<UnboundAggregate>& aggregate) override;
101+
102+
private:
103+
std::unordered_set<int32_t> referenced_field_ids_;
104+
};
77105

78106
} // namespace iceberg

src/iceberg/json_internal.cc

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
399399
json[kTimestampMs] = UnixMsFromTimePointMs(snapshot.timestamp_ms);
400400
json[kManifestList] = snapshot.manifest_list;
401401
// If there is an operation, write the summary map
402-
if (snapshot.operation().has_value()) {
402+
if (snapshot.Operation().has_value()) {
403403
json[kSummary] = snapshot.summary;
404404
}
405405
SetOptionalField(json, kSchemaId, snapshot.schema_id);
@@ -645,9 +645,8 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json) {
645645
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, kSnapshotId));
646646
ICEBERG_ASSIGN_OR_RAISE(auto sequence_number,
647647
GetJsonValueOptional<int64_t>(json, kSequenceNumber));
648-
ICEBERG_ASSIGN_OR_RAISE(
649-
auto timestamp_ms,
650-
GetJsonValue<int64_t>(json, kTimestampMs).and_then(TimePointMsFromUnixMs));
648+
ICEBERG_ASSIGN_OR_RAISE(auto unix_ms, GetJsonValue<int64_t>(json, kTimestampMs));
649+
auto timestamp_ms = TimePointMsFromUnixMs(unix_ms);
651650
ICEBERG_ASSIGN_OR_RAISE(auto manifest_list,
652651
GetJsonValue<std::string>(json, kManifestList));
653652

@@ -781,9 +780,8 @@ nlohmann::json ToJson(const SnapshotLogEntry& snapshot_log_entry) {
781780

782781
Result<SnapshotLogEntry> SnapshotLogEntryFromJson(const nlohmann::json& json) {
783782
SnapshotLogEntry snapshot_log_entry;
784-
ICEBERG_ASSIGN_OR_RAISE(
785-
snapshot_log_entry.timestamp_ms,
786-
GetJsonValue<int64_t>(json, kTimestampMs).and_then(TimePointMsFromUnixMs));
783+
ICEBERG_ASSIGN_OR_RAISE(auto unix_ms, GetJsonValue<int64_t>(json, kTimestampMs));
784+
snapshot_log_entry.timestamp_ms = TimePointMsFromUnixMs(unix_ms);
787785
ICEBERG_ASSIGN_OR_RAISE(snapshot_log_entry.snapshot_id,
788786
GetJsonValue<int64_t>(json, kSnapshotId));
789787
return snapshot_log_entry;
@@ -798,9 +796,8 @@ nlohmann::json ToJson(const MetadataLogEntry& metadata_log_entry) {
798796

799797
Result<MetadataLogEntry> MetadataLogEntryFromJson(const nlohmann::json& json) {
800798
MetadataLogEntry metadata_log_entry;
801-
ICEBERG_ASSIGN_OR_RAISE(
802-
metadata_log_entry.timestamp_ms,
803-
GetJsonValue<int64_t>(json, kTimestampMs).and_then(TimePointMsFromUnixMs));
799+
ICEBERG_ASSIGN_OR_RAISE(auto unix_ms, GetJsonValue<int64_t>(json, kTimestampMs));
800+
metadata_log_entry.timestamp_ms = TimePointMsFromUnixMs(unix_ms);
804801
ICEBERG_ASSIGN_OR_RAISE(metadata_log_entry.metadata_file,
805802
GetJsonValue<std::string>(json, kMetadataFile));
806803
return metadata_log_entry;
@@ -1553,9 +1550,17 @@ Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(const nlohmann::json& j
15531550
GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs));
15541551
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age,
15551552
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
1556-
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), snapshot_id, type,
1557-
min_snapshots, max_snapshot_age,
1558-
max_ref_age);
1553+
if (type == SnapshotRefType::kTag) {
1554+
ICEBERG_ASSIGN_OR_RAISE(auto tag, SnapshotRef::MakeTag(snapshot_id, max_ref_age));
1555+
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), *tag);
1556+
} else {
1557+
ICEBERG_CHECK(type == SnapshotRefType::kBranch,
1558+
"Expected branch type for snapshot ref");
1559+
ICEBERG_ASSIGN_OR_RAISE(auto branch,
1560+
SnapshotRef::MakeBranch(snapshot_id, min_snapshots,
1561+
max_snapshot_age, max_ref_age));
1562+
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), *branch);
1563+
}
15591564
}
15601565
if (action == kActionSetProperties) {
15611566
using StringMap = std::unordered_map<std::string, std::string>;

src/iceberg/manifest/manifest_writer.cc

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -369,23 +369,18 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
369369
int8_t format_version, std::optional<int64_t> snapshot_id,
370370
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
371371
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
372-
std::optional<ManifestContent> content, std::optional<int64_t> first_row_id) {
372+
ManifestContent content, std::optional<int64_t> first_row_id) {
373373
switch (format_version) {
374374
case 1:
375375
return MakeV1Writer(snapshot_id, manifest_location, std::move(file_io),
376376
std::move(partition_spec), std::move(current_schema));
377377
case 2:
378-
ICEBERG_PRECHECK(content.has_value(),
379-
"ManifestContent is required for format version 2");
380378
return MakeV2Writer(snapshot_id, manifest_location, std::move(file_io),
381-
std::move(partition_spec), std::move(current_schema),
382-
content.value());
379+
std::move(partition_spec), std::move(current_schema), content);
383380
case 3:
384-
ICEBERG_PRECHECK(content.has_value(),
385-
"ManifestContent is required for format version 3");
386381
return MakeV3Writer(snapshot_id, first_row_id, manifest_location,
387382
std::move(file_io), std::move(partition_spec),
388-
std::move(current_schema), content.value());
383+
std::move(current_schema), content);
389384
default:
390385
return NotSupported("Format version {} is not supported", format_version);
391386
}

src/iceberg/manifest/manifest_writer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "iceberg/file_writer.h"
3030
#include "iceberg/iceberg_export.h"
31+
#include "iceberg/manifest/manifest_list.h"
3132
#include "iceberg/metrics.h"
3233
#include "iceberg/result.h"
3334
#include "iceberg/type_fwd.h"
@@ -175,7 +176,7 @@ class ICEBERG_EXPORT ManifestWriter {
175176
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
176177
std::shared_ptr<PartitionSpec> partition_spec,
177178
std::shared_ptr<Schema> current_schema,
178-
std::optional<ManifestContent> content = std::nullopt,
179+
ManifestContent content = ManifestContent::kData,
179180
std::optional<int64_t> first_row_id = std::nullopt);
180181

181182
private:

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ iceberg_sources = files(
103103
'transform_function.cc',
104104
'type.cc',
105105
'update/pending_update.cc',
106+
'update/snapshot_update.cc',
106107
'update/update_partition_spec.cc',
107108
'update/update_properties.cc',
108109
'update/update_schema.cc',

0 commit comments

Comments
 (0)