Skip to content

Commit 21775d9

Browse files
rest: rebuild resource paths from final config
1 parent d328d0a commit 21775d9

2 files changed

Lines changed: 155 additions & 2 deletions

File tree

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
117117
ICEBERG_ASSIGN_OR_RAISE(auto server_config, FetchServerConfig(*paths, config));
118118

119119
std::unique_ptr<RestCatalogProperties> final_config = RestCatalogProperties::FromMap(
120-
MergeConfigs(server_config.overrides, config.configs(), server_config.defaults));
120+
MergeConfigs(server_config.defaults, config.configs(), server_config.overrides));
121121

122122
std::unordered_set<Endpoint> endpoints;
123123
if (!server_config.endpoints.empty()) {
@@ -132,7 +132,11 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
132132

133133
// Update resource paths based on the final config
134134
ICEBERG_ASSIGN_OR_RAISE(auto final_uri, final_config->Uri());
135-
ICEBERG_RETURN_UNEXPECTED(paths->SetBaseUri(std::string(TrimTrailingSlash(final_uri))));
135+
ICEBERG_ASSIGN_OR_RAISE(
136+
auto final_paths,
137+
ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)),
138+
final_config->Get(RestCatalogProperties::kPrefix)));
139+
paths = std::move(final_paths);
136140

137141
return std::shared_ptr<RestCatalog>(
138142
new RestCatalog(std::move(final_config), std::move(file_io), std::move(paths),

src/iceberg/test/rest_util_test.cc

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,138 @@
1919

2020
#include "iceberg/catalog/rest/rest_util.h"
2121

22+
#include <unistd.h>
23+
24+
#include <string>
25+
#include <thread>
26+
27+
#include <arpa/inet.h>
2228
#include <gtest/gtest.h>
29+
#include <netinet/in.h>
30+
#include <sys/socket.h>
2331

32+
#include "iceberg/catalog/rest/catalog_properties.h"
2433
#include "iceberg/catalog/rest/endpoint.h"
34+
#include "iceberg/catalog/rest/rest_catalog.h"
2535
#include "iceberg/table_identifier.h"
2636
#include "iceberg/test/matchers.h"
37+
#include "iceberg/test/std_io.h"
2738

2839
namespace iceberg::rest {
2940

41+
namespace {
42+
43+
class StubHttpServer {
44+
public:
45+
explicit StubHttpServer(std::string config_json)
46+
: config_json_(std::move(config_json)) {}
47+
48+
~StubHttpServer() { Join(); }
49+
50+
void Start() {
51+
listen_fd_ = socket(AF_INET, SOCK_STREAM, 0);
52+
ASSERT_GE(listen_fd_, 0);
53+
54+
int opt = 1;
55+
setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
56+
57+
sockaddr_in addr{};
58+
addr.sin_family = AF_INET;
59+
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
60+
addr.sin_port = htons(0); // ephemeral port
61+
ASSERT_EQ(bind(listen_fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)), 0);
62+
ASSERT_EQ(listen(listen_fd_, 16), 0);
63+
64+
socklen_t len = sizeof(addr);
65+
ASSERT_EQ(getsockname(listen_fd_, reinterpret_cast<sockaddr*>(&addr), &len), 0);
66+
port_ = ntohs(addr.sin_port);
67+
68+
server_thread_ = std::thread([this]() { ServeTwoRequests(); });
69+
}
70+
71+
void Join() {
72+
if (server_thread_.joinable()) {
73+
server_thread_.join();
74+
}
75+
if (listen_fd_ >= 0) {
76+
shutdown(listen_fd_, SHUT_RDWR);
77+
close(listen_fd_);
78+
listen_fd_ = -1;
79+
}
80+
}
81+
82+
uint16_t port() const { return port_; }
83+
84+
const std::string& first_path() const { return first_path_; }
85+
const std::string& second_path() const { return second_path_; }
86+
87+
private:
88+
static std::string buildJsonResponse(const std::string& body) {
89+
std::string resp = "HTTP/1.1 200 OK\r\n";
90+
resp += "Content-Type: application/json\r\n";
91+
resp += "Connection: close\r\n";
92+
resp += "Content-Length: " + std::to_string(body.size()) + "\r\n";
93+
resp += "\r\n";
94+
resp += body;
95+
return resp;
96+
}
97+
98+
static std::string extractPathFromRequest(const std::string& req) {
99+
auto line_end = req.find("\r\n");
100+
std::string first_line =
101+
(line_end == std::string::npos) ? req : req.substr(0, line_end);
102+
auto sp1 = first_line.find(' ');
103+
auto sp2 =
104+
(sp1 == std::string::npos) ? std::string::npos : first_line.find(' ', sp1 + 1);
105+
if (sp1 != std::string::npos && sp2 != std::string::npos && sp2 > sp1 + 1) {
106+
return first_line.substr(sp1 + 1, sp2 - (sp1 + 1));
107+
}
108+
return {};
109+
}
110+
111+
void ServeOneRequest(std::string& out_path) {
112+
sockaddr_in client_addr{};
113+
socklen_t client_len = sizeof(client_addr);
114+
int fd = accept(listen_fd_, reinterpret_cast<sockaddr*>(&client_addr), &client_len);
115+
ASSERT_GE(fd, 0);
116+
117+
std::string req;
118+
char buf[4096];
119+
ssize_t n = read(fd, buf, sizeof(buf));
120+
if (n > 0) {
121+
req.assign(buf, buf + n);
122+
}
123+
124+
out_path = extractPathFromRequest(req);
125+
126+
std::string resp;
127+
if (out_path == "/v1/config") {
128+
resp = buildJsonResponse(config_json_);
129+
} else {
130+
resp = buildJsonResponse(R"({"namespaces":[]})");
131+
}
132+
133+
(void)write(fd, resp.data(), resp.size());
134+
shutdown(fd, SHUT_RDWR);
135+
close(fd);
136+
}
137+
138+
void ServeTwoRequests() {
139+
ServeOneRequest(first_path_);
140+
ServeOneRequest(second_path_);
141+
}
142+
143+
std::string config_json_;
144+
int listen_fd_ = -1;
145+
uint16_t port_ = 0;
146+
std::thread server_thread_;
147+
148+
std::string first_path_;
149+
std::string second_path_;
150+
};
151+
152+
} // namespace
153+
30154
TEST(RestUtilTest, TrimTrailingSlash) {
31155
EXPECT_EQ(TrimTrailingSlash("https://foo"), "https://foo");
32156
EXPECT_EQ(TrimTrailingSlash("https://foo/"), "https://foo");
@@ -154,4 +278,29 @@ TEST(RestUtilTest, MergeConfigs) {
154278
EXPECT_EQ(merged_empty["key"], "value");
155279
}
156280

281+
TEST(RestUtilTest, PrefixIsTakenFromFinalConfig) {
282+
StubHttpServer server(R"({"defaults":{},"overrides":{"prefix":"serverp"}})");
283+
server.Start();
284+
285+
auto config = RestCatalogProperties::default_properties();
286+
config
287+
->Set(RestCatalogProperties::kUri,
288+
std::string("http://127.0.0.1:") + std::to_string(server.port()))
289+
.Set(RestCatalogProperties::kName, std::string("test_catalog"))
290+
.Set(RestCatalogProperties::kWarehouse, std::string("wh"))
291+
.Set(RestCatalogProperties::kPrefix, std::string("clientp"));
292+
293+
auto catalog_result = RestCatalog::Make(*config, std::make_shared<test::StdFileIO>());
294+
ASSERT_THAT(catalog_result, IsOk());
295+
auto& catalog = catalog_result.value();
296+
297+
Namespace root{.levels = {}};
298+
auto list_result = catalog->ListNamespaces(root);
299+
ASSERT_THAT(list_result, IsOk());
300+
301+
server.Join();
302+
EXPECT_EQ(server.first_path(), "/v1/config");
303+
EXPECT_EQ(server.second_path(), "/v1/serverp/namespaces");
304+
}
305+
157306
} // namespace iceberg::rest

0 commit comments

Comments
 (0)