Skip to content

Commit d920675

Browse files
committed
ver1
1 parent 3c5b00a commit d920675

File tree

9 files changed

+657
-3
lines changed

9 files changed

+657
-3
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ set(ICEBERG_SOURCES
8181
transform_function.cc
8282
type.cc
8383
update/pending_update.cc
84+
update/set_snapshot.cc
8485
update/update_partition_spec.cc
8586
update/update_properties.cc
8687
update/update_schema.cc

src/iceberg/table_metadata.cc

Lines changed: 150 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,8 @@ class TableMetadataBuilder::Impl {
591591
Status RemoveSchemas(const std::unordered_set<int32_t>& schema_ids);
592592
Result<int32_t> AddSchema(const Schema& schema, int32_t new_last_column_id);
593593
void SetLocation(std::string_view location);
594+
Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch);
595+
Status SetRef(const std::string& name, std::shared_ptr<SnapshotRef> ref);
594596

595597
Result<std::unique_ptr<TableMetadata>> Build();
596598

@@ -613,6 +615,23 @@ class TableMetadataBuilder::Impl {
613615
/// \return The ID to use for this schema (reused if exists, new otherwise
614616
int32_t ReuseOrCreateNewSchemaId(const Schema& new_schema) const;
615617

618+
/// \brief Internal method to set a branch snapshot
619+
/// \param snapshot The snapshot to set
620+
/// \param branch The branch name
621+
Status SetBranchSnapshotInternal(const Snapshot& snapshot, const std::string& branch);
622+
623+
/// \brief Helper to create a SnapshotRef from an existing ref with a new snapshot ID
624+
/// \param ref The existing SnapshotRef
625+
/// \param snapshot_id The new snapshot ID
626+
/// \return A new SnapshotRef with the same properties but different snapshot ID
627+
static std::shared_ptr<SnapshotRef> BuildRefFrom(const SnapshotRef& ref,
628+
int64_t snapshot_id);
629+
630+
/// \brief Helper to create a new branch SnapshotRef
631+
/// \param snapshot_id The snapshot ID for the branch
632+
/// \return A new branch SnapshotRef
633+
static std::shared_ptr<SnapshotRef> BuildBranchRef(int64_t snapshot_id);
634+
616635
private:
617636
// Base metadata (nullptr for new tables)
618637
const TableMetadata* base_;
@@ -1077,6 +1096,133 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
10771096
return new_schema_id;
10781097
}
10791098

1099+
std::shared_ptr<SnapshotRef> TableMetadataBuilder::Impl::BuildRefFrom(
1100+
const SnapshotRef& ref, int64_t snapshot_id) {
1101+
auto new_ref = std::make_shared<SnapshotRef>();
1102+
new_ref->snapshot_id = snapshot_id;
1103+
1104+
if (std::holds_alternative<SnapshotRef::Branch>(ref.retention)) {
1105+
const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
1106+
new_ref->retention = SnapshotRef::Branch{
1107+
.min_snapshots_to_keep = branch.min_snapshots_to_keep,
1108+
.max_snapshot_age_ms = branch.max_snapshot_age_ms,
1109+
.max_ref_age_ms = branch.max_ref_age_ms,
1110+
};
1111+
} else {
1112+
const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
1113+
new_ref->retention = SnapshotRef::Tag{
1114+
.max_ref_age_ms = tag.max_ref_age_ms,
1115+
};
1116+
}
1117+
1118+
return new_ref;
1119+
}
1120+
1121+
std::shared_ptr<SnapshotRef> TableMetadataBuilder::Impl::BuildBranchRef(
1122+
int64_t snapshot_id) {
1123+
auto new_ref = std::make_shared<SnapshotRef>();
1124+
new_ref->snapshot_id = snapshot_id;
1125+
new_ref->retention = SnapshotRef::Branch{};
1126+
return new_ref;
1127+
}
1128+
1129+
Status TableMetadataBuilder::Impl::SetBranchSnapshotInternal(const Snapshot& snapshot,
1130+
const std::string& branch) {
1131+
int64_t replacement_snapshot_id = snapshot.snapshot_id;
1132+
1133+
// Check if the ref already exists
1134+
auto ref_it = metadata_.refs.find(branch);
1135+
if (ref_it != metadata_.refs.end()) {
1136+
const auto& ref = ref_it->second;
1137+
1138+
// Verify that it's a branch, not a tag
1139+
if (ref->type() != SnapshotRefType::kBranch) {
1140+
return ValidationFailed("Cannot update branch: {} is a tag", branch);
1141+
}
1142+
1143+
// If the ref already points to the same snapshot, this is a no-op
1144+
if (ref->snapshot_id == replacement_snapshot_id) {
1145+
return {};
1146+
}
1147+
}
1148+
1149+
// For format version 2+, validate sequence number
1150+
if (metadata_.format_version > 1) {
1151+
if (snapshot.sequence_number > metadata_.last_sequence_number) {
1152+
return ValidationFailed(
1153+
"Last sequence number {} is less than existing snapshot sequence number {}",
1154+
metadata_.last_sequence_number, snapshot.sequence_number);
1155+
}
1156+
}
1157+
1158+
// Create the new ref
1159+
std::shared_ptr<SnapshotRef> new_ref;
1160+
if (ref_it != metadata_.refs.end()) {
1161+
// Build from existing ref with new snapshot ID
1162+
new_ref = BuildRefFrom(*ref_it->second, replacement_snapshot_id);
1163+
} else {
1164+
// Create a new branch ref
1165+
new_ref = BuildBranchRef(replacement_snapshot_id);
1166+
}
1167+
1168+
// Set the ref
1169+
return SetRef(branch, new_ref);
1170+
}
1171+
1172+
Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id,
1173+
const std::string& branch) {
1174+
// Check if this is a no-op
1175+
auto ref_it = metadata_.refs.find(branch);
1176+
if (ref_it != metadata_.refs.end() && ref_it->second->snapshot_id == snapshot_id) {
1177+
return {};
1178+
}
1179+
1180+
// Validate that the snapshot exists
1181+
auto snapshot_result = metadata_.SnapshotById(snapshot_id);
1182+
if (!snapshot_result.has_value()) {
1183+
return ValidationFailed("Cannot set {} to unknown snapshot: {}", branch, snapshot_id);
1184+
}
1185+
1186+
return SetBranchSnapshotInternal(*snapshot_result.value(), branch);
1187+
}
1188+
1189+
Status TableMetadataBuilder::Impl::SetRef(const std::string& name,
1190+
std::shared_ptr<SnapshotRef> ref) {
1191+
if (ref == nullptr) {
1192+
return InvalidArgument("Cannot set ref {} to null", name);
1193+
}
1194+
1195+
// Validate that the snapshot exists
1196+
auto snapshot_result = metadata_.SnapshotById(ref->snapshot_id);
1197+
if (!snapshot_result.has_value()) {
1198+
return ValidationFailed("Cannot set ref {} to unknown snapshot: {}", name,
1199+
ref->snapshot_id);
1200+
}
1201+
1202+
// Update the refs map
1203+
metadata_.refs[name] = ref;
1204+
1205+
// If setting the main branch, also update current_snapshot_id
1206+
if (name == std::string(SnapshotRef::kMainBranch)) {
1207+
metadata_.current_snapshot_id = ref->snapshot_id;
1208+
}
1209+
1210+
// Record the change
1211+
changes_.push_back(std::make_unique<table::SetSnapshotRef>(
1212+
name, ref->snapshot_id, ref->type(),
1213+
std::holds_alternative<SnapshotRef::Branch>(ref->retention)
1214+
? std::get<SnapshotRef::Branch>(ref->retention).min_snapshots_to_keep
1215+
: std::nullopt,
1216+
std::holds_alternative<SnapshotRef::Branch>(ref->retention)
1217+
? std::get<SnapshotRef::Branch>(ref->retention).max_snapshot_age_ms
1218+
: std::nullopt,
1219+
ref->type() == SnapshotRefType::kBranch
1220+
? std::get<SnapshotRef::Branch>(ref->retention).max_ref_age_ms
1221+
: std::get<SnapshotRef::Tag>(ref->retention).max_ref_age_ms));
1222+
1223+
return {};
1224+
}
1225+
10801226
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
10811227
: impl_(std::make_unique<Impl>(format_version)) {}
10821228

@@ -1212,12 +1358,14 @@ TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(
12121358

12131359
TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t snapshot_id,
12141360
const std::string& branch) {
1215-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1361+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetBranchSnapshot(snapshot_id, branch));
1362+
return *this;
12161363
}
12171364

12181365
TableMetadataBuilder& TableMetadataBuilder::SetRef(const std::string& name,
12191366
std::shared_ptr<SnapshotRef> ref) {
1220-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1367+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetRef(name, ref));
1368+
return *this;
12211369
}
12221370

12231371
TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name) {

src/iceberg/table_update.cc

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,23 @@ std::unique_ptr<TableUpdate> RemoveSnapshotRef::Clone() const {
344344
// SetSnapshotRef
345345

346346
void SetSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
347-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
347+
// Create a SnapshotRef based on the type
348+
auto ref = std::make_shared<SnapshotRef>();
349+
ref->snapshot_id = snapshot_id_;
350+
351+
if (type_ == SnapshotRefType::kBranch) {
352+
ref->retention = SnapshotRef::Branch{
353+
.min_snapshots_to_keep = min_snapshots_to_keep_,
354+
.max_snapshot_age_ms = max_snapshot_age_ms_,
355+
.max_ref_age_ms = max_ref_age_ms_,
356+
};
357+
} else {
358+
ref->retention = SnapshotRef::Tag{
359+
.max_ref_age_ms = max_ref_age_ms_,
360+
};
361+
}
362+
363+
builder.SetRef(ref_name_, ref);
348364
}
349365

350366
void SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ if(ICEBERG_BUILD_BUNDLE)
167167
add_iceberg_test(table_update_test
168168
USE_BUNDLE
169169
SOURCES
170+
set_snapshot_test.cc
170171
transaction_test.cc
171172
update_partition_spec_test.cc
172173
update_properties_test.cc

0 commit comments

Comments
 (0)