Skip to content

Commit 7e6357f

Browse files
rest: rebuild resource paths from final config
1 parent d328d0a commit 7e6357f

2 files changed

Lines changed: 150 additions & 2 deletions

File tree

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
117117
ICEBERG_ASSIGN_OR_RAISE(auto server_config, FetchServerConfig(*paths, config));
118118

119119
std::unique_ptr<RestCatalogProperties> final_config = RestCatalogProperties::FromMap(
120-
MergeConfigs(server_config.overrides, config.configs(), server_config.defaults));
120+
MergeConfigs(server_config.defaults, config.configs(), server_config.overrides));
121121

122122
std::unordered_set<Endpoint> endpoints;
123123
if (!server_config.endpoints.empty()) {
@@ -132,7 +132,11 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
132132

133133
// Update resource paths based on the final config
134134
ICEBERG_ASSIGN_OR_RAISE(auto final_uri, final_config->Uri());
135-
ICEBERG_RETURN_UNEXPECTED(paths->SetBaseUri(std::string(TrimTrailingSlash(final_uri))));
135+
ICEBERG_ASSIGN_OR_RAISE(
136+
auto final_paths,
137+
ResourcePaths::Make(std::string(TrimTrailingSlash(final_uri)),
138+
final_config->Get(RestCatalogProperties::kPrefix)));
139+
paths = std::move(final_paths);
136140

137141
return std::shared_ptr<RestCatalog>(
138142
new RestCatalog(std::move(final_config), std::move(file_io), std::move(paths),

src/iceberg/test/rest_util_test.cc

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,134 @@
1919

2020
#include "iceberg/catalog/rest/rest_util.h"
2121

22+
#include <arpa/inet.h>
2223
#include <gtest/gtest.h>
24+
#include <netinet/in.h>
25+
#include <sys/socket.h>
26+
#include <unistd.h>
2327

28+
#include <string>
29+
#include <thread>
30+
31+
#include "iceberg/catalog/rest/catalog_properties.h"
2432
#include "iceberg/catalog/rest/endpoint.h"
33+
#include "iceberg/catalog/rest/rest_catalog.h"
2534
#include "iceberg/table_identifier.h"
2635
#include "iceberg/test/matchers.h"
36+
#include "iceberg/test/std_io.h"
2737

2838
namespace iceberg::rest {
2939

40+
namespace {
41+
42+
class StubHttpServer {
43+
public:
44+
explicit StubHttpServer(std::string config_json) : config_json_(std::move(config_json)) {}
45+
46+
~StubHttpServer() { Join(); }
47+
48+
void Start() {
49+
listen_fd_ = socket(AF_INET, SOCK_STREAM, 0);
50+
ASSERT_GE(listen_fd_, 0);
51+
52+
int opt = 1;
53+
setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
54+
55+
sockaddr_in addr{};
56+
addr.sin_family = AF_INET;
57+
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
58+
addr.sin_port = htons(0); // ephemeral port
59+
ASSERT_EQ(bind(listen_fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)), 0);
60+
ASSERT_EQ(listen(listen_fd_, 16), 0);
61+
62+
socklen_t len = sizeof(addr);
63+
ASSERT_EQ(getsockname(listen_fd_, reinterpret_cast<sockaddr*>(&addr), &len), 0);
64+
port_ = ntohs(addr.sin_port);
65+
66+
server_thread_ = std::thread([this]() { ServeTwoRequests(); });
67+
}
68+
69+
void Join() {
70+
if (server_thread_.joinable()) {
71+
server_thread_.join();
72+
}
73+
if (listen_fd_ >= 0) {
74+
shutdown(listen_fd_, SHUT_RDWR);
75+
close(listen_fd_);
76+
listen_fd_ = -1;
77+
}
78+
}
79+
80+
uint16_t port() const { return port_; }
81+
82+
const std::string& first_path() const { return first_path_; }
83+
const std::string& second_path() const { return second_path_; }
84+
85+
private:
86+
static std::string buildJsonResponse(const std::string& body) {
87+
std::string resp = "HTTP/1.1 200 OK\r\n";
88+
resp += "Content-Type: application/json\r\n";
89+
resp += "Connection: close\r\n";
90+
resp += "Content-Length: " + std::to_string(body.size()) + "\r\n";
91+
resp += "\r\n";
92+
resp += body;
93+
return resp;
94+
}
95+
96+
static std::string extractPathFromRequest(const std::string& req) {
97+
auto line_end = req.find("\r\n");
98+
std::string first_line = (line_end == std::string::npos) ? req : req.substr(0, line_end);
99+
auto sp1 = first_line.find(' ');
100+
auto sp2 = (sp1 == std::string::npos) ? std::string::npos : first_line.find(' ', sp1 + 1);
101+
if (sp1 != std::string::npos && sp2 != std::string::npos && sp2 > sp1 + 1) {
102+
return first_line.substr(sp1 + 1, sp2 - (sp1 + 1));
103+
}
104+
return {};
105+
}
106+
107+
void ServeOneRequest(std::string& out_path) {
108+
sockaddr_in client_addr{};
109+
socklen_t client_len = sizeof(client_addr);
110+
int fd = accept(listen_fd_, reinterpret_cast<sockaddr*>(&client_addr), &client_len);
111+
ASSERT_GE(fd, 0);
112+
113+
std::string req;
114+
char buf[4096];
115+
ssize_t n = read(fd, buf, sizeof(buf));
116+
if (n > 0) {
117+
req.assign(buf, buf + n);
118+
}
119+
120+
out_path = extractPathFromRequest(req);
121+
122+
std::string resp;
123+
if (out_path == "/v1/config") {
124+
resp = buildJsonResponse(config_json_);
125+
} else {
126+
resp = buildJsonResponse(R"({"namespaces":[]})");
127+
}
128+
129+
(void)write(fd, resp.data(), resp.size());
130+
shutdown(fd, SHUT_RDWR);
131+
close(fd);
132+
}
133+
134+
void ServeTwoRequests() {
135+
ServeOneRequest(first_path_);
136+
ServeOneRequest(second_path_);
137+
}
138+
139+
std::string config_json_;
140+
int listen_fd_ = -1;
141+
uint16_t port_ = 0;
142+
std::thread server_thread_;
143+
144+
std::string first_path_;
145+
std::string second_path_;
146+
};
147+
148+
} // namespace
149+
30150
TEST(RestUtilTest, TrimTrailingSlash) {
31151
EXPECT_EQ(TrimTrailingSlash("https://foo"), "https://foo");
32152
EXPECT_EQ(TrimTrailingSlash("https://foo/"), "https://foo");
@@ -154,4 +274,28 @@ TEST(RestUtilTest, MergeConfigs) {
154274
EXPECT_EQ(merged_empty["key"], "value");
155275
}
156276

277+
TEST(RestUtilTest, PrefixIsTakenFromFinalConfig) {
278+
StubHttpServer server(R"({"defaults":{},"overrides":{"prefix":"serverp"}})");
279+
server.Start();
280+
281+
auto config = RestCatalogProperties::default_properties();
282+
config->Set(RestCatalogProperties::kUri,
283+
std::string("http://127.0.0.1:") + std::to_string(server.port()))
284+
.Set(RestCatalogProperties::kName, std::string("test_catalog"))
285+
.Set(RestCatalogProperties::kWarehouse, std::string("wh"))
286+
.Set(RestCatalogProperties::kPrefix, std::string("clientp"));
287+
288+
auto catalog_result = RestCatalog::Make(*config, std::make_shared<test::StdFileIO>());
289+
ASSERT_THAT(catalog_result, IsOk());
290+
auto& catalog = catalog_result.value();
291+
292+
Namespace root{.levels = {}};
293+
auto list_result = catalog->ListNamespaces(root);
294+
ASSERT_THAT(list_result, IsOk());
295+
296+
server.Join();
297+
EXPECT_EQ(server.first_path(), "/v1/config");
298+
EXPECT_EQ(server.second_path(), "/v1/serverp/namespaces");
299+
}
300+
157301
} // namespace iceberg::rest

0 commit comments

Comments
 (0)