Skip to content

Commit a1a9c75

Browse files
committed
feat: add SnapshotManager
1 parent 6f1cdfd commit a1a9c75

13 files changed

Lines changed: 985 additions & 0 deletions

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ set(ICEBERG_SOURCES
7171
schema_util.cc
7272
snapshot.cc
7373
sort_field.cc
74+
update/snapshot_manager.cc
7475
sort_order.cc
7576
statistics_file.cc
7677
table.cc

src/iceberg/table.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "iceberg/table_scan.h"
3333
#include "iceberg/transaction.h"
3434
#include "iceberg/update/expire_snapshots.h"
35+
#include "iceberg/update/snapshot_manager.h"
3536
#include "iceberg/update/update_partition_spec.h"
3637
#include "iceberg/update/update_partition_statistics.h"
3738
#include "iceberg/update/update_properties.h"
@@ -222,6 +223,10 @@ Result<std::shared_ptr<UpdatePartitionStatistics>> Table::NewUpdatePartitionStat
222223
return transaction->NewUpdatePartitionStatistics();
223224
}
224225

226+
Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {
227+
return SnapshotManager::Make(name().ToString(), shared_from_this());
228+
}
229+
225230
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
226231
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
227232
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,9 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
168168
/// \brief Create a new FastAppend to append data files and commit the changes.
169169
virtual Result<std::shared_ptr<FastAppend>> NewFastAppend();
170170

171+
/// \brief Create a new SnapshotManager to manage snapshots and snapshot references.
172+
virtual Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();
173+
171174
protected:
172175
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
173176
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ if(ICEBERG_BUILD_BUNDLE)
179179
expire_snapshots_test.cc
180180
fast_append_test.cc
181181
set_snapshot_test.cc
182+
snapshot_manager_test.cc
182183
transaction_test.cc
183184
update_location_test.cc
184185
update_partition_spec_test.cc

src/iceberg/test/snapshot_manager_test.cc

Lines changed: 439 additions & 0 deletions
Large diffs are not rendered by default.

src/iceberg/test/update_test_base.h

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#pragma once
2121

22+
#include <chrono>
2223
#include <format>
2324
#include <memory>
2425
#include <string>
@@ -28,11 +29,16 @@
2829

2930
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
3031
#include "iceberg/catalog/memory/in_memory_catalog.h"
32+
#include "iceberg/constants.h"
33+
#include "iceberg/partition_spec.h"
34+
#include "iceberg/schema.h"
35+
#include "iceberg/sort_order.h"
3136
#include "iceberg/table.h"
3237
#include "iceberg/table_identifier.h"
3338
#include "iceberg/table_metadata.h"
3439
#include "iceberg/test/matchers.h"
3540
#include "iceberg/test/test_resource.h"
41+
#include "iceberg/type.h"
3642
#include "iceberg/util/uuid.h"
3743

3844
namespace iceberg {
@@ -43,6 +49,7 @@ class UpdateTestBase : public ::testing::Test {
4349
void SetUp() override {
4450
InitializeFileIO();
4551
RegisterTableFromResource("TableMetadataV2Valid.json");
52+
RegisterEmptyTable();
4653
}
4754

4855
/// \brief Initialize file IO and create necessary directories.
@@ -56,6 +63,7 @@ class UpdateTestBase : public ::testing::Test {
5663
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
5764
ASSERT_TRUE(arrow_fs != nullptr);
5865
ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok());
66+
ASSERT_TRUE(arrow_fs->CreateDir(empty_table_location_ + "/metadata").ok());
5967
}
6068

6169
/// \brief Register a table from a metadata resource file.
@@ -78,11 +86,60 @@ class UpdateTestBase : public ::testing::Test {
7886
catalog_->RegisterTable(table_ident_, metadata_location));
7987
}
8088

89+
/// \brief Register an empty table with no snapshots.
90+
///
91+
/// Creates a minimal table metadata with no snapshots (current_snapshot_id = -1).
92+
/// This is useful for testing operations on empty tables.
93+
void RegisterEmptyTable() {
94+
// Drop existing table if it exists
95+
std::ignore = catalog_->DropTable(empty_table_ident_, /*purge=*/false);
96+
97+
// Create minimal schema with a single field
98+
auto field = SchemaField::MakeRequired(1, "id", int64());
99+
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{field}, 0);
100+
101+
// Create empty table metadata
102+
auto metadata = std::make_unique<TableMetadata>();
103+
metadata->format_version = 2;
104+
metadata->table_uuid = Uuid::GenerateV7().ToString();
105+
metadata->location = empty_table_location_;
106+
metadata->last_sequence_number = 0;
107+
metadata->last_updated_ms = TimePointMs{std::chrono::milliseconds(1000)};
108+
metadata->last_column_id = 1;
109+
metadata->current_schema_id = 0;
110+
metadata->schemas.push_back(schema);
111+
metadata->partition_specs.push_back(PartitionSpec::Unpartitioned());
112+
metadata->default_spec_id = PartitionSpec::kInitialSpecId;
113+
metadata->last_partition_id = 0;
114+
metadata->current_snapshot_id = kInvalidSnapshotId;
115+
metadata->sort_orders.push_back(SortOrder::Unsorted());
116+
metadata->default_sort_order_id = SortOrder::kUnsortedOrderId;
117+
metadata->next_row_id = TableMetadata::kInitialRowId;
118+
119+
// Write table metadata to the table location.
120+
auto metadata_location =
121+
std::format("{}/metadata/00001-{}.metadata.json", empty_table_location_,
122+
Uuid::GenerateV7().ToString());
123+
ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata),
124+
IsOk());
125+
126+
// Register the table in the catalog.
127+
ICEBERG_UNWRAP_OR_FAIL(
128+
auto registered_table,
129+
catalog_->RegisterTable(empty_table_ident_, metadata_location));
130+
131+
// Reload the table to ensure it's in a clean state without any transaction state.
132+
ICEBERG_UNWRAP_OR_FAIL(empty_table_, catalog_->LoadTable(empty_table_ident_));
133+
}
134+
81135
const TableIdentifier table_ident_{.name = "test_table"};
82136
const std::string table_location_{"/warehouse/test_table"};
137+
const TableIdentifier empty_table_ident_{.name = "empty_table"};
138+
const std::string empty_table_location_{"/warehouse/empty_table"};
83139
std::shared_ptr<FileIO> file_io_;
84140
std::shared_ptr<InMemoryCatalog> catalog_;
85141
std::shared_ptr<Table> table_;
142+
std::shared_ptr<Table> empty_table_;
86143
};
87144

88145
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "iceberg/update/fast_append.h"
3737
#include "iceberg/update/pending_update.h"
3838
#include "iceberg/update/set_snapshot.h"
39+
#include "iceberg/update/snapshot_manager.h"
3940
#include "iceberg/update/snapshot_update.h"
4041
#include "iceberg/update/update_location.h"
4142
#include "iceberg/update/update_partition_spec.h"
@@ -428,4 +429,11 @@ Transaction::NewUpdateSnapshotReference() {
428429
return update_ref;
429430
}
430431

432+
Result<std::shared_ptr<SnapshotManager>> Transaction::NewSnapshotManager() {
433+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<SnapshotManager> snapshot_manager,
434+
SnapshotManager::Make(shared_from_this()));
435+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(snapshot_manager));
436+
return snapshot_manager;
437+
}
438+
431439
} // namespace iceberg

src/iceberg/transaction.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
105105
/// and tags) and commit the changes.
106106
Result<std::shared_ptr<UpdateSnapshotReference>> NewUpdateSnapshotReference();
107107

108+
/// \brief Create a new SnapshotManager to manage snapshots.
109+
Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();
110+
108111
private:
109112
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
110113
std::unique_ptr<TableMetadataBuilder> metadata_builder);

src/iceberg/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ class TableProperties;
184184
/// \brief Table update.
185185
class TableMetadataBuilder;
186186
class TableUpdate;
187+
class SnapshotManager;
187188
class TableRequirement;
188189
class TableUpdateContext;
189190
class Transaction;

src/iceberg/update/fast_append.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) {
8787
return *this;
8888
}
8989

90+
FastAppend& FastAppend::ToBranch(const std::string& branch) {
91+
ICEBERG_BUILDER_RETURN_IF_ERROR(SetTargetBranch(branch));
92+
return *this;
93+
}
94+
9095
std::string FastAppend::operation() { return DataOperation::kAppend; }
9196

9297
Result<std::vector<ManifestFile>> FastAppend::Apply(

0 commit comments

Comments
 (0)