Skip to content

Commit 216c84e

Browse files
authored
Fix RpcUtils.retry() silently swallowing timeout and max-retry exceptions, returning null (#1878)
* Fix RpcUtils.retry() silently swallowing timeout and max-retry exceptions, returning null Signed-off-by: eye-gu <734164350@qq.com> * fix InterruptedException Signed-off-by: eye-gu <734164350@qq.com> --------- Signed-off-by: eye-gu <734164350@qq.com>
1 parent 3e6cf42 commit 216c84e

2 files changed

Lines changed: 187 additions & 36 deletions

File tree

sdk-core/src/main/java/io/milvus/v2/utils/RpcUtils.java

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.util.concurrent.Callable;
3131
import java.util.concurrent.TimeUnit;
32+
import java.util.function.Function;
3233

3334
public class RpcUtils {
3435

@@ -123,9 +124,8 @@ public <T> T retry(Callable<T> callable) {
123124
// method to check timeout
124125
long begin = System.currentTimeMillis();
125126
long maxRetryTimeoutMs = retryConfig.getMaxRetryTimeoutMs();
126-
Callable<Boolean> timeoutChecker = () -> {
127-
long current = System.currentTimeMillis();
128-
long cost = (current - begin);
127+
Function<Long, Boolean> timeoutChecker = (timePoint) -> {
128+
long cost = (timePoint - begin);
129129
if (maxRetryTimeoutMs > 0 && cost >= maxRetryTimeoutMs) {
130130
return Boolean.TRUE;
131131
}
@@ -154,24 +154,18 @@ public <T> T retry(Callable<T> callable) {
154154
// trigger topology refresh if connection is unavailable, and continue to retry
155155
handleGlobalConnectionError(e);
156156

157-
try {
158-
if (timeoutChecker.call() == Boolean.TRUE) {
159-
String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, reason: %s",
160-
maxRetryTimeoutMs, maxRetryTimes, k, e.getMessage());
161-
logger.warn(msg);
162-
throw new MilvusClientException(ErrorCode.TIMEOUT, msg); // exit retry for timeout
163-
}
164-
} catch (Exception ignored) {
157+
if (timeoutChecker.apply(System.currentTimeMillis()) == Boolean.TRUE) {
158+
String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, reason: %s",
159+
maxRetryTimeoutMs, maxRetryTimes, k, e.getMessage());
160+
logger.warn(msg);
161+
throw new MilvusClientException(ErrorCode.TIMEOUT, msg); // exit retry for timeout
165162
}
166163
} catch (MilvusClientException e) {
167-
try {
168-
if (timeoutChecker.call() == Boolean.TRUE) {
169-
String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, reason: %s",
170-
maxRetryTimeoutMs, maxRetryTimes, k, e.getMessage());
171-
logger.warn(msg);
172-
throw new MilvusClientException(ErrorCode.TIMEOUT, msg); // exit retry for timeout
173-
}
174-
} catch (Exception ignored) {
164+
if (timeoutChecker.apply(System.currentTimeMillis()) == Boolean.TRUE) {
165+
String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, reason: %s",
166+
maxRetryTimeoutMs, maxRetryTimes, k, e.getMessage());
167+
logger.warn(msg);
168+
throw new MilvusClientException(ErrorCode.TIMEOUT, msg); // exit retry for timeout
175169
}
176170

177171
if (retryConfig.isRetryOnRateLimit() &&
@@ -188,27 +182,41 @@ public <T> T retry(Callable<T> callable) {
188182
throw new MilvusClientException(ErrorCode.CLIENT_ERROR, e); // others error treated as client error
189183
}
190184

191-
try {
192-
if (k >= maxRetryTimes) {
193-
// finish retry loop, return the response of the last retry
194-
String msg = String.format("Finish %d retry times, stop retry", maxRetryTimes);
185+
if (k >= maxRetryTimes) {
186+
// finish retry loop, return the response of the last retry
187+
String msg = String.format("Finish %d retry times, stop retry", maxRetryTimes);
188+
logger.warn(msg);
189+
throw new MilvusClientException(ErrorCode.TIMEOUT, msg); // exceed max time, exit retry
190+
} else {
191+
// check if sleep would exceed maxRetryTimeoutMs, if so, directly throw timeout
192+
long futureTimePoint = System.currentTimeMillis() + retryIntervalMs;
193+
if (timeoutChecker.apply(futureTimePoint) == Boolean.TRUE) {
194+
String msg = String.format("Retry timeout: %dms, maxRetry:%d, retries: %d, "
195+
+ "elapsed time + next interval %dms would exceed timeout",
196+
maxRetryTimeoutMs, maxRetryTimes, k, retryIntervalMs);
195197
logger.warn(msg);
196-
throw new MilvusClientException(ErrorCode.TIMEOUT, msg); // exceed max time, exit retry
197-
} else {
198-
// sleep for interval
199-
// print log, follow the pymilvus logic
200-
if (k > 3) {
201-
logger.warn(String.format("Retry(%d) with interval %dms", k, retryIntervalMs));
202-
}
203-
TimeUnit.MILLISECONDS.sleep(retryIntervalMs);
198+
throw new MilvusClientException(ErrorCode.TIMEOUT, msg);
204199
}
205200

206-
// reset the next interval value
207-
retryIntervalMs = retryIntervalMs * retryConfig.getBackOffMultiplier();
208-
if (retryIntervalMs > retryConfig.getMaxBackOffMs()) {
209-
retryIntervalMs = retryConfig.getMaxBackOffMs();
201+
// sleep for interval
202+
// print log, follow the pymilvus logic
203+
if (k > 3) {
204+
logger.warn(String.format("Retry(%d) with interval %dms", k, retryIntervalMs));
210205
}
211-
} catch (Exception ignored) {
206+
try {
207+
TimeUnit.MILLISECONDS.sleep(retryIntervalMs);
208+
} catch (InterruptedException e) {
209+
Thread.currentThread().interrupt();
210+
String msg = String.format("Retry sleep interrupted, aborting retry after %d attempts", k);
211+
logger.warn(msg);
212+
throw new MilvusClientException(ErrorCode.CLIENT_ERROR, msg);
213+
}
214+
}
215+
216+
// reset the next interval value
217+
retryIntervalMs = retryIntervalMs * retryConfig.getBackOffMultiplier();
218+
if (retryIntervalMs > retryConfig.getMaxBackOffMs()) {
219+
retryIntervalMs = retryConfig.getMaxBackOffMs();
212220
}
213221
}
214222

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.v2.utils;
21+
22+
import io.grpc.StatusRuntimeException;
23+
import io.milvus.v2.client.RetryConfig;
24+
import io.milvus.v2.exception.ErrorCode;
25+
import io.milvus.v2.exception.MilvusClientException;
26+
import org.junit.jupiter.api.Assertions;
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
31+
public class RpcUtilsTest {
32+
33+
@Test
34+
void testEarlyExitWhenPredictedBackoffExceedsMaxRetryTimeoutMs() {
35+
RpcUtils rpcUtils = new RpcUtils();
36+
long maxRetryTimeoutMs = 5000;
37+
rpcUtils.retryConfig(RetryConfig.builder()
38+
.maxRetryTimes(10)
39+
.maxRetryTimeoutMs(maxRetryTimeoutMs)
40+
.initialBackOffMs(10)
41+
.maxBackOffMs(3000)
42+
.backOffMultiplier(3)
43+
.build());
44+
45+
long start = System.currentTimeMillis();
46+
47+
MilvusClientException thrown = Assertions.assertThrows(MilvusClientException.class, () -> {
48+
rpcUtils.retry(() -> {
49+
throw new StatusRuntimeException(
50+
io.grpc.Status.UNAVAILABLE.withDescription("server unavailable"));
51+
});
52+
});
53+
54+
long elapsed = System.currentTimeMillis() - start;
55+
56+
Assertions.assertEquals(ErrorCode.TIMEOUT, thrown.getErrorCode(),
57+
"Should fail with TIMEOUT error code");
58+
// Backoff sequence (initial=10ms, multiplier=3, max=3000ms):
59+
// k=1 @~0ms → sleep 10ms
60+
// k=2 @~10ms → sleep 30ms
61+
// k=3 @~40ms → sleep 90ms
62+
// k=4 @~130ms → sleep 270ms
63+
// k=5 @~400ms → sleep 810ms
64+
// k=6 @~1210ms → sleep 2430ms
65+
// k=7 @~3640ms → next backoff 3000ms, 3640+3000=6640 > 5000ms → TIMEOUT
66+
Assertions.assertTrue(elapsed <= 4000,
67+
"Retry should respect maxRetryTimeoutMs(5000ms), but took " + elapsed + "ms");
68+
}
69+
70+
@Test
71+
void testMaxRetryTimes() {
72+
RpcUtils rpcUtils = new RpcUtils();
73+
int maxRetryTimes = 3;
74+
rpcUtils.retryConfig(RetryConfig.builder()
75+
.maxRetryTimes(maxRetryTimes)
76+
.maxRetryTimeoutMs(60000) // large timeout so retry times is the limiting factor
77+
.initialBackOffMs(10)
78+
.maxBackOffMs(100)
79+
.backOffMultiplier(2)
80+
.build());
81+
82+
AtomicInteger callCount = new AtomicInteger(0);
83+
84+
MilvusClientException thrown = Assertions.assertThrows(MilvusClientException.class, () -> {
85+
rpcUtils.retry(() -> {
86+
callCount.incrementAndGet();
87+
throw new StatusRuntimeException(
88+
io.grpc.Status.UNAVAILABLE.withDescription("server unavailable"));
89+
});
90+
});
91+
92+
Assertions.assertEquals(ErrorCode.TIMEOUT, thrown.getErrorCode(),
93+
"Should fail with TIMEOUT error code");
94+
Assertions.assertEquals(maxRetryTimes, callCount.get(),
95+
"Should have retried exactly maxRetryTimes(" + maxRetryTimes + ") times, but got " + callCount.get());
96+
}
97+
98+
@Test
99+
void testTimeoutAfterSlowCallExceedsMaxRetryTimeoutMs() {
100+
RpcUtils rpcUtils = new RpcUtils();
101+
long maxRetryTimeoutMs = 2000;
102+
rpcUtils.retryConfig(RetryConfig.builder()
103+
.maxRetryTimes(10)
104+
.maxRetryTimeoutMs(maxRetryTimeoutMs)
105+
.initialBackOffMs(50)
106+
.maxBackOffMs(500)
107+
.backOffMultiplier(2)
108+
.build());
109+
110+
AtomicInteger callCount = new AtomicInteger(0);
111+
112+
long start = System.currentTimeMillis();
113+
114+
MilvusClientException thrown = Assertions.assertThrows(MilvusClientException.class, () -> {
115+
rpcUtils.retry(() -> {
116+
callCount.incrementAndGet();
117+
// Simulate a slow RPC call that takes 500ms each time
118+
Thread.sleep(500);
119+
throw new StatusRuntimeException(
120+
io.grpc.Status.UNAVAILABLE.withDescription("server unavailable"));
121+
});
122+
});
123+
124+
long elapsed = System.currentTimeMillis() - start;
125+
126+
Assertions.assertEquals(ErrorCode.TIMEOUT, thrown.getErrorCode(),
127+
"Should fail with TIMEOUT error code when slow calls accumulate beyond maxRetryTimeoutMs");
128+
// Timeline (slow call sleep=500ms, backoff: initial=50ms, multiplier=2, max=500ms):
129+
// k=1 @~0ms → sleep 500ms, call ends @500ms
130+
// backoff 50ms, total ~550ms < 2000ms → continue
131+
// k=2 @~550ms → sleep 500ms, call ends @~1050ms
132+
// backoff 100ms, total ~1150ms < 2000ms → continue
133+
// k=3 @~1150ms → sleep 500ms, call ends @~1650ms
134+
// backoff 200ms, total ~1850ms < 2000ms → continue
135+
// k=4 @~1850ms → sleep 500ms, call ends @~2350ms
136+
// elapsed(2350ms) >= maxRetryTimeoutMs(2000ms)
137+
Assertions.assertEquals(4, callCount.get(), "Should have 4 times, but got " + callCount.get());
138+
Assertions.assertTrue(elapsed > maxRetryTimeoutMs,
139+
"Elapsed time should greater than maxRetryTimeoutMs, but was " + elapsed + "ms");
140+
Assertions.assertTrue(elapsed < maxRetryTimeoutMs + 1000,
141+
"Should not exceed maxRetryTimeoutMs by too much, elapsed was " + elapsed + "ms");
142+
}
143+
}

0 commit comments

Comments
 (0)