Skip to content

Commit d5d0e20

Browse files
committed
feat: add snapshot update
1 parent 7a0eafb commit d5d0e20

21 files changed

+1136
-16
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ set(ICEBERG_SOURCES
8181
transform_function.cc
8282
type.cc
8383
update/pending_update.cc
84+
update/snapshot_update.cc
8485
update/update_partition_spec.cc
8586
update/update_properties.cc
8687
update/update_schema.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ iceberg_sources = files(
102102
'transform_function.cc',
103103
'type.cc',
104104
'update/pending_update.cc',
105+
'update/snapshot_update.cc',
105106
'update/update_partition_spec.cc',
106107
'update/update_properties.cc',
107108
'update/update_schema.cc',

src/iceberg/snapshot.cc

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919

2020
#include "iceberg/snapshot.h"
2121

22+
#include <charconv>
23+
2224
#include "iceberg/file_io.h"
2325
#include "iceberg/manifest/manifest_list.h"
2426
#include "iceberg/manifest/manifest_reader.h"
2527
#include "iceberg/util/macros.h"
28+
#include "iceberg/util/string_util.h"
2629

2730
namespace iceberg {
2831

@@ -75,6 +78,24 @@ std::optional<std::string_view> Snapshot::operation() const {
7578
return std::nullopt;
7679
}
7780

81+
std::optional<int64_t> Snapshot::FirstRowId() const {
82+
auto it = summary.find("first-row-id");
83+
if (it == summary.end()) {
84+
return std::nullopt;
85+
}
86+
87+
return StringUtils::ParseInt<int64_t>(it->second);
88+
}
89+
90+
std::optional<int64_t> Snapshot::AddedRows() const {
91+
auto it = summary.find("added-rows");
92+
if (it == summary.end()) {
93+
return std::nullopt;
94+
}
95+
96+
return StringUtils::ParseInt<int64_t>(it->second);
97+
}
98+
7899
bool Snapshot::Equals(const Snapshot& other) const {
79100
if (this == &other) {
80101
return true;
@@ -141,4 +162,103 @@ Result<std::span<ManifestFile>> SnapshotCache::DeleteManifests(
141162
return std::span<ManifestFile>(cache.first.data() + delete_start, delete_count);
142163
}
143164

165+
// SnapshotRef::Builder implementation
166+
167+
SnapshotRef::Builder::Builder(SnapshotRefType type, int64_t snapshot_id)
168+
: type_(type), snapshot_id_(snapshot_id) {}
169+
170+
SnapshotRef::Builder SnapshotRef::Builder::TagBuilder(int64_t snapshot_id) {
171+
return Builder(SnapshotRefType::kTag, snapshot_id);
172+
}
173+
174+
SnapshotRef::Builder SnapshotRef::Builder::BranchBuilder(int64_t snapshot_id) {
175+
return Builder(SnapshotRefType::kBranch, snapshot_id);
176+
}
177+
178+
SnapshotRef::Builder SnapshotRef::Builder::BuilderFor(int64_t snapshot_id,
179+
SnapshotRefType type) {
180+
return Builder(type, snapshot_id);
181+
}
182+
183+
SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref) {
184+
Builder builder(ref.type(), ref.snapshot_id);
185+
if (ref.type() == SnapshotRefType::kBranch) {
186+
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
187+
builder.min_snapshots_to_keep_ = branch.min_snapshots_to_keep;
188+
builder.max_snapshot_age_ms_ = branch.max_snapshot_age_ms;
189+
builder.max_ref_age_ms_ = branch.max_ref_age_ms;
190+
} else {
191+
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
192+
builder.max_ref_age_ms_ = tag.max_ref_age_ms;
193+
}
194+
return builder;
195+
}
196+
197+
SnapshotRef::Builder SnapshotRef::Builder::BuilderFrom(const SnapshotRef& ref,
198+
int64_t snapshot_id) {
199+
Builder builder(ref.type(), snapshot_id);
200+
if (ref.type() == SnapshotRefType::kBranch) {
201+
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
202+
builder.min_snapshots_to_keep_ = branch.min_snapshots_to_keep;
203+
builder.max_snapshot_age_ms_ = branch.max_snapshot_age_ms;
204+
builder.max_ref_age_ms_ = branch.max_ref_age_ms;
205+
} else {
206+
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
207+
builder.max_ref_age_ms_ = tag.max_ref_age_ms;
208+
}
209+
return builder;
210+
}
211+
212+
SnapshotRef::Builder& SnapshotRef::Builder::MinSnapshotsToKeep(
213+
std::optional<int32_t> value) {
214+
if (type_ == SnapshotRefType::kTag && value.has_value()) {
215+
return AddError(ErrorKind::kInvalidArgument,
216+
"Tags do not support setting minSnapshotsToKeep");
217+
}
218+
if (value.has_value() && value.value() <= 0) {
219+
return AddError(ErrorKind::kInvalidArgument,
220+
"Min snapshots to keep must be greater than 0");
221+
}
222+
min_snapshots_to_keep_ = value;
223+
return *this;
224+
}
225+
226+
SnapshotRef::Builder& SnapshotRef::Builder::MaxSnapshotAgeMs(
227+
std::optional<int64_t> value) {
228+
if (type_ == SnapshotRefType::kTag && value.has_value()) {
229+
return AddError(ErrorKind::kInvalidArgument,
230+
"Tags do not support setting maxSnapshotAgeMs");
231+
}
232+
if (value.has_value() && value.value() <= 0) {
233+
return AddError(ErrorKind::kInvalidArgument,
234+
"Max snapshot age must be greater than 0 ms");
235+
}
236+
max_snapshot_age_ms_ = value;
237+
return *this;
238+
}
239+
240+
SnapshotRef::Builder& SnapshotRef::Builder::MaxRefAgeMs(std::optional<int64_t> value) {
241+
if (value.has_value() && value.value() <= 0) {
242+
return AddError(ErrorKind::kInvalidArgument,
243+
"Max reference age must be greater than 0");
244+
}
245+
max_ref_age_ms_ = value;
246+
return *this;
247+
}
248+
249+
Result<SnapshotRef> SnapshotRef::Builder::Build() const {
250+
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
251+
252+
if (type_ == SnapshotRefType::kBranch) {
253+
return SnapshotRef{
254+
.snapshot_id = snapshot_id_,
255+
.retention = SnapshotRef::Branch{.min_snapshots_to_keep = min_snapshots_to_keep_,
256+
.max_snapshot_age_ms = max_snapshot_age_ms_,
257+
.max_ref_age_ms = max_ref_age_ms_}};
258+
} else {
259+
return SnapshotRef{.snapshot_id = snapshot_id_,
260+
.retention = SnapshotRef::Tag{.max_ref_age_ms = max_ref_age_ms_}};
261+
}
262+
}
263+
144264
} // namespace iceberg

src/iceberg/snapshot.h

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "iceberg/manifest/manifest_list.h"
3333
#include "iceberg/result.h"
3434
#include "iceberg/type_fwd.h"
35+
#include "iceberg/util/error_collector.h"
3536
#include "iceberg/util/lazy.h"
3637
#include "iceberg/util/timepoint.h"
3738

@@ -119,6 +120,67 @@ struct ICEBERG_EXPORT SnapshotRef {
119120
return lhs.Equals(rhs);
120121
}
121122

123+
/// \brief Builder class for constructing SnapshotRef objects
124+
class ICEBERG_EXPORT Builder : public ErrorCollector {
125+
public:
126+
/// \brief Create a builder for a tag reference
127+
/// \param snapshot_id The snapshot ID for the tag
128+
/// \return A new Builder instance for a tag
129+
static Builder TagBuilder(int64_t snapshot_id);
130+
131+
/// \brief Create a builder for a branch reference
132+
/// \param snapshot_id The snapshot ID for the branch
133+
/// \return A new Builder instance for a branch
134+
static Builder BranchBuilder(int64_t snapshot_id);
135+
136+
/// \brief Create a builder from an existing SnapshotRef
137+
/// \param ref The existing reference to copy properties from
138+
/// \return A new Builder instance with properties from the existing ref
139+
static Builder BuilderFrom(const SnapshotRef& ref);
140+
141+
/// \brief Create a builder from an existing SnapshotRef with a new snapshot ID
142+
/// \param ref The existing reference to copy properties from
143+
/// \param snapshot_id The new snapshot ID to use
144+
/// \return A new Builder instance with properties from the existing ref but new
145+
/// snapshot ID
146+
static Builder BuilderFrom(const SnapshotRef& ref, int64_t snapshot_id);
147+
148+
/// \brief Create a builder for a specific type
149+
/// \param snapshot_id The snapshot ID
150+
/// \param type The type of reference (branch or tag)
151+
/// \return A new Builder instance
152+
static Builder BuilderFor(int64_t snapshot_id, SnapshotRefType type);
153+
154+
/// \brief Set the minimum number of snapshots to keep (branch only)
155+
/// \param value The minimum number of snapshots to keep, or nullopt for default
156+
/// \return Reference to this builder for method chaining
157+
Builder& MinSnapshotsToKeep(std::optional<int32_t> value);
158+
159+
/// \brief Set the maximum snapshot age in milliseconds (branch only)
160+
/// \param value The maximum snapshot age in milliseconds, or nullopt for default
161+
/// \return Reference to this builder for method chaining
162+
Builder& MaxSnapshotAgeMs(std::optional<int64_t> value);
163+
164+
/// \brief Set the maximum reference age in milliseconds
165+
/// \param value The maximum reference age in milliseconds, or nullopt for default
166+
/// \return Reference to this builder for method chaining
167+
Builder& MaxRefAgeMs(std::optional<int64_t> value);
168+
169+
/// \brief Build the SnapshotRef
170+
/// \return A Result containing the SnapshotRef instance, or an error if validation
171+
/// failed
172+
Result<SnapshotRef> Build() const;
173+
174+
private:
175+
explicit Builder(SnapshotRefType type, int64_t snapshot_id);
176+
177+
SnapshotRefType type_;
178+
int64_t snapshot_id_;
179+
std::optional<int32_t> min_snapshots_to_keep_;
180+
std::optional<int64_t> max_snapshot_age_ms_;
181+
std::optional<int64_t> max_ref_age_ms_;
182+
};
183+
122184
private:
123185
/// \brief Compare two snapshot refs for equality.
124186
bool Equals(const SnapshotRef& other) const;
@@ -253,6 +315,29 @@ struct ICEBERG_EXPORT Snapshot {
253315
/// unknown.
254316
std::optional<std::string_view> operation() const;
255317

318+
/// \brief The row-id of the first newly added row in this snapshot.
319+
///
320+
/// All rows added in this snapshot will have a row-id assigned to them greater than
321+
/// this value. All rows with a row-id less than this value were created in a snapshot
322+
/// that was added to the table (but not necessarily committed to this branch) in the
323+
/// past.
324+
///
325+
/// \return the first row-id to be used in this snapshot or nullopt when row lineage
326+
/// is not supported
327+
std::optional<int64_t> FirstRowId() const;
328+
329+
/// \brief The upper bound of number of rows with assigned row IDs in this snapshot.
330+
///
331+
/// It can be used safely to increment the table's `next-row-id` during a commit. It
332+
/// can be more than the number of rows added in this snapshot and include some
333+
/// existing rows.
334+
///
335+
/// This field is optional but is required when the table version supports row lineage.
336+
///
337+
/// \return the upper bound of number of rows with assigned row IDs in this snapshot
338+
/// or nullopt if the value was not stored.
339+
std::optional<int64_t> AddedRows() const;
340+
256341
/// \brief Compare two snapshots for equality.
257342
friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) {
258343
return lhs.Equals(rhs);

src/iceberg/table.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
5050

5151
virtual ~Table();
5252

53-
/// \brief Return the identifier of this table
53+
/// \brief Returns the identifier of this table
5454
const TableIdentifier& name() const { return identifier_; }
5555

5656
/// \brief Returns the UUID of the table
@@ -59,40 +59,40 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
5959
/// \brief Return the schema for this table, return NotFoundError if not found
6060
Result<std::shared_ptr<Schema>> schema() const;
6161

62-
/// \brief Return a map of schema for this table
62+
/// \brief Returns a map of schema for this table
6363
Result<
6464
std::reference_wrapper<const std::unordered_map<int32_t, std::shared_ptr<Schema>>>>
6565
schemas() const;
6666

67-
/// \brief Return the partition spec for this table, return NotFoundError if not found
67+
/// \brief Returns the partition spec for this table, return NotFoundError if not found
6868
Result<std::shared_ptr<PartitionSpec>> spec() const;
6969

70-
/// \brief Return a map of partition specs for this table
70+
/// \brief Returns a map of partition specs for this table
7171
Result<std::reference_wrapper<
7272
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>>
7373
specs() const;
7474

75-
/// \brief Return the sort order for this table, return NotFoundError if not found
75+
/// \brief Returns the sort order for this table, return NotFoundError if not found
7676
Result<std::shared_ptr<SortOrder>> sort_order() const;
7777

78-
/// \brief Return a map of sort order IDs to sort orders for this table
78+
/// \brief Returns a map of sort order IDs to sort orders for this table
7979
Result<std::reference_wrapper<
8080
const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>>
8181
sort_orders() const;
8282

83-
/// \brief Return a map of string properties for this table
83+
/// \brief Returns the properties of this table
8484
const TableProperties& properties() const;
8585

86-
/// \brief Return the table's metadata file location
86+
/// \brief Returns the table's metadata file location
8787
std::string_view metadata_file_location() const;
8888

89-
/// \brief Return the table's base location
89+
/// \brief Returns the table's base location
9090
std::string_view location() const;
9191

9292
/// \brief Returns the time when this table was last updated
9393
TimePointMs last_updated_ms() const;
9494

95-
/// \brief Return the table's current snapshot, return NotFoundError if not found
95+
/// \brief Returns the table's current snapshot, return NotFoundError if not found
9696
Result<std::shared_ptr<Snapshot>> current_snapshot() const;
9797

9898
/// \brief Get the snapshot of this table with the given id

0 commit comments

Comments
 (0)