Skip to content

Commit b937acb

Browse files
dongxiao1198wgtmac
andauthored
feat: support expire snapshots (#490)
This PR is part of effort to implement expire snapshots described in the issue #364. TODO: File recycling will be added in a followup PR. --------- Co-authored-by: Gang Wu <ustcwg@gmail.com>
1 parent 93bf6ac commit b937acb

19 files changed

+835
-10
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ set(ICEBERG_SOURCES
8181
transform.cc
8282
transform_function.cc
8383
type.cc
84+
update/expire_snapshots.cc
8485
update/pending_update.cc
8586
update/snapshot_update.cc
8687
update/update_partition_spec.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ iceberg_sources = files(
102102
'transform.cc',
103103
'transform_function.cc',
104104
'type.cc',
105+
'update/expire_snapshots.cc',
105106
'update/pending_update.cc',
106107
'update/snapshot_update.cc',
107108
'update/update_partition_spec.cc',

src/iceberg/snapshot.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,19 @@ SnapshotRefType SnapshotRef::type() const noexcept {
5252
retention);
5353
}
5454

55+
std::optional<int64_t> SnapshotRef::max_ref_age_ms() const noexcept {
56+
return std::visit(
57+
[&](const auto& retention) -> std::optional<int64_t> {
58+
using T = std::remove_cvref_t<decltype(retention)>;
59+
if constexpr (std::is_same_v<T, Branch>) {
60+
return retention.max_ref_age_ms;
61+
} else {
62+
return retention.max_ref_age_ms;
63+
}
64+
},
65+
retention);
66+
}
67+
5568
Status SnapshotRef::Validate() const {
5669
if (type() == SnapshotRefType::kBranch) {
5770
const auto& branch = std::get<Branch>(this->retention);

src/iceberg/snapshot.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ struct ICEBERG_EXPORT SnapshotRef {
113113

114114
SnapshotRefType type() const noexcept;
115115

116+
std::optional<int64_t> max_ref_age_ms() const noexcept;
117+
116118
/// \brief Create a branch reference
117119
///
118120
/// \param snapshot_id The snapshot ID for the branch

src/iceberg/table.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "iceberg/table_properties.h"
3232
#include "iceberg/table_scan.h"
3333
#include "iceberg/transaction.h"
34+
#include "iceberg/update/expire_snapshots.h"
3435
#include "iceberg/update/update_partition_spec.h"
3536
#include "iceberg/update/update_properties.h"
3637
#include "iceberg/update/update_schema.h"
@@ -184,6 +185,13 @@ Result<std::shared_ptr<UpdateSchema>> Table::NewUpdateSchema() {
184185
return transaction->NewUpdateSchema();
185186
}
186187

188+
Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
189+
ICEBERG_ASSIGN_OR_RAISE(
190+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
191+
/*auto_commit=*/true));
192+
return transaction->NewExpireSnapshots();
193+
}
194+
187195
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
188196
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
189197
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
147147
/// changes.
148148
virtual Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();
149149

150+
/// \brief Create a new ExpireSnapshots to remove expired snapshots and commit the
151+
/// changes.
152+
virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
153+
150154
protected:
151155
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
152156
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table_metadata.cc

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,9 @@ class TableMetadataBuilder::Impl {
617617
Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch);
618618
Status SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot, const std::string& branch);
619619
Status SetRef(const std::string& name, std::shared_ptr<SnapshotRef> ref);
620+
Status RemoveRef(const std::string& name);
621+
Status RemoveSnapshots(const std::vector<int64_t>& snapshot_ids);
622+
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
620623

621624
Result<std::unique_ptr<TableMetadata>> Build();
622625

@@ -1334,6 +1337,84 @@ int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
13341337
return new_schema_id;
13351338
}
13361339

1340+
Status TableMetadataBuilder::Impl::RemoveRef(const std::string& name) {
1341+
if (name == SnapshotRef::kMainBranch) {
1342+
metadata_.current_snapshot_id = kInvalidSnapshotId;
1343+
}
1344+
1345+
if (metadata_.refs.erase(name) != 0) {
1346+
changes_.push_back(std::make_unique<table::RemoveSnapshotRef>(name));
1347+
}
1348+
1349+
return {};
1350+
}
1351+
1352+
Status TableMetadataBuilder::Impl::RemoveSnapshots(
1353+
const std::vector<int64_t>& snapshot_ids) {
1354+
if (snapshot_ids.empty()) {
1355+
return {};
1356+
}
1357+
1358+
std::unordered_set<int64_t> ids_to_remove(snapshot_ids.begin(), snapshot_ids.end());
1359+
std::vector<std::shared_ptr<Snapshot>> retained_snapshots;
1360+
retained_snapshots.reserve(metadata_.snapshots.size() - snapshot_ids.size());
1361+
std::vector<int64_t> snapshot_ids_to_remove;
1362+
snapshot_ids_to_remove.reserve(snapshot_ids.size());
1363+
1364+
for (auto& snapshot : metadata_.snapshots) {
1365+
ICEBERG_CHECK(snapshot != nullptr, "Encountered null snapshot in metadata");
1366+
const int64_t snapshot_id = snapshot->snapshot_id;
1367+
if (ids_to_remove.contains(snapshot_id)) {
1368+
snapshots_by_id_.erase(snapshot_id);
1369+
snapshot_ids_to_remove.push_back(snapshot_id);
1370+
// FIXME: implement statistics removal and uncomment below
1371+
// ICEBERG_RETURN_UNEXPECTED(RemoveStatistics(snapshot_id));
1372+
// ICEBERG_RETURN_UNEXPECTED(RemovePartitionStatistics(snapshot_id));
1373+
} else {
1374+
retained_snapshots.push_back(std::move(snapshot));
1375+
}
1376+
}
1377+
1378+
if (!snapshot_ids_to_remove.empty()) {
1379+
changes_.push_back(std::make_unique<table::RemoveSnapshots>(snapshot_ids_to_remove));
1380+
}
1381+
1382+
metadata_.snapshots = std::move(retained_snapshots);
1383+
1384+
// Remove any refs that are no longer valid (dangling refs)
1385+
std::vector<std::string> dangling_refs;
1386+
for (const auto& [ref_name, ref] : metadata_.refs) {
1387+
if (!snapshots_by_id_.contains(ref->snapshot_id)) {
1388+
dangling_refs.push_back(ref_name);
1389+
}
1390+
}
1391+
for (const auto& ref_name : dangling_refs) {
1392+
ICEBERG_RETURN_UNEXPECTED(RemoveRef(ref_name));
1393+
}
1394+
1395+
return {};
1396+
}
1397+
1398+
Status TableMetadataBuilder::Impl::RemovePartitionSpecs(
1399+
const std::vector<int32_t>& spec_ids) {
1400+
if (spec_ids.empty()) {
1401+
return {};
1402+
}
1403+
1404+
std::unordered_set<int32_t> spec_ids_to_remove(spec_ids.begin(), spec_ids.end());
1405+
ICEBERG_PRECHECK(!spec_ids_to_remove.contains(metadata_.default_spec_id),
1406+
"Cannot remove the default partition spec");
1407+
1408+
metadata_.partition_specs =
1409+
metadata_.partition_specs | std::views::filter([&](const auto& spec) {
1410+
return !spec_ids_to_remove.contains(spec->spec_id());
1411+
}) |
1412+
std::ranges::to<std::vector<std::shared_ptr<PartitionSpec>>>();
1413+
changes_.push_back(std::make_unique<table::RemovePartitionSpecs>(spec_ids));
1414+
1415+
return {};
1416+
}
1417+
13371418
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
13381419
: impl_(std::make_unique<Impl>(format_version)) {}
13391420

@@ -1436,7 +1517,8 @@ TableMetadataBuilder& TableMetadataBuilder::AddPartitionSpec(
14361517

14371518
TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(
14381519
const std::vector<int32_t>& spec_ids) {
1439-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1520+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemovePartitionSpecs(spec_ids));
1521+
return *this;
14401522
}
14411523

14421524
TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas(
@@ -1464,7 +1546,7 @@ TableMetadataBuilder& TableMetadataBuilder::AddSortOrder(
14641546

14651547
TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(
14661548
std::shared_ptr<Snapshot> snapshot) {
1467-
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(snapshot));
1549+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(std::move(snapshot)));
14681550
return *this;
14691551
}
14701552

@@ -1487,7 +1569,8 @@ TableMetadataBuilder& TableMetadataBuilder::SetRef(const std::string& name,
14871569
}
14881570

14891571
TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name) {
1490-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1572+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveRef(name));
1573+
return *this;
14911574
}
14921575

14931576
TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots(
@@ -1497,7 +1580,8 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots(
14971580

14981581
TableMetadataBuilder& TableMetadataBuilder::RemoveSnapshots(
14991582
const std::vector<int64_t>& snapshot_ids) {
1500-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
1583+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveSnapshots(snapshot_ids));
1584+
return *this;
15011585
}
15021586

15031587
TableMetadataBuilder& TableMetadataBuilder::SuppressHistoricalSnapshots() {

src/iceberg/table_update.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ std::unique_ptr<TableUpdate> SetDefaultPartitionSpec::Clone() const {
178178
// RemovePartitionSpecs
179179

180180
void RemovePartitionSpecs::ApplyTo(TableMetadataBuilder& builder) const {
181-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
181+
builder.RemovePartitionSpecs(spec_ids_);
182182
}
183183

184184
void RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) const {
@@ -301,7 +301,9 @@ std::unique_ptr<TableUpdate> AddSnapshot::Clone() const {
301301

302302
// RemoveSnapshots
303303

304-
void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {}
304+
void RemoveSnapshots::ApplyTo(TableMetadataBuilder& builder) const {
305+
builder.RemoveSnapshots(snapshot_ids_);
306+
}
305307

306308
void RemoveSnapshots::GenerateRequirements(TableUpdateContext& context) const {
307309
// RemoveSnapshots doesn't generate any requirements
@@ -322,7 +324,7 @@ std::unique_ptr<TableUpdate> RemoveSnapshots::Clone() const {
322324
// RemoveSnapshotRef
323325

324326
void RemoveSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
325-
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
327+
builder.RemoveRef(ref_name_);
326328
}
327329

328330
void RemoveSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ if(ICEBERG_BUILD_BUNDLE)
170170
add_iceberg_test(table_update_test
171171
USE_BUNDLE
172172
SOURCES
173+
expire_snapshots_test.cc
173174
transaction_test.cc
174175
update_partition_spec_test.cc
175176
update_properties_test.cc
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/expire_snapshots.h"
21+
22+
#include "iceberg/test/matchers.h"
23+
#include "iceberg/test/update_test_base.h"
24+
25+
namespace iceberg {
26+
27+
class ExpireSnapshotsTest : public UpdateTestBase {};
28+
29+
TEST_F(ExpireSnapshotsTest, DefaultExpireByAge) {
30+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
31+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
32+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
33+
EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
34+
}
35+
36+
TEST_F(ExpireSnapshotsTest, KeepAll) {
37+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
38+
update->RetainLast(2);
39+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
40+
EXPECT_TRUE(result.snapshot_ids_to_remove.empty());
41+
EXPECT_TRUE(result.refs_to_remove.empty());
42+
}
43+
44+
TEST_F(ExpireSnapshotsTest, ExpireById) {
45+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
46+
update->ExpireSnapshotId(3051729675574597004);
47+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
48+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
49+
EXPECT_EQ(result.snapshot_ids_to_remove.at(0), 3051729675574597004);
50+
}
51+
52+
TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
53+
struct TestCase {
54+
int64_t expire_older_than;
55+
size_t expected_num_expired;
56+
};
57+
const std::vector<TestCase> test_cases = {
58+
{.expire_older_than = 1515100955770 - 1, .expected_num_expired = 0},
59+
{.expire_older_than = 1515100955770 + 1, .expected_num_expired = 1}};
60+
for (const auto& test_case : test_cases) {
61+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
62+
update->ExpireOlderThan(test_case.expire_older_than);
63+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
64+
EXPECT_EQ(result.snapshot_ids_to_remove.size(), test_case.expected_num_expired);
65+
}
66+
}
67+
68+
} // namespace iceberg

0 commit comments

Comments
 (0)