Skip to content

Commit 0812870

Browse files
committed
attempt to fix inconsistency
1 parent 83940f2 commit 0812870

File tree

9 files changed

+253
-277
lines changed

9 files changed

+253
-277
lines changed

src/iceberg/snapshot.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,19 @@ SnapshotRefType SnapshotRef::type() const noexcept {
5252
retention);
5353
}
5454

55+
std::optional<int64_t> SnapshotRef::max_ref_age_ms() const noexcept {
56+
return std::visit(
57+
[&](const auto& retention) -> std::optional<int64_t> {
58+
using T = std::remove_cvref_t<decltype(retention)>;
59+
if constexpr (std::is_same_v<T, Branch>) {
60+
return retention.max_ref_age_ms;
61+
} else {
62+
return retention.max_ref_age_ms;
63+
}
64+
},
65+
retention);
66+
}
67+
5568
Status SnapshotRef::Validate() const {
5669
if (type() == SnapshotRefType::kBranch) {
5770
const auto& branch = std::get<Branch>(this->retention);

src/iceberg/snapshot.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ struct ICEBERG_EXPORT SnapshotRef {
113113

114114
SnapshotRefType type() const noexcept;
115115

116+
std::optional<int64_t> max_ref_age_ms() const noexcept;
117+
116118
/// \brief Create a branch reference
117119
///
118120
/// \param snapshot_id The snapshot ID for the branch

src/iceberg/table_metadata.cc

Lines changed: 25 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,6 @@ class TableMetadataBuilder::Impl {
617617
Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch);
618618
Status SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot, const std::string& branch);
619619
Status SetRef(const std::string& name, std::shared_ptr<SnapshotRef> ref);
620-
621620
Status RemoveRef(const std::string& name);
622621
Status RemoveSnapshots(const std::vector<int64_t>& snapshot_ids);
623622
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
@@ -1339,15 +1338,11 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
13391338
}
13401339

13411340
Status TableMetadataBuilder::Impl::RemoveRef(const std::string& name) {
1342-
// Handle main branch specially
13431341
if (name == SnapshotRef::kMainBranch) {
13441342
metadata_.current_snapshot_id = kInvalidSnapshotId;
13451343
}
13461344

1347-
// Remove the ref from the map
1348-
auto it = metadata_.refs.find(name);
1349-
if (it != metadata_.refs.end()) {
1350-
metadata_.refs.erase(it);
1345+
if (metadata_.refs.erase(name) != 0) {
13511346
changes_.push_back(std::make_unique<table::RemoveSnapshotRef>(name));
13521347
}
13531348

@@ -1360,47 +1355,39 @@ Status TableMetadataBuilder::Impl::RemoveSnapshots(
13601355
return {};
13611356
}
13621357

1363-
std::unordered_set<int64_t> snapshot_ids_set(snapshot_ids.begin(), snapshot_ids.end());
1364-
1365-
// Build a map of snapshot IDs for quick lookup
1366-
std::unordered_map<int64_t, std::shared_ptr<Snapshot>> snapshots_by_id;
1367-
for (const auto& snapshot : metadata_.snapshots) {
1368-
if (snapshot) {
1369-
snapshots_by_id[snapshot->snapshot_id] = snapshot;
1370-
}
1371-
}
1372-
1373-
// Filter snapshots to retain
1358+
std::unordered_set<int64_t> ids_to_remove(snapshot_ids.begin(), snapshot_ids.end());
13741359
std::vector<std::shared_ptr<Snapshot>> retained_snapshots;
1375-
retained_snapshots.reserve(metadata_.snapshots.size());
1376-
1377-
for (const auto& snapshot : metadata_.snapshots) {
1378-
if (!snapshot) continue;
1379-
1380-
int64_t snapshot_id = snapshot->snapshot_id;
1381-
if (snapshot_ids_set.contains(snapshot_id)) {
1382-
// Remove from the map
1383-
snapshots_by_id.erase(snapshot_id);
1384-
// Record the removal
1385-
changes_.push_back(
1386-
std::make_unique<table::RemoveSnapshots>(std::vector<int64_t>{snapshot_id}));
1387-
// Note: Statistics and partition statistics removal would be handled here
1388-
// if those features were implemented
1360+
retained_snapshots.reserve(metadata_.snapshots.size() - snapshot_ids.size());
1361+
std::vector<int64_t> snapshot_ids_to_remove;
1362+
snapshot_ids_to_remove.reserve(snapshot_ids.size());
1363+
1364+
for (auto& snapshot : metadata_.snapshots) {
1365+
ICEBERG_CHECK(snapshot != nullptr, "Encountered null snapshot in metadata");
1366+
const int64_t snapshot_id = snapshot->snapshot_id;
1367+
if (ids_to_remove.contains(snapshot_id)) {
1368+
snapshots_by_id_.erase(snapshot_id);
1369+
snapshot_ids_to_remove.push_back(snapshot_id);
1370+
// FIXME: implement statistics removal and uncomment below
1371+
// ICEBERG_RETURN_UNEXPECTED(RemoveStatistics(snapshot_id));
1372+
// ICEBERG_RETURN_UNEXPECTED(RemovePartitionStatistics(snapshot_id));
13891373
} else {
1390-
retained_snapshots.push_back(snapshot);
1374+
retained_snapshots.push_back(std::move(snapshot));
13911375
}
13921376
}
13931377

1378+
if (!snapshot_ids_to_remove.empty()) {
1379+
changes_.push_back(std::make_unique<table::RemoveSnapshots>(snapshot_ids_to_remove));
1380+
}
1381+
13941382
metadata_.snapshots = std::move(retained_snapshots);
13951383

13961384
// Remove any refs that are no longer valid (dangling refs)
13971385
std::vector<std::string> dangling_refs;
13981386
for (const auto& [ref_name, ref] : metadata_.refs) {
1399-
if (!snapshots_by_id.contains(ref->snapshot_id)) {
1387+
if (!snapshots_by_id_.contains(ref->snapshot_id)) {
14001388
dangling_refs.push_back(ref_name);
14011389
}
14021390
}
1403-
14041391
for (const auto& ref_name : dangling_refs) {
14051392
ICEBERG_RETURN_UNEXPECTED(RemoveRef(ref_name));
14061393
}
@@ -1414,25 +1401,15 @@ Status TableMetadataBuilder::Impl::RemovePartitionSpecs(
14141401
return {};
14151402
}
14161403

1417-
std::unordered_set<int32_t> spec_ids_set(spec_ids.begin(), spec_ids.end());
1418-
1419-
// Validate that we're not removing the default spec
1420-
ICEBERG_PRECHECK(!spec_ids_set.contains(metadata_.default_spec_id),
1404+
std::unordered_set<int32_t> spec_ids_to_remove(spec_ids.begin(), spec_ids.end());
1405+
ICEBERG_PRECHECK(!spec_ids_to_remove.contains(metadata_.default_spec_id),
14211406
"Cannot remove the default partition spec");
14221407

1423-
// Filter partition specs to retain
14241408
metadata_.partition_specs =
14251409
metadata_.partition_specs | std::views::filter([&](const auto& spec) {
1426-
return !spec_ids_set.contains(spec->spec_id());
1410+
return !spec_ids_to_remove.contains(spec->spec_id());
14271411
}) |
1428-
std::ranges::to<std::vector<std::shared_ptr<iceberg::PartitionSpec>>>();
1429-
1430-
// Update the specs_by_id_ index
1431-
for (int32_t spec_id : spec_ids) {
1432-
specs_by_id_.erase(spec_id);
1433-
}
1434-
1435-
// Record the change
1412+
std::ranges::to<std::vector<std::shared_ptr<PartitionSpec>>>();
14361413
changes_.push_back(std::make_unique<table::RemovePartitionSpecs>(spec_ids));
14371414

14381415
return {};

src/iceberg/test/expire_snapshots_test.cc

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -24,71 +24,43 @@
2424

2525
namespace iceberg {
2626

27-
class ExpireSnapshotsTest : public UpdateTestBase {
28-
protected:
29-
};
27+
class ExpireSnapshotsTest : public UpdateTestBase {};
3028

31-
TEST_F(ExpireSnapshotsTest, Empty) {
29+
TEST_F(ExpireSnapshotsTest, DefaultExpireByAge) {
3230
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
3331
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
34-
EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1);
35-
EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
36-
EXPECT_THAT(result.ref_to_remove.empty(), true);
37-
EXPECT_THAT(result.schema_ids_to_remove.empty(), true);
38-
EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true);
32+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
33+
EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
3934
}
4035

41-
TEST_F(ExpireSnapshotsTest, Keep2) {
36+
TEST_F(ExpireSnapshotsTest, KeepAll) {
4237
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
4338
update->RetainLast(2);
4439
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
45-
EXPECT_THAT(result.snapshot_ids_to_remove.empty(), true);
46-
EXPECT_THAT(result.ref_to_remove.empty(), true);
47-
EXPECT_THAT(result.schema_ids_to_remove.empty(), true);
48-
EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true);
40+
EXPECT_TRUE(result.snapshot_ids_to_remove.empty());
41+
EXPECT_TRUE(result.refs_to_remove.empty());
4942
}
5043

5144
TEST_F(ExpireSnapshotsTest, ExpireById) {
5245
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
5346
update->ExpireSnapshotId(3051729675574597004);
54-
update->RetainLast(2);
5547
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
56-
EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1);
57-
EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
58-
EXPECT_THAT(result.ref_to_remove.empty(), true);
59-
EXPECT_THAT(result.schema_ids_to_remove.empty(), true);
60-
EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true);
61-
}
62-
63-
TEST_F(ExpireSnapshotsTest, ExpireByIdNotExist) {
64-
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
65-
update->ExpireSnapshotId(3055729675574597003);
66-
update->RetainLast(2);
67-
auto result = update->Apply();
68-
EXPECT_THAT(result.has_value(), false);
69-
EXPECT_THAT(result.error().message.contains("Snapshot:3055729675574597003 not exist"),
70-
true);
48+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
49+
EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
7150
}
7251

73-
TEST_F(ExpireSnapshotsTest, ExpireOlderThan1) {
74-
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
75-
update->ExpireOlderThan(1515100955770 - 1);
76-
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
77-
EXPECT_THAT(result.snapshot_ids_to_remove.empty(), true);
78-
EXPECT_THAT(result.ref_to_remove.empty(), true);
79-
EXPECT_THAT(result.schema_ids_to_remove.empty(), true);
80-
EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true);
81-
}
82-
83-
TEST_F(ExpireSnapshotsTest, ExpireOlderThan2) {
84-
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
85-
update->ExpireOlderThan(1515100955770 + 1);
86-
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
87-
EXPECT_THAT(result.snapshot_ids_to_remove.size(), 1);
88-
EXPECT_THAT(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
89-
EXPECT_THAT(result.ref_to_remove.empty(), true);
90-
EXPECT_THAT(result.schema_ids_to_remove.empty(), true);
91-
EXPECT_THAT(result.partition_spec_ids_to_remove.empty(), true);
52+
TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
53+
struct TestCase {
54+
int64_t expire_older_than;
55+
size_t expected_num_expired;
56+
};
57+
std::vector<TestCase> test_cases = {{1515100955770 - 1, 0}, {1515100955770 + 1, 1}};
58+
for (const auto& test_case : test_cases) {
59+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
60+
update->ExpireOlderThan(test_case.expire_older_than);
61+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
62+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), test_case.expected_num_expired);
63+
}
9264
}
9365

9466
} // namespace iceberg

0 commit comments

Comments
 (0)