Skip to content

Commit 54ff51a

Browse files
committed
feat: implement update location
1 parent f869003 commit 54ff51a

File tree

11 files changed

+297
-1
lines changed

11 files changed

+297
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ set(ICEBERG_SOURCES
8787
update/update_properties.cc
8888
update/update_schema.cc
8989
update/update_sort_order.cc
90+
update/update_location.cc
9091
util/bucket_util.cc
9192
util/content_file_util.cc
9293
util/conversions.cc

src/iceberg/table.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,13 @@ Result<std::shared_ptr<UpdateSchema>> Table::NewUpdateSchema() {
179179
return transaction->NewUpdateSchema();
180180
}
181181

182+
Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
183+
ICEBERG_ASSIGN_OR_RAISE(
184+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
185+
/*auto_commit=*/true));
186+
return transaction->NewUpdateLocation();
187+
}
188+
182189
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
183190
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
184191
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
144144
/// changes.
145145
virtual Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();
146146

147+
/// \brief Create a new UpdateLocation to update the table location and commit the
148+
/// changes.
149+
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
150+
147151
protected:
148152
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
149153
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/test/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ if(ICEBERG_BUILD_BUNDLE)
173173
update_partition_spec_test.cc
174174
update_properties_test.cc
175175
update_schema_test.cc
176-
update_sort_order_test.cc)
176+
update_sort_order_test.cc
177+
update_location_test.cc)
177178

178179
add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)
179180

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/update_location.h"
21+
22+
#include <memory>
23+
#include <string>
24+
25+
#include <arrow/filesystem/mockfs.h>
26+
#include <gmock/gmock.h>
27+
#include <gtest/gtest.h>
28+
29+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/test/matchers.h"
32+
#include "iceberg/test/update_test_base.h"
33+
34+
namespace iceberg {
35+
36+
class UpdateLocationTest : public UpdateTestBase {};
37+
38+
TEST_F(UpdateLocationTest, SetLocationSuccess) {
39+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
40+
const std::string new_location = "/warehouse/new_location";
41+
update->SetLocation(new_location);
42+
43+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
44+
EXPECT_EQ(result, new_location);
45+
}
46+
47+
TEST_F(UpdateLocationTest, SetLocationMultipleTimes) {
48+
// Test that setting location multiple times uses the last value
49+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
50+
update->SetLocation("/warehouse/first_location")
51+
.SetLocation("/warehouse/second_location")
52+
.SetLocation("/warehouse/final_location");
53+
54+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
55+
EXPECT_EQ(result, "/warehouse/final_location");
56+
}
57+
58+
TEST_F(UpdateLocationTest, SetEmptyLocation) {
59+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
60+
update->SetLocation("");
61+
62+
auto result = update->Apply();
63+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
64+
EXPECT_THAT(result, HasErrorMessage("Location cannot be empty"));
65+
}
66+
67+
TEST_F(UpdateLocationTest, ApplyWithoutSettingLocation) {
68+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
69+
70+
auto result = update->Apply();
71+
EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
72+
EXPECT_THAT(result, HasErrorMessage("Location must be set before applying"));
73+
}
74+
75+
TEST_F(UpdateLocationTest, CommitSuccess) {
76+
// Test empty commit (should fail since location is not set)
77+
ICEBERG_UNWRAP_OR_FAIL(auto empty_update, table_->NewUpdateLocation());
78+
auto empty_commit_result = empty_update->Commit();
79+
EXPECT_THAT(empty_commit_result, IsError(ErrorKind::kInvalidArgument));
80+
81+
// Test commit with location change
82+
// For MockFS, we need to create the metadata directory at the new location
83+
const std::string new_location = "/warehouse/new_table_location";
84+
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
85+
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
86+
ASSERT_TRUE(arrow_fs != nullptr);
87+
ASSERT_TRUE(arrow_fs->CreateDir(new_location + "/metadata").ok());
88+
89+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
90+
update->SetLocation(new_location);
91+
EXPECT_THAT(update->Commit(), IsOk());
92+
93+
// Verify the location was committed
94+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
95+
EXPECT_EQ(reloaded->location(), new_location);
96+
}
97+
98+
TEST_F(UpdateLocationTest, CommitWithRelativePath) {
99+
// Test that relative paths work
100+
const std::string relative_location = "warehouse/relative_location";
101+
102+
// Create metadata directory for the new location
103+
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
104+
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
105+
ASSERT_TRUE(arrow_fs != nullptr);
106+
ASSERT_TRUE(arrow_fs->CreateDir(relative_location + "/metadata").ok());
107+
108+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
109+
update->SetLocation(relative_location);
110+
111+
EXPECT_THAT(update->Commit(), IsOk());
112+
113+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
114+
EXPECT_EQ(reloaded->location(), relative_location);
115+
}
116+
117+
TEST_F(UpdateLocationTest, MultipleUpdatesSequentially) {
118+
// Get arrow_fs for creating directories
119+
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
120+
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
121+
ASSERT_TRUE(arrow_fs != nullptr);
122+
123+
// First update
124+
const std::string first_location = "/warehouse/first";
125+
ASSERT_TRUE(arrow_fs->CreateDir(first_location + "/metadata").ok());
126+
127+
ICEBERG_UNWRAP_OR_FAIL(auto update1, table_->NewUpdateLocation());
128+
update1->SetLocation(first_location);
129+
EXPECT_THAT(update1->Commit(), IsOk());
130+
131+
// Reload and verify
132+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded1, catalog_->LoadTable(table_ident_));
133+
EXPECT_EQ(reloaded1->location(), first_location);
134+
135+
// Second update
136+
const std::string second_location = "/warehouse/second";
137+
ASSERT_TRUE(arrow_fs->CreateDir(second_location + "/metadata").ok());
138+
139+
ICEBERG_UNWRAP_OR_FAIL(auto update2, reloaded1->NewUpdateLocation());
140+
update2->SetLocation(second_location);
141+
EXPECT_THAT(update2->Commit(), IsOk());
142+
143+
// Reload and verify
144+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded2, catalog_->LoadTable(table_ident_));
145+
EXPECT_EQ(reloaded2->location(), second_location);
146+
}
147+
148+
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "iceberg/table_update.h"
3333
#include "iceberg/update/pending_update.h"
3434
#include "iceberg/update/snapshot_update.h"
35+
#include "iceberg/update/update_location.h"
3536
#include "iceberg/update/update_partition_spec.h"
3637
#include "iceberg/update/update_properties.h"
3738
#include "iceberg/update/update_schema.h"
@@ -163,6 +164,11 @@ Status Transaction::Apply(PendingUpdate& update) {
163164
metadata_builder_->AssignUUID();
164165
}
165166
} break;
167+
case PendingUpdate::Kind::kUpdateLocation: {
168+
auto& update_location = internal::checked_cast<UpdateLocation&>(update);
169+
ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
170+
metadata_builder_->SetLocation(location);
171+
} break;
166172
default:
167173
return NotSupported("Unsupported pending update: {}",
168174
static_cast<int32_t>(update.kind()));
@@ -253,4 +259,11 @@ Result<std::shared_ptr<UpdateSchema>> Transaction::NewUpdateSchema() {
253259
return update_schema;
254260
}
255261

262+
Result<std::shared_ptr<UpdateLocation>> Transaction::NewUpdateLocation() {
263+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateLocation> update_location,
264+
UpdateLocation::Make(shared_from_this()));
265+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_location));
266+
return update_location;
267+
}
268+
256269
} // namespace iceberg

src/iceberg/transaction.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
7878
/// changes.
7979
Result<std::shared_ptr<UpdateSchema>> NewUpdateSchema();
8080

81+
/// \brief Create a new UpdateLocation to update the table location and commit the
82+
/// changes.
83+
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
84+
8185
private:
8286
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
8387
std::unique_ptr<TableMetadataBuilder> metadata_builder);

src/iceberg/type_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ class UpdatePartitionSpec;
193193
class UpdateProperties;
194194
class UpdateSchema;
195195
class UpdateSortOrder;
196+
class UpdateLocation;
196197

197198
/// ----------------------------------------------------------------------------
198199
/// TODO: Forward declarations below are not added yet.

src/iceberg/update/pending_update.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
4747
kUpdateSchema,
4848
kUpdateSnapshot,
4949
kUpdateSortOrder,
50+
kUpdateLocation,
5051
};
5152

5253
/// \brief Return the kind of this pending update.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/update_location.h"
21+
22+
#include <memory>
23+
#include <string>
24+
#include <string_view>
25+
26+
#include "iceberg/result.h"
27+
#include "iceberg/transaction.h"
28+
#include "iceberg/util/macros.h"
29+
30+
namespace iceberg {
31+
32+
Result<std::shared_ptr<UpdateLocation>> UpdateLocation::Make(
33+
std::shared_ptr<Transaction> transaction) {
34+
ICEBERG_PRECHECK(transaction != nullptr,
35+
"Cannot create UpdateLocation without a transaction");
36+
return std::shared_ptr<UpdateLocation>(new UpdateLocation(std::move(transaction)));
37+
}
38+
39+
UpdateLocation::UpdateLocation(std::shared_ptr<Transaction> transaction)
40+
: PendingUpdate(std::move(transaction)) {}
41+
42+
UpdateLocation::~UpdateLocation() = default;
43+
44+
UpdateLocation& UpdateLocation::SetLocation(std::string_view location) {
45+
ICEBERG_BUILDER_CHECK(!location.empty(), "Location cannot be empty");
46+
location_ = std::string(location);
47+
return *this;
48+
}
49+
50+
Result<std::string> UpdateLocation::Apply() {
51+
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
52+
if (location_.empty()) {
53+
return InvalidArgument("Location must be set before applying");
54+
}
55+
return location_;
56+
}
57+
58+
} // namespace iceberg

0 commit comments

Comments
 (0)