Skip to content

Commit b940b65

Browse files
authored
RATIS-2400. Support timeout and interrupt handling in GrpcClientRpc. (#1342)
1 parent b02fed5 commit b940b65

2 files changed

Lines changed: 104 additions & 16 deletions

File tree

ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
*/
1818
package org.apache.ratis.grpc.client;
1919

20+
import org.apache.ratis.client.RaftClientConfigKeys;
2021
import org.apache.ratis.client.impl.ClientProtoUtils;
2122
import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
2223
import org.apache.ratis.conf.RaftProperties;
2324
import org.apache.ratis.grpc.GrpcConfigKeys;
2425
import org.apache.ratis.grpc.GrpcUtil;
2526
import org.apache.ratis.protocol.*;
2627
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
28+
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
29+
import org.apache.ratis.thirdparty.io.grpc.Status;
2730
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
2831
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
2932
import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
@@ -39,26 +42,33 @@
3942
import org.apache.ratis.util.IOUtils;
4043
import org.apache.ratis.util.JavaUtils;
4144
import org.apache.ratis.util.PeerProxyMap;
45+
import org.apache.ratis.util.TimeDuration;
4246
import org.slf4j.Logger;
4347
import org.slf4j.LoggerFactory;
4448

4549
import java.io.IOException;
4650
import java.io.InterruptedIOException;
4751
import java.util.concurrent.CompletableFuture;
4852
import java.util.concurrent.ExecutionException;
53+
import java.util.concurrent.TimeUnit;
54+
import java.util.concurrent.TimeoutException;
4955

5056
public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClient> {
5157
public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);
5258

5359
private final ClientId clientId;
5460
private final int maxMessageSize;
61+
private final TimeDuration requestTimeoutDuration;
62+
private final TimeDuration watchRequestTimeoutDuration;
5563

5664
public GrpcClientRpc(ClientId clientId, RaftProperties properties,
5765
SslContext adminSslContext, SslContext clientSslContext) {
5866
super(new PeerProxyMap<>(clientId.toString(),
5967
p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext)));
6068
this.clientId = clientId;
6169
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
70+
this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
71+
this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
6272
}
6373

6474
@Override
@@ -121,24 +131,11 @@ public RaftClientReply sendRequest(RaftClientRequest request)
121131
((LeaderElectionManagementRequest) request);
122132
return ClientProtoUtils.toRaftClientReply(proxy.leaderElectionManagement(proto));
123133
} else {
124-
final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy);
125-
// TODO: timeout support
126-
try {
127-
return f.get();
128-
} catch (InterruptedException e) {
129-
Thread.currentThread().interrupt();
130-
throw new InterruptedIOException(
131-
"Interrupted while waiting for response of request " + request);
132-
} catch (ExecutionException e) {
133-
if (LOG.isTraceEnabled()) {
134-
LOG.trace(clientId + ": failed " + request, e);
135-
}
136-
throw IOUtils.toIOException(e);
137-
}
134+
return sendRequest(request, proxy);
138135
}
139136
}
140137

141-
private CompletableFuture<RaftClientReply> sendRequest(
138+
private RaftClientReply sendRequest(
142139
RaftClientRequest request, GrpcClientProtocolClient proxy) throws IOException {
143140
final RaftClientRequestProto requestProto =
144141
toRaftClientRequestProto(request);
@@ -167,7 +164,44 @@ public void onCompleted() {
167164
requestObserver.onNext(requestProto);
168165
requestObserver.onCompleted();
169166

170-
return replyFuture.thenApply(ClientProtoUtils::toRaftClientReply);
167+
final TimeDuration timeout = getTimeoutDuration(request);
168+
try {
169+
return replyFuture.thenApply(ClientProtoUtils::toRaftClientReply)
170+
.get(timeout.getDuration(), timeout.getUnit());
171+
} catch (InterruptedException e) {
172+
Thread.currentThread().interrupt();
173+
replyFuture.cancel(true);
174+
final InterruptedIOException ioe = new InterruptedIOException(clientId + ": Interrupted " + request);
175+
sendOnError(requestObserver, Status.CANCELLED, ioe.getMessage());
176+
throw ioe;
177+
} catch (TimeoutException e) {
178+
replyFuture.cancel(true);
179+
final TimeoutIOException ioe =
180+
new TimeoutIOException(clientId + ": Timed out " + timeout + " for " + request, e);
181+
sendOnError(requestObserver, Status.DEADLINE_EXCEEDED, ioe.getMessage());
182+
throw ioe;
183+
} catch (ExecutionException e) {
184+
if (LOG.isTraceEnabled()) {
185+
LOG.trace("{} : failed {}", clientId, request, e);
186+
}
187+
throw IOUtils.toIOException(e);
188+
}
189+
}
190+
191+
private void sendOnError(StreamObserver<RaftClientRequestProto> requestObserver, Status status, String message) {
192+
try {
193+
requestObserver.onError(status.withDescription(message).asException());
194+
} catch (Exception ignored) {
195+
// the stream already closed.
196+
}
197+
}
198+
199+
private TimeDuration getTimeoutDuration(RaftClientRequest request) {
200+
final long timeoutMs = request.getTimeoutMs();
201+
if (timeoutMs > 0) {
202+
return TimeDuration.valueOf(timeoutMs, TimeUnit.MILLISECONDS);
203+
}
204+
return request.is(RaftClientRequestProto.TypeCase.WATCH) ? watchRequestTimeoutDuration : requestTimeoutDuration;
171205
}
172206

173207
private RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request) throws IOException {

ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import javax.net.ssl.KeyManager;
7070
import javax.net.ssl.TrustManager;
7171
import java.io.IOException;
72+
import java.io.InterruptedIOException;
7273
import java.nio.channels.OverlappingFileLockException;
7374
import java.util.ArrayList;
7475
import java.util.Arrays;
@@ -81,6 +82,7 @@
8182
import java.util.concurrent.ThreadLocalRandom;
8283
import java.util.concurrent.TimeUnit;
8384
import java.util.concurrent.atomic.AtomicLong;
85+
import java.util.concurrent.atomic.AtomicReference;
8486

8587
public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {
8688
{
@@ -238,6 +240,58 @@ public void testRaftClientMetrics(Boolean separateHeartbeat) throws Exception {
238240
runWithNewCluster(3, this::testRaftClientRequestMetrics);
239241
}
240242

243+
@ParameterizedTest
244+
@MethodSource("data")
245+
public void testGrpcClientRpcSyncTimeout(Boolean separateHeartbeat) throws Exception {
246+
GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat);
247+
runWithNewCluster(3, cluster -> {
248+
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
249+
try (RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
250+
final SimpleStateMachine4Testing stateMachine = SimpleStateMachine4Testing.get(cluster.getLeader());
251+
stateMachine.blockStartTransaction();
252+
try {
253+
Assertions.assertThrows(TimeoutIOException.class,
254+
() -> client.io().send(new SimpleMessage("sync-timeout")));
255+
} finally {
256+
stateMachine.unblockStartTransaction();
257+
}
258+
}
259+
});
260+
}
261+
262+
@ParameterizedTest
263+
@MethodSource("data")
264+
public void testGrpcClientRpcSyncCancelOnInterrupt(Boolean separateHeartbeat) throws Exception {
265+
RaftClientConfigKeys.Rpc.setRequestTimeout(getProperties(), TimeDuration.valueOf(10, TimeUnit.SECONDS));
266+
GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat);
267+
runWithNewCluster(3, cluster -> {
268+
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
269+
try (RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
270+
final SimpleStateMachine4Testing stateMachine = SimpleStateMachine4Testing.get(cluster.getLeader());
271+
stateMachine.blockStartTransaction();
272+
try {
273+
final AtomicReference<Throwable> error = new AtomicReference<>();
274+
final Thread t = new Thread(() -> {
275+
try {
276+
client.io().send(new SimpleMessage("sync-cancel"));
277+
} catch (Throwable e) {
278+
error.set(e);
279+
}
280+
});
281+
t.start();
282+
Thread.sleep(200);
283+
t.interrupt();
284+
t.join(5000);
285+
Assertions.assertFalse(t.isAlive(), "request thread should exit after interrupt");
286+
Assertions.assertTrue(error.get() instanceof InterruptedIOException,
287+
"expected InterruptedIOException but got " + error.get());
288+
} finally {
289+
stateMachine.unblockStartTransaction();
290+
}
291+
}
292+
});
293+
}
294+
241295
@ParameterizedTest
242296
@MethodSource("data")
243297
public void testRaftServerMetrics(Boolean separateHeartbeat) throws Exception {

0 commit comments

Comments
 (0)