|
19 | 19 |
|
20 | 20 | #include "iceberg/catalog/rest/rest_util.h" |
21 | 21 |
|
| 22 | +#include <unistd.h> |
| 23 | + |
| 24 | +#include <array> |
| 25 | +#include <string> |
| 26 | +#include <thread> |
| 27 | + |
| 28 | +#include <arpa/inet.h> |
22 | 29 | #include <gtest/gtest.h> |
| 30 | +#include <netinet/in.h> |
| 31 | +#include <sys/socket.h> |
23 | 32 |
|
| 33 | +#include "iceberg/catalog/rest/catalog_properties.h" |
24 | 34 | #include "iceberg/catalog/rest/endpoint.h" |
| 35 | +#include "iceberg/catalog/rest/rest_catalog.h" |
25 | 36 | #include "iceberg/table_identifier.h" |
26 | 37 | #include "iceberg/test/matchers.h" |
| 38 | +#include "iceberg/test/std_io.h" |
27 | 39 |
|
28 | 40 | namespace iceberg::rest { |
29 | 41 |
|
| 42 | +namespace { |
| 43 | + |
| 44 | +class StubHttpServer { |
| 45 | + public: |
| 46 | + explicit StubHttpServer(std::string config_json) |
| 47 | + : config_json_(std::move(config_json)) {} |
| 48 | + |
| 49 | + ~StubHttpServer() { Join(); } |
| 50 | + |
| 51 | + void Start() { |
| 52 | + listen_fd_ = socket(AF_INET, SOCK_STREAM, 0); |
| 53 | + ASSERT_GE(listen_fd_, 0); |
| 54 | + |
| 55 | + int opt = 1; |
| 56 | + setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); |
| 57 | + |
| 58 | + sockaddr_in addr{}; |
| 59 | + addr.sin_family = AF_INET; |
| 60 | + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); |
| 61 | + addr.sin_port = htons(0); // ephemeral port |
| 62 | + ASSERT_EQ(bind(listen_fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)), 0); |
| 63 | + ASSERT_EQ(listen(listen_fd_, 16), 0); |
| 64 | + |
| 65 | + socklen_t len = sizeof(addr); |
| 66 | + ASSERT_EQ(getsockname(listen_fd_, reinterpret_cast<sockaddr*>(&addr), &len), 0); |
| 67 | + port_ = ntohs(addr.sin_port); |
| 68 | + |
| 69 | + server_thread_ = std::thread([this]() { ServeTwoRequests(); }); |
| 70 | + } |
| 71 | + |
| 72 | + void Join() { |
| 73 | + if (server_thread_.joinable()) { |
| 74 | + server_thread_.join(); |
| 75 | + } |
| 76 | + if (listen_fd_ >= 0) { |
| 77 | + shutdown(listen_fd_, SHUT_RDWR); |
| 78 | + close(listen_fd_); |
| 79 | + listen_fd_ = -1; |
| 80 | + } |
| 81 | + } |
| 82 | + |
| 83 | + uint16_t port() const { return port_; } |
| 84 | + |
| 85 | + const std::string& first_path() const { return first_path_; } |
| 86 | + const std::string& second_path() const { return second_path_; } |
| 87 | + |
| 88 | + private: |
| 89 | + static std::string buildJsonResponse(const std::string& body) { |
| 90 | + std::string resp = "HTTP/1.1 200 OK\r\n"; |
| 91 | + resp += "Content-Type: application/json\r\n"; |
| 92 | + resp += "Connection: close\r\n"; |
| 93 | + resp += "Content-Length: " + std::to_string(body.size()) + "\r\n"; |
| 94 | + resp += "\r\n"; |
| 95 | + resp += body; |
| 96 | + return resp; |
| 97 | + } |
| 98 | + |
| 99 | + static std::string extractPathFromRequest(const std::string& req) { |
| 100 | + auto line_end = req.find("\r\n"); |
| 101 | + std::string first_line = |
| 102 | + (line_end == std::string::npos) ? req : req.substr(0, line_end); |
| 103 | + auto sp1 = first_line.find(' '); |
| 104 | + auto sp2 = |
| 105 | + (sp1 == std::string::npos) ? std::string::npos : first_line.find(' ', sp1 + 1); |
| 106 | + if (sp1 != std::string::npos && sp2 != std::string::npos && sp2 > sp1 + 1) { |
| 107 | + return first_line.substr(sp1 + 1, sp2 - (sp1 + 1)); |
| 108 | + } |
| 109 | + return {}; |
| 110 | + } |
| 111 | + |
| 112 | + void ServeOneRequest(std::string& out_path) { |
| 113 | + sockaddr_in client_addr{}; |
| 114 | + socklen_t client_len = sizeof(client_addr); |
| 115 | + int fd = accept(listen_fd_, reinterpret_cast<sockaddr*>(&client_addr), &client_len); |
| 116 | + ASSERT_GE(fd, 0); |
| 117 | + |
| 118 | + std::string req; |
| 119 | + std::array<char, 4096> buf{}; |
| 120 | + ssize_t n = read(fd, buf.data(), buf.size()); |
| 121 | + if (n > 0) { |
| 122 | + req.assign(buf.data(), static_cast<size_t>(n)); |
| 123 | + } |
| 124 | + |
| 125 | + out_path = extractPathFromRequest(req); |
| 126 | + |
| 127 | + std::string resp; |
| 128 | + if (out_path == "/v1/config") { |
| 129 | + resp = buildJsonResponse(config_json_); |
| 130 | + } else { |
| 131 | + resp = buildJsonResponse(R"({"namespaces":[]})"); |
| 132 | + } |
| 133 | + |
| 134 | + (void)write(fd, resp.data(), resp.size()); |
| 135 | + shutdown(fd, SHUT_RDWR); |
| 136 | + close(fd); |
| 137 | + } |
| 138 | + |
| 139 | + void ServeTwoRequests() { |
| 140 | + ServeOneRequest(first_path_); |
| 141 | + ServeOneRequest(second_path_); |
| 142 | + } |
| 143 | + |
| 144 | + std::string config_json_; |
| 145 | + int listen_fd_ = -1; |
| 146 | + uint16_t port_ = 0; |
| 147 | + std::thread server_thread_; |
| 148 | + |
| 149 | + std::string first_path_; |
| 150 | + std::string second_path_; |
| 151 | +}; |
| 152 | + |
| 153 | +} // namespace |
| 154 | + |
30 | 155 | TEST(RestUtilTest, TrimTrailingSlash) { |
31 | 156 | EXPECT_EQ(TrimTrailingSlash("https://foo"), "https://foo"); |
32 | 157 | EXPECT_EQ(TrimTrailingSlash("https://foo/"), "https://foo"); |
@@ -154,4 +279,29 @@ TEST(RestUtilTest, MergeConfigs) { |
154 | 279 | EXPECT_EQ(merged_empty["key"], "value"); |
155 | 280 | } |
156 | 281 |
|
| 282 | +TEST(RestUtilTest, PrefixIsTakenFromFinalConfig) { |
| 283 | + StubHttpServer server(R"({"defaults":{},"overrides":{"prefix":"serverp"}})"); |
| 284 | + server.Start(); |
| 285 | + |
| 286 | + auto config = RestCatalogProperties::default_properties(); |
| 287 | + config |
| 288 | + ->Set(RestCatalogProperties::kUri, |
| 289 | + std::string("http://127.0.0.1:") + std::to_string(server.port())) |
| 290 | + .Set(RestCatalogProperties::kName, std::string("test_catalog")) |
| 291 | + .Set(RestCatalogProperties::kWarehouse, std::string("wh")) |
| 292 | + .Set(RestCatalogProperties::kPrefix, std::string("clientp")); |
| 293 | + |
| 294 | + auto catalog_result = RestCatalog::Make(*config, std::make_shared<test::StdFileIO>()); |
| 295 | + ASSERT_THAT(catalog_result, IsOk()); |
| 296 | + auto& catalog = catalog_result.value(); |
| 297 | + |
| 298 | + Namespace root{.levels = {}}; |
| 299 | + auto list_result = catalog->ListNamespaces(root); |
| 300 | + ASSERT_THAT(list_result, IsOk()); |
| 301 | + |
| 302 | + server.Join(); |
| 303 | + EXPECT_EQ(server.first_path(), "/v1/config"); |
| 304 | + EXPECT_EQ(server.second_path(), "/v1/serverp/namespaces"); |
| 305 | +} |
| 306 | + |
157 | 307 | } // namespace iceberg::rest |
0 commit comments