Skip to content

Commit 065bb98

Browse files
Address PR review: close response body on retry, clamp jitter, move test
- Close response body before retrying to avoid leaking connections - Clamp jittered delay to maxDelayMs so it remains a hard ceiling - Add comment explaining push-promise sendAsync skips retry - Move RetryingHttpClientTest from ce/ to common/ where the source lives
1 parent ddb9183 commit 065bb98

5 files changed

Lines changed: 127 additions & 36 deletions

File tree

ce/src/main/java/org/thingsboard/client/RetryingHttpClient.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public <T> HttpResponse<T> send(HttpRequest request, HttpResponse.BodyHandler<T>
103103
long delayMs = computeDelay(response, attempt);
104104
log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})",
105105
new Object[]{response.statusCode(), delayMs, attempt, maxRetries});
106+
closeBody(response);
106107
Thread.sleep(delayMs);
107108

108109
response = delegate.send(request, responseBodyHandler);
@@ -121,6 +122,8 @@ public <T> CompletableFuture<HttpResponse<T>> sendAsync(
121122
return sendAsyncWithRetry(request, responseBodyHandler, 1);
122123
}
123124

125+
// Push-promise variant delegates without retry: server-push semantics are incompatible
126+
// with request-level retry, and this overload is not used by the generated API code.
124127
@Override
125128
public <T> CompletableFuture<HttpResponse<T>> sendAsync(
126129
HttpRequest request,
@@ -138,13 +141,27 @@ private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetry(
138141
long delayMs = computeDelay(response, attempt);
139142
log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})",
140143
new Object[]{response.statusCode(), delayMs, attempt, maxRetries});
144+
closeBody(response);
141145
Executor delayedExecutor = CompletableFuture.delayedExecutor(
142146
delayMs, java.util.concurrent.TimeUnit.MILLISECONDS);
143147
return CompletableFuture.supplyAsync(() -> null, delayedExecutor)
144148
.thenCompose(ignored -> sendAsyncWithRetry(request, responseBodyHandler, attempt + 1));
145149
});
146150
}
147151

152+
/**
153+
* Closes the response body if it is {@link AutoCloseable} (e.g. {@code InputStream}-backed responses)
154+
* to free the underlying connection before retrying.
155+
*/
156+
private static <T> void closeBody(HttpResponse<T> response) {
157+
if (response.body() instanceof AutoCloseable c) {
158+
try {
159+
c.close();
160+
} catch (Exception ignored) {
161+
}
162+
}
163+
}
164+
148165
/**
149166
* Computes the delay in milliseconds before the next retry attempt.
150167
* Honours the {@code Retry-After} header when present (integer seconds, non-negative).
@@ -165,9 +182,9 @@ private <T> long computeDelay(HttpResponse<T> response, int attempt) {
165182
// Exponential backoff: initialDelayMs * 2^(attempt-1), capped at maxDelayMs
166183
int shift = Math.min(attempt - 1, 30); // prevent long overflow on large attempt values
167184
long base = Math.min(initialDelayMs * (1L << shift), maxDelayMs);
168-
// ±20% jitter
185+
// ±20% jitter, clamped so maxDelayMs remains a hard ceiling
169186
double jitter = (random.nextDouble() * 0.4) - 0.2; // range [-0.2, 0.2]
170-
return Math.max(0, Math.round(base * (1.0 + jitter)));
187+
return Math.min(maxDelayMs, Math.max(0, Math.round(base * (1.0 + jitter))));
171188
}
172189

173190
// -------------------------------------------------------------------------

common/src/main/java/org/thingsboard/client/RetryingHttpClient.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public <T> HttpResponse<T> send(HttpRequest request, HttpResponse.BodyHandler<T>
103103
long delayMs = computeDelay(response, attempt);
104104
log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})",
105105
new Object[]{response.statusCode(), delayMs, attempt, maxRetries});
106+
closeBody(response);
106107
Thread.sleep(delayMs);
107108

108109
response = delegate.send(request, responseBodyHandler);
@@ -121,6 +122,8 @@ public <T> CompletableFuture<HttpResponse<T>> sendAsync(
121122
return sendAsyncWithRetry(request, responseBodyHandler, 1);
122123
}
123124

125+
// Push-promise variant delegates without retry: server-push semantics are incompatible
126+
// with request-level retry, and this overload is not used by the generated API code.
124127
@Override
125128
public <T> CompletableFuture<HttpResponse<T>> sendAsync(
126129
HttpRequest request,
@@ -138,13 +141,27 @@ private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetry(
138141
long delayMs = computeDelay(response, attempt);
139142
log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})",
140143
new Object[]{response.statusCode(), delayMs, attempt, maxRetries});
144+
closeBody(response);
141145
Executor delayedExecutor = CompletableFuture.delayedExecutor(
142146
delayMs, java.util.concurrent.TimeUnit.MILLISECONDS);
143147
return CompletableFuture.supplyAsync(() -> null, delayedExecutor)
144148
.thenCompose(ignored -> sendAsyncWithRetry(request, responseBodyHandler, attempt + 1));
145149
});
146150
}
147151

152+
/**
153+
* Closes the response body if it is {@link AutoCloseable} (e.g. {@code InputStream}-backed responses)
154+
* to free the underlying connection before retrying.
155+
*/
156+
private static <T> void closeBody(HttpResponse<T> response) {
157+
if (response.body() instanceof AutoCloseable c) {
158+
try {
159+
c.close();
160+
} catch (Exception ignored) {
161+
}
162+
}
163+
}
164+
148165
/**
149166
* Computes the delay in milliseconds before the next retry attempt.
150167
* Honours the {@code Retry-After} header when present (integer seconds, non-negative).
@@ -165,9 +182,9 @@ private <T> long computeDelay(HttpResponse<T> response, int attempt) {
165182
// Exponential backoff: initialDelayMs * 2^(attempt-1), capped at maxDelayMs
166183
int shift = Math.min(attempt - 1, 30); // prevent long overflow on large attempt values
167184
long base = Math.min(initialDelayMs * (1L << shift), maxDelayMs);
168-
// ±20% jitter
185+
// ±20% jitter, clamped so maxDelayMs remains a hard ceiling
169186
double jitter = (random.nextDouble() * 0.4) - 0.2; // range [-0.2, 0.2]
170-
return Math.max(0, Math.round(base * (1.0 + jitter)));
187+
return Math.min(maxDelayMs, Math.max(0, Math.round(base * (1.0 + jitter))));
171188
}
172189

173190
// -------------------------------------------------------------------------

ce/src/test/java/org/thingsboard/client/RetryingHttpClientTest.java renamed to common/src/test/java/org/thingsboard/client/RetryingHttpClientTest.java

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,6 @@
4242
import static org.junit.jupiter.api.Assertions.assertFalse;
4343
import static org.junit.jupiter.api.Assertions.assertTrue;
4444

45-
/**
46-
* Unit tests for {@link RetryingHttpClient}.
47-
*
48-
* <p>These tests are pure unit tests — no running ThingsBoard instance is required.
49-
* All HTTP interactions are handled by a hand-rolled {@link StubHttpClient} stub
50-
* that returns pre-programmed sequences of status codes.
51-
*/
5245
class RetryingHttpClientTest {
5346

5447
// Reusable no-op request to pass into send()
@@ -57,10 +50,6 @@ class RetryingHttpClientTest {
5750
.GET()
5851
.build();
5952

60-
// ---------------------------------------------------------------------------
61-
// Tests
62-
// ---------------------------------------------------------------------------
63-
6453
@Test
6554
void testNoRetryOn200() throws Exception {
6655
StubHttpClient stub = StubHttpClient.ofStatusCodes(List.of(200));
@@ -183,15 +172,33 @@ public <T> CompletableFuture<HttpResponse<T>> sendAsync(
183172
}
184173

185174
// Remaining abstract methods — minimal no-op implementations
186-
@Override public Optional<CookieHandler> cookieHandler() { return Optional.empty(); }
187-
@Override public Optional<Duration> connectTimeout() { return Optional.empty(); }
188-
@Override public Redirect followRedirects() { return Redirect.NORMAL; }
189-
@Override public Optional<ProxySelector> proxy() { return Optional.empty(); }
190-
@Override public SSLContext sslContext() { try { return SSLContext.getDefault(); } catch (Exception e) { throw new RuntimeException(e); } }
191-
@Override public SSLParameters sslParameters() { return new SSLParameters(); }
192-
@Override public Optional<Authenticator> authenticator() { return Optional.empty(); }
193-
@Override public Version version() { return Version.HTTP_1_1; }
194-
@Override public Optional<Executor> executor() { return Optional.empty(); }
175+
@Override
176+
public Optional<CookieHandler> cookieHandler() {return Optional.empty();}
177+
178+
@Override
179+
public Optional<Duration> connectTimeout() {return Optional.empty();}
180+
181+
@Override
182+
public Redirect followRedirects() {return Redirect.NORMAL;}
183+
184+
@Override
185+
public Optional<ProxySelector> proxy() {return Optional.empty();}
186+
187+
@Override
188+
public SSLContext sslContext() {try {return SSLContext.getDefault();} catch (Exception e) {throw new RuntimeException(e);}}
189+
190+
@Override
191+
public SSLParameters sslParameters() {return new SSLParameters();}
192+
193+
@Override
194+
public Optional<Authenticator> authenticator() {return Optional.empty();}
195+
196+
@Override
197+
public Version version() {return Version.HTTP_1_1;}
198+
199+
@Override
200+
public Optional<Executor> executor() {return Optional.empty();}
201+
195202
}
196203

197204
/**
@@ -207,14 +214,30 @@ private static class StubHttpResponse implements HttpResponse<InputStream> {
207214
this.headers = HttpHeaders.of(headers, (k, v) -> true);
208215
}
209216

210-
@Override public int statusCode() { return statusCode; }
211-
@Override public HttpHeaders headers() { return headers; }
212-
@Override public InputStream body() { return InputStream.nullInputStream(); }
213-
@Override public HttpRequest request() { return DUMMY_REQUEST; }
214-
@Override public Optional<HttpResponse<InputStream>> previousResponse() { return Optional.empty(); }
215-
@Override public Optional<SSLSession> sslSession() { return Optional.empty(); }
216-
@Override public URI uri() { return DUMMY_REQUEST.uri(); }
217-
@Override public HttpClient.Version version() { return HttpClient.Version.HTTP_1_1; }
217+
@Override
218+
public int statusCode() {return statusCode;}
219+
220+
@Override
221+
public HttpHeaders headers() {return headers;}
222+
223+
@Override
224+
public InputStream body() {return InputStream.nullInputStream();}
225+
226+
@Override
227+
public HttpRequest request() {return DUMMY_REQUEST;}
228+
229+
@Override
230+
public Optional<HttpResponse<InputStream>> previousResponse() {return Optional.empty();}
231+
232+
@Override
233+
public Optional<SSLSession> sslSession() {return Optional.empty();}
234+
235+
@Override
236+
public URI uri() {return DUMMY_REQUEST.uri();}
237+
238+
@Override
239+
public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;}
240+
218241
}
219242

220243
}

paas/src/main/java/org/thingsboard/client/RetryingHttpClient.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public <T> HttpResponse<T> send(HttpRequest request, HttpResponse.BodyHandler<T>
103103
long delayMs = computeDelay(response, attempt);
104104
log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})",
105105
new Object[]{response.statusCode(), delayMs, attempt, maxRetries});
106+
closeBody(response);
106107
Thread.sleep(delayMs);
107108

108109
response = delegate.send(request, responseBodyHandler);
@@ -121,6 +122,8 @@ public <T> CompletableFuture<HttpResponse<T>> sendAsync(
121122
return sendAsyncWithRetry(request, responseBodyHandler, 1);
122123
}
123124

125+
// Push-promise variant delegates without retry: server-push semantics are incompatible
126+
// with request-level retry, and this overload is not used by the generated API code.
124127
@Override
125128
public <T> CompletableFuture<HttpResponse<T>> sendAsync(
126129
HttpRequest request,
@@ -138,13 +141,27 @@ private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetry(
138141
long delayMs = computeDelay(response, attempt);
139142
log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})",
140143
new Object[]{response.statusCode(), delayMs, attempt, maxRetries});
144+
closeBody(response);
141145
Executor delayedExecutor = CompletableFuture.delayedExecutor(
142146
delayMs, java.util.concurrent.TimeUnit.MILLISECONDS);
143147
return CompletableFuture.supplyAsync(() -> null, delayedExecutor)
144148
.thenCompose(ignored -> sendAsyncWithRetry(request, responseBodyHandler, attempt + 1));
145149
});
146150
}
147151

152+
/**
153+
* Closes the response body if it is {@link AutoCloseable} (e.g. {@code InputStream}-backed responses)
154+
* to free the underlying connection before retrying.
155+
*/
156+
private static <T> void closeBody(HttpResponse<T> response) {
157+
if (response.body() instanceof AutoCloseable c) {
158+
try {
159+
c.close();
160+
} catch (Exception ignored) {
161+
}
162+
}
163+
}
164+
148165
/**
149166
* Computes the delay in milliseconds before the next retry attempt.
150167
* Honours the {@code Retry-After} header when present (integer seconds, non-negative).
@@ -165,9 +182,9 @@ private <T> long computeDelay(HttpResponse<T> response, int attempt) {
165182
// Exponential backoff: initialDelayMs * 2^(attempt-1), capped at maxDelayMs
166183
int shift = Math.min(attempt - 1, 30); // prevent long overflow on large attempt values
167184
long base = Math.min(initialDelayMs * (1L << shift), maxDelayMs);
168-
// ±20% jitter
185+
// ±20% jitter, clamped so maxDelayMs remains a hard ceiling
169186
double jitter = (random.nextDouble() * 0.4) - 0.2; // range [-0.2, 0.2]
170-
return Math.max(0, Math.round(base * (1.0 + jitter)));
187+
return Math.min(maxDelayMs, Math.max(0, Math.round(base * (1.0 + jitter))));
171188
}
172189

173190
// -------------------------------------------------------------------------

pe/src/main/java/org/thingsboard/client/RetryingHttpClient.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public <T> HttpResponse<T> send(HttpRequest request, HttpResponse.BodyHandler<T>
103103
long delayMs = computeDelay(response, attempt);
104104
log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})",
105105
new Object[]{response.statusCode(), delayMs, attempt, maxRetries});
106+
closeBody(response);
106107
Thread.sleep(delayMs);
107108

108109
response = delegate.send(request, responseBodyHandler);
@@ -121,6 +122,8 @@ public <T> CompletableFuture<HttpResponse<T>> sendAsync(
121122
return sendAsyncWithRetry(request, responseBodyHandler, 1);
122123
}
123124

125+
// Push-promise variant delegates without retry: server-push semantics are incompatible
126+
// with request-level retry, and this overload is not used by the generated API code.
124127
@Override
125128
public <T> CompletableFuture<HttpResponse<T>> sendAsync(
126129
HttpRequest request,
@@ -138,13 +141,27 @@ private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetry(
138141
long delayMs = computeDelay(response, attempt);
139142
log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})",
140143
new Object[]{response.statusCode(), delayMs, attempt, maxRetries});
144+
closeBody(response);
141145
Executor delayedExecutor = CompletableFuture.delayedExecutor(
142146
delayMs, java.util.concurrent.TimeUnit.MILLISECONDS);
143147
return CompletableFuture.supplyAsync(() -> null, delayedExecutor)
144148
.thenCompose(ignored -> sendAsyncWithRetry(request, responseBodyHandler, attempt + 1));
145149
});
146150
}
147151

152+
/**
153+
* Closes the response body if it is {@link AutoCloseable} (e.g. {@code InputStream}-backed responses)
154+
* to free the underlying connection before retrying.
155+
*/
156+
private static <T> void closeBody(HttpResponse<T> response) {
157+
if (response.body() instanceof AutoCloseable c) {
158+
try {
159+
c.close();
160+
} catch (Exception ignored) {
161+
}
162+
}
163+
}
164+
148165
/**
149166
* Computes the delay in milliseconds before the next retry attempt.
150167
* Honours the {@code Retry-After} header when present (integer seconds, non-negative).
@@ -165,9 +182,9 @@ private <T> long computeDelay(HttpResponse<T> response, int attempt) {
165182
// Exponential backoff: initialDelayMs * 2^(attempt-1), capped at maxDelayMs
166183
int shift = Math.min(attempt - 1, 30); // prevent long overflow on large attempt values
167184
long base = Math.min(initialDelayMs * (1L << shift), maxDelayMs);
168-
// ±20% jitter
185+
// ±20% jitter, clamped so maxDelayMs remains a hard ceiling
169186
double jitter = (random.nextDouble() * 0.4) - 0.2; // range [-0.2, 0.2]
170-
return Math.max(0, Math.round(base * (1.0 + jitter)));
187+
return Math.min(maxDelayMs, Math.max(0, Math.round(base * (1.0 + jitter))));
171188
}
172189

173190
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)