Skip to content

Commit 80ad6e4

Browse files
committed
feat: implement update location
1 parent deec370 commit 80ad6e4

11 files changed

Lines changed: 299 additions & 2 deletions

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ set(ICEBERG_SOURCES
8888
update/update_properties.cc
8989
update/update_schema.cc
9090
update/update_sort_order.cc
91+
update/update_location.cc
9192
util/bucket_util.cc
9293
util/content_file_util.cc
9394
util/conversions.cc

src/iceberg/table.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,13 @@ Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
192192
return transaction->NewExpireSnapshots();
193193
}
194194

195+
Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
196+
ICEBERG_ASSIGN_OR_RAISE(
197+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
198+
/*auto_commit=*/true));
199+
return transaction->NewUpdateLocation();
200+
}
201+
195202
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
196203
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
197204
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "iceberg/snapshot.h"
3030
#include "iceberg/table_identifier.h"
3131
#include "iceberg/type_fwd.h"
32+
#include "iceberg/update/update_location.h"
3233
#include "iceberg/util/timepoint.h"
3334

3435
namespace iceberg {
@@ -150,6 +151,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
150151
/// \brief Create a new ExpireSnapshots to remove expired snapshots and commit the
151152
/// changes.
152153
virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
154+
155+
/// \brief Create a new UpdateLocation to update the table location and commit the
156+
/// changes.
157+
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
153158

154159
protected:
155160
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,

src/iceberg/test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ if(ICEBERG_BUILD_BUNDLE)
175175
update_partition_spec_test.cc
176176
update_properties_test.cc
177177
update_schema_test.cc
178-
update_sort_order_test.cc)
178+
update_sort_order_test.cc
179+
update_location_test.cc)
179180

180181
add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)
181182

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/update_location.h"
21+
22+
#include <memory>
23+
#include <string>
24+
25+
#include <arrow/filesystem/mockfs.h>
26+
#include <gmock/gmock.h>
27+
#include <gtest/gtest.h>
28+
29+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/test/matchers.h"
32+
#include "iceberg/test/update_test_base.h"
33+
34+
namespace iceberg {
35+
36+
class UpdateLocationTest : public UpdateTestBase {};
37+
38+
TEST_F(UpdateLocationTest, SetLocationSuccess) {
39+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
40+
const std::string new_location = "/warehouse/new_location";
41+
update->SetLocation(new_location);
42+
43+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
44+
EXPECT_EQ(result, new_location);
45+
}
46+
47+
TEST_F(UpdateLocationTest, SetLocationMultipleTimes) {
48+
// Test that setting location multiple times uses the last value
49+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
50+
update->SetLocation("/warehouse/first_location")
51+
.SetLocation("/warehouse/second_location")
52+
.SetLocation("/warehouse/final_location");
53+
54+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
55+
EXPECT_EQ(result, "/warehouse/final_location");
56+
}
57+
58+
TEST_F(UpdateLocationTest, SetEmptyLocation) {
59+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
60+
update->SetLocation("");
61+
62+
auto result = update->Apply();
63+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
64+
EXPECT_THAT(result, HasErrorMessage("Location cannot be empty"));
65+
}
66+
67+
TEST_F(UpdateLocationTest, ApplyWithoutSettingLocation) {
68+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
69+
70+
auto result = update->Apply();
71+
EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
72+
EXPECT_THAT(result, HasErrorMessage("Location must be set before applying"));
73+
}
74+
75+
TEST_F(UpdateLocationTest, CommitSuccess) {
76+
// Test empty commit (should fail since location is not set)
77+
ICEBERG_UNWRAP_OR_FAIL(auto empty_update, table_->NewUpdateLocation());
78+
auto empty_commit_result = empty_update->Commit();
79+
EXPECT_THAT(empty_commit_result, IsError(ErrorKind::kInvalidArgument));
80+
81+
// Test commit with location change
82+
// For MockFS, we need to create the metadata directory at the new location
83+
const std::string new_location = "/warehouse/new_table_location";
84+
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
85+
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
86+
ASSERT_TRUE(arrow_fs != nullptr);
87+
ASSERT_TRUE(arrow_fs->CreateDir(new_location + "/metadata").ok());
88+
89+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
90+
update->SetLocation(new_location);
91+
EXPECT_THAT(update->Commit(), IsOk());
92+
93+
// Verify the location was committed
94+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
95+
EXPECT_EQ(reloaded->location(), new_location);
96+
}
97+
98+
TEST_F(UpdateLocationTest, CommitWithRelativePath) {
99+
// Test that relative paths work
100+
const std::string relative_location = "warehouse/relative_location";
101+
102+
// Create metadata directory for the new location
103+
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
104+
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
105+
ASSERT_TRUE(arrow_fs != nullptr);
106+
ASSERT_TRUE(arrow_fs->CreateDir(relative_location + "/metadata").ok());
107+
108+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
109+
update->SetLocation(relative_location);
110+
111+
EXPECT_THAT(update->Commit(), IsOk());
112+
113+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
114+
EXPECT_EQ(reloaded->location(), relative_location);
115+
}
116+
117+
TEST_F(UpdateLocationTest, MultipleUpdatesSequentially) {
118+
// Get arrow_fs for creating directories
119+
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
120+
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
121+
ASSERT_TRUE(arrow_fs != nullptr);
122+
123+
// First update
124+
const std::string first_location = "/warehouse/first";
125+
ASSERT_TRUE(arrow_fs->CreateDir(first_location + "/metadata").ok());
126+
127+
ICEBERG_UNWRAP_OR_FAIL(auto update1, table_->NewUpdateLocation());
128+
update1->SetLocation(first_location);
129+
EXPECT_THAT(update1->Commit(), IsOk());
130+
131+
// Reload and verify
132+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded1, catalog_->LoadTable(table_ident_));
133+
EXPECT_EQ(reloaded1->location(), first_location);
134+
135+
// Second update
136+
const std::string second_location = "/warehouse/second";
137+
ASSERT_TRUE(arrow_fs->CreateDir(second_location + "/metadata").ok());
138+
139+
ICEBERG_UNWRAP_OR_FAIL(auto update2, reloaded1->NewUpdateLocation());
140+
update2->SetLocation(second_location);
141+
EXPECT_THAT(update2->Commit(), IsOk());
142+
143+
// Reload and verify
144+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded2, catalog_->LoadTable(table_ident_));
145+
EXPECT_EQ(reloaded2->location(), second_location);
146+
}
147+
148+
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "iceberg/update/expire_snapshots.h"
3434
#include "iceberg/update/pending_update.h"
3535
#include "iceberg/update/snapshot_update.h"
36+
#include "iceberg/update/update_location.h"
3637
#include "iceberg/update/update_partition_spec.h"
3738
#include "iceberg/update/update_properties.h"
3839
#include "iceberg/update/update_schema.h"
@@ -183,6 +184,11 @@ Status Transaction::Apply(PendingUpdate& update) {
183184
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
184185
}
185186
} break;
187+
case PendingUpdate::Kind::kUpdateLocation: {
188+
auto& update_location = internal::checked_cast<UpdateLocation&>(update);
189+
ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
190+
metadata_builder_->SetLocation(location);
191+
} break;
186192
default:
187193
return NotSupported("Unsupported pending update: {}",
188194
static_cast<int32_t>(update.kind()));
@@ -280,4 +286,11 @@ Result<std::shared_ptr<ExpireSnapshots>> Transaction::NewExpireSnapshots() {
280286
return expire_snapshots;
281287
}
282288

289+
Result<std::shared_ptr<UpdateLocation>> Transaction::NewUpdateLocation() {
290+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateLocation> update_location,
291+
UpdateLocation::Make(shared_from_this()));
292+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_location));
293+
return update_location;
294+
}
295+
283296
} // namespace iceberg

src/iceberg/transaction.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
8181
/// \brief Create a new ExpireSnapshots to remove expired snapshots and commit the
8282
/// changes.
8383
Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
84+
85+
/// \brief Create a new UpdateLocation to update the table location and commit the
86+
/// changes.
87+
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
8488

8589
private:
8690
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,

src/iceberg/type_fwd.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,14 @@ class TableUpdateContext;
187187
class Transaction;
188188

189189
/// \brief Update family.
190+
class ExpireSnapshots;
190191
class PendingUpdate;
191192
class SnapshotUpdate;
192193
class UpdatePartitionSpec;
193194
class UpdateProperties;
194195
class UpdateSchema;
195196
class UpdateSortOrder;
196-
class ExpireSnapshots;
197+
class UpdateLocation;
197198

198199
/// ----------------------------------------------------------------------------
199200
/// TODO: Forward declarations below are not added yet.

src/iceberg/update/pending_update.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
4848
kUpdateSchema,
4949
kUpdateSnapshot,
5050
kUpdateSortOrder,
51+
kUpdateLocation,
5152
};
5253

5354
/// \brief Return the kind of this pending update.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/update_location.h"
21+
22+
#include <memory>
23+
#include <string>
24+
#include <string_view>
25+
26+
#include "iceberg/result.h"
27+
#include "iceberg/transaction.h"
28+
#include "iceberg/util/macros.h"
29+
30+
namespace iceberg {
31+
32+
Result<std::shared_ptr<UpdateLocation>> UpdateLocation::Make(
33+
std::shared_ptr<Transaction> transaction) {
34+
ICEBERG_PRECHECK(transaction != nullptr,
35+
"Cannot create UpdateLocation without a transaction");
36+
return std::shared_ptr<UpdateLocation>(new UpdateLocation(std::move(transaction)));
37+
}
38+
39+
UpdateLocation::UpdateLocation(std::shared_ptr<Transaction> transaction)
40+
: PendingUpdate(std::move(transaction)) {}
41+
42+
UpdateLocation::~UpdateLocation() = default;
43+
44+
UpdateLocation& UpdateLocation::SetLocation(std::string_view location) {
45+
ICEBERG_BUILDER_CHECK(!location.empty(), "Location cannot be empty");
46+
location_ = std::string(location);
47+
return *this;
48+
}
49+
50+
Result<std::string> UpdateLocation::Apply() {
51+
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
52+
if (location_.empty()) {
53+
return InvalidArgument("Location must be set before applying");
54+
}
55+
return location_;
56+
}
57+
58+
} // namespace iceberg

0 commit comments

Comments
 (0)