Skip to content

Commit cfa8192

Browse files
committed
[fix][sec] Upgrade to async-http-client 2.14.5 to address CVE-2026-40490 (#25546)
(cherry picked from commit a1613bc)
1 parent 470b0e6 commit cfa8192

8 files changed

Lines changed: 405 additions & 13 deletions

File tree

distribution/server/src/assemble/LICENSE.bin.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,8 +392,8 @@ The Apache Software License, Version 2.0
392392
* AirCompressor
393393
- io.airlift-aircompressor-2.0.3.jar
394394
* AsyncHttpClient
395-
- org.asynchttpclient-async-http-client-2.12.4.jar
396-
- org.asynchttpclient-async-http-client-netty-utils-2.12.4.jar
395+
- org.asynchttpclient-async-http-client-2.14.5.jar
396+
- org.asynchttpclient-async-http-client-netty-utils-2.14.5.jar
397397
* Jetty
398398
- org.eclipse.jetty-jetty-client-9.4.58.v20250814.jar
399399
- org.eclipse.jetty-jetty-continuation-9.4.58.v20250814.jar
@@ -611,7 +611,7 @@ Eclipse Public License - v2.0 -- ../licenses/LICENSE-EPL-2.0.txt
611611
* Jakarta Injection -- org.glassfish.hk2.external-jakarta.inject-2.6.1.jar
612612

613613
Public Domain (CC0) -- ../licenses/LICENSE-CC0.txt
614-
* Reactive Streams -- org.reactivestreams-reactive-streams-1.0.3.jar
614+
* Reactive Streams -- org.reactivestreams-reactive-streams-1.0.4.jar
615615

616616
Creative Commons Attribution License
617617
* Jcip -- ../licenses/LICENSE-jcip.txt

distribution/shell/src/assemble/LICENSE.bin.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -398,8 +398,8 @@ The Apache Software License, Version 2.0
398398
* AirCompressor
399399
- aircompressor-2.0.3.jar
400400
* AsyncHttpClient
401-
- async-http-client-2.12.4.jar
402-
- async-http-client-netty-utils-2.12.4.jar
401+
- async-http-client-2.14.5.jar
402+
- async-http-client-netty-utils-2.14.5.jar
403403
* Jetty
404404
- jetty-client-9.4.58.v20250814.jar
405405
- jetty-http-9.4.58.v20250814.jar
@@ -462,7 +462,7 @@ Eclipse Public License - v2.0 -- ../licenses/LICENSE-EPL-2.0.txt
462462
* Jakarta Injection -- jakarta.inject-2.6.1.jar
463463

464464
Public Domain (CC0) -- ../licenses/LICENSE-CC0.txt
465-
* Reactive Streams -- reactive-streams-1.0.3.jar
465+
* Reactive Streams -- reactive-streams-1.0.4.jar
466466

467467
Creative Commons Attribution License
468468
* Jcip -- ../licenses/LICENSE-jcip.txt

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ flexible messaging model and an intuitive client API.</description>
258258
<prometheus-jmx.version>0.16.1</prometheus-jmx.version>
259259
<confluent.version>7.9.2</confluent.version>
260260
<aircompressor.version>2.0.3</aircompressor.version>
261-
<asynchttpclient.version>2.12.4</asynchttpclient.version>
261+
<asynchttpclient.version>2.14.5</asynchttpclient.version>
262262
<commons-lang3.version>3.19.0</commons-lang3.version>
263263
<commons-io.version>2.21.0</commons-io.version>
264264
<commons-codec.version>1.20.0</commons-codec.version>

pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.apache.pulsar.client.admin.internal.http;
2020

2121
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
22+
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
2223
import static com.github.tomakehurst.wiremock.client.WireMock.get;
24+
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
2325
import static com.github.tomakehurst.wiremock.client.WireMock.post;
2426
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
2527
import static org.testng.Assert.assertEquals;
@@ -282,6 +284,60 @@ private void doTestRedirect(String location) throws InterruptedException, Execut
282284
assertEquals(response.getResponseBody(), "OK");
283285
}
284286

287+
/**
288+
* Locks in that AsyncHttpConnector forwards the {@code Authorization} header across a cross-origin
289+
* HTTP redirect (different host:port — serverA → serverB). The admin connector disables AHC's
290+
* built-in follow-redirect and runs its own redirect loop that copies the original request headers,
291+
* so it must remain unaffected by the async-http-client &gt;= 2.14.5 {@code Authorization} stripping
292+
* on cross-origin redirects (CVE-2026-40490 fix). Regressing {@code setFollowRedirect(true)} would
293+
* break this test.
294+
*/
295+
@Test
296+
void testAuthorizationHeaderOnCrossOriginRedirect() throws ExecutionException, InterruptedException {
297+
WireMockServer serverB = new WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
298+
serverB.start();
299+
try {
300+
server.stubFor(get(urlEqualTo("/admin/v2/clusters"))
301+
.willReturn(aResponse()
302+
.withStatus(307)
303+
.withHeader("Location",
304+
"http://127.0.0.1:" + serverB.port() + "/admin/v2/clusters")));
305+
306+
serverB.stubFor(get(urlEqualTo("/admin/v2/clusters"))
307+
.atPriority(2)
308+
.willReturn(aResponse().withStatus(401).withBody("missing auth")));
309+
serverB.stubFor(get(urlEqualTo("/admin/v2/clusters"))
310+
.atPriority(1)
311+
.withHeader("Authorization", equalTo("Bearer test-token"))
312+
.willReturn(aResponse()
313+
.withStatus(200)
314+
.withHeader("Content-Type", "application/json")
315+
.withBody("[\"test-cluster\"]")));
316+
317+
ClientConfigurationData conf = new ClientConfigurationData();
318+
conf.setServiceUrl("http://127.0.0.1:" + server.port());
319+
320+
@Cleanup
321+
AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000,
322+
5000, 0, conf, false);
323+
324+
Request request = new RequestBuilder("GET")
325+
.setUrl("http://127.0.0.1:" + server.port() + "/admin/v2/clusters")
326+
.addHeader("Authorization", "Bearer test-token")
327+
.build();
328+
329+
Response response = connector.executeRequest(request).get();
330+
331+
assertEquals(response.getStatusCode(), 200,
332+
"cross-origin redirect should forward Authorization and return the stubbed body");
333+
assertEquals(response.getResponseBody(), "[\"test-cluster\"]");
334+
serverB.verify(getRequestedFor(urlEqualTo("/admin/v2/clusters"))
335+
.withHeader("Authorization", equalTo("Bearer test-token")));
336+
} finally {
337+
serverB.stop();
338+
}
339+
}
340+
285341
@Test
286342
void testRedirectWithBody() throws ExecutionException, InterruptedException {
287343
server.stubFor(post(urlEqualTo("/path1"))

pulsar-client/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,12 @@
228228
<artifactId>fastutil</artifactId>
229229
</dependency>
230230

231+
<dependency>
232+
<groupId>com.github.tomakehurst</groupId>
233+
<artifactId>wiremock-jre8-standalone</artifactId>
234+
<version>${wiremock.version}</version>
235+
<scope>test</scope>
236+
</dependency>
231237
</dependencies>
232238

233239
<build>

pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.asynchttpclient.DefaultAsyncHttpClient;
5353
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
5454
import org.asynchttpclient.Request;
55+
import org.asynchttpclient.Response;
5556
import org.asynchttpclient.SslEngineFactory;
5657
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
5758

@@ -79,7 +80,12 @@ protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup
7980
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
8081
confBuilder.setCookieStore(null);
8182
confBuilder.setUseProxyProperties(true);
82-
confBuilder.setFollowRedirect(true);
83+
// Follow redirects manually in executeGet(...) so we can re-invoke authentication per hop and
84+
// carry the Authorization header across cross-origin redirects. async-http-client >= 2.14.5
85+
// (CVE-2026-40490 fix) strips the Authorization header when it follows redirects itself; Pulsar
86+
// HTTP lookups routinely redirect to another broker's httpUrl/httpUrlTls which is a different
87+
// host/port, i.e. cross-origin.
88+
confBuilder.setFollowRedirect(false);
8389
confBuilder.setMaxRedirects(conf.getMaxLookupRedirects());
8490
confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
8591
confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
@@ -156,7 +162,24 @@ public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
156162
try {
157163
URI hostUri = serviceNameResolver.resolveHostUri();
158164
String requestUrl = new URL(hostUri.toURL(), path).toString();
159-
String remoteHostName = hostUri.getHost();
165+
executeGet(requestUrl, clientConf.getMaxLookupRedirects(), future, clazz);
166+
} catch (Exception e) {
167+
log.warn("[{}] Failed to initiate HTTP get request: {}", path, e.getMessage());
168+
if (e instanceof PulsarClientException) {
169+
future.completeExceptionally(e);
170+
} else {
171+
future.completeExceptionally(new PulsarClientException(e));
172+
}
173+
}
174+
175+
return future;
176+
}
177+
178+
private <T> void executeGet(String requestUrl, int redirectsRemaining,
179+
CompletableFuture<T> future, Class<T> clazz) {
180+
try {
181+
URI currentUri = URI.create(requestUrl);
182+
String remoteHostName = currentUri.getHost();
160183
AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName);
161184

162185
CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
@@ -207,11 +230,17 @@ public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
207230
return;
208231
}
209232

233+
int statusCode = response2.getStatusCode();
234+
if (isRedirectStatusCode(statusCode)) {
235+
handleRedirect(requestUrl, currentUri, response2, redirectsRemaining, future, clazz);
236+
return;
237+
}
238+
210239
// request not success
211-
if (response2.getStatusCode() != HttpURLConnection.HTTP_OK) {
240+
if (statusCode != HttpURLConnection.HTTP_OK) {
212241
log.warn("[{}] HTTP get request failed: {}", requestUrl, response2.getStatusText());
213242
Exception e;
214-
if (response2.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
243+
if (statusCode == HttpURLConnection.HTTP_NOT_FOUND) {
215244
e = new NotFoundException("Not found: " + response2.getStatusText());
216245
} else {
217246
e = new PulsarClientException("HTTP get request failed: " + response2.getStatusText());
@@ -231,15 +260,46 @@ public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
231260
});
232261
});
233262
} catch (Exception e) {
234-
log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage());
263+
log.warn("[{}] HTTP request setup failed: {}", requestUrl, e.getMessage());
235264
if (e instanceof PulsarClientException) {
236265
future.completeExceptionally(e);
237266
} else {
238267
future.completeExceptionally(new PulsarClientException(e));
239268
}
240269
}
270+
}
241271

242-
return future;
272+
private <T> void handleRedirect(String requestUrl, URI currentUri, Response response,
273+
int redirectsRemaining, CompletableFuture<T> future, Class<T> clazz) {
274+
String location = response.getHeader("Location");
275+
if (location == null || location.isEmpty()) {
276+
future.completeExceptionally(new PulsarClientException(
277+
"HTTP redirect " + response.getStatusCode() + " without Location header: " + requestUrl));
278+
return;
279+
}
280+
if (redirectsRemaining <= 0) {
281+
future.completeExceptionally(new PulsarClientException(
282+
"Maximum redirects exceeded (" + clientConf.getMaxLookupRedirects()
283+
+ ") while following HTTP redirect for " + requestUrl));
284+
return;
285+
}
286+
String newUrl;
287+
try {
288+
newUrl = currentUri.resolve(location).toString();
289+
} catch (Exception e) {
290+
future.completeExceptionally(new PulsarClientException(
291+
"Invalid redirect Location \"" + location + "\" for " + requestUrl));
292+
return;
293+
}
294+
executeGet(newUrl, redirectsRemaining - 1, future, clazz);
295+
}
296+
297+
private static boolean isRedirectStatusCode(int statusCode) {
298+
return statusCode == HttpURLConnection.HTTP_MOVED_PERM // 301
299+
|| statusCode == HttpURLConnection.HTTP_MOVED_TEMP // 302
300+
|| statusCode == HttpURLConnection.HTTP_SEE_OTHER // 303
301+
|| statusCode == 307 // Temporary Redirect
302+
|| statusCode == 308; // Permanent Redirect
243303
}
244304

245305
protected PulsarSslConfiguration buildSslConfiguration(ClientConfigurationData config, String host)
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl;
20+
21+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
22+
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
23+
import static com.github.tomakehurst.wiremock.client.WireMock.get;
24+
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
25+
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
26+
import static org.testng.Assert.assertEquals;
27+
import static org.testng.Assert.assertNotNull;
28+
import com.github.tomakehurst.wiremock.WireMockServer;
29+
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
30+
import io.netty.channel.EventLoopGroup;
31+
import io.netty.channel.nio.NioEventLoopGroup;
32+
import io.netty.util.concurrent.DefaultThreadFactory;
33+
import java.util.concurrent.TimeUnit;
34+
import org.apache.pulsar.client.api.AuthenticationFactory;
35+
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
36+
import org.apache.pulsar.common.lookup.data.LookupData;
37+
import org.testng.annotations.AfterClass;
38+
import org.testng.annotations.AfterMethod;
39+
import org.testng.annotations.BeforeClass;
40+
import org.testng.annotations.BeforeMethod;
41+
import org.testng.annotations.Test;
42+
43+
/**
44+
* Verifies that {@link HttpClient} carries the {@code Authorization} header across cross-origin HTTP redirects.
45+
*
46+
* <p>Pulsar's HTTP lookup endpoint returns {@code 307 Temporary Redirect} to whichever broker owns the bundle for a
47+
* topic. The redirect target is that broker's {@code httpUrl}/{@code httpUrlTls}, i.e. typically a different host or
48+
* port from the original request. Auth plugins ({@code AuthenticationToken}, {@code AuthenticationBasic},
49+
* {@code AuthenticationOAuth2}, {@code AuthenticationAthenz}) inject the {@code Authorization} header — that header
50+
* must reach the redirect target for lookup to succeed.
51+
*
52+
* <p>async-http-client 2.14.5 strips {@code Authorization} on cross-origin redirects when its built-in follow-redirect
53+
* is enabled (CVE-2026-40490 fix). This test drives two WireMock servers on different ports to exercise that path.
54+
*/
55+
public class HttpClientTest {
56+
57+
private static final String LOOKUP_PATH = "/lookup/v2/topic/persistent/public/default/test-topic";
58+
private static final String EXPECTED_BODY = "{\"brokerUrl\":\"pulsar://broker-b:6650\","
59+
+ "\"brokerUrlTls\":\"pulsar+ssl://broker-b:6651\","
60+
+ "\"httpUrl\":\"http://broker-b:8080\","
61+
+ "\"httpUrlTls\":\"https://broker-b:8443\","
62+
+ "\"nativeUrl\":\"pulsar://broker-b:6650\"}";
63+
64+
private WireMockServer serverA;
65+
private WireMockServer serverB;
66+
private EventLoopGroup eventLoopGroup;
67+
68+
@BeforeClass(alwaysRun = true)
69+
void beforeClass() {
70+
eventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("HttpClientTest"));
71+
}
72+
73+
@AfterClass(alwaysRun = true)
74+
void afterClass() {
75+
if (eventLoopGroup != null) {
76+
eventLoopGroup.shutdownGracefully();
77+
}
78+
}
79+
80+
@BeforeMethod(alwaysRun = true)
81+
void beforeMethod() {
82+
serverA = new WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
83+
serverB = new WireMockServer(WireMockConfiguration.wireMockConfig().port(0));
84+
serverA.start();
85+
serverB.start();
86+
}
87+
88+
@AfterMethod(alwaysRun = true)
89+
void afterMethod() {
90+
if (serverA != null) {
91+
serverA.stop();
92+
}
93+
if (serverB != null) {
94+
serverB.stop();
95+
}
96+
}
97+
98+
@Test
99+
public void testCrossOriginRedirectCarriesAuthorizationHeader() throws Exception {
100+
// serverA (origin host:port) returns 307 Temporary Redirect to serverB (different port -> cross-origin).
101+
serverA.stubFor(get(urlPathMatching(LOOKUP_PATH))
102+
.willReturn(aResponse()
103+
.withStatus(307)
104+
.withHeader("Location",
105+
"http://127.0.0.1:" + serverB.port() + LOOKUP_PATH)));
106+
107+
// serverB only responds 200 when the Authorization header is present with the expected token.
108+
// Priorities: lower = higher priority; the specific-Authorization stub must be checked first.
109+
serverB.stubFor(get(urlPathMatching(LOOKUP_PATH))
110+
.atPriority(2)
111+
.willReturn(aResponse().withStatus(401).withBody("missing auth")));
112+
serverB.stubFor(get(urlPathMatching(LOOKUP_PATH))
113+
.atPriority(1)
114+
.withHeader("Authorization", equalTo("Bearer test-token"))
115+
.willReturn(aResponse()
116+
.withStatus(200)
117+
.withHeader("Content-Type", "application/json")
118+
.withBody(EXPECTED_BODY)));
119+
120+
ClientConfigurationData conf = new ClientConfigurationData();
121+
conf.setServiceUrl("http://127.0.0.1:" + serverA.port());
122+
conf.setAuthentication(AuthenticationFactory.token("test-token"));
123+
124+
try (HttpClient httpClient = new HttpClient(conf, eventLoopGroup)) {
125+
LookupData result = httpClient.get(LOOKUP_PATH, LookupData.class)
126+
.get(30, TimeUnit.SECONDS);
127+
128+
assertNotNull(result, "Expected lookup payload after cross-origin redirect");
129+
assertEquals(result.getBrokerUrl(), "pulsar://broker-b:6650");
130+
assertEquals(result.getHttpUrl(), "http://broker-b:8080");
131+
132+
// Lock the invariant: the final hop on serverB must carry the Authorization header.
133+
serverB.verify(getRequestedFor(urlPathMatching(LOOKUP_PATH))
134+
.withHeader("Authorization", equalTo("Bearer test-token")));
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)