1+ /*
2+ * Copyright OpenSearch Contributors
3+ * SPDX-License-Identifier: Apache-2.0
4+ */
5+
6+ package org .opensearch .dataprepper .plugins .server ;
7+
8+
9+ import com .linecorp .armeria .common .grpc .GrpcExceptionHandlerFunction ;
10+ import com .linecorp .armeria .common .util .BlockingTaskExecutor ;
11+ import com .linecorp .armeria .server .HttpService ;
12+ import com .linecorp .armeria .server .Server ;
13+ import com .linecorp .armeria .server .ServerBuilder ;
14+ import com .linecorp .armeria .server .encoding .DecodingService ;
15+ import com .linecorp .armeria .server .grpc .GrpcService ;
16+ import com .linecorp .armeria .server .grpc .GrpcServiceBuilder ;
17+ import com .linecorp .armeria .server .healthcheck .HealthCheckService ;
18+ import com .linecorp .armeria .server .throttling .ThrottlingService ;
19+ import io .grpc .BindableService ;
20+ import io .grpc .MethodDescriptor ;
21+ import io .grpc .ServerInterceptor ;
22+ import io .grpc .ServerInterceptors ;
23+ import io .grpc .protobuf .services .ProtoReflectionService ;
24+ import org .opensearch .dataprepper .GrpcRequestExceptionHandler ;
25+ import org .opensearch .dataprepper .HttpRequestExceptionHandler ;
26+ import org .opensearch .dataprepper .armeria .authentication .ArmeriaHttpAuthenticationProvider ;
27+ import org .opensearch .dataprepper .armeria .authentication .GrpcAuthenticationProvider ;
28+ import org .opensearch .dataprepper .http .LogThrottlingRejectHandler ;
29+ import org .opensearch .dataprepper .http .LogThrottlingStrategy ;
30+ import org .opensearch .dataprepper .http .certificate .CertificateProviderFactory ;
31+ import org .opensearch .dataprepper .metrics .PluginMetrics ;
32+ import org .opensearch .dataprepper .model .buffer .Buffer ;
33+ import org .opensearch .dataprepper .model .log .Log ;
34+ import org .opensearch .dataprepper .model .record .Record ;
35+ import org .opensearch .dataprepper .plugins .certificate .CertificateProvider ;
36+ import org .opensearch .dataprepper .plugins .certificate .model .Certificate ;
37+ import org .opensearch .dataprepper .plugins .codec .CompressionOption ;
38+ import org .slf4j .Logger ;
39+
40+ import java .io .ByteArrayInputStream ;
41+ import java .nio .charset .StandardCharsets ;
42+ import java .time .Duration ;
43+ import java .util .Collections ;
44+ import java .util .List ;
45+ import java .util .Optional ;
46+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
47+ import java .util .function .Function ;
48+
49+
50+ public class CreateServer {
51+ private final ServerConfiguration serverConfiguration ;
52+ private final Logger LOG ;
53+ private final PluginMetrics pluginMetrics ;
54+ private String sourceName ;
55+ private String pipelineName ;
56+
57+ private static final String HTTP_HEALTH_CHECK_PATH = "/health" ;
58+ private static final String REGEX_HEALTH = "regex:^/(?!health$).*$" ;
59+ private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}" ;
60+
61+ private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig (Duration .ofMillis (100 ), Duration .ofMillis (2000 ));
62+
63+ public CreateServer (final ServerConfiguration serverConfiguration , final Logger LOG , final PluginMetrics pluginMetrics , final String sourceName , final String pipelineName ) {
64+ this .serverConfiguration = serverConfiguration ;
65+ this .LOG = LOG ;
66+ this .pluginMetrics = pluginMetrics ;
67+ this .sourceName = sourceName ;
68+ this .pipelineName = pipelineName ;
69+ }
70+
71+ public <K , V > Server createGRPCServer (final GrpcAuthenticationProvider authenticationProvider , final BindableService grpcService , final CertificateProvider certificateProvider , final MethodDescriptor <K , V > methodDescriptor ) {
72+ final List <ServerInterceptor > serverInterceptors = getAuthenticationInterceptor (authenticationProvider );
73+
74+ final GrpcServiceBuilder grpcServiceBuilder = GrpcService
75+ .builder ()
76+ .useClientTimeoutHeader (false )
77+ .useBlockingTaskExecutor (true )
78+ .exceptionHandler (createGrpExceptionHandler ());
79+
80+ final String sourcePath = serverConfiguration .getPath ();
81+ if (sourcePath != null ) {
82+ final String transformedSourcePath = sourcePath .replace (PIPELINE_NAME_PLACEHOLDER , pipelineName );
83+ grpcServiceBuilder .addService (transformedSourcePath ,
84+ ServerInterceptors .intercept (grpcService , serverInterceptors ), methodDescriptor );
85+ } else {
86+ grpcServiceBuilder .addService (ServerInterceptors .intercept (grpcService , serverInterceptors ));
87+ }
88+
89+ if (serverConfiguration .hasHealthCheck ()) {
90+ LOG .info ("Health check is enabled" );
91+ grpcServiceBuilder .addService (new HealthGrpcService ());
92+ }
93+
94+ if (serverConfiguration .hasProtoReflectionService ()) {
95+ LOG .info ("Proto reflection service is enabled" );
96+ grpcServiceBuilder .addService (ProtoReflectionService .newInstance ());
97+ }
98+
99+ grpcServiceBuilder .enableUnframedRequests (serverConfiguration .enableUnframedRequests ());
100+
101+ final ServerBuilder sb = Server .builder ();
102+ sb .disableServerHeader ();
103+ if (CompressionOption .NONE .equals (serverConfiguration .getCompression ())) {
104+ sb .service (grpcServiceBuilder .build ());
105+ } else {
106+ sb .service (grpcServiceBuilder .build (), DecodingService .newDecorator ());
107+ }
108+
109+ if (serverConfiguration .enableHttpHealthCheck ()) {
110+ sb .service (HTTP_HEALTH_CHECK_PATH , HealthCheckService .builder ().longPolling (0 ).build ());
111+ }
112+
113+ if (serverConfiguration .getAuthentication () != null ) {
114+ final Optional <Function <? super HttpService , ? extends HttpService >> optionalHttpAuthenticationService =
115+ authenticationProvider .getHttpAuthenticationService ();
116+
117+ if (serverConfiguration .isUnauthenticatedHealthCheck ()) {
118+ optionalHttpAuthenticationService .ifPresent (httpAuthenticationService ->
119+ sb .decorator (REGEX_HEALTH , httpAuthenticationService ));
120+ } else {
121+ optionalHttpAuthenticationService .ifPresent (sb ::decorator );
122+ }
123+ }
124+
125+ sb .requestTimeoutMillis (serverConfiguration .getRequestTimeoutInMillis ());
126+ if (serverConfiguration .getMaxRequestLength () != null ) {
127+ sb .maxRequestLength (serverConfiguration .getMaxRequestLength ().getBytes ());
128+ }
129+
130+ // ACM Cert for SSL takes preference
131+ if (serverConfiguration .isSsl () || serverConfiguration .useAcmCertForSSL ()) {
132+ LOG .info ("SSL/TLS is enabled." );
133+ final Certificate certificate = certificateProvider .getCertificate ();
134+ sb .https (serverConfiguration .getPort ()).tls (
135+ new ByteArrayInputStream (certificate .getCertificate ().getBytes (StandardCharsets .UTF_8 )),
136+ new ByteArrayInputStream (certificate .getPrivateKey ().getBytes (StandardCharsets .UTF_8 )
137+ )
138+ );
139+ } else {
140+ LOG .warn ("Creating " + sourceName + " without SSL/TLS. This is not secure." );
141+ LOG .warn ("In order to set up TLS for the " + sourceName + ", go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-trace-source#ssl" );
142+ sb .http (serverConfiguration .getPort ());
143+ }
144+
145+ final BlockingTaskExecutor blockingTaskExecutor = BlockingTaskExecutor .builder ()
146+ .numThreads (serverConfiguration .getThreadCount ())
147+ .threadNamePrefix (pipelineName + sourceName )
148+ .build ();
149+ sb .blockingTaskExecutor (blockingTaskExecutor , true );
150+
151+ return sb .build ();
152+ }
153+
154+ public Server createHTTPServer (final Buffer <Record <Log >> buffer , final CertificateProviderFactory certificateProviderFactory , final ArmeriaHttpAuthenticationProvider authenticationProvider , final HttpRequestExceptionHandler httpRequestExceptionHandler , final Object logService ) {
155+ final ServerBuilder sb = Server .builder ();
156+
157+ sb .disableServerHeader ();
158+
159+ if (serverConfiguration .isSsl ()) {
160+ LOG .info ("Creating http source with SSL/TLS enabled." );
161+ final CertificateProvider certificateProvider = certificateProviderFactory .getCertificateProvider ();
162+ final Certificate certificate = certificateProvider .getCertificate ();
163+ // TODO: enable encrypted key with password
164+ sb .https (serverConfiguration .getPort ()).tls (
165+ new ByteArrayInputStream (certificate .getCertificate ().getBytes (StandardCharsets .UTF_8 )),
166+ new ByteArrayInputStream (certificate .getPrivateKey ().getBytes (StandardCharsets .UTF_8 )
167+ )
168+ );
169+ } else {
170+ LOG .warn ("Creating http source without SSL/TLS. This is not secure." );
171+ LOG .warn ("In order to set up TLS for the http source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl" );
172+ sb .http (serverConfiguration .getPort ());
173+ }
174+
175+ if (serverConfiguration .getAuthentication () != null ) {
176+ final Optional <Function <? super HttpService , ? extends HttpService >> optionalAuthDecorator = authenticationProvider .getAuthenticationDecorator ();
177+
178+ if (serverConfiguration .isUnauthenticatedHealthCheck ()) {
179+ optionalAuthDecorator .ifPresent (authDecorator -> sb .decorator (REGEX_HEALTH , authDecorator ));
180+ } else {
181+ optionalAuthDecorator .ifPresent (sb ::decorator );
182+ }
183+ }
184+
185+ sb .maxNumConnections (serverConfiguration .getMaxConnectionCount ());
186+ sb .requestTimeout (Duration .ofMillis (serverConfiguration .getRequestTimeoutInMillis ()));
187+ if (serverConfiguration .getMaxRequestLength () != null ) {
188+ sb .maxRequestLength (serverConfiguration .getMaxRequestLength ().getBytes ());
189+ }
190+ final int threads = serverConfiguration .getThreadCount ();
191+ final ScheduledThreadPoolExecutor blockingTaskExecutor = new ScheduledThreadPoolExecutor (threads );
192+ sb .blockingTaskExecutor (blockingTaskExecutor , true );
193+ final int maxPendingRequests = serverConfiguration .getMaxPendingRequests ();
194+ final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy (
195+ maxPendingRequests , blockingTaskExecutor .getQueue ());
196+ final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler (maxPendingRequests , pluginMetrics );
197+
198+ final String httpSourcePath = serverConfiguration .getPath ().replace (PIPELINE_NAME_PLACEHOLDER , pipelineName );
199+ sb .decorator (httpSourcePath , ThrottlingService .newDecorator (logThrottlingStrategy , logThrottlingRejectHandler ));
200+
201+ if (CompressionOption .NONE .equals (serverConfiguration .getCompression ())) {
202+ sb .annotatedService (httpSourcePath , logService , httpRequestExceptionHandler );
203+ } else {
204+ sb .annotatedService (httpSourcePath , logService , DecodingService .newDecorator (), httpRequestExceptionHandler );
205+ }
206+
207+ if (serverConfiguration .hasHealthCheck ()) {
208+ LOG .info ("HTTP source health check is enabled" );
209+ sb .service (HTTP_HEALTH_CHECK_PATH , HealthCheckService .builder ().longPolling (0 ).build ());
210+ }
211+
212+ return sb .build ();
213+ }
214+
215+ private GrpcExceptionHandlerFunction createGrpExceptionHandler () {
216+ RetryInfoConfig retryInfo = serverConfiguration .getRetryInfo () != null
217+ ? serverConfiguration .getRetryInfo ()
218+ : DEFAULT_RETRY_INFO ;
219+
220+ return new GrpcRequestExceptionHandler (pluginMetrics , retryInfo .getMinDelay (), retryInfo .getMaxDelay ());
221+ }
222+
223+ private List <ServerInterceptor > getAuthenticationInterceptor (final GrpcAuthenticationProvider authenticationProvider ) {
224+ final ServerInterceptor authenticationInterceptor = authenticationProvider .getAuthenticationInterceptor ();
225+ if (authenticationInterceptor == null ) {
226+ return Collections .emptyList ();
227+ }
228+ return Collections .singletonList (authenticationInterceptor );
229+ }
230+ }
231+ //
0 commit comments