Skip to content

Commit 318138b

Browse files
committed
Resolve Code Review issues
1 parent 558353e commit 318138b

6 files changed

Lines changed: 322 additions & 38 deletions

File tree

basic/grpc-client/src/main/java/org/springframework/integration/samples/grpc/configuration/ClientHelloWorldConfiguration.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,8 @@ IntegrationFlow grpcOutboundFlowStreamResponse(ManagedChannel managedChannel,
191191
FluxMessageChannel grpcStreamOutputChannel) {
192192
//TODO: Need advice, there has to be a better way than what I have below (or is this a bug in the
193193
// GrpcOutboundGateway?
194-
GrpcOutboundGateway gateway = new GrpcOutboundGateway(managedChannel, HelloWorldServiceGrpc.class) {
195-
@Override
196-
protected boolean shouldSplitOutput(Iterable<?> reply) {
197-
return false;
198-
}
199-
};
194+
GrpcOutboundGateway gateway = new GrpcOutboundGateway(managedChannel, HelloWorldServiceGrpc.class);
200195
gateway.setMethodName("StreamSayHello");
201-
gateway.setAsync(true);
202196

203197
return IntegrationFlow.from(grpcInputChannelStreamResponse)
204198
.handle(gateway)
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.samples.grpc;
18+
19+
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import io.grpc.Server;
26+
import io.grpc.ServerBuilder;
27+
import io.grpc.stub.StreamObserver;
28+
import org.junit.jupiter.api.AfterAll;
29+
import org.junit.jupiter.api.Test;
30+
import reactor.core.publisher.Flux;
31+
32+
import org.springframework.beans.factory.annotation.Autowired;
33+
import org.springframework.boot.ApplicationRunner;
34+
import org.springframework.boot.test.context.SpringBootTest;
35+
import org.springframework.boot.test.context.TestConfiguration;
36+
import org.springframework.context.annotation.Bean;
37+
import org.springframework.context.annotation.Primary;
38+
import org.springframework.integration.channel.FluxMessageChannel;
39+
import org.springframework.integration.channel.QueueChannel;
40+
import org.springframework.integration.grpc.proto.HelloReply;
41+
import org.springframework.integration.grpc.proto.HelloRequest;
42+
import org.springframework.integration.grpc.proto.HelloWorldServiceGrpc;
43+
import org.springframework.integration.support.MessageBuilder;
44+
import org.springframework.messaging.Message;
45+
import org.springframework.messaging.MessageChannel;
46+
import org.springframework.test.context.DynamicPropertyRegistry;
47+
import org.springframework.test.context.DynamicPropertySource;
48+
49+
import static org.assertj.core.api.Assertions.assertThat;
50+
51+
/**
52+
* @author Glenn Renfro
53+
*/
54+
@SpringBootTest(properties = {
55+
"spring.main.web-application-type=none",
56+
"spring.main.allow-bean-definition-overriding=true"
57+
})
58+
class GrpcClientTests {
59+
60+
private static Server mockGrpcServer;
61+
62+
private static int serverPort;
63+
64+
@Autowired
65+
private MessageChannel grpcInputChannelSingleResponse;
66+
67+
@Autowired
68+
private MessageChannel grpcInputChannelStreamResponse;
69+
70+
@Autowired
71+
private FluxMessageChannel grpcStreamOutputChannel;
72+
73+
@DynamicPropertySource
74+
static void grpcServerProperties(DynamicPropertyRegistry registry) throws IOException {
75+
mockGrpcServer = ServerBuilder.forPort(0)
76+
.addService(new MockHelloWorldService())
77+
.build()
78+
.start();
79+
80+
serverPort = mockGrpcServer.getPort();
81+
82+
registry.add("grpc.server.host", () -> "localhost");
83+
registry.add("grpc.server.port", () -> serverPort);
84+
}
85+
86+
@AfterAll
87+
static void tearDown() {
88+
if (mockGrpcServer != null) {
89+
mockGrpcServer.shutdownNow();
90+
}
91+
}
92+
93+
@Test
94+
void shouldSendSingleRequestAndReceiveSingleResponse() {
95+
HelloRequest request = HelloRequest.newBuilder()
96+
.setName("TestUser")
97+
.build();
98+
99+
QueueChannel replyChannel = new QueueChannel();
100+
Message<?> requestMessage = MessageBuilder.withPayload(request)
101+
.setReplyChannel(replyChannel)
102+
.build();
103+
104+
this.grpcInputChannelSingleResponse.send(requestMessage);
105+
106+
Message<?> reply = replyChannel.receive(5000);
107+
108+
assertThat(reply).isNotNull();
109+
assertThat(reply.getPayload()).isInstanceOf(HelloReply.class);
110+
111+
HelloReply helloReply = (HelloReply) reply.getPayload();
112+
assertThat(helloReply.getMessage()).isEqualTo("Hello TestUser");
113+
}
114+
115+
@Test
116+
void shouldSendStreamRequestAndReceiveMultipleResponses() throws InterruptedException {
117+
HelloRequest request = HelloRequest.newBuilder()
118+
.setName("StreamUser")
119+
.build();
120+
121+
List<HelloReply> receivedReplies = new ArrayList<>();
122+
CountDownLatch latch = new CountDownLatch(1);
123+
124+
Flux.from(this.grpcStreamOutputChannel)
125+
.doOnSubscribe(subscription -> {
126+
Message<?> requestMessage = MessageBuilder.withPayload(request).build();
127+
this.grpcInputChannelStreamResponse.send(requestMessage);
128+
}).take(2)
129+
.map(message -> (HelloReply) message.getPayload())
130+
.doOnNext(receivedReplies::add)
131+
.doOnComplete(latch::countDown)
132+
.doOnError(error -> latch.countDown())
133+
.subscribe();
134+
135+
boolean completed = latch.await(10, TimeUnit.SECONDS);
136+
137+
assertThat(completed).isTrue();
138+
assertThat(receivedReplies).hasSize(2).extracting(HelloReply::getMessage)
139+
.containsExactly("Hello StreamUser", "Hello again!");
140+
}
141+
142+
/**
143+
* Mock gRPC service implementation for testing.
144+
*/
145+
private static class MockHelloWorldService extends HelloWorldServiceGrpc.HelloWorldServiceImplBase {
146+
147+
@Override
148+
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
149+
HelloReply reply = HelloReply.newBuilder()
150+
.setMessage("Hello " + request.getName())
151+
.build();
152+
responseObserver.onNext(reply);
153+
responseObserver.onCompleted();
154+
}
155+
156+
@Override
157+
public void streamSayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
158+
HelloReply reply1 = HelloReply.newBuilder()
159+
.setMessage("Hello " + request.getName())
160+
.build();
161+
HelloReply reply2 = HelloReply.newBuilder()
162+
.setMessage("Hello again!")
163+
.build();
164+
165+
responseObserver.onNext(reply1);
166+
responseObserver.onNext(reply2);
167+
responseObserver.onCompleted();
168+
}
169+
170+
}
171+
172+
@TestConfiguration
173+
private static class GrpcClientTestConfiguration {
174+
175+
@Bean
176+
@Primary
177+
public ApplicationRunner grpcClientSingleResponse() {
178+
return args -> {
179+
// No-op for tests
180+
};
181+
}
182+
183+
@Bean
184+
@Primary
185+
public ApplicationRunner grpcClientStreamResponse() {
186+
return args -> {
187+
// No-op for tests
188+
};
189+
}
190+
191+
}
192+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
spring.grpc.server.port=9090
1+
spring.grpc.server.port=9090
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.samples.grpc;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import io.grpc.ManagedChannel;
25+
import io.grpc.ManagedChannelBuilder;
26+
import io.grpc.stub.StreamObserver;
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.boot.test.context.SpringBootTest;
33+
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
34+
import org.springframework.grpc.server.lifecycle.GrpcServerLifecycle;
35+
import org.springframework.integration.grpc.proto.HelloReply;
36+
import org.springframework.integration.grpc.proto.HelloRequest;
37+
import org.springframework.integration.grpc.proto.HelloWorldServiceGrpc;
38+
import org.springframework.test.context.TestPropertySource;
39+
import org.springframework.test.util.ReflectionTestUtils;
40+
41+
import static org.assertj.core.api.Assertions.assertThat;
42+
43+
/**
44+
* @author Glenn Renfro
45+
*/
46+
@SpringBootTest(webEnvironment = WebEnvironment.NONE)
47+
@TestPropertySource(properties = "spring.grpc.server.port=0")
48+
class GrpcServerTests {
49+
50+
@Autowired
51+
private GrpcServerLifecycle grpcServerLifecycle;
52+
53+
private int grpcServerPort;
54+
55+
private ManagedChannel channel;
56+
57+
private HelloWorldServiceGrpc.HelloWorldServiceBlockingStub blockingStub;
58+
59+
private HelloWorldServiceGrpc.HelloWorldServiceStub asyncStub;
60+
61+
@BeforeEach
62+
void setUp() {
63+
Object server = ReflectionTestUtils.getField(this.grpcServerLifecycle, "server");
64+
this.grpcServerPort = ReflectionTestUtils.invokeMethod(server, "getPort");
65+
66+
this.channel = ManagedChannelBuilder.forAddress("localhost", this.grpcServerPort)
67+
.usePlaintext()
68+
.build();
69+
this.blockingStub = HelloWorldServiceGrpc.newBlockingStub(this.channel);
70+
this.asyncStub = HelloWorldServiceGrpc.newStub(this.channel);
71+
}
72+
73+
@AfterEach
74+
void tearDown() throws InterruptedException {
75+
if (this.channel != null) {
76+
this.channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
77+
}
78+
}
79+
80+
@Test
81+
void testSayHelloBlockingCall() {
82+
HelloRequest request = HelloRequest.newBuilder()
83+
.setName("Integration Test")
84+
.build();
85+
86+
HelloReply response = this.blockingStub.sayHello(request);
87+
88+
assertThat(response).isNotNull().extracting(HelloReply::getMessage).isEqualTo("Hello Integration Test");
89+
}
90+
91+
@Test
92+
void testStreamSayHello() throws InterruptedException {
93+
HelloRequest request = HelloRequest.newBuilder()
94+
.setName("Streaming Test")
95+
.build();
96+
97+
List<HelloReply> responses = new ArrayList<>();
98+
CountDownLatch latch = new CountDownLatch(1);
99+
100+
StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloReply>() {
101+
@Override
102+
public void onNext(HelloReply value) {
103+
responses.add(value);
104+
}
105+
106+
@Override
107+
public void onError(Throwable t) {
108+
latch.countDown();
109+
}
110+
111+
@Override
112+
public void onCompleted() {
113+
latch.countDown();
114+
}
115+
};
116+
117+
this.asyncStub.streamSayHello(request, responseObserver);
118+
119+
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
120+
assertThat(responses).hasSize(2).extracting(HelloReply::getMessage)
121+
.containsExactly("Hello Streaming Test", "Hello again!");
122+
}
123+
124+
}

0 commit comments

Comments
 (0)