Skip to content

Commit a29355d

Browse files
authored
feat: add snapshot update (#408)
1 parent 2bd493c commit a29355d

37 files changed

+1362
-97
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ set(ICEBERG_SOURCES
8282
transform_function.cc
8383
type.cc
8484
update/pending_update.cc
85+
update/snapshot_update.cc
8586
update/update_partition_spec.cc
8687
update/update_properties.cc
8788
update/update_schema.cc

src/iceberg/avro/avro_schema_util.cc

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
* under the License.
1818
*/
1919

20-
#include <charconv>
2120
#include <format>
2221
#include <mutex>
2322
#include <sstream>
@@ -40,6 +39,7 @@
4039
#include "iceberg/schema_util_internal.h"
4140
#include "iceberg/util/formatter.h"
4241
#include "iceberg/util/macros.h"
42+
#include "iceberg/util/string_util.h"
4343
#include "iceberg/util/visit_type.h"
4444

4545
namespace iceberg::avro {
@@ -471,13 +471,7 @@ Result<int32_t> GetId(const ::avro::NodePtr& node, const std::string& attr_name,
471471
return InvalidSchema("Missing avro attribute: {}", attr_name);
472472
}
473473

474-
int32_t id;
475-
const auto& id_value = id_str.value();
476-
auto [_, ec] = std::from_chars(id_value.data(), id_value.data() + id_value.size(), id);
477-
if (ec != std::errc()) {
478-
return InvalidSchema("Invalid {}: {}", attr_name, id_value);
479-
}
480-
return id;
474+
return StringUtils::ParseInt<int32_t>(id_str.value());
481475
}
482476

483477
Result<int32_t> GetElementId(const ::avro::NodePtr& node) {

src/iceberg/json_internal.cc

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
399399
json[kTimestampMs] = UnixMsFromTimePointMs(snapshot.timestamp_ms);
400400
json[kManifestList] = snapshot.manifest_list;
401401
// If there is an operation, write the summary map
402-
if (snapshot.operation().has_value()) {
402+
if (snapshot.Operation().has_value()) {
403403
json[kSummary] = snapshot.summary;
404404
}
405405
SetOptionalField(json, kSchemaId, snapshot.schema_id);
@@ -1553,9 +1553,17 @@ Result<std::unique_ptr<TableUpdate>> TableUpdateFromJson(const nlohmann::json& j
15531553
GetJsonValueOptional<int64_t>(json, kMaxSnapshotAgeMs));
15541554
ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age,
15551555
GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
1556-
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), snapshot_id, type,
1557-
min_snapshots, max_snapshot_age,
1558-
max_ref_age);
1556+
if (type == SnapshotRefType::kTag) {
1557+
ICEBERG_ASSIGN_OR_RAISE(auto tag, SnapshotRef::MakeTag(snapshot_id, max_ref_age));
1558+
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), *tag);
1559+
} else {
1560+
ICEBERG_CHECK(type == SnapshotRefType::kBranch,
1561+
"Expected branch type for snapshot ref");
1562+
ICEBERG_ASSIGN_OR_RAISE(auto branch,
1563+
SnapshotRef::MakeBranch(snapshot_id, min_snapshots,
1564+
max_snapshot_age, max_ref_age));
1565+
return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), *branch);
1566+
}
15591567
}
15601568
if (action == kActionSetProperties) {
15611569
using StringMap = std::unordered_map<std::string, std::string>;

src/iceberg/manifest/manifest_writer.cc

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -369,23 +369,18 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeWriter(
369369
int8_t format_version, std::optional<int64_t> snapshot_id,
370370
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
371371
std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> current_schema,
372-
std::optional<ManifestContent> content, std::optional<int64_t> first_row_id) {
372+
ManifestContent content, std::optional<int64_t> first_row_id) {
373373
switch (format_version) {
374374
case 1:
375375
return MakeV1Writer(snapshot_id, manifest_location, std::move(file_io),
376376
std::move(partition_spec), std::move(current_schema));
377377
case 2:
378-
ICEBERG_PRECHECK(content.has_value(),
379-
"ManifestContent is required for format version 2");
380378
return MakeV2Writer(snapshot_id, manifest_location, std::move(file_io),
381-
std::move(partition_spec), std::move(current_schema),
382-
content.value());
379+
std::move(partition_spec), std::move(current_schema), content);
383380
case 3:
384-
ICEBERG_PRECHECK(content.has_value(),
385-
"ManifestContent is required for format version 3");
386381
return MakeV3Writer(snapshot_id, first_row_id, manifest_location,
387382
std::move(file_io), std::move(partition_spec),
388-
std::move(current_schema), content.value());
383+
std::move(current_schema), content);
389384
default:
390385
return NotSupported("Format version {} is not supported", format_version);
391386
}

src/iceberg/manifest/manifest_writer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "iceberg/file_writer.h"
3030
#include "iceberg/iceberg_export.h"
31+
#include "iceberg/manifest/manifest_list.h"
3132
#include "iceberg/metrics.h"
3233
#include "iceberg/result.h"
3334
#include "iceberg/type_fwd.h"
@@ -175,7 +176,7 @@ class ICEBERG_EXPORT ManifestWriter {
175176
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
176177
std::shared_ptr<PartitionSpec> partition_spec,
177178
std::shared_ptr<Schema> current_schema,
178-
std::optional<ManifestContent> content = std::nullopt,
179+
ManifestContent content = ManifestContent::kData,
179180
std::optional<int64_t> first_row_id = std::nullopt);
180181

181182
private:

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ iceberg_sources = files(
103103
'transform_function.cc',
104104
'type.cc',
105105
'update/pending_update.cc',
106+
'update/snapshot_update.cc',
106107
'update/update_partition_spec.cc',
107108
'update/update_properties.cc',
108109
'update/update_schema.cc',

src/iceberg/snapshot.cc

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919

2020
#include "iceberg/snapshot.h"
2121

22+
#include <memory>
23+
2224
#include "iceberg/file_io.h"
2325
#include "iceberg/manifest/manifest_list.h"
2426
#include "iceberg/manifest/manifest_reader.h"
2527
#include "iceberg/util/macros.h"
28+
#include "iceberg/util/string_util.h"
2629

2730
namespace iceberg {
2831

@@ -49,6 +52,55 @@ SnapshotRefType SnapshotRef::type() const noexcept {
4952
retention);
5053
}
5154

55+
Status SnapshotRef::Validate() const {
56+
if (type() == SnapshotRefType::kBranch) {
57+
const auto& branch = std::get<Branch>(this->retention);
58+
ICEBERG_CHECK(!branch.min_snapshots_to_keep.has_value() ||
59+
branch.min_snapshots_to_keep.value() > 0,
60+
"Min snapshots to keep must be greater than 0");
61+
ICEBERG_CHECK(
62+
!branch.max_snapshot_age_ms.has_value() || branch.max_snapshot_age_ms.value() > 0,
63+
"Max snapshot age must be greater than 0 ms");
64+
ICEBERG_CHECK(!branch.max_ref_age_ms.has_value() || branch.max_ref_age_ms.value() > 0,
65+
"Max reference age must be greater than 0");
66+
} else {
67+
const auto& tag = std::get<Tag>(this->retention);
68+
ICEBERG_CHECK(!tag.max_ref_age_ms.has_value() || tag.max_ref_age_ms.value() > 0,
69+
"Max reference age must be greater than 0");
70+
}
71+
return {};
72+
}
73+
74+
Result<std::unique_ptr<SnapshotRef>> SnapshotRef::MakeBranch(
75+
int64_t snapshot_id, std::optional<int32_t> min_snapshots_to_keep,
76+
std::optional<int64_t> max_snapshot_age_ms, std::optional<int64_t> max_ref_age_ms) {
77+
auto ref = std::make_unique<SnapshotRef>(
78+
SnapshotRef{.snapshot_id = snapshot_id,
79+
.retention = Branch{
80+
.min_snapshots_to_keep = min_snapshots_to_keep,
81+
.max_snapshot_age_ms = max_snapshot_age_ms,
82+
.max_ref_age_ms = max_ref_age_ms,
83+
}});
84+
ICEBERG_RETURN_UNEXPECTED(ref->Validate());
85+
return ref;
86+
}
87+
88+
Result<std::unique_ptr<SnapshotRef>> SnapshotRef::MakeTag(
89+
int64_t snapshot_id, std::optional<int64_t> max_ref_age_ms) {
90+
auto ref = std::make_unique<SnapshotRef>(SnapshotRef{
91+
.snapshot_id = snapshot_id, .retention = Tag{.max_ref_age_ms = max_ref_age_ms}});
92+
ICEBERG_RETURN_UNEXPECTED(ref->Validate());
93+
return ref;
94+
}
95+
96+
std::unique_ptr<SnapshotRef> SnapshotRef::Clone(
97+
std::optional<int64_t> new_snapshot_id) const {
98+
auto ref = std::make_unique<SnapshotRef>();
99+
ref->snapshot_id = new_snapshot_id.value_or(snapshot_id);
100+
ref->retention = retention;
101+
return ref;
102+
}
103+
52104
bool SnapshotRef::Equals(const SnapshotRef& other) const {
53105
if (this == &other) {
54106
return true;
@@ -67,14 +119,32 @@ bool SnapshotRef::Equals(const SnapshotRef& other) const {
67119
}
68120
}
69121

70-
std::optional<std::string_view> Snapshot::operation() const {
122+
std::optional<std::string_view> Snapshot::Operation() const {
71123
auto it = summary.find(SnapshotSummaryFields::kOperation);
72124
if (it != summary.end()) {
73125
return it->second;
74126
}
75127
return std::nullopt;
76128
}
77129

130+
Result<std::optional<int64_t>> Snapshot::FirstRowId() const {
131+
auto it = summary.find(SnapshotSummaryFields::kFirstRowId);
132+
if (it == summary.end()) {
133+
return std::nullopt;
134+
}
135+
136+
return StringUtils::ParseInt<int64_t>(it->second);
137+
}
138+
139+
Result<std::optional<int64_t>> Snapshot::AddedRows() const {
140+
auto it = summary.find(SnapshotSummaryFields::kAddedRows);
141+
if (it == summary.end()) {
142+
return std::nullopt;
143+
}
144+
145+
return StringUtils::ParseInt<int64_t>(it->second);
146+
}
147+
78148
bool Snapshot::Equals(const Snapshot& other) const {
79149
if (this == &other) {
80150
return true;
@@ -85,6 +155,37 @@ bool Snapshot::Equals(const Snapshot& other) const {
85155
schema_id == other.schema_id;
86156
}
87157

158+
Result<std::unique_ptr<Snapshot>> Snapshot::Make(
159+
int64_t sequence_number, int64_t snapshot_id,
160+
std::optional<int64_t> parent_snapshot_id, TimePointMs timestamp_ms,
161+
std::string operation, std::unordered_map<std::string, std::string> summary,
162+
std::optional<int32_t> schema_id, std::string manifest_list,
163+
std::optional<int64_t> first_row_id, std::optional<int64_t> added_rows) {
164+
ICEBERG_PRECHECK(!operation.empty(), "Operation cannot be empty");
165+
ICEBERG_PRECHECK(!first_row_id.has_value() || first_row_id.value() >= 0,
166+
"Invalid first-row-id (cannot be negative): {}", first_row_id.value());
167+
ICEBERG_PRECHECK(!added_rows.has_value() || added_rows.value() >= 0,
168+
"Invalid added-rows (cannot be negative): {}", added_rows.value());
169+
ICEBERG_PRECHECK(!first_row_id.has_value() || added_rows.has_value(),
170+
"Missing added-rows when first-row-id is set");
171+
summary[SnapshotSummaryFields::kOperation] = operation;
172+
if (first_row_id.has_value()) {
173+
summary[SnapshotSummaryFields::kFirstRowId] = std::to_string(first_row_id.value());
174+
}
175+
if (added_rows.has_value()) {
176+
summary[SnapshotSummaryFields::kAddedRows] = std::to_string(added_rows.value());
177+
}
178+
return std::make_unique<Snapshot>(Snapshot{
179+
.snapshot_id = snapshot_id,
180+
.parent_snapshot_id = parent_snapshot_id,
181+
.sequence_number = sequence_number,
182+
.timestamp_ms = timestamp_ms,
183+
.manifest_list = std::move(manifest_list),
184+
.summary = std::move(summary),
185+
.schema_id = schema_id,
186+
});
187+
}
188+
88189
Result<SnapshotCache::ManifestsCache> SnapshotCache::InitManifestsCache(
89190
const Snapshot* snapshot, std::shared_ptr<FileIO> file_io) {
90191
if (file_io == nullptr) {

src/iceberg/snapshot.h

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <string>
2626
#include <string_view>
2727
#include <unordered_map>
28-
#include <utility>
2928
#include <variant>
3029

3130
#include "iceberg/iceberg_export.h"
@@ -114,6 +113,39 @@ struct ICEBERG_EXPORT SnapshotRef {
114113

115114
SnapshotRefType type() const noexcept;
116115

116+
/// \brief Create a branch reference
117+
///
118+
/// \param snapshot_id The snapshot ID for the branch
119+
/// \param min_snapshots_to_keep Optional minimum number of snapshots to keep
120+
/// \param max_snapshot_age_ms Optional maximum snapshot age in milliseconds
121+
/// \param max_ref_age_ms Optional maximum reference age in milliseconds
122+
/// \return A Result containing a unique_ptr to the SnapshotRef, or an error if
123+
/// validation failed
124+
static Result<std::unique_ptr<SnapshotRef>> MakeBranch(
125+
int64_t snapshot_id, std::optional<int32_t> min_snapshots_to_keep = std::nullopt,
126+
std::optional<int64_t> max_snapshot_age_ms = std::nullopt,
127+
std::optional<int64_t> max_ref_age_ms = std::nullopt);
128+
129+
/// \brief Create a tag reference
130+
///
131+
/// \param snapshot_id The snapshot ID for the tag
132+
/// \param max_ref_age_ms Optional maximum reference age in milliseconds
133+
/// \return A Result containing a unique_ptr to the SnapshotRef, or an error if
134+
/// validation failed
135+
static Result<std::unique_ptr<SnapshotRef>> MakeTag(
136+
int64_t snapshot_id, std::optional<int64_t> max_ref_age_ms = std::nullopt);
137+
138+
/// \brief Clone this SnapshotRef with an optional new snapshot ID
139+
///
140+
/// \param new_snapshot_id Optional new snapshot ID. If not provided, uses the current
141+
/// snapshot_id
142+
/// \return A unique_ptr to the cloned SnapshotRef
143+
std::unique_ptr<SnapshotRef> Clone(
144+
std::optional<int64_t> new_snapshot_id = std::nullopt) const;
145+
146+
/// \brief Validate the SnapshotRef
147+
Status Validate() const;
148+
117149
/// \brief Compare two snapshot refs for equality
118150
friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) {
119151
return lhs.Equals(rhs);
@@ -125,9 +157,13 @@ struct ICEBERG_EXPORT SnapshotRef {
125157
};
126158

127159
/// \brief Optional Snapshot Summary Fields
128-
struct SnapshotSummaryFields {
160+
struct ICEBERG_EXPORT SnapshotSummaryFields {
129161
/// \brief The operation field key
130162
inline static const std::string kOperation = "operation";
163+
/// \brief The first row id field key
164+
inline static const std::string kFirstRowId = "first-row-id";
165+
/// \brief The added rows field key
166+
inline static const std::string kAddedRows = "added-rows";
131167

132168
/// Metrics, see https://iceberg.apache.org/spec/#metrics
133169

@@ -246,12 +282,44 @@ struct ICEBERG_EXPORT Snapshot {
246282
/// ID of the table's current schema when the snapshot was created.
247283
std::optional<int32_t> schema_id;
248284

285+
/// \brief Create a new Snapshot instance with validation on the inputs.
286+
static Result<std::unique_ptr<Snapshot>> Make(
287+
int64_t sequence_number, int64_t snapshot_id,
288+
std::optional<int64_t> parent_snapshot_id, TimePointMs timestamp_ms,
289+
std::string operation, std::unordered_map<std::string, std::string> summary,
290+
std::optional<int32_t> schema_id, std::string manifest_list,
291+
std::optional<int64_t> first_row_id = std::nullopt,
292+
std::optional<int64_t> added_rows = std::nullopt);
293+
249294
/// \brief Return the name of the DataOperations data operation that produced this
250295
/// snapshot.
251296
///
252297
/// \return the operation that produced this snapshot, or nullopt if the operation is
253298
/// unknown.
254-
std::optional<std::string_view> operation() const;
299+
std::optional<std::string_view> Operation() const;
300+
301+
/// \brief The row-id of the first newly added row in this snapshot.
302+
///
303+
/// All rows added in this snapshot will have a row-id assigned to them greater than
304+
/// this value. All rows with a row-id less than this value were created in a snapshot
305+
/// that was added to the table (but not necessarily committed to this branch) in the
306+
/// past.
307+
///
308+
/// \return the first row-id to be used in this snapshot or nullopt when row lineage
309+
/// is not supported
310+
Result<std::optional<int64_t>> FirstRowId() const;
311+
312+
/// \brief The upper bound of number of rows with assigned row IDs in this snapshot.
313+
///
314+
/// It can be used safely to increment the table's `next-row-id` during a commit. It
315+
/// can be more than the number of rows added in this snapshot and include some
316+
/// existing rows.
317+
///
318+
/// This field is optional but is required when the table version supports row lineage.
319+
///
320+
/// \return the upper bound of number of rows with assigned row IDs in this snapshot
321+
/// or nullopt if the value was not stored.
322+
Result<std::optional<int64_t>> AddedRows() const;
255323

256324
/// \brief Compare two snapshots for equality.
257325
friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) {

0 commit comments

Comments
 (0)