2020import java .util .concurrent .TimeUnit ;
2121
2222import io .grpc .ManagedChannel ;
23- import io .grpc .ManagedChannelBuilder ;
2423import org .apache .commons .logging .Log ;
2524import org .apache .commons .logging .LogFactory ;
2625import reactor .core .publisher .Flux ;
2726
27+ import org .springframework .beans .factory .annotation .Autowired ;
28+ import org .springframework .beans .factory .annotation .Qualifier ;
2829import org .springframework .beans .factory .annotation .Value ;
2930import org .springframework .boot .ApplicationRunner ;
3031import org .springframework .context .annotation .Bean ;
3132import org .springframework .context .annotation .Configuration ;
33+ import org .springframework .context .annotation .Lazy ;
34+ import org .springframework .grpc .client .GrpcChannelFactory ;
3235import org .springframework .integration .channel .DirectChannel ;
3336import org .springframework .integration .channel .FluxMessageChannel ;
3437import org .springframework .integration .channel .QueueChannel ;
38+ import org .springframework .integration .config .EnableIntegration ;
3539import org .springframework .integration .dsl .IntegrationFlow ;
3640import org .springframework .integration .grpc .dsl .Grpc ;
37- import org .springframework .integration .grpc .outbound .GrpcOutboundGateway ;
38- import org .springframework .integration .grpc .proto .HelloReply ;
39- import org .springframework .integration .grpc .proto .HelloRequest ;
40- import org .springframework .integration .grpc .proto .HelloWorldServiceGrpc ;
41+ import org .springframework .integration .samples .grpc .proto .HelloReply ;
42+ import org .springframework .integration .samples .grpc .proto .HelloRequest ;
43+ import org .springframework .integration .samples .grpc .proto .HelloWorldServiceGrpc ;
4144import org .springframework .integration .support .MessageBuilder ;
4245import org .springframework .messaging .Message ;
4346import org .springframework .messaging .MessageChannel ;
4447
4548/**
46- * Configuration class for the gRPC client sample.
47- * Configures the gRPC channel, message channels, and integration flows
49+ * Configure the gRPC client sample.
50+ * <p>
51+ * Define the gRPC channel, message channels, and integration flows
4852 * for both single response and streaming response scenarios.
4953 *
5054 * @author Glenn Renfro
5155 */
5256@ Configuration
57+ @ EnableIntegration
5358class ClientHelloWorldConfiguration {
5459
5560 private static final Log LOG = LogFactory .getLog (ClientHelloWorldConfiguration .class );
5661
5762 /**
58- * Creates a managed gRPC channel for communication with the server.
59- *
60- * @param host the gRPC server host (default: localhost)
61- * @param port the gRPC server port (default: 9090)
63+ * Create a managed gRPC channel for communication with the server.
64+ * @param factory used to create the managed channel
6265 * @return the configured managed channel
6366 */
64- @ Bean (destroyMethod = "shutdownNow" )
65- ManagedChannel managedChannel (@ Value ("${grpc.server.host:localhost}" ) String host ,
66- @ Value ("${grpc.server.port:9090}" ) int port ) {
67- return ManagedChannelBuilder .forAddress (host , port )
68- .usePlaintext ()
69- .build ();
70- }
71-
72- /**
73- * Creates a message channel for single response gRPC requests.
74- *
75- * @return the direct message channel
76- */
7767 @ Bean
78- MessageChannel grpcInputChannelSingleResponse ( ) {
79- return new DirectChannel ( );
68+ ManagedChannel managedChannel ( GrpcChannelFactory factory ) {
69+ return factory . createChannel ( "local" );
8070 }
8171
8272 /**
83- * Creates a message channel for streaming response gRPC requests.
84- *
73+ * Create a message channel for single response gRPC requests.
8574 * @return the direct message channel
8675 */
8776 @ Bean
88- MessageChannel grpcInputChannelStreamResponse () {
77+ MessageChannel grpcInputChannelSingleResponse () {
8978 return new DirectChannel ();
9079 }
9180
9281 /**
93- * Creates a FluxMessageChannel for output.
94- *
82+ * Create a FluxMessageChannel for output.
9583 * @return the flux message channel
9684 */
9785 @ Bean
@@ -100,15 +88,16 @@ FluxMessageChannel grpcStreamOutputChannel() {
10088 }
10189
10290 /**
103- * Creates an application runner that sends a single gRPC request and receives a single response.
104- *
91+ * Create an application runner that sends a single gRPC request and receives a single response.
10592 * @param grpcInputChannelSingleResponse the message channel for single response requests
106- * @param replyTimeout the time in milliseconds to await for the response. Defaults to 10,000 milliseconds.
93+ * @param replyTimeout the time in milliseconds to await for the response. Defaults to 10,000 milliseconds
10794 * @return the application runner
10895 */
10996 @ Bean
110- ApplicationRunner grpcClientSingleResponse (MessageChannel grpcInputChannelSingleResponse ,
97+ ApplicationRunner grpcClientSingleResponse (@ Autowired @ Qualifier ("grpcOutboundFlowSingleResponse.input" ) @ Lazy
98+ MessageChannel grpcInputChannelSingleResponse ,
11199 @ Value ("${grpc.client.single.reply.timeout:10000}" ) long replyTimeout ) {
100+
112101 return args -> {
113102 HelloRequest request = HelloRequest .newBuilder ().setName ("Jack" ).build ();
114103 QueueChannel replyChannel = new QueueChannel ();
@@ -127,26 +116,23 @@ ApplicationRunner grpcClientSingleResponse(MessageChannel grpcInputChannelSingle
127116 }
128117
129118 /**
130- * Creates an application runner that sends a gRPC request and receives a stream of responses.
131- *
119+ * Create an application runner that sends a gRPC request and receives a stream of responses.
132120 * @param grpcInputChannelStreamResponse the message channel for streaming response requests
133121 * @param grpcStreamOutputChannel channel that contains the responses
134- * @param replyTimeout the time in seconds to await for the response. Defaults to 1 second.
122+ * @param replyTimeout the time in seconds to await for the response. Defaults to 1000 milliseconds
135123 * @return the application runner
136124 */
137125 @ Bean
138- ApplicationRunner grpcClientStreamResponse (MessageChannel grpcInputChannelStreamResponse ,
126+ ApplicationRunner grpcClientStreamResponse (@ Autowired @ Qualifier ("grpcOutboundFlowStreamResponse.input" )
127+ @ Lazy MessageChannel grpcInputChannelStreamResponse ,
139128 FluxMessageChannel grpcStreamOutputChannel ,
140- @ Value ("${grpc.client.stream.reply.timeout:1}" ) int replyTimeout ) {
129+ @ Value ("${grpc.client.stream.reply.timeout:1000}" ) int replyTimeout ) {
130+
141131 return args -> {
142132 CountDownLatch latch = new CountDownLatch (1 );
133+ HelloRequest request = HelloRequest .newBuilder ().setName ("Jack" ).build ();
143134
144135 Flux .from (grpcStreamOutputChannel )
145- .doOnSubscribe (subscription -> {
146- HelloRequest request = HelloRequest .newBuilder ().setName ("Jack" ).build ();
147- Message <?> requestMessage = MessageBuilder .withPayload (request ).build ();
148- grpcInputChannelStreamResponse .send (requestMessage );
149- })
150136 .map (message -> (HelloReply ) message .getPayload ())
151137 .map (HelloReply ::getMessage )
152138 .doOnNext (msg -> LOG .info ("Stream received reply: " + msg ))
@@ -157,45 +143,44 @@ ApplicationRunner grpcClientStreamResponse(MessageChannel grpcInputChannelStream
157143 })
158144 .subscribe ();
159145
160- latch .await (replyTimeout , TimeUnit .SECONDS );
146+ Message <?> requestMessage = MessageBuilder
147+ .withPayload (request )
148+ .setReplyChannel (grpcStreamOutputChannel )
149+ .build ();
150+ grpcInputChannelStreamResponse .send (requestMessage );
151+
152+ latch .await (replyTimeout , TimeUnit .MILLISECONDS );
161153 };
162154 }
163155
164156 /**
165- * Creates an integration flow for outbound gRPC requests with single responses.
166- *
157+ * Create an integration flow for outbound gRPC requests with single responses.
167158 * @param managedChannel the gRPC managed channel
168159 * @param grpcInputChannelSingleResponse the input message channel
169160 * @return the integration flow
170161 */
171162 @ Bean
172163 IntegrationFlow grpcOutboundFlowSingleResponse (ManagedChannel managedChannel ,
173164 MessageChannel grpcInputChannelSingleResponse ) {
174- return IntegrationFlow . from ( grpcInputChannelSingleResponse )
165+ return flow -> flow
175166 .handle (Grpc .outboundGateway (managedChannel , HelloWorldServiceGrpc .class )
176- .methodName ("SayHello" ))
177- .get ();
167+ .methodName ("SayHello" ));
178168 }
179169
180170 /**
181- * Creates an integration flow for outbound gRPC requests with streaming responses.
182- *
171+ * Create an integration flow for outbound gRPC requests with streaming responses.
183172 * @param managedChannel the gRPC managed channel
184- * @param grpcInputChannelStreamResponse the input message channel
185173 * @param grpcStreamOutputChannel channel containing the results
186174 * @return the integration flow
187175 */
188176 @ Bean
189177 IntegrationFlow grpcOutboundFlowStreamResponse (ManagedChannel managedChannel ,
190- MessageChannel grpcInputChannelStreamResponse ,
191178 FluxMessageChannel grpcStreamOutputChannel ) {
192- GrpcOutboundGateway gateway = new GrpcOutboundGateway (managedChannel , HelloWorldServiceGrpc .class );
193- gateway .setMethodName ("StreamSayHello" );
194179
195- return IntegrationFlow . from ( grpcInputChannelStreamResponse )
196- .handle (gateway )
197- . channel ( grpcStreamOutputChannel )
198- .get ( );
180+ return flow -> flow
181+ .handle (Grpc . outboundGateway ( managedChannel , HelloWorldServiceGrpc . class )
182+ . methodName ( "StreamSayHello" ) )
183+ .channel ( grpcStreamOutputChannel );
199184 }
200185
201186}
0 commit comments