Skip to content

Commit 0bbdb0f

Browse files
committed
Retry on REPLICATE_VIOLATION for global cluster region switch
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent c2c33bc commit 0bbdb0f

2 files changed

Lines changed: 107 additions & 13 deletions

File tree

sdk-core/src/main/java/io/milvus/v2/utils/RpcUtils.java

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
public class RpcUtils {
3434

3535
protected static final Logger logger = LoggerFactory.getLogger(RpcUtils.class);
36+
private static final String GLOBAL_ROUTING_ERROR = "STREAMING_CODE_REPLICATE_VIOLATION";
3637
private RetryConfig retryConfig = RetryConfig.builder().build();
3738
private Runnable globalRefreshTrigger;
3839

@@ -44,6 +45,37 @@ public void setGlobalRefreshTrigger(Runnable trigger) {
4445
this.globalRefreshTrigger = trigger;
4546
}
4647

48+
private void handleGlobalConnectionError(StatusRuntimeException e) {
49+
if (globalRefreshTrigger == null) {
50+
return;
51+
}
52+
if (e.getStatus().getCode() == io.grpc.Status.UNAVAILABLE.getCode()) {
53+
logger.info("Connection unavailable, triggering global topology refresh: {}", e.getMessage());
54+
try {
55+
globalRefreshTrigger.run();
56+
} catch (Exception ex) {
57+
logger.warn("Failed to trigger global topology refresh: {}", ex.getMessage());
58+
}
59+
}
60+
}
61+
62+
private boolean handleGlobalRoutingError(Exception e) {
63+
if (globalRefreshTrigger == null) {
64+
return false;
65+
}
66+
String message = e.getMessage();
67+
if (message != null && message.contains(GLOBAL_ROUTING_ERROR)) {
68+
logger.info("Detected {}, triggering global topology refresh", GLOBAL_ROUTING_ERROR);
69+
try {
70+
globalRefreshTrigger.run();
71+
} catch (Exception ex) {
72+
logger.warn("Failed to trigger global topology refresh: {}", ex.getMessage());
73+
}
74+
return true;
75+
}
76+
return false;
77+
}
78+
4779
public void handleResponse(String requestInfo, Status status) {
4880
// the server made a change for error code:
4981
// for 2.2.x, error code is status.getErrorCode()
@@ -119,14 +151,8 @@ public <T> T retry(Callable<T> callable) {
119151
throw new MilvusClientException(ErrorCode.RPC_ERROR, msg); // throw rpc error
120152
}
121153

122-
// For UNAVAILABLE errors, trigger global topology refresh if configured
123-
if (code == io.grpc.Status.UNAVAILABLE.getCode() && globalRefreshTrigger != null) {
124-
try {
125-
globalRefreshTrigger.run();
126-
} catch (Exception ex) {
127-
logger.warn("Failed to trigger global topology refresh: {}", ex.getMessage());
128-
}
129-
}
154+
// trigger topology refresh if connection is unavailable, and continue to retry
155+
handleGlobalConnectionError(e);
130156

131157
try {
132158
if (timeoutChecker.call() == Boolean.TRUE) {
@@ -148,12 +174,13 @@ public <T> T retry(Callable<T> callable) {
148174
} catch (Exception ignored) {
149175
}
150176

151-
// for server-side returned error, only retry for rate limit
152-
// in new error codes of v2.3, rate limit error value is 8
153177
if (retryConfig.isRetryOnRateLimit() &&
154178
(e.getLegacyServerCode() == io.milvus.grpc.ErrorCode.RateLimit.getNumber() ||
155179
e.getServerErrCode() == 8)) {
156-
// cannot be retried
180+
// for server-side returned error, only retry for rate limit
181+
// in new error codes of v2.3, rate limit error value is 8
182+
} else if (handleGlobalRoutingError(e)) {
183+
// for global cluster routing errors, immediately trigger topology refresh and continue to retry
157184
} else {
158185
throw e; // exit retry, throw the error
159186
}

sdk-core/src/test/java/io/milvus/v2/client/globalcluster/GlobalClusterTest.java

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,11 +321,78 @@ public void testRpcUtils_globalRefreshTrigger() {
321321
AtomicReference<Boolean> triggered = new AtomicReference<>(false);
322322
rpcUtils.setGlobalRefreshTrigger(() -> triggered.set(true));
323323

324-
// The trigger is set but only called internally on UNAVAILABLE errors.
325-
// Just verify it can be set without error.
324+
// The trigger is set but only called internally on UNAVAILABLE errors
325+
// or STREAMING_CODE_REPLICATE_VIOLATION. Just verify it can be set without error.
326326
assertFalse(triggered.get());
327327
}
328328

329+
@Test
330+
public void testRpcUtils_replicateViolationTriggersRefresh() {
331+
io.milvus.v2.utils.RpcUtils rpcUtils = new io.milvus.v2.utils.RpcUtils();
332+
// Use default retry config (maxRetryTimes=75)
333+
334+
AtomicReference<Boolean> refreshTriggered = new AtomicReference<>(false);
335+
rpcUtils.setGlobalRefreshTrigger(() -> refreshTriggered.set(true));
336+
337+
final int[] callCount = {0};
338+
// First call throws STREAMING_CODE_REPLICATE_VIOLATION, second call succeeds
339+
try {
340+
rpcUtils.retry(() -> {
341+
callCount[0]++;
342+
if (callCount[0] == 1) {
343+
throw new io.milvus.v2.exception.MilvusClientException(
344+
io.milvus.v2.exception.ErrorCode.SERVER_ERROR,
345+
"error: STREAMING_CODE_REPLICATE_VIOLATION occurred");
346+
}
347+
return "success";
348+
});
349+
} catch (Exception e) {
350+
// Should not reach here since second call succeeds
351+
fail("Should not throw: " + e.getMessage());
352+
}
353+
354+
assertTrue(refreshTriggered.get(), "Global refresh should have been triggered");
355+
assertEquals(2, callCount[0], "Should have retried after routing error");
356+
}
357+
358+
@Test
359+
public void testRpcUtils_normalServerErrorNotTriggersRefresh() {
360+
io.milvus.v2.utils.RpcUtils rpcUtils = new io.milvus.v2.utils.RpcUtils();
361+
362+
AtomicReference<Boolean> refreshTriggered = new AtomicReference<>(false);
363+
rpcUtils.setGlobalRefreshTrigger(() -> refreshTriggered.set(true));
364+
365+
// A normal server error without the routing violation string should NOT trigger refresh
366+
try {
367+
rpcUtils.retry(() -> {
368+
throw new io.milvus.v2.exception.MilvusClientException(
369+
io.milvus.v2.exception.ErrorCode.SERVER_ERROR,
370+
"some normal error");
371+
});
372+
} catch (Exception e) {
373+
// Expected to throw
374+
}
375+
376+
assertFalse(refreshTriggered.get(), "Global refresh should NOT have been triggered for normal errors");
377+
}
378+
379+
@Test
380+
public void testRpcUtils_noRefreshTriggerSet() {
381+
io.milvus.v2.utils.RpcUtils rpcUtils = new io.milvus.v2.utils.RpcUtils();
382+
// No globalRefreshTrigger set
383+
384+
// Should not throw NPE even with routing violation error
385+
try {
386+
rpcUtils.retry(() -> {
387+
throw new io.milvus.v2.exception.MilvusClientException(
388+
io.milvus.v2.exception.ErrorCode.SERVER_ERROR,
389+
"error: STREAMING_CODE_REPLICATE_VIOLATION occurred");
390+
});
391+
} catch (Exception e) {
392+
// Expected to throw since no refresh trigger means no retry for this error
393+
}
394+
}
395+
329396
// ==================== End-to-end parse + getPrimary test ====================
330397

331398
@Test

0 commit comments

Comments
 (0)