Skip to content

Commit eebd821

Browse files
authored
[improve][ws] Support subscribing multi/pattern topic for Websocket (#21379)
1 parent 957337b commit eebd821

10 files changed

Lines changed: 296 additions & 13 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@
174174
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
175175
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
176176
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
177+
import org.apache.pulsar.websocket.WebSocketMultiTopicConsumerServlet;
177178
import org.apache.pulsar.websocket.WebSocketProducerServlet;
178179
import org.apache.pulsar.websocket.WebSocketReaderServlet;
179180
import org.apache.pulsar.websocket.WebSocketService;
@@ -1081,6 +1082,11 @@ private void addWebSocketServiceHandler(WebService webService,
10811082
new ServletHolder(readerWebSocketServlet), true, attributeMap);
10821083
webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
10831084
new ServletHolder(readerWebSocketServlet), true, attributeMap);
1085+
1086+
final WebSocketMultiTopicConsumerServlet multiTopicConsumerWebSocketServlet =
1087+
new WebSocketMultiTopicConsumerServlet(webSocketService);
1088+
webService.addServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH,
1089+
new ServletHolder(multiTopicConsumerWebSocketServlet), true, attributeMap);
10841090
}
10851091
}
10861092

pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,7 @@ public void ackBatchMessageTest() throws Exception {
886886

887887
WebSocketClient consumerClient = new WebSocketClient();
888888
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
889+
@Cleanup
889890
Producer<byte[]> producer = pulsarClient.newProducer()
890891
.topic(topic)
891892
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
@@ -933,6 +934,7 @@ public void consumeEncryptedMessages() throws Exception {
933934
final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==";
934935
final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==";
935936

937+
@Cleanup
936938
Producer<byte[]> producer = pulsarClient.newProducer()
937939
.topic(topic)
938940
.enableBatching(false)
@@ -1051,5 +1053,71 @@ private void stopWebSocketClient(WebSocketClient... clients) {
10511053
log.info("proxy clients are stopped successfully");
10521054
}
10531055

1056+
@Test
1057+
public void testMultiTopics() throws Exception {
1058+
final String subscription1 = "my-sub1";
1059+
final String subscription2 = "my-sub2";
1060+
final String topic1 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
1061+
final String topic2 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID();
1062+
final String consumerUri1 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
1063+
"/ws/v3/consumer/" + subscription1 + "?topics=" + topic1 + "," + topic2;
1064+
1065+
final String consumerUri2 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() +
1066+
"/ws/v3/consumer/" + subscription2 + "?topicsPattern=my-property/my-ns/testMultiTopics.*";
1067+
1068+
int messages = 10;
1069+
WebSocketClient consumerClient1 = new WebSocketClient();
1070+
WebSocketClient consumerClient2 = new WebSocketClient();
1071+
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
1072+
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
1073+
@Cleanup
1074+
Producer<byte[]> producer1 = pulsarClient.newProducer()
1075+
.topic(topic1)
1076+
.batchingMaxMessages(1)
1077+
.create();
1078+
@Cleanup
1079+
Producer<byte[]> producer2 = pulsarClient.newProducer()
1080+
.topic(topic2)
1081+
.batchingMaxMessages(1)
1082+
.create();
1083+
1084+
try {
1085+
consumerClient1.start();
1086+
consumerClient2.start();
1087+
ClientUpgradeRequest consumerRequest1 = new ClientUpgradeRequest();
1088+
ClientUpgradeRequest consumerRequest2 = new ClientUpgradeRequest();
1089+
Future<Session> consumerFuture1 = consumerClient1.connect(consumeSocket1, URI.create(consumerUri1), consumerRequest1);
1090+
Future<Session> consumerFuture2 = consumerClient2.connect(consumeSocket2, URI.create(consumerUri2), consumerRequest2);
1091+
1092+
assertTrue(consumerFuture1.get().isOpen());
1093+
assertTrue(consumerFuture2.get().isOpen());
1094+
assertEquals(consumeSocket1.getReceivedMessagesCount(), 0);
1095+
assertEquals(consumeSocket2.getReceivedMessagesCount(), 0);
1096+
1097+
for (int i = 1; i <= messages; i ++) {
1098+
producer1.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
1099+
producer2.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
1100+
}
1101+
producer1.flush();
1102+
producer2.flush();
1103+
1104+
consumeSocket1.sendPermits(2 * messages);
1105+
Awaitility.await().untilAsserted(() ->
1106+
assertEquals(consumeSocket1.getReceivedMessagesCount(), 2 * messages));
1107+
Awaitility.await().untilAsserted(() ->
1108+
assertEquals(admin.topics().getStats(topic1).getSubscriptions()
1109+
.get(subscription1).getMsgBacklog(), 0));
1110+
Awaitility.await().untilAsserted(() ->
1111+
assertEquals(admin.topics().getStats(topic2).getSubscriptions()
1112+
.get(subscription1).getMsgBacklog(), 0));
1113+
1114+
consumeSocket2.sendPermits(2 * messages);
1115+
Awaitility.await().untilAsserted(() ->
1116+
assertEquals(consumeSocket2.getReceivedMessagesCount(), 2 * messages));
1117+
} finally {
1118+
stopWebSocketClient(consumerClient1, consumerClient2);
1119+
}
1120+
}
1121+
10541122
private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
10551123
}

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.pulsar.docs.tools.CmdGenerateDocs;
5353
import org.apache.pulsar.proxy.stats.ProxyStats;
5454
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
55+
import org.apache.pulsar.websocket.WebSocketMultiTopicConsumerServlet;
5556
import org.apache.pulsar.websocket.WebSocketProducerServlet;
5657
import org.apache.pulsar.websocket.WebSocketReaderServlet;
5758
import org.apache.pulsar.websocket.WebSocketService;
@@ -336,6 +337,11 @@ public static void addWebServerHandlers(WebServer server,
336337
new ServletHolder(readerWebSocketServlet));
337338
server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
338339
new ServletHolder(readerWebSocketServlet));
340+
341+
final WebSocketMultiTopicConsumerServlet multiTopicConsumerWebSocketServlet =
342+
new WebSocketMultiTopicConsumerServlet(webSocketService);
343+
server.addServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH,
344+
new ServletHolder(multiTopicConsumerWebSocketServlet));
339345
}
340346
}
341347

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
6767
protected final WebSocketService service;
6868
protected final HttpServletRequest request;
6969

70-
protected final TopicName topic;
70+
protected TopicName topic;
7171
protected final Map<String, String> queryParams;
7272
private static final String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name";
7373
protected final ObjectReader consumerCommandReader =
@@ -80,12 +80,12 @@ public AbstractWebSocketHandler(WebSocketService service,
8080
ServletUpgradeResponse response) {
8181
this.service = service;
8282
this.request = new WebSocketHttpServletRequestWrapper(request);
83-
this.topic = extractTopicName(request);
8483

8584
this.queryParams = new TreeMap<>();
8685
request.getParameterMap().forEach((key, values) -> {
8786
queryParams.put(key, values[0]);
8887
});
88+
extractTopicName(request);
8989
}
9090

9191
protected boolean checkAuth(ServletUpgradeResponse response) {
@@ -244,7 +244,7 @@ protected String checkAuthentication() {
244244
return null;
245245
}
246246

247-
private TopicName extractTopicName(HttpServletRequest request) {
247+
protected void extractTopicName(HttpServletRequest request) {
248248
String uri = request.getRequestURI();
249249
List<String> parts = Splitter.on("/").splitToList(uri);
250250

@@ -287,7 +287,7 @@ private TopicName extractTopicName(HttpServletRequest request) {
287287
}
288288
final String name = Codec.decode(topicName.toString());
289289

290-
return TopicName.get(domain, namespace, name);
290+
topic = TopicName.get(domain, namespace, name);
291291
}
292292

293293
@VisibleForTesting

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,12 @@
4141
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
4242
import org.apache.pulsar.client.api.DeadLetterPolicy;
4343
import org.apache.pulsar.client.api.MessageId;
44-
import org.apache.pulsar.client.api.MessageIdAdv;
4544
import org.apache.pulsar.client.api.PulsarClient;
4645
import org.apache.pulsar.client.api.PulsarClientException;
4746
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
4847
import org.apache.pulsar.client.api.SubscriptionMode;
4948
import org.apache.pulsar.client.api.SubscriptionType;
50-
import org.apache.pulsar.client.api.TopicMessageId;
5149
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
52-
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
5350
import org.apache.pulsar.common.policies.data.TopicOperation;
5451
import org.apache.pulsar.common.util.Codec;
5552
import org.apache.pulsar.common.util.DateFormatter;
@@ -75,7 +72,7 @@
7572
*/
7673
public class ConsumerHandler extends AbstractWebSocketHandler {
7774

78-
private String subscription = null;
75+
protected String subscription = null;
7976
private SubscriptionType subscriptionType;
8077
private SubscriptionMode subscriptionMode;
8178
private Consumer<byte[]> consumer;
@@ -88,6 +85,10 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
8885
private final LongAdder numBytesDelivered;
8986
private final LongAdder numMsgsAcked;
9087
private volatile long msgDeliveredCounter = 0;
88+
89+
protected String topicsPattern;
90+
91+
protected String topics;
9192
private static final AtomicLongFieldUpdater<ConsumerHandler> MSG_DELIVERED_COUNTER_UPDATER =
9293
AtomicLongFieldUpdater.newUpdater(ConsumerHandler.class, "msgDeliveredCounter");
9394

@@ -123,7 +124,14 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser
123124
return;
124125
}
125126

126-
this.consumer = builder.topic(topic.toString()).subscriptionName(subscription).subscribe();
127+
if (topicsPattern != null) {
128+
this.consumer = builder.topicsPattern(topicsPattern).subscriptionName(subscription).subscribe();
129+
} else if (topics != null) {
130+
this.consumer = builder.topics(Splitter.on(",").splitToList(topics))
131+
.subscriptionName(subscription).subscribe();
132+
} else {
133+
this.consumer = builder.topic(topic.toString()).subscriptionName(subscription).subscribe();
134+
}
127135
if (!this.service.addConsumer(this)) {
128136
log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(),
129137
request.getRemotePort(), topic);
@@ -299,8 +307,7 @@ private void checkResumeReceive() {
299307

300308
private void handleAck(ConsumerCommand command) throws IOException {
301309
// We should have received an ack
302-
TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(),
303-
(MessageIdAdv) MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)));
310+
MessageId msgId = MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId));
304311
if (log.isDebugEnabled()) {
305312
log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(),
306313
subscription, msgId, getRemote().getInetSocketAddress().toString());
@@ -490,7 +497,7 @@ protected Boolean isAuthorized(String authRole, AuthenticationDataSource authent
490497
}
491498
}
492499

493-
public static String extractSubscription(HttpServletRequest request) {
500+
public String extractSubscription(HttpServletRequest request) {
494501
String uri = request.getRequestURI();
495502
List<String> parts = Splitter.on("/").splitToList(uri);
496503

0 commit comments

Comments
 (0)