Skip to content

Commit bf60fd1

Browse files
authored
HDDS-14212. Make maxFailovers in GrpcOmTransport apply on a per-request basis. (#9546)
1 parent 6d4cd1c commit bf60fd1

4 files changed

Lines changed: 358 additions & 15 deletions

File tree

hadoop-hdds/common/src/main/resources/ozone-default.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3116,9 +3116,9 @@
31163116
Expert only. Ozone RpcClient attempts talking to each OzoneManager
31173117
ipc.client.connect.max.retries (default = 10) number of times before
31183118
failing over to another OzoneManager, if available. This parameter
3119-
represents the number of times the client will failover before giving
3120-
up. This value is kept high so that client does not give up trying to
3121-
connect to OMs easily.
3119+
represents the number of times per request the client will failover
3120+
before giving up. This value is kept high so that client does not
3121+
give up trying to connect to OMs easily.
31223122
</description>
31233123
</property>
31243124
<property>

hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,11 @@ public class GrpcOmTransport implements OmTransport {
8181
private ConfigurationSource conf;
8282

8383
private AtomicReference<String> host;
84-
private AtomicInteger syncFailoverCount;
84+
private AtomicInteger globalFailoverCount;
8585
private final int maxSize;
8686
private SecurityConfig secConfig;
8787

8888
private RetryPolicy retryPolicy;
89-
private int failoverCount = 0;
9089
private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
9190
omFailoverProxyProvider;
9291

@@ -102,9 +101,7 @@ public GrpcOmTransport(ConfigurationSource conf,
102101
this.clients = new HashMap<>();
103102
this.conf = conf;
104103
this.host = new AtomicReference();
105-
this.failoverCount = 0;
106-
this.syncFailoverCount = new AtomicInteger();
107-
104+
this.globalFailoverCount = new AtomicInteger();
108105

109106
secConfig = new SecurityConfig(conf);
110107
maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
@@ -175,12 +172,13 @@ public void start() throws IOException {
175172
@Override
176173
public OMResponse submitRequest(OMRequest payload) throws IOException {
177174
AtomicReference<OMResponse> resp = new AtomicReference<>();
175+
int requestFailoverCount = 0;
178176
boolean tryOtherHost = true;
179177
int expectedFailoverCount = 0;
180178
ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
181179
while (tryOtherHost) {
182180
tryOtherHost = false;
183-
expectedFailoverCount = syncFailoverCount.get();
181+
expectedFailoverCount = globalFailoverCount.get();
184182
try {
185183
InetAddress inetAddress = InetAddress.getLocalHost();
186184
Context.current()
@@ -201,7 +199,7 @@ public OMResponse submitRequest(OMRequest payload) throws IOException {
201199
}
202200
Exception exp = new Exception(e);
203201
tryOtherHost = shouldRetry(unwrapException(exp),
204-
expectedFailoverCount);
202+
expectedFailoverCount, ++requestFailoverCount);
205203
if (!tryOtherHost) {
206204
throw new OMException(resultCode);
207205
}
@@ -251,11 +249,11 @@ private Exception unwrapException(Exception ex) {
251249
return grpcException;
252250
}
253251

254-
private boolean shouldRetry(Exception ex, int expectedFailoverCount) {
252+
private boolean shouldRetry(Exception ex, int expectedFailoverCount, int requestFailoverCount) {
255253
boolean retry = false;
256254
RetryPolicy.RetryAction action = null;
257255
try {
258-
action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++, true);
256+
action = retryPolicy.shouldRetry(ex, 0, requestFailoverCount, true);
259257
LOG.debug("grpc failover retry action {}", action.action);
260258
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
261259
retry = false;
@@ -273,9 +271,9 @@ private boolean shouldRetry(Exception ex, int expectedFailoverCount) {
273271
}
274272
}
275273
// switch om host to current proxy OMNodeId
276-
if (syncFailoverCount.get() == expectedFailoverCount) {
274+
if (globalFailoverCount.get() == expectedFailoverCount) {
277275
omFailoverProxyProvider.performFailover(null);
278-
syncFailoverCount.getAndIncrement();
276+
globalFailoverCount.getAndIncrement();
279277
} else {
280278
LOG.warn("A failover has occurred since the start of current" +
281279
" thread retry, NOT failover using current proxy");
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.ozone.om.protocolPB;
19+
20+
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
21+
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_PORT_KEY;
22+
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
23+
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
24+
import static org.junit.jupiter.api.Assertions.assertEquals;
25+
import static org.junit.jupiter.api.Assertions.assertTrue;
26+
import static org.mockito.AdditionalAnswers.delegatesTo;
27+
import static org.mockito.Mockito.mock;
28+
29+
import io.grpc.Server;
30+
import io.grpc.ServerBuilder;
31+
import io.grpc.Status;
32+
import io.grpc.StatusRuntimeException;
33+
import io.grpc.stub.StreamObserver;
34+
import java.io.IOException;
35+
import java.util.HashMap;
36+
import java.util.Map;
37+
import java.util.StringJoiner;
38+
import java.util.concurrent.CountDownLatch;
39+
import java.util.concurrent.CyclicBarrier;
40+
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.Executors;
42+
import java.util.concurrent.TimeUnit;
43+
import java.util.concurrent.atomic.AtomicBoolean;
44+
import java.util.concurrent.atomic.AtomicInteger;
45+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
46+
import org.apache.hadoop.ozone.OzoneConfigKeys;
47+
import org.apache.hadoop.ozone.ha.ConfUtils;
48+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
49+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
50+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
51+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
52+
import org.apache.hadoop.security.UserGroupInformation;
53+
import org.junit.jupiter.api.AfterEach;
54+
import org.junit.jupiter.api.BeforeEach;
55+
import org.junit.jupiter.api.Test;
56+
import org.slf4j.Logger;
57+
import org.slf4j.LoggerFactory;
58+
59+
/**
60+
* Concurrent test for GrpcOmTransport client.
61+
*/
62+
public class TestGrpcOmTransportConcurrentFailover {
63+
private static final Logger LOG =
64+
LoggerFactory.getLogger(TestGrpcOmTransportConcurrentFailover.class);
65+
66+
private static final String OM_SERVICE_ID = "om-service-test";
67+
private static final String NODE_ID_BASE = "om";
68+
private static final int NUM_OMS = 3;
69+
private static final int BASE_PORT = 19860;
70+
71+
private Map<String, MockOMServer> mockServers;
72+
private GrpcOmTransport transport;
73+
74+
@BeforeEach
75+
public void setUp() throws Exception {
76+
mockServers = new HashMap<>();
77+
OzoneConfiguration conf = new OzoneConfiguration();
78+
79+
conf.setLong(OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 250);
80+
conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10);
81+
conf.set(OZONE_OM_SERVICE_IDS_KEY, OM_SERVICE_ID);
82+
83+
StringJoiner omNodes = new StringJoiner(",");
84+
85+
for (int i = 0; i < NUM_OMS; i++) {
86+
String nodeId = NODE_ID_BASE + i;
87+
omNodes.add(nodeId);
88+
89+
int port = BASE_PORT + i;
90+
MockOMServer server = new MockOMServer(nodeId, port);
91+
server.start();
92+
mockServers.put(nodeId, server);
93+
94+
conf.set(ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, OM_SERVICE_ID, nodeId),
95+
"localhost");
96+
conf.setInt(ConfUtils.addKeySuffixes(OZONE_OM_GRPC_PORT_KEY, OM_SERVICE_ID, nodeId),
97+
port);
98+
}
99+
100+
conf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
101+
omNodes.toString());
102+
103+
failover("om0", "om1", "om2");
104+
105+
transport = new GrpcOmTransport(conf, UserGroupInformation.getCurrentUser(), OM_SERVICE_ID);
106+
}
107+
108+
@AfterEach
109+
public void tearDown() throws Exception {
110+
if (transport != null) {
111+
transport.close();
112+
}
113+
for (MockOMServer server : mockServers.values()) {
114+
server.stop();
115+
}
116+
}
117+
118+
@Test
119+
public void testConcurrentFailoverTriesAllOMs() throws Exception {
120+
final int numThreads = 500;
121+
final int requestsPerThread = 10;
122+
123+
sendInitialOmRequestsBeforeFailover();
124+
failover("om2", "om0", "om1");
125+
runConcurrentClientRequests(numThreads, requestsPerThread);
126+
127+
int omsWithFailuresCount = omRequestFailoverDistributionReport();
128+
int om0FailuresCount = mockServers.get("om0").getFailureCount();
129+
int om2SuccessesCount = mockServers.get("om2").getSuccessCount();
130+
131+
assertTrue(omsWithFailuresCount >= 1,
132+
"At least 1 OMs should receive failed requests during failover. Got: " + omsWithFailuresCount);
133+
assertTrue(om0FailuresCount > 0, "om0 should receive failed requests");
134+
assertEquals(numThreads * requestsPerThread, om2SuccessesCount,
135+
"All requests should eventually succeed on om2 (leader)");
136+
}
137+
138+
private int omRequestFailoverDistributionReport() {
139+
int totalRequests = 0;
140+
int totalFailures = 0;
141+
int totalSuccesses = 0;
142+
int omsWithFailures = 0;
143+
144+
for (int i = 0; i < NUM_OMS; i++) {
145+
String omId = NODE_ID_BASE + i;
146+
MockOMServer server = mockServers.get(omId);
147+
totalRequests += server.getRequestCount();
148+
totalFailures += server.getFailureCount();
149+
totalSuccesses += server.getSuccessCount();
150+
if (server.getFailureCount() > 0) {
151+
omsWithFailures++;
152+
}
153+
}
154+
155+
LOG.info("Total requests: {} (failures: {}, successes: {})", totalRequests, totalFailures, totalSuccesses);
156+
LOG.info("OMs that received failed requests: {}/{}", omsWithFailures, NUM_OMS);
157+
158+
LOG.info("--- Failed Requests (Failover Attempts) ---");
159+
for (int i = 0; i < NUM_OMS; i++) {
160+
String omId = NODE_ID_BASE + i;
161+
int failures = mockServers.get(omId).getFailureCount();
162+
double percentage = totalFailures > 0 ? (failures * 100.0 / totalFailures) : 0;
163+
String status = failures == 0 ? " NEVER TRIED!" : "";
164+
LOG.info(" {}: {} failures ({} %){}", omId, failures, String.format("%.1f", percentage), status);
165+
}
166+
167+
LOG.info("--- Successful Requests ---");
168+
for (int i = 0; i < NUM_OMS; i++) {
169+
String omId = NODE_ID_BASE + i;
170+
int successes = mockServers.get(omId).getSuccessCount();
171+
double percentage = totalSuccesses > 0 ? (successes * 100.0 / totalSuccesses) : 0;
172+
String status = successes > 0 ? " LEADER" : "";
173+
LOG.info(" {}: {} successes ({} %){}", omId, successes, String.format("%.1f", percentage), status);
174+
}
175+
return omsWithFailures;
176+
}
177+
178+
private void failover(String leader, String follower1, String follower2) {
179+
mockServers.get(leader).setAsLeader(true);
180+
mockServers.get(follower1).setAsLeader(false);
181+
mockServers.get(follower2).setAsLeader(false);
182+
}
183+
184+
private void runConcurrentClientRequests(int numThreads, int requestsPerThread) throws InterruptedException {
185+
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
186+
CyclicBarrier startBarrier = new CyclicBarrier(numThreads);
187+
CountDownLatch completionLatch = new CountDownLatch(numThreads);
188+
189+
for (int threadId = 0; threadId < numThreads; threadId++) {
190+
final int id = threadId;
191+
executor.submit(() -> {
192+
try {
193+
startBarrier.await();
194+
195+
for (int attempt = 0; attempt < requestsPerThread; attempt++) {
196+
OMRequest request = OMRequest.newBuilder()
197+
.setCmdType(Type.ListVolume)
198+
.setClientId("test-client")
199+
.build();
200+
201+
try {
202+
transport.submitRequest(request);
203+
} catch (Exception e) {
204+
LOG.error("Thread: {}, Request {} failed: {}", id, attempt + 1, e.getMessage());
205+
}
206+
207+
Thread.sleep(1);
208+
}
209+
} catch (Exception e) {
210+
LOG.error("Thread: {}, Failed: {}", id, e.getMessage());
211+
} finally {
212+
completionLatch.countDown();
213+
}
214+
});
215+
}
216+
217+
if (!completionLatch.await(30, TimeUnit.SECONDS)) {
218+
LOG.info("Latch didn't count down before timeout");
219+
}
220+
executor.shutdown();
221+
}
222+
223+
private void sendInitialOmRequestsBeforeFailover() throws IOException {
224+
for (int i = 0; i < 5; i++) {
225+
OMRequest request = OMRequest.newBuilder()
226+
.setCmdType(Type.ListVolume)
227+
.setClientId("test-client")
228+
.build();
229+
transport.submitRequest(request);
230+
}
231+
}
232+
233+
private static class MockOMServer {
234+
private final String nodeId;
235+
private final int port;
236+
private final AtomicInteger requestCount = new AtomicInteger(0);
237+
private final AtomicInteger successCount = new AtomicInteger(0);
238+
private final AtomicInteger failureCount = new AtomicInteger(0);
239+
private final AtomicBoolean isLeader = new AtomicBoolean(false);
240+
private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase serviceImpl =
241+
mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class,
242+
delegatesTo(new OzoneManagerServiceGrpc.OzoneManagerServiceImplBase() {
243+
@Override
244+
public void submitRequest(OMRequest request, StreamObserver<OMResponse> responseObserver) {
245+
requestCount.incrementAndGet();
246+
247+
if (!isLeader.get()) {
248+
failureCount.incrementAndGet();
249+
String errorMsg = "org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException: " +
250+
"OM:" + nodeId + " is not the leader. Suggested leader: om2";
251+
252+
responseObserver.onError(new StatusRuntimeException(
253+
Status.INTERNAL.withDescription(errorMsg)));
254+
} else {
255+
successCount.incrementAndGet();
256+
OMResponse response = OMResponse.newBuilder()
257+
.setCmdType(request.getCmdType())
258+
.setStatus(org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK)
259+
.setMessage("Success from " + nodeId)
260+
.build();
261+
262+
responseObserver.onNext(response);
263+
responseObserver.onCompleted();
264+
}
265+
}
266+
}));
267+
private Server server;
268+
269+
MockOMServer(String nodeId, int port) {
270+
this.nodeId = nodeId;
271+
this.port = port;
272+
}
273+
274+
public void start() throws Exception {
275+
server = ServerBuilder.forPort(port)
276+
.addService(serviceImpl)
277+
.build()
278+
.start();
279+
}
280+
281+
public void stop() throws Exception {
282+
if (server != null) {
283+
server.shutdown();
284+
server.awaitTermination(5, TimeUnit.SECONDS);
285+
}
286+
}
287+
288+
public void setAsLeader(boolean leader) {
289+
this.isLeader.set(leader);
290+
}
291+
292+
public int getRequestCount() {
293+
return requestCount.get();
294+
}
295+
296+
public int getSuccessCount() {
297+
return successCount.get();
298+
}
299+
300+
public int getFailureCount() {
301+
return failureCount.get();
302+
}
303+
}
304+
}
305+

0 commit comments

Comments
 (0)