Skip to content

Commit f159675

Browse files
authored
feat: implement update location (#508)
1 parent 34d5a1d commit f159675

File tree

13 files changed

+288
-1
lines changed

13 files changed

+288
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ set(ICEBERG_SOURCES
8787
update/expire_snapshots.cc
8888
update/pending_update.cc
8989
update/snapshot_update.cc
90+
update/update_location.cc
9091
update/update_partition_spec.cc
9192
update/update_properties.cc
9293
update/update_schema.cc

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ iceberg_sources = files(
105105
'update/expire_snapshots.cc',
106106
'update/pending_update.cc',
107107
'update/snapshot_update.cc',
108+
'update/update_location.cc',
108109
'update/update_partition_spec.cc',
109110
'update/update_properties.cc',
110111
'update/update_schema.cc',

src/iceberg/table.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,13 @@ Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
192192
return transaction->NewExpireSnapshots();
193193
}
194194

195+
Result<std::shared_ptr<UpdateLocation>> Table::NewUpdateLocation() {
196+
ICEBERG_ASSIGN_OR_RAISE(
197+
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
198+
/*auto_commit=*/true));
199+
return transaction->NewUpdateLocation();
200+
}
201+
195202
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
196203
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
197204
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/table.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "iceberg/snapshot.h"
3030
#include "iceberg/table_identifier.h"
3131
#include "iceberg/type_fwd.h"
32+
#include "iceberg/update/update_location.h"
3233
#include "iceberg/util/timepoint.h"
3334

3435
namespace iceberg {
@@ -151,6 +152,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
151152
/// changes.
152153
virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
153154

155+
/// \brief Create a new UpdateLocation to update the table location and commit the
156+
/// changes.
157+
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
158+
154159
protected:
155160
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
156161
std::string metadata_location, std::shared_ptr<FileIO> io,

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
172172
SOURCES
173173
expire_snapshots_test.cc
174174
transaction_test.cc
175+
update_location_test.cc
175176
update_partition_spec_test.cc
176177
update_properties_test.cc
177178
update_schema_test.cc
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/update/update_location.h"
21+
22+
#include <memory>
23+
#include <string>
24+
25+
#include <arrow/filesystem/mockfs.h>
26+
#include <gmock/gmock.h>
27+
#include <gtest/gtest.h>
28+
29+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
30+
#include "iceberg/result.h"
31+
#include "iceberg/test/matchers.h"
32+
#include "iceberg/test/update_test_base.h"
33+
34+
namespace iceberg {
35+
36+
class UpdateLocationTest : public UpdateTestBase {};
37+
38+
TEST_F(UpdateLocationTest, SetLocationSuccess) {
39+
const std::string new_location = "/warehouse/new_location";
40+
41+
// Create metadata directory for the new location
42+
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
43+
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
44+
ASSERT_TRUE(arrow_fs != nullptr);
45+
ASSERT_TRUE(arrow_fs->CreateDir(new_location + "/metadata").ok());
46+
47+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
48+
update->SetLocation(new_location);
49+
50+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
51+
EXPECT_EQ(result, new_location);
52+
53+
// Commit and verify the location was persisted
54+
EXPECT_THAT(update->Commit(), IsOk());
55+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
56+
EXPECT_EQ(reloaded->location(), new_location);
57+
}
58+
59+
TEST_F(UpdateLocationTest, SetLocationMultipleTimes) {
60+
// Test that setting location multiple times uses the last value
61+
const std::string final_location = "/warehouse/final_location";
62+
63+
// Create metadata directory for the new location
64+
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
65+
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
66+
ASSERT_TRUE(arrow_fs != nullptr);
67+
ASSERT_TRUE(arrow_fs->CreateDir(final_location + "/metadata").ok());
68+
69+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
70+
update->SetLocation("/warehouse/first_location")
71+
.SetLocation("/warehouse/second_location")
72+
.SetLocation(final_location);
73+
74+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
75+
EXPECT_EQ(result, final_location);
76+
77+
// Commit and verify the final location was persisted
78+
EXPECT_THAT(update->Commit(), IsOk());
79+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
80+
EXPECT_EQ(reloaded->location(), final_location);
81+
}
82+
83+
TEST_F(UpdateLocationTest, SetEmptyLocation) {
84+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
85+
update->SetLocation("");
86+
87+
auto result = update->Apply();
88+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
89+
EXPECT_THAT(result, HasErrorMessage("Location cannot be empty"));
90+
}
91+
92+
TEST_F(UpdateLocationTest, ApplyWithoutSettingLocation) {
93+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
94+
95+
auto result = update->Apply();
96+
EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
97+
EXPECT_THAT(result, HasErrorMessage("Location must be set before applying"));
98+
}
99+
100+
TEST_F(UpdateLocationTest, MultipleUpdatesSequentially) {
101+
// Get arrow_fs for creating directories
102+
auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
103+
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
104+
ASSERT_TRUE(arrow_fs != nullptr);
105+
106+
// First update
107+
const std::string first_location = "/warehouse/first";
108+
ASSERT_TRUE(arrow_fs->CreateDir(first_location + "/metadata").ok());
109+
110+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation());
111+
update->SetLocation(first_location);
112+
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
113+
EXPECT_EQ(result, first_location);
114+
EXPECT_THAT(update->Commit(), IsOk());
115+
116+
// Reload and verify
117+
ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
118+
EXPECT_EQ(reloaded->location(), first_location);
119+
120+
// Second update
121+
const std::string second_location = "/warehouse/second";
122+
ASSERT_TRUE(arrow_fs->CreateDir(second_location + "/metadata").ok());
123+
124+
ICEBERG_UNWRAP_OR_FAIL(update, reloaded->NewUpdateLocation());
125+
update->SetLocation(second_location);
126+
ICEBERG_UNWRAP_OR_FAIL(result, update->Apply());
127+
EXPECT_EQ(result, second_location);
128+
EXPECT_THAT(update->Commit(), IsOk());
129+
130+
// Reload and verify
131+
ICEBERG_UNWRAP_OR_FAIL(reloaded, catalog_->LoadTable(table_ident_));
132+
EXPECT_EQ(reloaded->location(), second_location);
133+
}
134+
135+
} // namespace iceberg

src/iceberg/transaction.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include "iceberg/update/expire_snapshots.h"
3434
#include "iceberg/update/pending_update.h"
3535
#include "iceberg/update/snapshot_update.h"
36+
#include "iceberg/update/update_location.h"
3637
#include "iceberg/update/update_partition_spec.h"
3738
#include "iceberg/update/update_properties.h"
3839
#include "iceberg/update/update_schema.h"
@@ -183,6 +184,11 @@ Status Transaction::Apply(PendingUpdate& update) {
183184
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
184185
}
185186
} break;
187+
case PendingUpdate::Kind::kUpdateLocation: {
188+
auto& update_location = internal::checked_cast<UpdateLocation&>(update);
189+
ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply());
190+
metadata_builder_->SetLocation(location);
191+
} break;
186192
default:
187193
return NotSupported("Unsupported pending update: {}",
188194
static_cast<int32_t>(update.kind()));
@@ -280,4 +286,11 @@ Result<std::shared_ptr<ExpireSnapshots>> Transaction::NewExpireSnapshots() {
280286
return expire_snapshots;
281287
}
282288

289+
Result<std::shared_ptr<UpdateLocation>> Transaction::NewUpdateLocation() {
290+
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateLocation> update_location,
291+
UpdateLocation::Make(shared_from_this()));
292+
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_location));
293+
return update_location;
294+
}
295+
283296
} // namespace iceberg

src/iceberg/transaction.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
8282
/// changes.
8383
Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
8484

85+
/// \brief Create a new UpdateLocation to update the table location and commit the
86+
/// changes.
87+
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
88+
8589
private:
8690
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
8791
std::unique_ptr<TableMetadataBuilder> metadata_builder);

src/iceberg/type_fwd.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,13 +187,14 @@ class TableUpdateContext;
187187
class Transaction;
188188

189189
/// \brief Update family.
190+
class ExpireSnapshots;
190191
class PendingUpdate;
191192
class SnapshotUpdate;
193+
class UpdateLocation;
192194
class UpdatePartitionSpec;
193195
class UpdateProperties;
194196
class UpdateSchema;
195197
class UpdateSortOrder;
196-
class ExpireSnapshots;
197198

198199
/// ----------------------------------------------------------------------------
199200
/// TODO: Forward declarations below are not added yet.

src/iceberg/update/meson.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
install_headers(
1919
[
20+
'expire_snapshots.h',
2021
'pending_update.h',
2122
'snapshot_update.h',
23+
'update_location.h',
2224
'update_partition_spec.h',
2325
'update_schema.h',
2426
'update_sort_order.h',

0 commit comments

Comments
 (0)