Skip to content

Commit 506a04f

Browse files
authored
Improves error handling for peer-forwarder. (opensearch-project#6833)
Test scenarios where the input is incorrect and verify the correct response codes. Fix a 500 error when the target pipeline and plugin do not exist by returning 400 instead. Signed-off-by: David Venable <dlv@amazon.com>
1 parent 08acf57 commit 506a04f

6 files changed

Lines changed: 158 additions & 2 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.core.peerforwarder.exception;
11+
12+
/**
13+
* This exception is thrown when the peer forwarder receives a request
14+
* with a destination pipeline or plugin that is not registered.
15+
*/
16+
public class NoPeerForwarderTargetException extends RuntimeException {
17+
18+
public NoPeerForwarderTargetException(final String errorMessage) {
19+
super(errorMessage);
20+
}
21+
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/server/PeerForwarderHttpService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider;
2121
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderReceiveBuffer;
2222
import org.opensearch.dataprepper.core.peerforwarder.codec.PeerForwarderCodec;
23+
import org.opensearch.dataprepper.core.peerforwarder.exception.NoPeerForwarderTargetException;
2324
import org.opensearch.dataprepper.core.peerforwarder.model.PeerForwardingEvents;
2425
import org.opensearch.dataprepper.metrics.PluginMetrics;
2526
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -126,7 +127,21 @@ private PeerForwarderReceiveBuffer<Record<Event>> getPeerForwarderBuffer(final S
126127
final Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> pipelinePeerForwarderReceiveBufferMap =
127128
peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap();
128129

129-
return pipelinePeerForwarderReceiveBufferMap
130-
.get(destinationPipelineName).get(destinationPluginId);
130+
final Map<String, PeerForwarderReceiveBuffer<Record<Event>>> pluginBufferMap =
131+
pipelinePeerForwarderReceiveBufferMap.get(destinationPipelineName);
132+
if (pluginBufferMap == null) {
133+
throw new NoPeerForwarderTargetException(
134+
"Unable to find a peer-forwarder target with destinationPluginId='" + destinationPluginId +
135+
"' and destinationPipelineName='" + destinationPipelineName + "'");
136+
}
137+
138+
final PeerForwarderReceiveBuffer<Record<Event>> buffer = pluginBufferMap.get(destinationPluginId);
139+
if (buffer == null) {
140+
throw new NoPeerForwarderTargetException(
141+
"Unable to find a peer-forwarder target with destinationPluginId='" + destinationPluginId +
142+
"' and destinationPipelineName='" + destinationPipelineName + "'");
143+
}
144+
145+
return buffer;
131146
}
132147
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/server/ResponseHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.linecorp.armeria.common.HttpStatus;
1414
import com.linecorp.armeria.common.MediaType;
1515
import io.micrometer.core.instrument.Counter;
16+
import org.opensearch.dataprepper.core.peerforwarder.exception.NoPeerForwarderTargetException;
1617
import org.opensearch.dataprepper.metrics.PluginMetrics;
1718
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
1819

@@ -55,6 +56,11 @@ public HttpResponse handleException(final Exception e, final String message) {
5556
return HttpResponse.of(HttpStatus.REQUEST_TIMEOUT, MediaType.ANY_TYPE, message);
5657
}
5758

59+
if (e instanceof NoPeerForwarderTargetException) {
60+
badRequestsCounter.increment();
61+
return HttpResponse.of(HttpStatus.BAD_REQUEST, MediaType.ANY_TYPE, e.getMessage());
62+
}
63+
5864
if (e instanceof NullPointerException) {
5965
requestsUnprocessableCounter.increment();
6066
return HttpResponse.of(HttpStatus.UNPROCESSABLE_ENTITY, MediaType.ANY_TYPE, message);

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarder_ClientServerIT.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,15 @@
1111

1212
import com.fasterxml.jackson.databind.ObjectMapper;
1313
import com.linecorp.armeria.client.UnprocessedRequestException;
14+
import com.linecorp.armeria.client.WebClient;
1415
import com.linecorp.armeria.common.AggregatedHttpResponse;
1516
import com.linecorp.armeria.common.ClosedSessionException;
17+
import com.linecorp.armeria.common.HttpData;
18+
import com.linecorp.armeria.common.HttpMethod;
19+
import com.linecorp.armeria.common.HttpRequest;
1620
import com.linecorp.armeria.common.HttpStatus;
21+
import com.linecorp.armeria.common.MediaType;
22+
import com.linecorp.armeria.common.RequestHeaders;
1723
import com.linecorp.armeria.server.Server;
1824
import io.netty.handler.ssl.NotSslRecordException;
1925
import org.junit.jupiter.api.AfterEach;
@@ -27,6 +33,7 @@
2733
import org.opensearch.dataprepper.core.peerforwarder.codec.JavaPeerForwarderCodec;
2834
import org.opensearch.dataprepper.core.peerforwarder.codec.PeerForwarderCodec;
2935
import org.opensearch.dataprepper.core.peerforwarder.codec.PeerForwarderCodecAppConfig;
36+
import org.opensearch.dataprepper.core.peerforwarder.model.PeerForwardingEvents;
3037
import org.opensearch.dataprepper.core.peerforwarder.discovery.DiscoveryMode;
3138
import org.opensearch.dataprepper.core.peerforwarder.server.PeerForwarderHttpServerProvider;
3239
import org.opensearch.dataprepper.core.peerforwarder.server.PeerForwarderHttpService;
@@ -340,6 +347,65 @@ void send_Events_to_server_when_expecting_SSL_should_throw(final boolean binaryC
340347
assertThat(receivedRecords, notNullValue());
341348
assertThat(receivedRecords, is(empty()));
342349
}
350+
351+
@ParameterizedTest
352+
@ValueSource(booleans = {true, false})
353+
void send_invalid_content_type_with_valid_body_returns_ok(final boolean binaryCodec) throws Exception {
354+
setUpServer(binaryCodec);
355+
final PeerForwarderCodec codec = applicationContext.getBean(PeerForwarderCodec.class);
356+
final List<Event> eventList = outgoingRecords.stream().map(Record::getData).collect(Collectors.toList());
357+
final PeerForwardingEvents peerForwardingEvents = new PeerForwardingEvents(eventList, pluginId, pipelineName);
358+
final byte[] serializedBody = codec.serialize(peerForwardingEvents);
359+
360+
final WebClient client = WebClient.of("http://" + LOCALHOST + ":4994");
361+
362+
final AggregatedHttpResponse response = client.execute(
363+
HttpRequest.of(
364+
RequestHeaders.builder()
365+
.method(HttpMethod.POST)
366+
.path(PeerForwarderConfiguration.DEFAULT_PEER_FORWARDING_URI)
367+
.contentType(MediaType.parse("application/x-www-form-urlencoded"))
368+
.build(),
369+
HttpData.wrap(serializedBody)
370+
)
371+
).aggregate().join();
372+
373+
assertThat(response.status(), equalTo(HttpStatus.OK));
374+
}
375+
376+
@ParameterizedTest
377+
@ValueSource(booleans = {true, false})
378+
void send_malformed_body_returns_bad_request(final boolean binaryCodec) {
379+
setUpServer(binaryCodec);
380+
final String body = "this is not valid json or yaml";
381+
382+
final WebClient client = WebClient.of("http://" + LOCALHOST + ":4994");
383+
384+
final AggregatedHttpResponse response = client.execute(
385+
HttpRequest.of(
386+
RequestHeaders.builder()
387+
.method(HttpMethod.POST)
388+
.path(PeerForwarderConfiguration.DEFAULT_PEER_FORWARDING_URI)
389+
.build(),
390+
HttpData.ofUtf8(body)
391+
)
392+
).aggregate().join();
393+
394+
assertThat(response.status(), equalTo(HttpStatus.BAD_REQUEST));
395+
}
396+
397+
@ParameterizedTest
398+
@ValueSource(booleans = {true, false})
399+
void send_Events_to_unknown_destination_returns_bad_request(final boolean binaryCodec) throws ExecutionException, InterruptedException {
400+
setUpServer(binaryCodec);
401+
final PeerForwarderClient client = createClient(peerForwarderConfiguration);
402+
403+
final CompletableFuture<AggregatedHttpResponse> httpResponseFuture =
404+
client.serializeRecordsAndSendHttpRequest(outgoingRecords, LOCALHOST, UUID.randomUUID().toString(), UUID.randomUUID().toString());
405+
final AggregatedHttpResponse httpResponse = httpResponseFuture.get();
406+
407+
assertThat(httpResponse.status(), equalTo(HttpStatus.BAD_REQUEST));
408+
}
343409
}
344410

345411
@Nested

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/server/PeerForwarderHttpServiceTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderProvider;
3030
import org.opensearch.dataprepper.core.peerforwarder.PeerForwarderReceiveBuffer;
3131
import org.opensearch.dataprepper.core.peerforwarder.codec.PeerForwarderCodec;
32+
import org.opensearch.dataprepper.core.peerforwarder.exception.NoPeerForwarderTargetException;
3233
import org.opensearch.dataprepper.core.peerforwarder.model.PeerForwardingEvents;
3334
import org.opensearch.dataprepper.metrics.PluginMetrics;
3435
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
@@ -167,6 +168,35 @@ void test_doPost_with_HTTP_request_size_greater_than_buffer_size_should_return_R
167168
assertThat(aggregatedHttpResponse.status(), equalTo(HttpStatus.REQUEST_ENTITY_TOO_LARGE));
168169
}
169170

171+
@Test
172+
void test_doPost_with_unknown_pipeline_should_return_BAD_REQUEST() throws Exception {
173+
lenient().when(peerForwardingEvents.getDestinationPipelineName()).thenReturn("unknown_pipeline");
174+
when(responseHandler.handleException(any(NoPeerForwarderTargetException.class), anyString())).thenReturn(HttpResponse.of(HttpStatus.BAD_REQUEST));
175+
final HashMap<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> pipelinePeerForwarderReceiveBufferMap = new HashMap<>();
176+
when(peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap()).thenReturn(pipelinePeerForwarderReceiveBufferMap);
177+
178+
final PeerForwarderHttpService objectUnderTest = createObjectUnderTest();
179+
180+
final AggregatedHttpResponse aggregatedHttpResponse = objectUnderTest.doPost(aggregatedHttpRequest).aggregate().get();
181+
182+
assertThat(aggregatedHttpResponse.status(), equalTo(HttpStatus.BAD_REQUEST));
183+
}
184+
185+
@Test
186+
void test_doPost_with_unknown_plugin_should_return_BAD_REQUEST() throws Exception {
187+
lenient().when(peerForwardingEvents.getDestinationPluginId()).thenReturn("unknown_plugin");
188+
when(responseHandler.handleException(any(NoPeerForwarderTargetException.class), anyString())).thenReturn(HttpResponse.of(HttpStatus.BAD_REQUEST));
189+
final HashMap<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> pipelinePeerForwarderReceiveBufferMap = new HashMap<>();
190+
pipelinePeerForwarderReceiveBufferMap.put(PIPELINE_NAME, new HashMap<>());
191+
when(peerForwarderProvider.getPipelinePeerForwarderReceiveBufferMap()).thenReturn(pipelinePeerForwarderReceiveBufferMap);
192+
193+
final PeerForwarderHttpService objectUnderTest = createObjectUnderTest();
194+
195+
final AggregatedHttpResponse aggregatedHttpResponse = objectUnderTest.doPost(aggregatedHttpRequest).aggregate().get();
196+
197+
assertThat(aggregatedHttpResponse.status(), equalTo(HttpStatus.BAD_REQUEST));
198+
}
199+
170200
private List<Event> generateEvents(final int numEvents) {
171201
final List<Event> events = new ArrayList<>();
172202
for (int i = 0; i < numEvents; i++) {

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/server/ResponseHandlerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.junit.jupiter.api.extension.ExtendWith;
2121
import org.mockito.Mock;
2222
import org.mockito.junit.jupiter.MockitoExtension;
23+
import org.opensearch.dataprepper.core.peerforwarder.exception.NoPeerForwarderTargetException;
2324
import org.opensearch.dataprepper.metrics.PluginMetrics;
2425
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;
2526

@@ -134,6 +135,23 @@ void test_UnknownException() throws ExecutionException, InterruptedException {
134135
verify(badRequestsCounter).increment();
135136
}
136137

138+
@Test
139+
void test_NoPeerForwarderTargetException() throws ExecutionException, InterruptedException {
140+
final ResponseHandler objectUnderTest = createObjectUnderTest();
141+
final String exceptionMessage = "Unable to find a peer-forwarder target with destinationPluginId='myPlugin' and destinationPipelineName='myPipeline'";
142+
final NoPeerForwarderTargetException noPeerForwarderTargetException = new NoPeerForwarderTargetException(exceptionMessage);
143+
144+
final String testMessage = "test exception message";
145+
146+
final HttpResponse httpResponse = objectUnderTest.handleException(noPeerForwarderTargetException, testMessage);
147+
final AggregatedHttpResponse aggregatedHttpResponse = httpResponse.aggregate().get();
148+
149+
assertEquals(HttpStatus.BAD_REQUEST, aggregatedHttpResponse.status());
150+
assertEquals(exceptionMessage, aggregatedHttpResponse.contentUtf8());
151+
152+
verify(badRequestsCounter).increment();
153+
}
154+
137155
@Test
138156
void test_NullPointerException() throws ExecutionException, InterruptedException {
139157
final ResponseHandler objectUnderTest = createObjectUnderTest();

0 commit comments

Comments
 (0)