@@ -1085,9 +1085,51 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
10851085
10861086Status TableMetadataBuilder::Impl::SetRef (const std::string& name,
10871087 std::shared_ptr<SnapshotRef> ref) {
1088- ICEBERG_PRECHECK (!metadata_.refs .contains (name),
1089- " Cannot set ref: {}, which is already exist." , name);
1088+ // Check if the ref already exists and is equal to the new ref
1089+ auto existing_ref_it = metadata_.refs .find (name);
1090+ if (existing_ref_it != metadata_.refs .end () && *existing_ref_it->second == *ref) {
1091+ // No change needed
1092+ return {};
1093+ }
1094+
1095+ // Validate that the snapshot exists
1096+ int64_t snapshot_id = ref->snapshot_id ;
1097+ auto snapshot_it =
1098+ std::ranges::find_if (metadata_.snapshots , [snapshot_id](const auto & snapshot) {
1099+ return snapshot != nullptr && snapshot->snapshot_id == snapshot_id;
1100+ });
1101+ ICEBERG_PRECHECK (snapshot_it != metadata_.snapshots .end (),
1102+ " Cannot set {} to unknown snapshot: {}" , name, snapshot_id);
1103+
1104+ // Check if this is an added snapshot (in the current set of changes)
1105+ bool is_added_snapshot =
1106+ std::ranges::any_of (changes_, [snapshot_id](const auto & change) {
1107+ return change->kind () == TableUpdate::Kind::kAddSnapshot &&
1108+ internal::checked_cast<const table::AddSnapshot&>(*change)
1109+ .snapshot ()
1110+ ->snapshot_id == snapshot_id;
1111+ });
1112+
1113+ if (is_added_snapshot) {
1114+ metadata_.last_updated_ms = (*snapshot_it)->timestamp_ms ;
1115+ }
1116+
1117+ // Handle main branch specially
1118+ if (name == SnapshotRef::kMainBranch ) {
1119+ metadata_.current_snapshot_id = ref->snapshot_id ;
1120+ if (metadata_.last_updated_ms == kInvalidLastUpdatedMs ) {
1121+ metadata_.last_updated_ms =
1122+ TimePointMs{std::chrono::duration_cast<std::chrono::milliseconds>(
1123+ std::chrono::system_clock::now ().time_since_epoch ())};
1124+ }
1125+
1126+ metadata_.snapshot_log .emplace_back (metadata_.last_updated_ms , ref->snapshot_id );
1127+ }
1128+
1129+ // Update the refs map
10901130 metadata_.refs [name] = ref;
1131+
1132+ // Record the change
10911133 if (ref->type () == SnapshotRefType::kBranch ) {
10921134 auto retention = std::get<SnapshotRef::Branch>(ref->retention );
10931135 changes_.push_back (std::make_unique<table::SetSnapshotRef>(
@@ -1099,60 +1141,148 @@ Status TableMetadataBuilder::Impl::SetRef(const std::string& name,
10991141 name, ref->snapshot_id , ref->type (), std::nullopt , std::nullopt ,
11001142 retention.max_ref_age_ms ));
11011143 }
1144+
11021145 return {};
11031146}
11041147
11051148Status TableMetadataBuilder::Impl::RemoveRef (const std::string& name) {
1106- ICEBERG_PRECHECK (metadata_.refs .contains (name),
1107- " Cannot remove ref: {}, which is not exist." , name);
1149+ // Handle main branch specially
1150+ if (name == SnapshotRef::kMainBranch ) {
1151+ metadata_.current_snapshot_id = kInvalidSnapshotId ;
1152+ }
11081153
1109- metadata_.refs .erase (name);
1110- changes_.push_back (std::make_unique<table::RemoveSnapshotRef>(name));
1154+ // Remove the ref from the map
1155+ auto it = metadata_.refs .find (name);
1156+ if (it != metadata_.refs .end ()) {
1157+ metadata_.refs .erase (it);
1158+ changes_.push_back (std::make_unique<table::RemoveSnapshotRef>(name));
1159+ }
11111160
11121161 return {};
11131162}
11141163
11151164Status TableMetadataBuilder::Impl::AddSnapshot (std::shared_ptr<Snapshot> snapshot) {
1116- // TODO(xiao.dong) this is only for test, not official complete implementation
1117- metadata_.snapshots .emplace_back (std::move (snapshot));
1165+ if (snapshot == nullptr ) {
1166+ // No-op
1167+ return {};
1168+ }
1169+
1170+ // Validate preconditions
1171+ ICEBERG_PRECHECK (!metadata_.schemas .empty (),
1172+ " Attempting to add a snapshot before a schema is added" );
1173+ ICEBERG_PRECHECK (!metadata_.partition_specs .empty (),
1174+ " Attempting to add a snapshot before a partition spec is added" );
1175+ ICEBERG_PRECHECK (!metadata_.sort_orders .empty (),
1176+ " Attempting to add a snapshot before a sort order is added" );
1177+
1178+ // Check if snapshot already exists
1179+ int64_t snapshot_id = snapshot->snapshot_id ;
1180+ auto existing_snapshot =
1181+ std::ranges::find_if (metadata_.snapshots , [snapshot_id](const auto & s) {
1182+ return s != nullptr && s->snapshot_id == snapshot_id;
1183+ });
1184+ ICEBERG_PRECHECK (existing_snapshot == metadata_.snapshots .end (),
1185+ " Snapshot already exists for id: {}" , snapshot_id);
1186+
1187+ // Validate sequence number
1188+ ICEBERG_PRECHECK (
1189+ metadata_.format_version == 1 ||
1190+ snapshot->sequence_number > metadata_.last_sequence_number ||
1191+ !snapshot->parent_snapshot_id .has_value (),
1192+ " Cannot add snapshot with sequence number {} older than last sequence number {}" ,
1193+ snapshot->sequence_number , metadata_.last_sequence_number );
1194+
1195+ // Update metadata
1196+ metadata_.last_updated_ms = snapshot->timestamp_ms ;
1197+ metadata_.last_sequence_number = snapshot->sequence_number ;
1198+ metadata_.snapshots .push_back (snapshot);
1199+ changes_.push_back (std::make_unique<table::AddSnapshot>(snapshot));
1200+
1201+ // TODO(xiao.dong) Handle row lineage for format version >= 3
11181202 return {};
11191203}
11201204
11211205Status TableMetadataBuilder::Impl::RemoveSnapshots (
11221206 const std::vector<int64_t >& snapshot_ids) {
1123- auto current_snapshot_id = metadata_.current_snapshot_id ;
1207+ if (snapshot_ids.empty ()) {
1208+ return {};
1209+ }
1210+
11241211 std::unordered_set<int64_t > snapshot_ids_set (snapshot_ids.begin (), snapshot_ids.end ());
1125- ICEBERG_PRECHECK (!snapshot_ids_set.contains (current_snapshot_id),
1126- " Cannot remove current snapshot: {}" , current_snapshot_id);
11271212
1128- if (!snapshot_ids.empty ()) {
1129- metadata_.snapshots =
1130- metadata_.snapshots | std::views::filter ([&](const auto & snapshot) {
1131- return !snapshot_ids_set.contains (snapshot->snapshot_id );
1132- }) |
1133- std::ranges::to<std::vector<std::shared_ptr<iceberg::Snapshot>>>();
1134- changes_.push_back (std::make_unique<table::RemoveSnapshots>(snapshot_ids));
1213+ // Build a map of snapshot IDs for quick lookup
1214+ std::unordered_map<int64_t , std::shared_ptr<Snapshot>> snapshots_by_id;
1215+ for (const auto & snapshot : metadata_.snapshots ) {
1216+ if (snapshot) {
1217+ snapshots_by_id[snapshot->snapshot_id ] = snapshot;
1218+ }
1219+ }
1220+
1221+ // Filter snapshots to retain
1222+ std::vector<std::shared_ptr<Snapshot>> retained_snapshots;
1223+ retained_snapshots.reserve (metadata_.snapshots .size ());
1224+
1225+ for (const auto & snapshot : metadata_.snapshots ) {
1226+ if (!snapshot) continue ;
1227+
1228+ int64_t snapshot_id = snapshot->snapshot_id ;
1229+ if (snapshot_ids_set.contains (snapshot_id)) {
1230+ // Remove from the map
1231+ snapshots_by_id.erase (snapshot_id);
1232+ // Record the removal
1233+ changes_.push_back (
1234+ std::make_unique<table::RemoveSnapshots>(std::vector<int64_t >{snapshot_id}));
1235+ // Note: Statistics and partition statistics removal would be handled here
1236+ // if those features were implemented
1237+ } else {
1238+ retained_snapshots.push_back (snapshot);
1239+ }
1240+ }
1241+
1242+ metadata_.snapshots = std::move (retained_snapshots);
1243+
1244+ // Remove any refs that are no longer valid (dangling refs)
1245+ std::vector<std::string> dangling_refs;
1246+ for (const auto & [ref_name, ref] : metadata_.refs ) {
1247+ if (!snapshots_by_id.contains (ref->snapshot_id )) {
1248+ dangling_refs.push_back (ref_name);
1249+ }
1250+ }
1251+
1252+ for (const auto & ref_name : dangling_refs) {
1253+ ICEBERG_RETURN_UNEXPECTED (RemoveRef (ref_name));
11351254 }
11361255
11371256 return {};
11381257}
11391258
11401259Status TableMetadataBuilder::Impl::RemovePartitionSpecs (
11411260 const std::vector<int32_t >& spec_ids) {
1142- auto default_spec_id = metadata_.default_spec_id ;
1261+ if (spec_ids.empty ()) {
1262+ return {};
1263+ }
1264+
11431265 std::unordered_set<int32_t > spec_ids_set (spec_ids.begin (), spec_ids.end ());
1144- ICEBERG_PRECHECK (!spec_ids_set.contains (default_spec_id),
1145- " Cannot remove default spec: {}" , default_spec_id);
11461266
1147- if (!spec_ids.empty ()) {
1148- metadata_.partition_specs =
1149- metadata_.partition_specs | std::views::filter ([&](const auto & spec) {
1150- return !spec_ids_set.contains (spec->spec_id ());
1151- }) |
1152- std::ranges::to<std::vector<std::shared_ptr<iceberg::PartitionSpec>>>();
1153- changes_.push_back (std::make_unique<table::RemovePartitionSpecs>(spec_ids));
1267+ // Validate that we're not removing the default spec
1268+ ICEBERG_PRECHECK (!spec_ids_set.contains (metadata_.default_spec_id ),
1269+ " Cannot remove the default partition spec" );
1270+
1271+ // Filter partition specs to retain
1272+ metadata_.partition_specs =
1273+ metadata_.partition_specs | std::views::filter ([&](const auto & spec) {
1274+ return !spec_ids_set.contains (spec->spec_id ());
1275+ }) |
1276+ std::ranges::to<std::vector<std::shared_ptr<iceberg::PartitionSpec>>>();
1277+
1278+ // Update the specs_by_id_ index
1279+ for (int32_t spec_id : spec_ids) {
1280+ specs_by_id_.erase (spec_id);
11541281 }
11551282
1283+ // Record the change
1284+ changes_.push_back (std::make_unique<table::RemovePartitionSpecs>(spec_ids));
1285+
11561286 return {};
11571287}
11581288
0 commit comments