@@ -1342,9 +1342,51 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
13421342
13431343Status TableMetadataBuilder::Impl::SetRef (const std::string& name,
13441344 std::shared_ptr<SnapshotRef> ref) {
1345- ICEBERG_PRECHECK (!metadata_.refs .contains (name),
1346- " Cannot set ref: {}, which is already exist." , name);
1345+ // Check if the ref already exists and is equal to the new ref
1346+ auto existing_ref_it = metadata_.refs .find (name);
1347+ if (existing_ref_it != metadata_.refs .end () && *existing_ref_it->second == *ref) {
1348+ // No change needed
1349+ return {};
1350+ }
1351+
1352+ // Validate that the snapshot exists
1353+ int64_t snapshot_id = ref->snapshot_id ;
1354+ auto snapshot_it =
1355+ std::ranges::find_if (metadata_.snapshots , [snapshot_id](const auto & snapshot) {
1356+ return snapshot != nullptr && snapshot->snapshot_id == snapshot_id;
1357+ });
1358+ ICEBERG_PRECHECK (snapshot_it != metadata_.snapshots .end (),
1359+ " Cannot set {} to unknown snapshot: {}" , name, snapshot_id);
1360+
1361+ // Check if this is an added snapshot (in the current set of changes)
1362+ bool is_added_snapshot =
1363+ std::ranges::any_of (changes_, [snapshot_id](const auto & change) {
1364+ return change->kind () == TableUpdate::Kind::kAddSnapshot &&
1365+ internal::checked_cast<const table::AddSnapshot&>(*change)
1366+ .snapshot ()
1367+ ->snapshot_id == snapshot_id;
1368+ });
1369+
1370+ if (is_added_snapshot) {
1371+ metadata_.last_updated_ms = (*snapshot_it)->timestamp_ms ;
1372+ }
1373+
1374+ // Handle main branch specially
1375+ if (name == SnapshotRef::kMainBranch ) {
1376+ metadata_.current_snapshot_id = ref->snapshot_id ;
1377+ if (metadata_.last_updated_ms == kInvalidLastUpdatedMs ) {
1378+ metadata_.last_updated_ms =
1379+ TimePointMs{std::chrono::duration_cast<std::chrono::milliseconds>(
1380+ std::chrono::system_clock::now ().time_since_epoch ())};
1381+ }
1382+
1383+ metadata_.snapshot_log .emplace_back (metadata_.last_updated_ms , ref->snapshot_id );
1384+ }
1385+
1386+ // Update the refs map
13471387 metadata_.refs [name] = ref;
1388+
1389+ // Record the change
13481390 if (ref->type () == SnapshotRefType::kBranch ) {
13491391 auto retention = std::get<SnapshotRef::Branch>(ref->retention );
13501392 changes_.push_back (std::make_unique<table::SetSnapshotRef>(
@@ -1356,60 +1398,148 @@ Status TableMetadataBuilder::Impl::SetRef(const std::string& name,
13561398 name, ref->snapshot_id , ref->type (), std::nullopt , std::nullopt ,
13571399 retention.max_ref_age_ms ));
13581400 }
1401+
13591402 return {};
13601403}
13611404
13621405Status TableMetadataBuilder::Impl::RemoveRef (const std::string& name) {
1363- ICEBERG_PRECHECK (metadata_.refs .contains (name),
1364- " Cannot remove ref: {}, which is not exist." , name);
1406+ // Handle main branch specially
1407+ if (name == SnapshotRef::kMainBranch ) {
1408+ metadata_.current_snapshot_id = kInvalidSnapshotId ;
1409+ }
13651410
1366- metadata_.refs .erase (name);
1367- changes_.push_back (std::make_unique<table::RemoveSnapshotRef>(name));
1411+ // Remove the ref from the map
1412+ auto it = metadata_.refs .find (name);
1413+ if (it != metadata_.refs .end ()) {
1414+ metadata_.refs .erase (it);
1415+ changes_.push_back (std::make_unique<table::RemoveSnapshotRef>(name));
1416+ }
13681417
13691418 return {};
13701419}
13711420
13721421Status TableMetadataBuilder::Impl::AddSnapshot (std::shared_ptr<Snapshot> snapshot) {
1373- // TODO(xiao.dong) this is only for test, not official complete implementation
1374- metadata_.snapshots .emplace_back (std::move (snapshot));
1422+ if (snapshot == nullptr ) {
1423+ // No-op
1424+ return {};
1425+ }
1426+
1427+ // Validate preconditions
1428+ ICEBERG_PRECHECK (!metadata_.schemas .empty (),
1429+ " Attempting to add a snapshot before a schema is added" );
1430+ ICEBERG_PRECHECK (!metadata_.partition_specs .empty (),
1431+ " Attempting to add a snapshot before a partition spec is added" );
1432+ ICEBERG_PRECHECK (!metadata_.sort_orders .empty (),
1433+ " Attempting to add a snapshot before a sort order is added" );
1434+
1435+ // Check if snapshot already exists
1436+ int64_t snapshot_id = snapshot->snapshot_id ;
1437+ auto existing_snapshot =
1438+ std::ranges::find_if (metadata_.snapshots , [snapshot_id](const auto & s) {
1439+ return s != nullptr && s->snapshot_id == snapshot_id;
1440+ });
1441+ ICEBERG_PRECHECK (existing_snapshot == metadata_.snapshots .end (),
1442+ " Snapshot already exists for id: {}" , snapshot_id);
1443+
1444+ // Validate sequence number
1445+ ICEBERG_PRECHECK (
1446+ metadata_.format_version == 1 ||
1447+ snapshot->sequence_number > metadata_.last_sequence_number ||
1448+ !snapshot->parent_snapshot_id .has_value (),
1449+ " Cannot add snapshot with sequence number {} older than last sequence number {}" ,
1450+ snapshot->sequence_number , metadata_.last_sequence_number );
1451+
1452+ // Update metadata
1453+ metadata_.last_updated_ms = snapshot->timestamp_ms ;
1454+ metadata_.last_sequence_number = snapshot->sequence_number ;
1455+ metadata_.snapshots .push_back (snapshot);
1456+ changes_.push_back (std::make_unique<table::AddSnapshot>(snapshot));
1457+
1458+ // TODO(xiao.dong) Handle row lineage for format version >= 3
13751459 return {};
13761460}
13771461
13781462Status TableMetadataBuilder::Impl::RemoveSnapshots (
13791463 const std::vector<int64_t >& snapshot_ids) {
1380- auto current_snapshot_id = metadata_.current_snapshot_id ;
1464+ if (snapshot_ids.empty ()) {
1465+ return {};
1466+ }
1467+
13811468 std::unordered_set<int64_t > snapshot_ids_set (snapshot_ids.begin (), snapshot_ids.end ());
1382- ICEBERG_PRECHECK (!snapshot_ids_set.contains (current_snapshot_id),
1383- " Cannot remove current snapshot: {}" , current_snapshot_id);
13841469
1385- if (!snapshot_ids.empty ()) {
1386- metadata_.snapshots =
1387- metadata_.snapshots | std::views::filter ([&](const auto & snapshot) {
1388- return !snapshot_ids_set.contains (snapshot->snapshot_id );
1389- }) |
1390- std::ranges::to<std::vector<std::shared_ptr<iceberg::Snapshot>>>();
1391- changes_.push_back (std::make_unique<table::RemoveSnapshots>(snapshot_ids));
1470+ // Build a map of snapshot IDs for quick lookup
1471+ std::unordered_map<int64_t , std::shared_ptr<Snapshot>> snapshots_by_id;
1472+ for (const auto & snapshot : metadata_.snapshots ) {
1473+ if (snapshot) {
1474+ snapshots_by_id[snapshot->snapshot_id ] = snapshot;
1475+ }
1476+ }
1477+
1478+ // Filter snapshots to retain
1479+ std::vector<std::shared_ptr<Snapshot>> retained_snapshots;
1480+ retained_snapshots.reserve (metadata_.snapshots .size ());
1481+
1482+ for (const auto & snapshot : metadata_.snapshots ) {
1483+ if (!snapshot) continue ;
1484+
1485+ int64_t snapshot_id = snapshot->snapshot_id ;
1486+ if (snapshot_ids_set.contains (snapshot_id)) {
1487+ // Remove from the map
1488+ snapshots_by_id.erase (snapshot_id);
1489+ // Record the removal
1490+ changes_.push_back (
1491+ std::make_unique<table::RemoveSnapshots>(std::vector<int64_t >{snapshot_id}));
1492+ // Note: Statistics and partition statistics removal would be handled here
1493+ // if those features were implemented
1494+ } else {
1495+ retained_snapshots.push_back (snapshot);
1496+ }
1497+ }
1498+
1499+ metadata_.snapshots = std::move (retained_snapshots);
1500+
1501+ // Remove any refs that are no longer valid (dangling refs)
1502+ std::vector<std::string> dangling_refs;
1503+ for (const auto & [ref_name, ref] : metadata_.refs ) {
1504+ if (!snapshots_by_id.contains (ref->snapshot_id )) {
1505+ dangling_refs.push_back (ref_name);
1506+ }
1507+ }
1508+
1509+ for (const auto & ref_name : dangling_refs) {
1510+ ICEBERG_RETURN_UNEXPECTED (RemoveRef (ref_name));
13921511 }
13931512
13941513 return {};
13951514}
13961515
13971516Status TableMetadataBuilder::Impl::RemovePartitionSpecs (
13981517 const std::vector<int32_t >& spec_ids) {
1399- auto default_spec_id = metadata_.default_spec_id ;
1518+ if (spec_ids.empty ()) {
1519+ return {};
1520+ }
1521+
14001522 std::unordered_set<int32_t > spec_ids_set (spec_ids.begin (), spec_ids.end ());
1401- ICEBERG_PRECHECK (!spec_ids_set.contains (default_spec_id),
1402- " Cannot remove default spec: {}" , default_spec_id);
14031523
1404- if (!spec_ids.empty ()) {
1405- metadata_.partition_specs =
1406- metadata_.partition_specs | std::views::filter ([&](const auto & spec) {
1407- return !spec_ids_set.contains (spec->spec_id ());
1408- }) |
1409- std::ranges::to<std::vector<std::shared_ptr<iceberg::PartitionSpec>>>();
1410- changes_.push_back (std::make_unique<table::RemovePartitionSpecs>(spec_ids));
1524+ // Validate that we're not removing the default spec
1525+ ICEBERG_PRECHECK (!spec_ids_set.contains (metadata_.default_spec_id ),
1526+ " Cannot remove the default partition spec" );
1527+
1528+ // Filter partition specs to retain
1529+ metadata_.partition_specs =
1530+ metadata_.partition_specs | std::views::filter ([&](const auto & spec) {
1531+ return !spec_ids_set.contains (spec->spec_id ());
1532+ }) |
1533+ std::ranges::to<std::vector<std::shared_ptr<iceberg::PartitionSpec>>>();
1534+
1535+ // Update the specs_by_id_ index
1536+ for (int32_t spec_id : spec_ids) {
1537+ specs_by_id_.erase (spec_id);
14111538 }
14121539
1540+ // Record the change
1541+ changes_.push_back (std::make_unique<table::RemovePartitionSpecs>(spec_ids));
1542+
14131543 return {};
14141544}
14151545
0 commit comments