Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 38 additions & 11 deletions sdk-core/src/main/java/io/milvus/v2/utils/RpcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
public class RpcUtils {

protected static final Logger logger = LoggerFactory.getLogger(RpcUtils.class);
private static final String GLOBAL_ROUTING_ERROR = "STREAMING_CODE_REPLICATE_VIOLATION";
private RetryConfig retryConfig = RetryConfig.builder().build();
private Runnable globalRefreshTrigger;

Expand All @@ -44,6 +45,37 @@ public void setGlobalRefreshTrigger(Runnable trigger) {
this.globalRefreshTrigger = trigger;
}

private void handleGlobalConnectionError(StatusRuntimeException e) {
if (globalRefreshTrigger == null) {
return;
}
if (e.getStatus().getCode() == io.grpc.Status.UNAVAILABLE.getCode()) {
logger.info("Connection unavailable, triggering global topology refresh: {}", e.getMessage());
try {
globalRefreshTrigger.run();
} catch (Exception ex) {
logger.warn("Failed to trigger global topology refresh: {}", ex.getMessage());
}
}
}

private boolean handleGlobalRoutingError(Exception e) {
if (globalRefreshTrigger == null) {
return false;
}
String message = e.getMessage();
if (message != null && message.contains(GLOBAL_ROUTING_ERROR)) {
logger.info("Detected {}, triggering global topology refresh", GLOBAL_ROUTING_ERROR);
try {
globalRefreshTrigger.run();
} catch (Exception ex) {
logger.warn("Failed to trigger global topology refresh: {}", ex.getMessage());
}
return true;
}
return false;
}

public void handleResponse(String requestInfo, Status status) {
// the server made a change for error code:
// for 2.2.x, error code is status.getErrorCode()
Expand Down Expand Up @@ -119,14 +151,8 @@ public <T> T retry(Callable<T> callable) {
throw new MilvusClientException(ErrorCode.RPC_ERROR, msg); // throw rpc error
}

// For UNAVAILABLE errors, trigger global topology refresh if configured
if (code == io.grpc.Status.UNAVAILABLE.getCode() && globalRefreshTrigger != null) {
try {
globalRefreshTrigger.run();
} catch (Exception ex) {
logger.warn("Failed to trigger global topology refresh: {}", ex.getMessage());
}
}
// trigger topology refresh if connection is unavailable, and continue to retry
handleGlobalConnectionError(e);

try {
if (timeoutChecker.call() == Boolean.TRUE) {
Expand All @@ -148,12 +174,13 @@ public <T> T retry(Callable<T> callable) {
} catch (Exception ignored) {
}

// for server-side returned error, only retry for rate limit
// in new error codes of v2.3, rate limit error value is 8
if (retryConfig.isRetryOnRateLimit() &&
(e.getLegacyServerCode() == io.milvus.grpc.ErrorCode.RateLimit.getNumber() ||
e.getServerErrCode() == 8)) {
// cannot be retried
// for server-side returned error, only retry for rate limit
// in new error codes of v2.3, rate limit error value is 8
} else if (handleGlobalRoutingError(e)) {
// for global cluster routing errors, immediately trigger topology refresh and continue to retry
} else {
throw e; // exit retry, throw the error
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,78 @@ public void testRpcUtils_globalRefreshTrigger() {
AtomicReference<Boolean> triggered = new AtomicReference<>(false);
rpcUtils.setGlobalRefreshTrigger(() -> triggered.set(true));

// The trigger is set but only called internally on UNAVAILABLE errors.
// Just verify it can be set without error.
// The trigger is set but only called internally on UNAVAILABLE errors
// or STREAMING_CODE_REPLICATE_VIOLATION. Just verify it can be set without error.
assertFalse(triggered.get());
}

@Test
public void testRpcUtils_replicateViolationTriggersRefresh() {
io.milvus.v2.utils.RpcUtils rpcUtils = new io.milvus.v2.utils.RpcUtils();
// Use default retry config (maxRetryTimes=75)

AtomicReference<Boolean> refreshTriggered = new AtomicReference<>(false);
rpcUtils.setGlobalRefreshTrigger(() -> refreshTriggered.set(true));

final int[] callCount = {0};
// First call throws STREAMING_CODE_REPLICATE_VIOLATION, second call succeeds
try {
rpcUtils.retry(() -> {
callCount[0]++;
if (callCount[0] == 1) {
throw new io.milvus.v2.exception.MilvusClientException(
io.milvus.v2.exception.ErrorCode.SERVER_ERROR,
"error: STREAMING_CODE_REPLICATE_VIOLATION occurred");
}
return "success";
});
} catch (Exception e) {
// Should not reach here since second call succeeds
fail("Should not throw: " + e.getMessage());
}

assertTrue(refreshTriggered.get(), "Global refresh should have been triggered");
assertEquals(2, callCount[0], "Should have retried after routing error");
}

@Test
public void testRpcUtils_normalServerErrorNotTriggersRefresh() {
io.milvus.v2.utils.RpcUtils rpcUtils = new io.milvus.v2.utils.RpcUtils();

AtomicReference<Boolean> refreshTriggered = new AtomicReference<>(false);
rpcUtils.setGlobalRefreshTrigger(() -> refreshTriggered.set(true));

// A normal server error without the routing violation string should NOT trigger refresh
try {
rpcUtils.retry(() -> {
throw new io.milvus.v2.exception.MilvusClientException(
io.milvus.v2.exception.ErrorCode.SERVER_ERROR,
"some normal error");
});
} catch (Exception e) {
// Expected to throw
}

assertFalse(refreshTriggered.get(), "Global refresh should NOT have been triggered for normal errors");
}

@Test
public void testRpcUtils_noRefreshTriggerSet() {
io.milvus.v2.utils.RpcUtils rpcUtils = new io.milvus.v2.utils.RpcUtils();
// No globalRefreshTrigger set

// Should not throw NPE even with routing violation error
try {
rpcUtils.retry(() -> {
throw new io.milvus.v2.exception.MilvusClientException(
io.milvus.v2.exception.ErrorCode.SERVER_ERROR,
"error: STREAMING_CODE_REPLICATE_VIOLATION occurred");
});
} catch (Exception e) {
// Expected to throw since no refresh trigger means no retry for this error
}
}

// ==================== End-to-end parse + getPrimary test ====================

@Test
Expand Down
Loading