55
66package org .opensearch .dataprepper .plugins .source .otellogs ;
77
8+ import com .linecorp .armeria .common .ContentTooLargeException ;
89import com .linecorp .armeria .common .SessionProtocol ;
910import com .linecorp .armeria .common .grpc .GrpcExceptionHandlerFunction ;
1011import com .linecorp .armeria .server .Server ;
1415import com .linecorp .armeria .server .grpc .GrpcServiceBuilder ;
1516import com .linecorp .armeria .server .healthcheck .HealthCheckService ;
1617import com .linecorp .armeria .server .throttling .ThrottlingService ;
17-
18+ import io .grpc .ServerInterceptor ;
19+ import io .grpc .ServerInterceptors ;
20+ import io .grpc .protobuf .services .ProtoReflectionService ;
21+ import io .micrometer .core .instrument .Counter ;
22+ import io .opentelemetry .proto .collector .logs .v1 .LogsServiceGrpc ;
1823import org .opensearch .dataprepper .GrpcRequestExceptionHandler ;
1924import org .opensearch .dataprepper .armeria .authentication .GrpcAuthenticationProvider ;
2025import org .opensearch .dataprepper .http .LogThrottlingRejectHandler ;
5762import java .util .concurrent .ExecutionException ;
5863import java .util .concurrent .ScheduledThreadPoolExecutor ;
5964
60- import io .grpc .ServerInterceptor ;
61- import io .grpc .ServerInterceptors ;
62- import io .grpc .protobuf .services .ProtoReflectionService ;
63- import io .opentelemetry .proto .collector .logs .v1 .LogsServiceGrpc ;
64-
6565@ DataPrepperPlugin (name = "otel_logs_source" , pluginType = Source .class , pluginConfigurationType = OTelLogsSourceConfig .class )
6666public class OTelLogsSource implements Source <Record <Object >> {
6767 private static final Logger LOG = LoggerFactory .getLogger (OTelLogsSource .class );
@@ -75,6 +75,7 @@ public class OTelLogsSource implements Source<Record<Object>> {
7575 private final CertificateProviderFactory certificateProviderFactory ;
7676 private final ByteDecoder byteDecoder ;
7777 private final PluginFactory pluginFactory ;
78+ final Counter requestsTooLargeCounter ;
7879 private Server server ;
7980
8081 @ DataPrepperPluginConstructor
@@ -95,6 +96,7 @@ public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig,
9596 this .pipelineName = pipelineDescription .getPipelineName ();
9697 this .pluginFactory = pluginFactory ;
9798 this .byteDecoder = new OTelLogsDecoder (oTelLogsSourceConfig .getOutputFormat ());
99+ this .requestsTooLargeCounter = pluginMetrics .counter (HttpExceptionHandler .REQUESTS_TOO_LARGE );
98100 }
99101
100102 @ Override
@@ -144,6 +146,12 @@ private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>>
144146 serverBuilder .http (oTelLogsSourceConfig .getPort ());
145147 }
146148
149+ serverBuilder .accessLogWriter (log -> {
150+ if (log .responseCause () instanceof ContentTooLargeException ) {
151+ requestsTooLargeCounter .increment ();
152+ }
153+ }, true );
154+
147155 final GrpcAuthenticationProvider authProvider = createGrpcAuthenticationProvider (pluginFactory );
148156 authProvider .getHttpAuthenticationService ().ifPresent (serverBuilder ::decorator );
149157
0 commit comments