Skip to content

Commit 6a644a0

Browse files
committed
fix: handle auto commit logic inside SnapshotManager
1 parent db23355 commit 6a644a0

File tree

5 files changed

+101
-21
lines changed

5 files changed

+101
-21
lines changed

src/iceberg/table.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ Result<std::shared_ptr<UpdatePartitionStatistics>> Table::NewUpdatePartitionStat
226226
Result<std::shared_ptr<SnapshotManager>> Table::NewSnapshotManager() {
227227
ICEBERG_ASSIGN_OR_RAISE(
228228
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
229-
/*auto_commit=*/false));
229+
/*auto_commit=*/true));
230230
return SnapshotManager::Make(std::move(transaction));
231231
}
232232

src/iceberg/test/snapshot_manager_test.cc

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ class SnapshotManagerTest : public UpdateTestBase {
5050
int64_t oldest_snapshot_id_{};
5151
};
5252

53+
class SnapshotManagerMinimalTableTest : public MinimalUpdateTestBase {};
54+
5355
TEST_F(SnapshotManagerTest, CreateBranch) {
5456
ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
5557
manager->CreateBranch("branch1", current_snapshot_id_);
@@ -76,12 +78,12 @@ TEST_F(SnapshotManagerTest, CreateBranchWithoutSnapshotId) {
7678
EXPECT_EQ(ref->snapshot_id, current_snapshot_id_);
7779
}
7880

79-
TEST_F(SnapshotManagerTest, CreateBranchOnEmptyTable) {
81+
TEST_F(SnapshotManagerMinimalTableTest, CreateBranchOnEmptyTable) {
8082
ICEBERG_UNWRAP_OR_FAIL(auto manager, minimal_table_->NewSnapshotManager());
8183
manager->CreateBranch("branch1");
8284
EXPECT_THAT(manager->Commit(), IsOk());
8385

84-
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(minimal_table_ident_));
86+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
8587
EXPECT_FALSE(
8688
reloaded->metadata()->refs.contains(std::string(SnapshotRef::kMainBranch)));
8789
auto it = reloaded->metadata()->refs.find("branch1");
@@ -90,13 +92,13 @@ TEST_F(SnapshotManagerTest, CreateBranchOnEmptyTable) {
9092
EXPECT_EQ(ref->type(), SnapshotRefType::kBranch);
9193
}
9294

93-
TEST_F(SnapshotManagerTest, CreateBranchOnEmptyTableFailsWhenRefAlreadyExists) {
95+
TEST_F(SnapshotManagerMinimalTableTest,
96+
CreateBranchOnEmptyTableFailsWhenRefAlreadyExists) {
9497
ICEBERG_UNWRAP_OR_FAIL(auto manager, minimal_table_->NewSnapshotManager());
9598
manager->CreateBranch("branch1");
9699
EXPECT_THAT(manager->Commit(), IsOk());
97100

98-
ICEBERG_UNWRAP_OR_FAIL(auto table_with_branch,
99-
catalog_->LoadTable(minimal_table_ident_));
101+
ICEBERG_UNWRAP_OR_FAIL(auto table_with_branch, catalog_->LoadTable(table_ident_));
100102
ICEBERG_UNWRAP_OR_FAIL(auto manager2, table_with_branch->NewSnapshotManager());
101103
manager2->CreateBranch("branch1");
102104
auto result = manager2->Commit();
@@ -509,4 +511,32 @@ TEST_F(SnapshotManagerTest, SnapshotManagerThroughTransaction) {
509511
EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_);
510512
}
511513

514+
TEST_F(SnapshotManagerTest, SnapshotManagerFromTableAllowsMultipleSnapshotOperations) {
515+
ICEBERG_UNWRAP_OR_FAIL(auto manager, table_->NewSnapshotManager());
516+
517+
manager->SetCurrentSnapshot(oldest_snapshot_id_);
518+
manager->SetCurrentSnapshot(current_snapshot_id_);
519+
manager->RollbackTo(oldest_snapshot_id_);
520+
EXPECT_THAT(manager->Commit(), IsOk());
521+
522+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
523+
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
524+
EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_);
525+
}
526+
527+
TEST_F(SnapshotManagerTest,
528+
SnapshotManagerFromTransactionAllowsMultipleSnapshotOperations) {
529+
ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction());
530+
ICEBERG_UNWRAP_OR_FAIL(auto manager, SnapshotManager::Make(txn));
531+
532+
manager->SetCurrentSnapshot(oldest_snapshot_id_);
533+
manager->SetCurrentSnapshot(current_snapshot_id_);
534+
manager->RollbackTo(oldest_snapshot_id_);
535+
EXPECT_THAT(txn->Commit(), IsOk());
536+
537+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
538+
ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
539+
EXPECT_EQ(current_snapshot->snapshot_id, oldest_snapshot_id_);
540+
}
541+
512542
} // namespace iceberg

src/iceberg/test/update_test_base.h

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ class UpdateTestBase : public ::testing::Test {
4343
void SetUp() override {
4444
InitializeFileIO();
4545
RegisterTableFromResource("TableMetadataV2Valid.json");
46-
RegisterMinimalTableFromResource("TableMetadataV2ValidMinimal.json");
4746
}
4847

4948
/// \brief Initialize file IO and create necessary directories.
@@ -57,7 +56,7 @@ class UpdateTestBase : public ::testing::Test {
5756
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
5857
ASSERT_TRUE(arrow_fs != nullptr);
5958
ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok());
60-
ASSERT_TRUE(arrow_fs->CreateDir(minimal_table_location_ + "/metadata").ok());
59+
ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok());
6160
}
6261

6362
/// \brief Register a table from a metadata resource file.
@@ -80,34 +79,58 @@ class UpdateTestBase : public ::testing::Test {
8079
catalog_->RegisterTable(table_ident_, metadata_location));
8180
}
8281

82+
const TableIdentifier table_ident_{.name = "test_table"};
83+
const std::string table_location_{"/warehouse/test_table"};
84+
std::shared_ptr<FileIO> file_io_;
85+
std::shared_ptr<InMemoryCatalog> catalog_;
86+
std::shared_ptr<Table> table_;
87+
};
88+
89+
// Base test fixture for table update operations on minimal table metadata.
90+
class MinimalUpdateTestBase : public ::testing::Test {
91+
protected:
92+
void SetUp() override {
93+
InitializeFileIO();
94+
RegisterMinimalTableFromResource("TableMetadataV2ValidMinimal.json");
95+
}
96+
97+
/// \brief Initialize file IO and create necessary directories.
98+
void InitializeFileIO() {
99+
file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
100+
catalog_ =
101+
InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/", /*properties=*/{});
102+
103+
// Arrow MockFS cannot automatically create directories.
104+
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
105+
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
106+
ASSERT_TRUE(arrow_fs != nullptr);
107+
ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok());
108+
}
109+
83110
/// \brief Register a minimal table from a metadata resource file.
84111
///
85112
/// \param resource_name The name of the metadata resource file
86113
void RegisterMinimalTableFromResource(const std::string& resource_name) {
87114
// Drop existing table if it exists
88-
std::ignore = catalog_->DropTable(minimal_table_ident_, /*purge=*/false);
115+
std::ignore = catalog_->DropTable(table_ident_, /*purge=*/false);
89116

90117
// Write table metadata to the table location.
91-
auto metadata_location =
92-
std::format("{}/metadata/00001-{}.metadata.json", minimal_table_location_,
93-
Uuid::GenerateV7().ToString());
118+
auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json",
119+
table_location_, Uuid::GenerateV7().ToString());
94120
ICEBERG_UNWRAP_OR_FAIL(auto metadata, ReadTableMetadataFromResource(resource_name));
95-
metadata->location = minimal_table_location_;
121+
metadata->location = table_location_;
96122
ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location, *metadata),
97123
IsOk());
98124

99125
// Register the table in the catalog.
100-
ICEBERG_UNWRAP_OR_FAIL(
101-
minimal_table_, catalog_->RegisterTable(minimal_table_ident_, metadata_location));
126+
ICEBERG_UNWRAP_OR_FAIL(minimal_table_,
127+
catalog_->RegisterTable(table_ident_, metadata_location));
102128
}
103129

104-
const TableIdentifier table_ident_{.name = "test_table"};
105-
const std::string table_location_{"/warehouse/test_table"};
106-
const TableIdentifier minimal_table_ident_{.name = "minimal_table"};
107-
const std::string minimal_table_location_{"/warehouse/minimal_table"};
130+
const TableIdentifier table_ident_{.name = "minimal_table"};
131+
const std::string table_location_{"/warehouse/minimal_table"};
108132
std::shared_ptr<FileIO> file_io_;
109133
std::shared_ptr<InMemoryCatalog> catalog_;
110-
std::shared_ptr<Table> table_;
111134
std::shared_ptr<Table> minimal_table_;
112135
};
113136

src/iceberg/update/snapshot_manager.cc

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,39 @@
3434

3535
namespace iceberg {
3636

37+
namespace {
38+
39+
class AutoCommitGuard {
40+
public:
41+
AutoCommitGuard(std::shared_ptr<Transaction> transaction, bool auto_commit)
42+
: transaction_(std::move(transaction)), auto_commit_(auto_commit) {}
43+
44+
~AutoCommitGuard() {
45+
if (auto_commit_) {
46+
transaction_->EnableAutoCommit();
47+
} else {
48+
transaction_->DisableAutoCommit();
49+
}
50+
}
51+
52+
private:
53+
std::shared_ptr<Transaction> transaction_;
54+
bool auto_commit_;
55+
};
56+
57+
} // namespace
58+
3759
Result<std::shared_ptr<SnapshotManager>> SnapshotManager::Make(
3860
std::shared_ptr<Transaction> transaction) {
3961
ICEBERG_PRECHECK(transaction != nullptr, "Invalid input transaction: null");
4062
return std::shared_ptr<SnapshotManager>(new SnapshotManager(std::move(transaction)));
4163
}
4264

4365
SnapshotManager::SnapshotManager(std::shared_ptr<Transaction> transaction)
44-
: PendingUpdate(std::move(transaction)) {}
66+
: PendingUpdate(std::move(transaction)),
67+
original_auto_commit_(transaction_->auto_commit_) {
68+
transaction_->DisableAutoCommit();
69+
}
4570

4671
SnapshotManager::~SnapshotManager() = default;
4772

@@ -172,6 +197,7 @@ SnapshotManager& SnapshotManager::SetMaxRefAgeMs(const std::string& name,
172197
}
173198

174199
Status SnapshotManager::Commit() {
200+
AutoCommitGuard auto_commit_guard(transaction_, original_auto_commit_);
175201
transaction_->EnableAutoCommit();
176202
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
177203
ICEBERG_RETURN_UNEXPECTED(CommitIfRefUpdatesExist());

src/iceberg/update/snapshot_manager.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ class ICEBERG_EXPORT SnapshotManager : public PendingUpdate {
186186
/// \brief Commit any pending reference updates if they exist.
187187
Status CommitIfRefUpdatesExist();
188188

189+
bool original_auto_commit_;
189190
std::shared_ptr<UpdateSnapshotReference> update_snap_refs_;
190191
};
191192

0 commit comments

Comments
 (0)