55
66package org .opensearch .dataprepper .plugins .server ;
77
8-
98import com .linecorp .armeria .common .grpc .GrpcExceptionHandlerFunction ;
109import com .linecorp .armeria .common .util .BlockingTaskExecutor ;
1110import com .linecorp .armeria .server .HttpService ;
4039import java .io .ByteArrayInputStream ;
4140import java .nio .charset .StandardCharsets ;
4241import java .time .Duration ;
42+ import java .util .ArrayList ;
4343import java .util .Collections ;
4444import java .util .List ;
4545import java .util .Optional ;
4646import java .util .concurrent .ScheduledThreadPoolExecutor ;
4747import java .util .function .Function ;
4848
49-
5049public class CreateServer {
5150 private final ServerConfiguration serverConfiguration ;
5251 private final Logger LOG ;
53- private final PluginMetrics pluginMetrics ;
52+ private final PluginMetrics pluginMetrics ;
5453 private String sourceName ;
5554 private String pipelineName ;
5655
@@ -68,7 +67,53 @@ public CreateServer(final ServerConfiguration serverConfiguration, final Logger
6867 this .pipelineName = pipelineName ;
6968 }
7069
71- public <K , V > Server createGRPCServer (final GrpcAuthenticationProvider authenticationProvider , final BindableService grpcService , final CertificateProvider certificateProvider , final MethodDescriptor <K , V > methodDescriptor ) {
70+ /**
71+ * Service configuration class for GRPC services
72+ * @param <K> Request type
73+ * @param <V> Response type
74+ */
75+ public static class GRPCServiceConfig <K , V > {
76+ private final BindableService service ;
77+ private final String path ;
78+ private final MethodDescriptor <K , V > methodDescriptor ;
79+
80+ public GRPCServiceConfig (BindableService service , String path , MethodDescriptor <K , V > methodDescriptor ) {
81+ this .service = service ;
82+ this .path = path ;
83+ this .methodDescriptor = methodDescriptor ;
84+ }
85+
86+ public GRPCServiceConfig (BindableService service ) {
87+ this .service = service ;
88+ this .path = null ;
89+ this .methodDescriptor = null ;
90+ }
91+
92+ public BindableService getService () {
93+ return service ;
94+ }
95+
96+ public String getPath () {
97+ return path ;
98+ }
99+
100+ public MethodDescriptor <K , V > getMethodDescriptor () {
101+ return methodDescriptor ;
102+ }
103+ }
104+
105+ /**
106+ * Creates a GRPC server with multiple services, each with its own optional path and method descriptor
107+ * @param authenticationProvider Provider for authentication
108+ * @param grpcServiceConfigs List of service configurations
109+ * @param certificateProvider Provider for SSL/TLS certificates
110+ * @return Configured server
111+ */
112+ public Server createGRPCServer (
113+ final GrpcAuthenticationProvider authenticationProvider ,
114+ final List <GRPCServiceConfig <?, ?>> grpcServiceConfigs ,
115+ final CertificateProvider certificateProvider ) {
116+
72117 final List <ServerInterceptor > serverInterceptors = getAuthenticationInterceptor (authenticationProvider );
73118
74119 final GrpcServiceBuilder grpcServiceBuilder = GrpcService
@@ -77,13 +122,20 @@ public <K, V> Server createGRPCServer(final GrpcAuthenticationProvider authentic
77122 .useBlockingTaskExecutor (true )
78123 .exceptionHandler (createGrpExceptionHandler ());
79124
80- final String sourcePath = serverConfiguration .getPath ();
81- if (sourcePath != null ) {
82- final String transformedSourcePath = sourcePath .replace (PIPELINE_NAME_PLACEHOLDER , pipelineName );
83- grpcServiceBuilder .addService (transformedSourcePath ,
84- ServerInterceptors .intercept (grpcService , serverInterceptors ), methodDescriptor );
85- } else {
86- grpcServiceBuilder .addService (ServerInterceptors .intercept (grpcService , serverInterceptors ));
125+ // Add each service with its own path and method descriptor
126+ for (GRPCServiceConfig <?, ?> serviceConfig : grpcServiceConfigs ) {
127+ if (serviceConfig .getPath () != null && serviceConfig .getMethodDescriptor () != null ) {
128+ final String transformedPath = serviceConfig .getPath ().replace (PIPELINE_NAME_PLACEHOLDER , pipelineName );
129+ grpcServiceBuilder .addService (
130+ transformedPath ,
131+ ServerInterceptors .intercept (serviceConfig .getService (), serverInterceptors ),
132+ serviceConfig .getMethodDescriptor ());
133+ LOG .info ("Adding service with path: {}" , transformedPath );
134+ } else {
135+ grpcServiceBuilder .addService (
136+ ServerInterceptors .intercept (serviceConfig .getService (), serverInterceptors ));
137+ LOG .info ("Adding service without specific path" );
138+ }
87139 }
88140
89141 if (serverConfiguration .hasHealthCheck ()) {
@@ -151,7 +203,35 @@ public <K, V> Server createGRPCServer(final GrpcAuthenticationProvider authentic
151203 return sb .build ();
152204 }
153205
154- public Server createHTTPServer (final Buffer <Record <Log >> buffer , final CertificateProviderFactory certificateProviderFactory , final ArmeriaHttpAuthenticationProvider authenticationProvider , final HttpRequestExceptionHandler httpRequestExceptionHandler , final Object logService ) {
206+ /**
207+ * Creates a GRPC server with a single service
208+ * @param authenticationProvider Provider for authentication
209+ * @param grpcService Service to be added
210+ * @param certificateProvider Provider for SSL/TLS certificates
211+ * @param methodDescriptor Method descriptor for the service
212+ * @return Configured server
213+ */
214+ public <K , V > Server createGRPCServer (
215+ final GrpcAuthenticationProvider authenticationProvider ,
216+ final BindableService grpcService ,
217+ final CertificateProvider certificateProvider ,
218+ final MethodDescriptor <K , V > methodDescriptor ) {
219+
220+ List <GRPCServiceConfig <?, ?>> serviceConfigs = new ArrayList <>();
221+ if (serverConfiguration .getPath () != null ) {
222+ serviceConfigs .add (new GRPCServiceConfig <>(grpcService , serverConfiguration .getPath (), methodDescriptor ));
223+ } else {
224+ serviceConfigs .add (new GRPCServiceConfig <>(grpcService ));
225+ }
226+
227+ return createGRPCServer (authenticationProvider , serviceConfigs , certificateProvider );
228+ }
229+
230+
231+ public Server createHTTPServer (final Buffer <Record <Log >> buffer ,
232+ final CertificateProviderFactory certificateProviderFactory ,
233+ final ArmeriaHttpAuthenticationProvider authenticationProvider ,
234+ final HttpRequestExceptionHandler httpRequestExceptionHandler , final Object logService ) {
155235 final ServerBuilder sb = Server .builder ();
156236
157237 sb .disableServerHeader ();
@@ -163,17 +243,16 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer, final Certifica
163243 // TODO: enable encrypted key with password
164244 sb .https (serverConfiguration .getPort ()).tls (
165245 new ByteArrayInputStream (certificate .getCertificate ().getBytes (StandardCharsets .UTF_8 )),
166- new ByteArrayInputStream (certificate .getPrivateKey ().getBytes (StandardCharsets .UTF_8 )
167- )
168- );
246+ new ByteArrayInputStream (certificate .getPrivateKey ().getBytes (StandardCharsets .UTF_8 )));
169247 } else {
170248 LOG .warn ("Creating " + sourceName + " without SSL/TLS. This is not secure." );
171249 LOG .warn ("In order to set up TLS for the " + sourceName + ", go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/http-source#ssl" );
172250 sb .http (serverConfiguration .getPort ());
173251 }
174252
175- if (serverConfiguration .getAuthentication () != null ) {
176- final Optional <Function <? super HttpService , ? extends HttpService >> optionalAuthDecorator = authenticationProvider .getAuthenticationDecorator ();
253+ if (serverConfiguration .getAuthentication () != null ) {
254+ final Optional <Function <? super HttpService , ? extends HttpService >> optionalAuthDecorator = authenticationProvider
255+ .getAuthenticationDecorator ();
177256
178257 if (serverConfiguration .isUnauthenticatedHealthCheck ()) {
179258 optionalAuthDecorator .ifPresent (authDecorator -> sb .decorator (REGEX_HEALTH , authDecorator ));
@@ -184,7 +263,7 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer, final Certifica
184263
185264 sb .maxNumConnections (serverConfiguration .getMaxConnectionCount ());
186265 sb .requestTimeout (Duration .ofMillis (serverConfiguration .getRequestTimeoutInMillis ()));
187- if (serverConfiguration .getMaxRequestLength () != null ) {
266+ if (serverConfiguration .getMaxRequestLength () != null ) {
188267 sb .maxRequestLength (serverConfiguration .getMaxRequestLength ().getBytes ());
189268 }
190269 final int threads = serverConfiguration .getThreadCount ();
@@ -193,15 +272,17 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer, final Certifica
193272 final int maxPendingRequests = serverConfiguration .getMaxPendingRequests ();
194273 final LogThrottlingStrategy logThrottlingStrategy = new LogThrottlingStrategy (
195274 maxPendingRequests , blockingTaskExecutor .getQueue ());
196- final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler (maxPendingRequests , pluginMetrics );
275+ final LogThrottlingRejectHandler logThrottlingRejectHandler = new LogThrottlingRejectHandler (maxPendingRequests ,
276+ pluginMetrics );
197277
198278 final String httpSourcePath = serverConfiguration .getPath ().replace (PIPELINE_NAME_PLACEHOLDER , pipelineName );
199279 sb .decorator (httpSourcePath , ThrottlingService .newDecorator (logThrottlingStrategy , logThrottlingRejectHandler ));
200280
201281 if (CompressionOption .NONE .equals (serverConfiguration .getCompression ())) {
202282 sb .annotatedService (httpSourcePath , logService , httpRequestExceptionHandler );
203283 } else {
204- sb .annotatedService (httpSourcePath , logService , DecodingService .newDecorator (), httpRequestExceptionHandler );
284+ sb .annotatedService (httpSourcePath , logService , DecodingService .newDecorator (),
285+ httpRequestExceptionHandler );
205286 }
206287
207288 if (serverConfiguration .hasHealthCheck ()) {
@@ -220,12 +301,12 @@ private GrpcExceptionHandlerFunction createGrpExceptionHandler() {
220301 return new GrpcRequestExceptionHandler (pluginMetrics , retryInfo .getMinDelay (), retryInfo .getMaxDelay ());
221302 }
222303
223- private List <ServerInterceptor > getAuthenticationInterceptor (final GrpcAuthenticationProvider authenticationProvider ) {
304+ private List <ServerInterceptor > getAuthenticationInterceptor (
305+ final GrpcAuthenticationProvider authenticationProvider ) {
224306 final ServerInterceptor authenticationInterceptor = authenticationProvider .getAuthenticationInterceptor ();
225307 if (authenticationInterceptor == null ) {
226308 return Collections .emptyList ();
227309 }
228310 return Collections .singletonList (authenticationInterceptor );
229311 }
230- }
231- //
312+ }
0 commit comments