Skip to content

Commit 64e4459

Browse files
mattisonchaoclaude
andcommitted
[improve][broker] Offload web response from metadata thread for list tenants/namespaces/clusters
The list tenants, list namespaces, and list clusters REST endpoints currently execute asyncResponse.resume() on the metadata store thread. This can block the metadata thread pool, leading to deadlocks or performance degradation. Offload the response handling (sorting, filtering, collecting) and asyncResponse.resume() to the web service executor thread pool by using thenAcceptAsync/thenApplyAsync with the web executor. - TenantsBase.getTenants(): thenAccept -> thenAcceptAsync - ClustersBase.getClusters(): thenApply+thenAccept -> thenAcceptAsync - Namespaces.getTenantNamespaces(): thenAccept -> thenAcceptAsync - WebService: expose webServiceExecutor via @Getter Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 08d89a2 commit 64e4459

5 files changed

Lines changed: 33 additions & 17 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,15 @@ public class ClustersBase extends AdminResource {
9292
})
9393
public void getClusters(@Suspended AsyncResponse asyncResponse) {
9494
clusterResources().listAsync()
95-
.thenApply(HashSet::new)
96-
.thenAccept(asyncResponse::resume)
97-
.exceptionally(ex -> {
98-
log.error("[{}] Failed to get clusters {}", clientAppId(), ex);
99-
resumeAsyncResponseExceptionally(asyncResponse, ex);
95+
.<Void>handleAsync((clusters, ex) -> {
96+
if (ex != null) {
97+
log.error("[{}] Failed to get clusters {}", clientAppId(), ex);
98+
resumeAsyncResponseExceptionally(asyncResponse, ex);
99+
return null;
100+
}
101+
asyncResponse.resume(clusters);
100102
return null;
101-
});
103+
}, webExecutor());
102104
}
103105

104106
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,18 @@ public void getTenants(@Suspended final AsyncResponse asyncResponse) {
6464
final String clientAppId = clientAppId();
6565
validateBothSuperUserAndTenantOperation(null, TenantOperation.LIST_TENANTS)
6666
.thenCompose(__ -> tenantResources().listTenantsAsync())
67-
.thenAccept(tenants -> {
67+
.<Void>handleAsync((tenants, ex) -> {
68+
if (ex != null) {
69+
log.error("[{}] Failed to get tenants list", clientAppId, ex);
70+
resumeAsyncResponseExceptionally(asyncResponse, ex);
71+
return null;
72+
}
6873
// deep copy the tenants to avoid concurrent sort exception
6974
List<String> deepCopy = new ArrayList<>(tenants);
7075
deepCopy.sort(null);
7176
asyncResponse.resume(deepCopy);
72-
}).exceptionally(ex -> {
73-
log.error("[{}] Failed to get tenants list", clientAppId, ex);
74-
resumeAsyncResponseExceptionally(asyncResponse, ex);
7577
return null;
76-
});
78+
}, webExecutor());
7779
}
7880

7981
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,15 @@ public class Namespaces extends NamespacesBase {
106106
public void getTenantNamespaces(@Suspended final AsyncResponse response,
107107
@PathParam("tenant") String tenant) {
108108
internalGetTenantNamespaces(tenant)
109-
.thenAccept(response::resume)
110-
.exceptionally(ex -> {
111-
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), ex);
112-
resumeAsyncResponseExceptionally(response, ex);
109+
.<Void>handleAsync((namespaces, ex) -> {
110+
if (ex != null) {
111+
log.error("[{}] Failed to get namespaces list: {}", clientAppId(), ex);
112+
resumeAsyncResponseExceptionally(response, ex);
113+
return null;
114+
}
115+
response.resume(namespaces);
113116
return null;
114-
});
117+
}, webExecutor());
115118
}
116119

117120
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Optional;
3535
import java.util.Set;
3636
import java.util.concurrent.CompletableFuture;
37+
import java.util.concurrent.Executor;
3738
import java.util.concurrent.ExecutionException;
3839
import java.util.concurrent.TimeUnit;
3940
import java.util.concurrent.TimeoutException;
@@ -133,6 +134,7 @@ public abstract class PulsarWebResource {
133134
protected UriInfo uri;
134135

135136
private PulsarService pulsar;
137+
private Executor webExecutor;
136138

137139
protected PulsarService pulsar() {
138140
if (pulsar == null) {
@@ -142,6 +144,13 @@ protected PulsarService pulsar() {
142144
return pulsar;
143145
}
144146

147+
protected Executor webExecutor() {
148+
if (webExecutor == null) {
149+
webExecutor = pulsar().getWebService().getWebServiceExecutor();
150+
}
151+
return webExecutor;
152+
}
153+
145154
protected ServiceConfiguration config() {
146155
return pulsar().getConfiguration();
147156
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ public class WebService implements AutoCloseable {
9494
@Deprecated
9595
private final WebExecutorStats executorStats;
9696
private final WebExecutorThreadPoolStats webExecutorThreadPoolStats;
97+
@Getter
9798
private final WebExecutorThreadPool webServiceExecutor;
98-
9999
private final ServerConnector httpConnector;
100100
private final ServerConnector httpsConnector;
101101
private final FilterInitializer filterInitializer;

0 commit comments

Comments
 (0)