Skip to content

Commit 8c90128

Browse files
committed
use a simpler approach to support internal txn in the snapshot manager
1 parent 013ca27 commit 8c90128

File tree

9 files changed

+367
-477
lines changed

9 files changed

+367
-477
lines changed

src/iceberg/table.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,7 @@ Result<std::shared_ptr<UpdatePartitionStatistics>> Table::NewUpdatePartitionStat
224224
}
225225

226226
Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {
227-
ICEBERG_ASSIGN_OR_RAISE(
228-
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
229-
/*auto_commit=*/true));
230-
return SnapshotManager::Make(std::move(transaction));
227+
return SnapshotManager::Make(shared_from_this());
231228
}
232229

233230
Result<std::shared_ptr<StagedTable>> StagedTable::Make(

src/iceberg/test/fast_append_test.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,12 @@ class FastAppendTest : public UpdateTestBase {
3939
protected:
4040
static void SetUpTestSuite() { avro::RegisterAll(); }
4141

42+
std::string MetadataResource() const override {
43+
return "TableMetadataV2ValidMinimal.json";
44+
}
45+
4246
void SetUp() override {
43-
InitializeFileIO();
44-
// Use minimal metadata for FastAppend tests
45-
RegisterTableFromResource("TableMetadataV2ValidMinimal.json");
47+
UpdateTestBase::SetUp();
4648

4749
// Get partition spec and schema from the base table
4850
ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());

src/iceberg/test/snapshot_manager_test.cc

Lines changed: 204 additions & 309 deletions
Large diffs are not rendered by default.

src/iceberg/test/update_test_base.h

Lines changed: 64 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
3030
#include "iceberg/catalog/memory/in_memory_catalog.h"
31+
#include "iceberg/result.h"
32+
#include "iceberg/snapshot.h"
3133
#include "iceberg/table.h"
3234
#include "iceberg/table_identifier.h"
3335
#include "iceberg/table_metadata.h"
@@ -37,12 +39,18 @@
3739

3840
namespace iceberg {
3941

40-
// Base test fixture for table update operations
42+
/// \brief Base test fixture for table update operations.
4143
class UpdateTestBase : public ::testing::Test {
4244
protected:
45+
virtual std::string MetadataResource() const { return "TableMetadataV2Valid.json"; }
46+
virtual std::string TableName() const { return "test_table"; }
47+
4348
void SetUp() override {
49+
table_ident_ = TableIdentifier{.name = TableName()};
50+
table_location_ = "/warehouse/" + TableName();
51+
4452
InitializeFileIO();
45-
RegisterTableFromResource("TableMetadataV2Valid.json");
53+
RegisterTableFromResource(MetadataResource());
4654
}
4755

4856
/// \brief Initialize file IO and create necessary directories.
@@ -56,82 +64,91 @@ class UpdateTestBase : public ::testing::Test {
5664
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
5765
ASSERT_TRUE(arrow_fs != nullptr);
5866
ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok());
59-
ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok());
6067
}
6168

6269
/// \brief Register a table from a metadata resource file.
6370
///
6471
/// \param resource_name The name of the metadata resource file
6572
void RegisterTableFromResource(const std::string& resource_name) {
66-
// Drop existing table if it exists
6773
std::ignore = catalog_->DropTable(table_ident_, /*purge=*/false);
6874

69-
// Write table metadata to the table location.
7075
auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json",
7176
table_location_, Uuid::GenerateV7().ToString());
7277
ICEBERG_UNWRAP_OR_FAIL(auto metadata, ReadTableMetadataFromResource(resource_name));
7378
metadata->location = table_location_;
7479
ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata),
7580
IsOk());
7681

77-
// Register the table in the catalog.
7882
ICEBERG_UNWRAP_OR_FAIL(table_,
7983
catalog_->RegisterTable(table_ident_, metadata_location));
8084
}
8185

82-
const TableIdentifier table_ident_{.name = "test_table"};
83-
const std::string table_location_{"/warehouse/test_table"};
84-
std::shared_ptr<FileIO> file_io_;
85-
std::shared_ptr<InMemoryCatalog> catalog_;
86-
std::shared_ptr<Table> table_;
87-
};
86+
/// \brief Reload the table from catalog and return its metadata.
87+
std::shared_ptr<TableMetadata> ReloadMetadata() {
88+
auto result = catalog_->LoadTable(table_ident_);
89+
EXPECT_TRUE(result.has_value()) << "Failed to reload table";
90+
return result.value()->metadata();
91+
}
8892

89-
// Base test fixture for table update operations on minimal table metadata.
90-
class MinimalUpdateTestBase : public ::testing::Test {
91-
protected:
92-
void SetUp() override {
93-
InitializeFileIO();
94-
RegisterMinimalTableFromResource("TableMetadataV2ValidMinimal.json");
93+
/// \brief Assert that a ref exists with the given type and snapshot id.
94+
void ExpectRef(const std::string& name, SnapshotRefType type, int64_t snapshot_id) {
95+
auto metadata = ReloadMetadata();
96+
auto it = metadata->refs.find(name);
97+
ASSERT_NE(it, metadata->refs.end()) << "Ref not found: " << name;
98+
EXPECT_EQ(it->second->type(), type);
99+
EXPECT_EQ(it->second->snapshot_id, snapshot_id);
95100
}
96101

97-
/// \brief Initialize file IO and create necessary directories.
98-
void InitializeFileIO() {
99-
file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
100-
catalog_ =
101-
InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/", /*properties=*/{});
102+
void ExpectBranch(const std::string& name, int64_t snapshot_id) {
103+
ExpectRef(name, SnapshotRefType::kBranch, snapshot_id);
104+
}
102105

103-
// Arrow MockFS cannot automatically create directories.
104-
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
105-
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
106-
ASSERT_TRUE(arrow_fs != nullptr);
107-
ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok());
106+
void ExpectTag(const std::string& name, int64_t snapshot_id) {
107+
ExpectRef(name, SnapshotRefType::kTag, snapshot_id);
108108
}
109109

110-
/// \brief Register a minimal table from a metadata resource file.
111-
///
112-
/// \param resource_name The name of the metadata resource file
113-
void RegisterMinimalTableFromResource(const std::string& resource_name) {
114-
// Drop existing table if it exists
115-
std::ignore = catalog_->DropTable(table_ident_, /*purge=*/false);
110+
/// \brief Assert that a ref does not exist.
111+
void ExpectNoRef(const std::string& name) {
112+
auto metadata = ReloadMetadata();
113+
EXPECT_FALSE(metadata->refs.contains(name)) << "Ref should not exist: " << name;
114+
}
116115

117-
// Write table metadata to the table location.
118-
auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json",
119-
table_location_, Uuid::GenerateV7().ToString());
120-
ICEBERG_UNWRAP_OR_FAIL(auto metadata, ReadTableMetadataFromResource(resource_name));
121-
metadata->location = table_location_;
122-
ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata),
123-
IsOk());
116+
/// \brief Assert the current snapshot id after reloading.
117+
void ExpectCurrentSnapshot(int64_t snapshot_id) {
118+
auto result = catalog_->LoadTable(table_ident_);
119+
ASSERT_TRUE(result.has_value());
120+
auto snap_result = result.value()->current_snapshot();
121+
ASSERT_TRUE(snap_result.has_value());
122+
EXPECT_EQ(snap_result.value()->snapshot_id, snapshot_id);
123+
}
124124

125-
// Register the table in the catalog.
126-
ICEBERG_UNWRAP_OR_FAIL(minimal_table_,
127-
catalog_->RegisterTable(table_ident_, metadata_location));
125+
/// \brief Assert that a commit succeeded.
126+
template <typename T>
127+
void ExpectCommitOk(const T& result) {
128+
EXPECT_THAT(result, IsOk());
129+
}
130+
131+
/// \brief Assert that a commit failed with the given error kind and message substring.
132+
template <typename T>
133+
void ExpectCommitError(const T& result, ErrorKind kind, const std::string& message) {
134+
EXPECT_THAT(result, IsError(kind));
135+
EXPECT_THAT(result, HasErrorMessage(message));
128136
}
129137

130-
const TableIdentifier table_ident_{.name = "minimal_table"};
131-
const std::string table_location_{"/warehouse/minimal_table"};
138+
TableIdentifier table_ident_;
139+
std::string table_location_;
132140
std::shared_ptr<FileIO> file_io_;
133141
std::shared_ptr<InMemoryCatalog> catalog_;
134-
std::shared_ptr<Table> minimal_table_;
142+
std::shared_ptr<Table> table_;
143+
};
144+
145+
/// \brief Test fixture for table update operations on minimal table metadata.
146+
class MinimalUpdateTestBase : public UpdateTestBase {
147+
protected:
148+
std::string MetadataResource() const override {
149+
return "TableMetadataV2ValidMinimal.json";
150+
}
151+
std::string TableName() const override { return "minimal_table"; }
135152
};
136153

137154
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,6 @@ std::string Transaction::MetadataFileLocation(std::string_view filename) const {
9292
}
9393

9494
Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
95-
ICEBERG_PRECHECK(update->kind() != PendingUpdate::Kind::kSnapshotManager,
96-
"SnapshotManager should not be added to the transaction");
9795
ICEBERG_CHECK(last_update_committed_,
9896
"Cannot add update when previous update is not committed");
9997

src/iceberg/transaction.h

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,13 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
5555
/// \return the location of the metadata file
5656
std::string MetadataFileLocation(std::string_view filename) const;
5757

58-
/// \brief Enable auto-commit for this transaction.
59-
void EnableAutoCommit() { auto_commit_ = true; }
60-
61-
/// \brief Disable auto-commit for this transaction.
62-
void DisableAutoCommit() { auto_commit_ = false; }
63-
6458
/// \brief Apply the pending changes from all actions and commit.
6559
///
6660
/// \return Updated table if the transaction was committed successfully, or an error:
6761
/// - ValidationFailed: if any update cannot be applied to the current table metadata.
6862
/// - CommitFailed: if the updates cannot be committed due to conflicts.
6963
Result<std::shared_ptr<Table>> Commit();
7064

71-
/// \brief Returns true if this transaction has been committed.
72-
bool is_committed() const { return committed_; }
73-
7465
/// \brief Create a new UpdatePartitionSpec to update the partition spec of this table
7566
/// and commit the changes.
7667
Result<std::shared_ptr<UpdatePartitionSpec>> NewUpdatePartitionSpec();
@@ -109,12 +100,6 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
109100
/// \brief Create a new SnapshotManager to manage snapshots.
110101
Result<std::shared_ptr<SnapshotManager>> NewSnapshotManager();
111102

112-
private:
113-
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
114-
std::unique_ptr<TableMetadataBuilder> metadata_builder);
115-
116-
Status AddUpdate(const std::shared_ptr<PendingUpdate>& update);
117-
118103
/// \brief Create a new SetSnapshot to set the current snapshot or rollback to a
119104
/// previous snapshot and commit the changes.
120105
Result<std::shared_ptr<SetSnapshot>> NewSetSnapshot();
@@ -123,6 +108,12 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
123108
/// and tags) and commit the changes.
124109
Result<std::shared_ptr<UpdateSnapshotReference>> NewUpdateSnapshotReference();
125110

111+
private:
112+
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
113+
std::unique_ptr<TableMetadataBuilder> metadata_builder);
114+
115+
Status AddUpdate(const std::shared_ptr<PendingUpdate>& update);
116+
126117
/// \brief Apply the pending changes to current table.
127118
Status Apply(PendingUpdate& updates);
128119

@@ -141,7 +132,6 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
141132

142133
private:
143134
friend class PendingUpdate;
144-
friend class SnapshotManager;
145135

146136
// The table that this transaction will update.
147137
std::shared_ptr<Table> table_;

src/iceberg/update/pending_update.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
4444
enum class Kind : uint8_t {
4545
kExpireSnapshots,
4646
kSetSnapshot,
47-
kSnapshotManager,
4847
kUpdateLocation,
4948
kUpdatePartitionSpec,
5049
kUpdatePartitionStatistics,

src/iceberg/update/snapshot_manager.cc

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,7 @@
1919

2020
#include "iceberg/update/snapshot_manager.h"
2121

22-
#include <memory>
23-
#include <string>
24-
2522
#include "iceberg/result.h"
26-
#include "iceberg/snapshot.h"
2723
#include "iceberg/table.h"
2824
#include "iceberg/table_metadata.h"
2925
#include "iceberg/transaction.h"
@@ -34,39 +30,27 @@
3430

3531
namespace iceberg {
3632

37-
namespace {
38-
39-
class AutoCommitGuard {
40-
public:
41-
AutoCommitGuard(std::shared_ptr<Transaction> transaction, bool auto_commit)
42-
: transaction_(std::move(transaction)), auto_commit_(auto_commit) {}
43-
44-
~AutoCommitGuard() {
45-
if (auto_commit_) {
46-
transaction_->EnableAutoCommit();
47-
} else {
48-
transaction_->DisableAutoCommit();
49-
}
50-
}
51-
52-
private:
53-
std::shared_ptr<Transaction> transaction_;
54-
bool auto_commit_;
55-
};
56-
57-
} // namespace
33+
Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
34+
std::shared_ptr<Table> table) {
35+
ICEBERG_PRECHECK(table != nullptr, "Invalid input table: null");
36+
ICEBERG_ASSIGN_OR_RAISE(auto transaction,
37+
Transaction::Make(std::move(table), Transaction::Kind::kUpdate,
38+
/*auto_commit=*/false));
39+
return std::shared_ptr<SnapshotManager>(
40+
new SnapshotManager(std::move(transaction), /*is_external_transaction=*/false));
41+
}
5842

5943
Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
6044
std::shared_ptr<Transaction> transaction) {
6145
ICEBERG_PRECHECK(transaction != nullptr, "Invalid input transaction: null");
62-
return std::shared_ptr<SnapshotManager>(new SnapshotManager(std::move(transaction)));
46+
return std::shared_ptr<SnapshotManager>(
47+
new SnapshotManager(std::move(transaction), /*is_external_transaction=*/true));
6348
}
6449

65-
SnapshotManager::SnapshotManager(std::shared_ptr<Transaction> transaction)
66-
: PendingUpdate(std::move(transaction)),
67-
original_auto_commit_(transaction_->auto_commit_) {
68-
transaction_->DisableAutoCommit();
69-
}
50+
SnapshotManager::SnapshotManager(std::shared_ptr<Transaction> transaction,
51+
bool is_external_transaction)
52+
: transaction_(std::move(transaction)),
53+
is_external_transaction_(is_external_transaction) {}
7054

7155
SnapshotManager::~SnapshotManager() = default;
7256

@@ -102,13 +86,13 @@ SnapshotManager& SnapshotManager::RollbackTo(int64_t snapshot_id) {
10286
}
10387

10488
SnapshotManager& SnapshotManager::CreateBranch(const std::string& name) {
105-
if (base().current_snapshot_id != kInvalidSnapshotId) {
106-
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, base().Snapshot());
89+
const auto& base = transaction_->current();
90+
if (base.current_snapshot_id != kInvalidSnapshotId) {
91+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_snapshot, base.Snapshot());
10792
ICEBERG_DCHECK(current_snapshot != nullptr, "Current snapshot should not be null");
10893
return CreateBranch(name, current_snapshot->snapshot_id);
10994
}
110-
const auto& current_refs = base().refs;
111-
ICEBERG_BUILDER_CHECK(!base().refs.contains(name), "Ref {} already exists", name);
95+
ICEBERG_BUILDER_CHECK(!base.refs.contains(name), "Ref {} already exists", name);
11296
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto fast_append, transaction_->NewFastAppend());
11397
ICEBERG_BUILDER_RETURN_IF_ERROR(fast_append->SetTargetBranch(name).Commit());
11498
return *this;
@@ -197,11 +181,9 @@ SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name,
197181
}
198182

199183
Status SnapshotManager::Commit() {
200-
AutoCommitGuard auto_commit_guard(transaction_, original_auto_commit_);
201-
transaction_->EnableAutoCommit();
202184
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
203185
ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist());
204-
if (!transaction_->is_committed()) {
186+
if (!is_external_transaction_) {
205187
ICEBERG_RETURN_UNEXPECTED(transaction_->Commit());
206188
}
207189
return {};

0 commit comments

Comments
 (0)