Skip to content

Commit a06e899

Browse files
committed
Fix requestsTooLarge metric reporting when decompression buffer overflows on armeria in otel logs source
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 3e90b97 commit a06e899

2 files changed

Lines changed: 82 additions & 7 deletions

File tree

data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.plugins.source.otellogs;
77

8+
import com.linecorp.armeria.common.ContentTooLargeException;
89
import com.linecorp.armeria.common.SessionProtocol;
910
import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
1011
import com.linecorp.armeria.server.Server;
@@ -14,7 +15,11 @@
1415
import com.linecorp.armeria.server.grpc.GrpcServiceBuilder;
1516
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
1617
import com.linecorp.armeria.server.throttling.ThrottlingService;
17-
18+
import io.grpc.ServerInterceptor;
19+
import io.grpc.ServerInterceptors;
20+
import io.grpc.protobuf.services.ProtoReflectionService;
21+
import io.micrometer.core.instrument.Counter;
22+
import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc;
1823
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
1924
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
2025
import org.opensearch.dataprepper.http.LogThrottlingRejectHandler;
@@ -57,11 +62,6 @@
5762
import java.util.concurrent.ExecutionException;
5863
import java.util.concurrent.ScheduledThreadPoolExecutor;
5964

60-
import io.grpc.ServerInterceptor;
61-
import io.grpc.ServerInterceptors;
62-
import io.grpc.protobuf.services.ProtoReflectionService;
63-
import io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc;
64-
6565
@DataPrepperPlugin(name = "otel_logs_source", pluginType = Source.class, pluginConfigurationType = OTelLogsSourceConfig.class)
6666
public class OTelLogsSource implements Source<Record<Object>> {
6767
private static final Logger LOG = LoggerFactory.getLogger(OTelLogsSource.class);
@@ -75,6 +75,7 @@ public class OTelLogsSource implements Source<Record<Object>> {
7575
private final CertificateProviderFactory certificateProviderFactory;
7676
private final ByteDecoder byteDecoder;
7777
private final PluginFactory pluginFactory;
78+
final Counter requestsTooLargeCounter;
7879
private Server server;
7980

8081
@DataPrepperPluginConstructor
@@ -95,6 +96,7 @@ public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig,
9596
this.pipelineName = pipelineDescription.getPipelineName();
9697
this.pluginFactory = pluginFactory;
9798
this.byteDecoder = new OTelLogsDecoder(oTelLogsSourceConfig.getOutputFormat());
99+
this.requestsTooLargeCounter = pluginMetrics.counter(HttpExceptionHandler.REQUESTS_TOO_LARGE);
98100
}
99101

100102
@Override
@@ -144,6 +146,12 @@ private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>>
144146
serverBuilder.http(oTelLogsSourceConfig.getPort());
145147
}
146148

149+
serverBuilder.accessLogWriter(log -> {
150+
if (log.responseCause() instanceof ContentTooLargeException) {
151+
requestsTooLargeCounter.increment();
152+
}
153+
}, true);
154+
147155
final GrpcAuthenticationProvider authProvider = createGrpcAuthenticationProvider(pluginFactory);
148156
authProvider.getHttpAuthenticationService().ifPresent(serverBuilder::decorator);
149157

data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceHttpTest.java

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.io.ByteArrayOutputStream;
4747
import java.io.IOException;
4848
import java.nio.charset.StandardCharsets;
49+
import java.util.UUID;
4950
import java.util.Base64;
5051
import java.util.List;
5152
import java.util.Map;
@@ -355,6 +356,72 @@ void httpRequest_requestBodyIsTooLarge_returns413() throws InvalidProtocolBuffer
355356
.join();
356357
}
357358

359+
@Test
360+
void httpRequest_compressedPayloadDecompressesTooLarge_incrementsRequestsTooLargeMetric() throws Exception {
361+
configureSource(createDefaultConfigBuilder()
362+
.httpPath(CONFIG_HTTP_PATH)
363+
.compression(CompressionOption.GZIP)
364+
.maxRequestLength(ByteCount.ofBytes(250))
365+
.build());
366+
SOURCE.start(buffer);
367+
368+
// ~720 bytes uncompressed, compresses well — compressed fits under maxRequestLength
369+
// but decompressed exceeds 250 bytes, triggering ContentTooLargeException
370+
String largePayload = UUID.randomUUID().toString().repeat(20);
371+
byte[] compressedPayload = createGZipCompressedPayload(largePayload);
372+
373+
WebClient.of().execute(
374+
getDefaultRequestHeadersBuilder()
375+
.add(HttpHeaderNames.CONTENT_ENCODING, "gzip")
376+
.build(),
377+
HttpData.copyOf(compressedPayload))
378+
.aggregate()
379+
.whenComplete((response, throwable) -> {
380+
assertThat("Http Response Throwable", throwable, is(nullValue()));
381+
assertThat("Expected 413 or 507 for too-large decompressed payload",
382+
response.status().code() == 413 || response.status().code() == 507, is(true));
383+
})
384+
.join();
385+
386+
Thread.sleep(200);
387+
388+
List<Measurement> measurements = MetricsTestUtil.getMeasurementList(
389+
"pipeline.otel_logs.requestsTooLarge");
390+
Measurement countMeasurement = MetricsTestUtil.getMeasurementFromList(
391+
measurements, Statistic.COUNT);
392+
assertEquals(1.0, countMeasurement.getValue());
393+
}
394+
395+
@Test
396+
void httpRequest_compressedPayloadWithinLimit_doesNotIncrementRequestsTooLargeMetric() throws Exception {
397+
configureSource(createDefaultConfigBuilder()
398+
.httpPath(CONFIG_HTTP_PATH)
399+
.compression(CompressionOption.GZIP)
400+
.build());
401+
SOURCE.start(buffer);
402+
403+
byte[] compressedPayload = createGZipCompressedPayload(
404+
JsonFormat.printer().print(createLogsServiceRequest()));
405+
406+
WebClient.of().execute(
407+
getDefaultRequestHeadersBuilder()
408+
.add(HttpHeaderNames.CONTENT_ENCODING, "gzip")
409+
.build(),
410+
compressedPayload)
411+
.aggregate()
412+
.whenComplete((response, throwable) ->
413+
assertSecureResponseWithStatusCode(response, HttpStatus.OK, throwable))
414+
.join();
415+
416+
Thread.sleep(200);
417+
418+
List<Measurement> measurements = MetricsTestUtil.getMeasurementList(
419+
"pipeline.otel_logs.requestsTooLarge");
420+
Measurement countMeasurement = MetricsTestUtil.getMeasurementFromList(
421+
measurements, Statistic.COUNT);
422+
assertEquals(0.0, countMeasurement.getValue());
423+
}
424+
358425
static class BufferExceptionToStatusArgumentsProvider implements ArgumentsProvider {
359426
@Override
360427
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
@@ -389,7 +456,7 @@ private ExportLogsServiceRequest createExportLogsRequest() {
389456
private void assertSecureResponseWithStatusCode(final AggregatedHttpResponse response,
390457
final HttpStatus expectedStatus,
391458
final Throwable throwable) {
392-
assertThat("Http Status", response.status(), equalTo(expectedStatus));
459+
assertThat("Http Status", response.status(), equalTo(expectedStatus));
393460
assertThat("Http Response Throwable", throwable, is(nullValue()));
394461

395462
final List<String> headerKeys = response.headers()

0 commit comments

Comments
 (0)