Skip to content

Commit 25af7cd

Browse files
pratt4lhotari
authored andcommitted
Return 400 for invalid reader messageId query parameter (#25865)
(cherry picked from commit b406518)
1 parent 579d807 commit 25af7cd

2 files changed

Lines changed: 121 additions & 12 deletions

File tree

pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,11 +121,17 @@ public ReaderHandler(WebSocketService service, HttpServletRequest request, Jetty
121121
}
122122
allowConnect = true;
123123
} catch (Exception e) {
124-
log.warn("[{}:{}] Failed in creating reader {} on topic {}", request.getRemoteAddr(),
125-
request.getRemotePort(), subscription, topic, e);
124+
int errorCode = getErrorCode(e);
125+
boolean isKnownError = errorCode != HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
126+
if (isKnownError) {
127+
log.warn("[{}:{}] Failed in creating reader {} on topic {}: {}", request.getRemoteAddr(),
128+
request.getRemotePort(), subscription, topic, e.getMessage());
129+
} else {
130+
log.error("[{}:{}] Failed in creating reader {} on topic {}", request.getRemoteAddr(),
131+
request.getRemotePort(), subscription, topic, e);
132+
}
126133
try {
127-
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
128-
"Failed to create reader: " + e.getMessage());
134+
response.sendError(errorCode, getErrorMessage(e));
129135
} catch (IOException e1) {
130136
log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
131137
e1.getMessage(), e1);
@@ -340,13 +346,25 @@ private int getReceiverQueueSize() {
340346
return size;
341347
}
342348

343-
private MessageId getMessageId() throws IOException {
349+
private MessageId getMessageId() {
344350
MessageId messageId = MessageId.latest;
345-
if (isNotBlank(queryParams.get("messageId"))) {
346-
if (queryParams.get("messageId").equals("earliest")) {
351+
String messageIdParam = queryParams.get("messageId");
352+
if (isNotBlank(messageIdParam)) {
353+
if (messageIdParam.equals("earliest")) {
347354
messageId = MessageId.earliest;
348-
} else if (!queryParams.get("messageId").equals("latest")) {
349-
messageId = MessageIdImpl.fromByteArray(Base64.getDecoder().decode(queryParams.get("messageId")));
355+
} else if (!messageIdParam.equals("latest")) {
356+
final byte[] decoded;
357+
try {
358+
decoded = Base64.getDecoder().decode(messageIdParam);
359+
} catch (IllegalArgumentException e) {
360+
throw new IllegalArgumentException("Invalid messageId base64 value", e);
361+
}
362+
363+
try {
364+
messageId = MessageIdImpl.fromByteArray(decoded);
365+
} catch (IOException | RuntimeException e) {
366+
throw new IllegalArgumentException("Invalid messageId value", e);
367+
}
350368
}
351369
}
352370
return messageId;

pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,22 @@
2121
import static org.mockito.Mockito.any;
2222
import static org.mockito.Mockito.anyInt;
2323
import static org.mockito.Mockito.anyString;
24+
import static org.mockito.Mockito.eq;
2425
import static org.mockito.Mockito.mock;
2526
import static org.mockito.Mockito.spy;
2627
import static org.mockito.Mockito.times;
2728
import static org.mockito.Mockito.verify;
2829
import static org.mockito.Mockito.when;
2930
import java.io.IOException;
31+
import java.util.Base64;
32+
import java.util.HashMap;
3033
import java.util.List;
34+
import java.util.Map;
3135
import java.util.concurrent.CompletableFuture;
3236
import java.util.concurrent.TimeUnit;
3337
import java.util.function.Function;
3438
import javax.servlet.http.HttpServletRequest;
39+
import javax.servlet.http.HttpServletResponse;
3540
import org.apache.pulsar.client.api.Message;
3641
import org.apache.pulsar.client.api.MessageId;
3742
import org.apache.pulsar.client.api.PulsarClient;
@@ -43,12 +48,98 @@
4348
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
4449
import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
4550
import org.apache.pulsar.client.impl.ReaderImpl;
51+
import org.apache.pulsar.common.api.proto.MessageIdData;
4652
import org.eclipse.jetty.ee8.websocket.server.JettyServerUpgradeResponse;
4753
import org.testng.Assert;
4854
import org.testng.annotations.Test;
4955

5056
public class ReaderHandlerTest {
5157

58+
@Test
59+
@SuppressWarnings("unchecked")
60+
public void testInvalidMessageIdBase64ReturnsBadRequest() throws IOException {
61+
WebSocketService wss = mock(WebSocketService.class);
62+
PulsarClient mockedClient = mock(PulsarClient.class);
63+
when(wss.getPulsarClient()).thenReturn(mockedClient);
64+
ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
65+
when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
66+
when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
67+
// Ensure the chain doesn't NPE after startMessageId() if parsing unexpectedly succeeds.
68+
when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
69+
70+
Map<String, String[]> params = new HashMap<>();
71+
params.put("messageId", new String[] { "invalidMessageId" });
72+
73+
HttpServletRequest request = mock(HttpServletRequest.class);
74+
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
75+
when(request.getParameterMap()).thenReturn(params);
76+
77+
JettyServerUpgradeResponse servletUpgradeResponse = mock(JettyServerUpgradeResponse.class);
78+
new ReaderHandler(wss, request, servletUpgradeResponse);
79+
80+
verify(servletUpgradeResponse, times(1))
81+
.sendError(eq(HttpServletResponse.SC_BAD_REQUEST), anyString());
82+
}
83+
84+
@Test
85+
@SuppressWarnings("unchecked")
86+
public void testInvalidMessageIdBytesReturnsBadRequest() throws IOException {
87+
WebSocketService wss = mock(WebSocketService.class);
88+
PulsarClient mockedClient = mock(PulsarClient.class);
89+
when(wss.getPulsarClient()).thenReturn(mockedClient);
90+
ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
91+
when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
92+
when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
93+
// Ensure the chain doesn't NPE after startMessageId() if parsing unexpectedly succeeds.
94+
when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
95+
96+
// "AQID" is valid Base64, but it doesn't decode into a valid Pulsar MessageId structure.
97+
Map<String, String[]> params = new HashMap<>();
98+
params.put("messageId", new String[] { "AQID" });
99+
100+
HttpServletRequest request = mock(HttpServletRequest.class);
101+
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
102+
when(request.getParameterMap()).thenReturn(params);
103+
104+
JettyServerUpgradeResponse servletUpgradeResponse = mock(JettyServerUpgradeResponse.class);
105+
new ReaderHandler(wss, request, servletUpgradeResponse);
106+
107+
verify(servletUpgradeResponse, times(1))
108+
.sendError(eq(HttpServletResponse.SC_BAD_REQUEST), anyString());
109+
}
110+
111+
@Test
112+
@SuppressWarnings("unchecked")
113+
public void testInvalidMessageIdRuntimeParseFailureReturnsBadRequest() throws IOException {
114+
WebSocketService wss = mock(WebSocketService.class);
115+
PulsarClient mockedClient = mock(PulsarClient.class);
116+
when(wss.getPulsarClient()).thenReturn(mockedClient);
117+
ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class);
118+
when(mockedClient.newReader()).thenReturn(mockedReaderBuilder);
119+
when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder);
120+
// Ensure the chain doesn't NPE after startMessageId() if parsing unexpectedly succeeds.
121+
when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder);
122+
123+
MessageIdData invalidBatchMessageId = new MessageIdData()
124+
.setLedgerId(1)
125+
.setEntryId(2)
126+
.setBatchIndex(0)
127+
.setBatchSize(-1);
128+
Map<String, String[]> params = new HashMap<>();
129+
params.put("messageId", new String[] {
130+
Base64.getEncoder().encodeToString(invalidBatchMessageId.toByteArray()) });
131+
132+
HttpServletRequest request = mock(HttpServletRequest.class);
133+
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
134+
when(request.getParameterMap()).thenReturn(params);
135+
136+
JettyServerUpgradeResponse servletUpgradeResponse = mock(JettyServerUpgradeResponse.class);
137+
new ReaderHandler(wss, request, servletUpgradeResponse);
138+
139+
verify(servletUpgradeResponse, times(1))
140+
.sendError(eq(HttpServletResponse.SC_BAD_REQUEST), anyString());
141+
}
142+
52143
@Test
53144
@SuppressWarnings("unchecked")
54145
public void testCreateReaderImp() throws IOException {
@@ -68,7 +159,7 @@ public void testCreateReaderImp() throws IOException {
68159
when(consumerImp.getSubscription()).thenReturn(subName);
69160
when(mockedReader.getConsumer()).thenReturn(consumerImp);
70161
HttpServletRequest request = mock(HttpServletRequest.class);
71-
when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
162+
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
72163
// create reader handler
73164
JettyServerUpgradeResponse servletUpgradeResponse = mock(JettyServerUpgradeResponse.class);
74165
ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse);
@@ -97,7 +188,7 @@ public void testCreateMultipleTopicReaderImp() throws IOException {
97188
when(consumerImp.getSubscription()).thenReturn(subName);
98189
when(mockedReader.getMultiTopicsConsumer()).thenReturn(consumerImp);
99190
HttpServletRequest request = mock(HttpServletRequest.class);
100-
when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
191+
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
101192
// create reader handler
102193
JettyServerUpgradeResponse servletUpgradeResponse = mock(JettyServerUpgradeResponse.class);
103194
ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse);
@@ -122,7 +213,7 @@ public void testCreateIllegalReaderImp() throws IOException {
122213
IllegalReader illegalReader = new IllegalReader();
123214
when(mockedReaderBuilder.create()).thenReturn(illegalReader);
124215
HttpServletRequest request = mock(HttpServletRequest.class);
125-
when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic");
216+
when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic");
126217
// create reader handler
127218
JettyServerUpgradeResponse servletUpgradeResponse = spy(JettyServerUpgradeResponse.class);
128219
new ReaderHandler(wss, request, servletUpgradeResponse);

0 commit comments

Comments
 (0)