diff --git a/sdk-core/src/main/java/io/milvus/v2/utils/RpcUtils.java b/sdk-core/src/main/java/io/milvus/v2/utils/RpcUtils.java index 29a6ad5ce..dfcb00ae8 100644 --- a/sdk-core/src/main/java/io/milvus/v2/utils/RpcUtils.java +++ b/sdk-core/src/main/java/io/milvus/v2/utils/RpcUtils.java @@ -33,6 +33,7 @@ public class RpcUtils { protected static final Logger logger = LoggerFactory.getLogger(RpcUtils.class); + private static final String GLOBAL_ROUTING_ERROR = "STREAMING_CODE_REPLICATE_VIOLATION"; private RetryConfig retryConfig = RetryConfig.builder().build(); private Runnable globalRefreshTrigger; @@ -44,6 +45,37 @@ public void setGlobalRefreshTrigger(Runnable trigger) { this.globalRefreshTrigger = trigger; } + private void handleGlobalConnectionError(StatusRuntimeException e) { + if (globalRefreshTrigger == null) { + return; + } + if (e.getStatus().getCode() == io.grpc.Status.UNAVAILABLE.getCode()) { + logger.info("Connection unavailable, triggering global topology refresh: {}", e.getMessage()); + try { + globalRefreshTrigger.run(); + } catch (Exception ex) { + logger.warn("Failed to trigger global topology refresh: {}", ex.getMessage()); + } + } + } + + private boolean handleGlobalRoutingError(Exception e) { + if (globalRefreshTrigger == null) { + return false; + } + String message = e.getMessage(); + if (message != null && message.contains(GLOBAL_ROUTING_ERROR)) { + logger.info("Detected {}, triggering global topology refresh", GLOBAL_ROUTING_ERROR); + try { + globalRefreshTrigger.run(); + } catch (Exception ex) { + logger.warn("Failed to trigger global topology refresh: {}", ex.getMessage()); + } + return true; + } + return false; + } + public void handleResponse(String requestInfo, Status status) { // the server made a change for error code: // for 2.2.x, error code is status.getErrorCode() @@ -119,14 +151,8 @@ public T retry(Callable callable) { throw new MilvusClientException(ErrorCode.RPC_ERROR, msg); // throw rpc error } - // For UNAVAILABLE errors, trigger global topology refresh if configured - if (code == io.grpc.Status.UNAVAILABLE.getCode() && globalRefreshTrigger != null) { - try { - globalRefreshTrigger.run(); - } catch (Exception ex) { - logger.warn("Failed to trigger global topology refresh: {}", ex.getMessage()); - } - } + // trigger topology refresh if connection is unavailable, and continue to retry + handleGlobalConnectionError(e); try { if (timeoutChecker.call() == Boolean.TRUE) { @@ -148,12 +174,13 @@ public T retry(Callable callable) { } catch (Exception ignored) { } - // for server-side returned error, only retry for rate limit - // in new error codes of v2.3, rate limit error value is 8 if (retryConfig.isRetryOnRateLimit() && (e.getLegacyServerCode() == io.milvus.grpc.ErrorCode.RateLimit.getNumber() || e.getServerErrCode() == 8)) { - // cannot be retried + // for server-side returned error, only retry for rate limit + // in new error codes of v2.3, rate limit error value is 8 + } else if (handleGlobalRoutingError(e)) { + // for global cluster routing errors, immediately trigger topology refresh and continue to retry } else { throw e; // exit retry, throw the error } diff --git a/sdk-core/src/test/java/io/milvus/v2/client/globalcluster/GlobalClusterTest.java b/sdk-core/src/test/java/io/milvus/v2/client/globalcluster/GlobalClusterTest.java index 9f7eb470c..c3313cad6 100644 --- a/sdk-core/src/test/java/io/milvus/v2/client/globalcluster/GlobalClusterTest.java +++ b/sdk-core/src/test/java/io/milvus/v2/client/globalcluster/GlobalClusterTest.java @@ -321,11 +321,78 @@ public void testRpcUtils_globalRefreshTrigger() { AtomicReference triggered = new AtomicReference<>(false); rpcUtils.setGlobalRefreshTrigger(() -> triggered.set(true)); - // The trigger is set but only called internally on UNAVAILABLE errors. - // Just verify it can be set without error. + // The trigger is set but only called internally on UNAVAILABLE errors + // or STREAMING_CODE_REPLICATE_VIOLATION. Just verify it can be set without error. assertFalse(triggered.get()); } + @Test + public void testRpcUtils_replicateViolationTriggersRefresh() { + io.milvus.v2.utils.RpcUtils rpcUtils = new io.milvus.v2.utils.RpcUtils(); + // Use default retry config (maxRetryTimes=75) + + AtomicReference refreshTriggered = new AtomicReference<>(false); + rpcUtils.setGlobalRefreshTrigger(() -> refreshTriggered.set(true)); + + final int[] callCount = {0}; + // First call throws STREAMING_CODE_REPLICATE_VIOLATION, second call succeeds + try { + rpcUtils.retry(() -> { + callCount[0]++; + if (callCount[0] == 1) { + throw new io.milvus.v2.exception.MilvusClientException( + io.milvus.v2.exception.ErrorCode.SERVER_ERROR, + "error: STREAMING_CODE_REPLICATE_VIOLATION occurred"); + } + return "success"; + }); + } catch (Exception e) { + // Should not reach here since second call succeeds + fail("Should not throw: " + e.getMessage()); + } + + assertTrue(refreshTriggered.get(), "Global refresh should have been triggered"); + assertEquals(2, callCount[0], "Should have retried after routing error"); + } + + @Test + public void testRpcUtils_normalServerErrorNotTriggersRefresh() { + io.milvus.v2.utils.RpcUtils rpcUtils = new io.milvus.v2.utils.RpcUtils(); + + AtomicReference refreshTriggered = new AtomicReference<>(false); + rpcUtils.setGlobalRefreshTrigger(() -> refreshTriggered.set(true)); + + // A normal server error without the routing violation string should NOT trigger refresh + try { + rpcUtils.retry(() -> { + throw new io.milvus.v2.exception.MilvusClientException( + io.milvus.v2.exception.ErrorCode.SERVER_ERROR, + "some normal error"); + }); + } catch (Exception e) { + // Expected to throw + } + + assertFalse(refreshTriggered.get(), "Global refresh should NOT have been triggered for normal errors"); + } + + @Test + public void testRpcUtils_noRefreshTriggerSet() { + io.milvus.v2.utils.RpcUtils rpcUtils = new io.milvus.v2.utils.RpcUtils(); + // No globalRefreshTrigger set + + // Should not throw NPE even with routing violation error + try { + rpcUtils.retry(() -> { + throw new io.milvus.v2.exception.MilvusClientException( + io.milvus.v2.exception.ErrorCode.SERVER_ERROR, + "error: STREAMING_CODE_REPLICATE_VIOLATION occurred"); + }); + } catch (Exception e) { + // Expected to throw since no refresh trigger means no retry for this error + } + } + // ==================== End-to-end parse + getPrimary test ==================== @Test