|
20 | 20 | import io.streamnative.pulsar.handlers.amqp.QueueService; |
21 | 21 | import io.streamnative.pulsar.handlers.amqp.admin.model.VhostBean; |
22 | 22 | import java.net.URI; |
23 | | -import java.net.URL; |
24 | 23 | import java.util.ArrayList; |
25 | 24 | import java.util.List; |
26 | 25 | import java.util.concurrent.CompletableFuture; |
@@ -143,40 +142,36 @@ private PulsarService pulsar() { |
143 | 142 | return aop().getBrokerService().getPulsar(); |
144 | 143 | } |
145 | 144 |
|
146 | | - public boolean isRequestHttps() { |
147 | | - return "https".equalsIgnoreCase(httpRequest.getScheme()); |
148 | | - } |
149 | | - |
150 | 145 | protected CompletableFuture<Void> validateTopicOwnershipAsync(TopicName topicName, boolean authoritative) { |
151 | 146 | NamespaceService nsService = pulsar().getNamespaceService(); |
152 | 147 |
|
153 | 148 | LookupOptions options = LookupOptions.builder() |
154 | 149 | .authoritative(authoritative) |
155 | | - .requestHttps(isRequestHttps()) |
156 | 150 | .readOnly(false) |
157 | 151 | .loadTopicsInBundle(false) |
158 | 152 | .build(); |
159 | 153 |
|
160 | | - return nsService.getWebServiceUrlAsync(topicName, options) |
161 | | - .thenApply(webUrl -> { |
| 154 | + return nsService.getLookupResultForWebRequestAsync(topicName, options) |
| 155 | + .thenApply(lookupResult -> { |
162 | 156 | // Ensure we get a url |
163 | | - if (webUrl == null || !webUrl.isPresent()) { |
164 | | - log.info("Unable to get web service url"); |
| 157 | + if (lookupResult.isEmpty()) { |
| 158 | + log.warn("Unable to get lookup result for topic: {}, authoritative: {}", |
| 159 | + topicName, authoritative); |
165 | 160 | throw new RestException(Response.Status.PRECONDITION_FAILED, |
166 | 161 | "Failed to find ownership for topic:" + topicName); |
167 | 162 | } |
168 | | - return webUrl.get(); |
| 163 | + return lookupResult.get(); |
169 | 164 | }).thenCompose(webUrl -> nsService.isServiceUnitOwnedAsync(topicName) |
170 | 165 | .thenApply(isTopicOwned -> Pair.of(webUrl, isTopicOwned)) |
171 | 166 | ).thenAccept(pair -> { |
172 | | - URL webUrl = pair.getLeft(); |
| 167 | + URI webUri = pair.getLeft().toLookupRedirectUri(uri.getRequestUri()); |
173 | 168 | boolean isTopicOwned = pair.getRight(); |
174 | 169 |
|
175 | 170 | if (!isTopicOwned) { |
176 | 171 | boolean newAuthoritative = isLeaderBroker(pulsar()); |
177 | 172 | // Replace the host and port of the current request and redirect |
178 | 173 | URI redirect = UriBuilder.fromUri(uri.getRequestUri()) |
179 | | - .host(webUrl.getHost()) |
| 174 | + .host(webUri.getHost()) |
180 | 175 | .port(aop().getAmqpConfig().getAmqpAdminPort()) |
181 | 176 | .replaceQueryParam("authoritative", newAuthoritative) |
182 | 177 | .build(); |
|
0 commit comments