2020import java .util .concurrent .TimeUnit ;
2121
2222import io .grpc .ManagedChannel ;
23- import io .grpc .ManagedChannelBuilder ;
2423import org .apache .commons .logging .Log ;
2524import org .apache .commons .logging .LogFactory ;
2625import reactor .core .publisher .Flux ;
2726
27+ import org .springframework .beans .factory .annotation .Autowired ;
28+ import org .springframework .beans .factory .annotation .Qualifier ;
2829import org .springframework .beans .factory .annotation .Value ;
2930import org .springframework .boot .ApplicationRunner ;
3031import org .springframework .context .annotation .Bean ;
3132import org .springframework .context .annotation .Configuration ;
33+ import org .springframework .context .annotation .Lazy ;
34+ import org .springframework .grpc .client .GrpcChannelFactory ;
3235import org .springframework .integration .channel .DirectChannel ;
3336import org .springframework .integration .channel .FluxMessageChannel ;
3437import org .springframework .integration .channel .QueueChannel ;
38+ import org .springframework .integration .config .EnableIntegration ;
3539import org .springframework .integration .dsl .IntegrationFlow ;
3640import org .springframework .integration .grpc .dsl .Grpc ;
37- import org .springframework .integration .grpc .outbound .GrpcOutboundGateway ;
38- import org .springframework .integration .grpc .proto .HelloReply ;
39- import org .springframework .integration .grpc .proto .HelloRequest ;
40- import org .springframework .integration .grpc .proto .HelloWorldServiceGrpc ;
41+ import org .springframework .integration .samples .grpc .proto .HelloReply ;
42+ import org .springframework .integration .samples .grpc .proto .HelloRequest ;
43+ import org .springframework .integration .samples .grpc .proto .HelloWorldServiceGrpc ;
4144import org .springframework .integration .support .MessageBuilder ;
4245import org .springframework .messaging .Message ;
4346import org .springframework .messaging .MessageChannel ;
5053 * @author Glenn Renfro
5154 */
5255@ Configuration
56+ @ EnableIntegration
5357class ClientHelloWorldConfiguration {
5458
5559 private static final Log LOG = LogFactory .getLog (ClientHelloWorldConfiguration .class );
5660
5761 /**
5862 * Creates a managed gRPC channel for communication with the server.
59- *
60- * @param host the gRPC server host (default: localhost)
61- * @param port the gRPC server port (default: 9090)
63+ * @param factory used to create the managed channel
6264 * @return the configured managed channel
6365 */
64- @ Bean (destroyMethod = "shutdownNow" )
65- ManagedChannel managedChannel (@ Value ("${grpc.server.host:localhost}" ) String host ,
66- @ Value ("${grpc.server.port:9090}" ) int port ) {
67- return ManagedChannelBuilder .forAddress (host , port )
68- .usePlaintext ()
69- .build ();
66+ @ Bean
67+ ManagedChannel managedChannel (GrpcChannelFactory factory ) {
68+ return factory .createChannel ("local" );
7069 }
7170
7271 /**
7372 * Creates a message channel for single response gRPC requests.
74- *
7573 * @return the direct message channel
7674 */
7775 @ Bean
7876 MessageChannel grpcInputChannelSingleResponse () {
7977 return new DirectChannel ();
8078 }
8179
82- /**
83- * Creates a message channel for streaming response gRPC requests.
84- *
85- * @return the direct message channel
86- */
87- @ Bean
88- MessageChannel grpcInputChannelStreamResponse () {
89- return new DirectChannel ();
90- }
9180
9281 /**
9382 * Creates a FluxMessageChannel for output.
94- *
9583 * @return the flux message channel
9684 */
9785 @ Bean
@@ -101,14 +89,15 @@ FluxMessageChannel grpcStreamOutputChannel() {
10189
10290 /**
10391 * Creates an application runner that sends a single gRPC request and receives a single response.
104- *
10592 * @param grpcInputChannelSingleResponse the message channel for single response requests
10693 * @param replyTimeout the time in milliseconds to await for the response. Defaults to 10,000 milliseconds.
10794 * @return the application runner
10895 */
10996 @ Bean
110- ApplicationRunner grpcClientSingleResponse (MessageChannel grpcInputChannelSingleResponse ,
97+ ApplicationRunner grpcClientSingleResponse (@ Autowired @ Qualifier ("grpcOutboundFlowSingleResponse.input" ) @ Lazy
98+ MessageChannel grpcInputChannelSingleResponse ,
11199 @ Value ("${grpc.client.single.reply.timeout:10000}" ) long replyTimeout ) {
100+
112101 return args -> {
113102 HelloRequest request = HelloRequest .newBuilder ().setName ("Jack" ).build ();
114103 QueueChannel replyChannel = new QueueChannel ();
@@ -131,22 +120,20 @@ ApplicationRunner grpcClientSingleResponse(MessageChannel grpcInputChannelSingle
131120 *
132121 * @param grpcInputChannelStreamResponse the message channel for streaming response requests
133122 * @param grpcStreamOutputChannel channel that contains the responses
134- * @param replyTimeout the time in seconds to await for the response. Defaults to 1 second .
123+ * @param replyTimeout the time in seconds to await for the response. Defaults to 1000 milliseconds .
135124 * @return the application runner
136125 */
137126 @ Bean
138- ApplicationRunner grpcClientStreamResponse (MessageChannel grpcInputChannelStreamResponse ,
127+ ApplicationRunner grpcClientStreamResponse (@ Autowired @ Qualifier ("grpcOutboundFlowStreamResponse.input" )
128+ @ Lazy MessageChannel grpcInputChannelStreamResponse ,
139129 FluxMessageChannel grpcStreamOutputChannel ,
140- @ Value ("${grpc.client.stream.reply.timeout:1}" ) int replyTimeout ) {
130+ @ Value ("${grpc.client.stream.reply.timeout:1000}" ) int replyTimeout ) {
131+
141132 return args -> {
142133 CountDownLatch latch = new CountDownLatch (1 );
134+ HelloRequest request = HelloRequest .newBuilder ().setName ("Jack" ).build ();
143135
144136 Flux .from (grpcStreamOutputChannel )
145- .doOnSubscribe (subscription -> {
146- HelloRequest request = HelloRequest .newBuilder ().setName ("Jack" ).build ();
147- Message <?> requestMessage = MessageBuilder .withPayload (request ).build ();
148- grpcInputChannelStreamResponse .send (requestMessage );
149- })
150137 .map (message -> (HelloReply ) message .getPayload ())
151138 .map (HelloReply ::getMessage )
152139 .doOnNext (msg -> LOG .info ("Stream received reply: " + msg ))
@@ -157,7 +144,13 @@ ApplicationRunner grpcClientStreamResponse(MessageChannel grpcInputChannelStream
157144 })
158145 .subscribe ();
159146
160- latch .await (replyTimeout , TimeUnit .SECONDS );
147+ Message <?> requestMessage = MessageBuilder
148+ .withPayload (request )
149+ .setReplyChannel (grpcStreamOutputChannel )
150+ .build ();
151+ grpcInputChannelStreamResponse .send (requestMessage );
152+
153+ latch .await (replyTimeout , TimeUnit .MILLISECONDS );
161154 };
162155 }
163156
@@ -171,31 +164,26 @@ ApplicationRunner grpcClientStreamResponse(MessageChannel grpcInputChannelStream
171164 @ Bean
172165 IntegrationFlow grpcOutboundFlowSingleResponse (ManagedChannel managedChannel ,
173166 MessageChannel grpcInputChannelSingleResponse ) {
174- return IntegrationFlow . from ( grpcInputChannelSingleResponse )
167+ return flow -> flow
175168 .handle (Grpc .outboundGateway (managedChannel , HelloWorldServiceGrpc .class )
176- .methodName ("SayHello" ))
177- .get ();
169+ .methodName ("SayHello" ));
178170 }
179171
180172 /**
181173 * Creates an integration flow for outbound gRPC requests with streaming responses.
182174 *
183175 * @param managedChannel the gRPC managed channel
184- * @param grpcInputChannelStreamResponse the input message channel
185176 * @param grpcStreamOutputChannel channel containing the results
186177 * @return the integration flow
187178 */
188179 @ Bean
189180 IntegrationFlow grpcOutboundFlowStreamResponse (ManagedChannel managedChannel ,
190- MessageChannel grpcInputChannelStreamResponse ,
191181 FluxMessageChannel grpcStreamOutputChannel ) {
192- GrpcOutboundGateway gateway = new GrpcOutboundGateway (managedChannel , HelloWorldServiceGrpc .class );
193- gateway .setMethodName ("StreamSayHello" );
194182
195- return IntegrationFlow . from ( grpcInputChannelStreamResponse )
196- .handle (gateway )
197- . channel ( grpcStreamOutputChannel )
198- .get ( );
183+ return flow -> flow
184+ .handle (Grpc . outboundGateway ( managedChannel , HelloWorldServiceGrpc . class )
185+ . methodName ( "StreamSayHello" ) )
186+ .channel ( grpcStreamOutputChannel );
199187 }
200188
201189}
0 commit comments