diff --git a/.gitignore b/.gitignore index 9c80aec7..c29aac2e 100644 --- a/.gitignore +++ b/.gitignore @@ -23,4 +23,5 @@ pom.xml.versionsBackup */.run/** .run/** .run -.claude/ +.claude +.planning diff --git a/ce/src/main/java/org/thingsboard/client/RetryingHttpClient.java b/ce/src/main/java/org/thingsboard/client/RetryingHttpClient.java new file mode 100644 index 00000000..20e35c9a --- /dev/null +++ b/ce/src/main/java/org/thingsboard/client/RetryingHttpClient.java @@ -0,0 +1,239 @@ +/** + * Copyright © 2026-2026 ThingsBoard, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.client; + +import lombok.extern.java.Log; + +import java.io.IOException; +import java.net.Authenticator; +import java.net.CookieHandler; +import java.net.ProxySelector; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.logging.Level; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; + +/** + * An {@link HttpClient} wrapper that automatically retries requests that receive a retriable + * HTTP status code (429 Too Many Requests by default) using exponential backoff with jitter. + * + *

The {@code Retry-After} response header is honoured when present: if it contains a + * non-negative integer, that number of seconds is used as the retry delay (capped to + * {@code maxDelayMs}). + * + *

After exhausting all retry attempts the final (still-retriable) response is returned to the + * caller so that the upstream code (e.g. {@code ThingsboardApi}) can throw an + * {@link ApiException} with the correct HTTP status code. + * + *

Obtain an instance via the static factory: + *

{@code
+ * RetryingHttpClient client = RetryingHttpClient.wrap(HttpClient.newHttpClient(), 3, 1000L, 30_000L);
+ * }
+ */ +@Log +public class RetryingHttpClient extends HttpClient { + + private final HttpClient delegate; + private final int maxRetries; + private final long initialDelayMs; + private final long maxDelayMs; + private final Random random = new Random(); + + private RetryingHttpClient(HttpClient delegate, int maxRetries, long initialDelayMs, long maxDelayMs) { + this.delegate = delegate; + this.maxRetries = maxRetries; + this.initialDelayMs = initialDelayMs; + this.maxDelayMs = maxDelayMs; + } + + /** + * Creates a new {@code RetryingHttpClient} that wraps the given delegate. + * + * @param delegate the underlying {@link HttpClient} to delegate to + * @param maxRetries maximum number of retry attempts (not counting the initial request) + * @param initialDelayMs initial backoff delay in milliseconds + * @param maxDelayMs maximum backoff delay in milliseconds + * @return a new {@code RetryingHttpClient} + */ + public static RetryingHttpClient wrap(HttpClient delegate, int maxRetries, long initialDelayMs, long maxDelayMs) { + return new RetryingHttpClient(delegate, maxRetries, initialDelayMs, maxDelayMs); + } + + /** + * Returns {@code true} if the given status code should trigger a retry. + * Override in subclasses to add additional retriable status codes. + * + * @param statusCode the HTTP response status code + * @return {@code true} for retriable status codes (429 by default) + */ + protected boolean isRetriable(int statusCode) { + return statusCode == 429; + } + + @Override + public HttpResponse send(HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) + throws IOException, InterruptedException { + HttpResponse response = delegate.send(request, responseBodyHandler); + + if (!isRetriable(response.statusCode())) { + return response; + } + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + long delayMs = computeDelay(response, attempt); + log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})", + new Object[]{response.statusCode(), delayMs, attempt, maxRetries}); + closeBody(response); + Thread.sleep(delayMs); + + response = delegate.send(request, responseBodyHandler); + if (!isRetriable(response.statusCode())) { + return response; + } + } + + // Exhausted retries — return the last response so the caller can throw ApiException + return response; + } + + @Override + public CompletableFuture> sendAsync( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) { + return sendAsyncWithRetry(request, responseBodyHandler, 1); + } + + // Push-promise variant delegates without retry: server-push semantics are incompatible + // with request-level retry, and this overload is not used by the generated API code. + @Override + public CompletableFuture> sendAsync( + HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler, + HttpResponse.PushPromiseHandler pushPromiseHandler) { + return delegate.sendAsync(request, responseBodyHandler, pushPromiseHandler); + } + + private CompletableFuture> sendAsyncWithRetry( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler, int attempt) { + return delegate.sendAsync(request, responseBodyHandler).thenCompose(response -> { + if (!isRetriable(response.statusCode()) || attempt > maxRetries) { + return CompletableFuture.completedFuture(response); + } + long delayMs = computeDelay(response, attempt); + log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})", + new Object[]{response.statusCode(), delayMs, attempt, maxRetries}); + closeBody(response); + Executor delayedExecutor = CompletableFuture.delayedExecutor( + delayMs, java.util.concurrent.TimeUnit.MILLISECONDS); + return CompletableFuture.supplyAsync(() -> null, delayedExecutor) + .thenCompose(ignored -> sendAsyncWithRetry(request, responseBodyHandler, attempt + 1)); + }); + } + + /** + * Closes the response body if it is {@link AutoCloseable} (e.g. {@code InputStream}-backed responses) + * to free the underlying connection before retrying. + */ + private static void closeBody(HttpResponse response) { + if (response.body() instanceof AutoCloseable c) { + try { + c.close(); + } catch (Exception ignored) { + } + } + } + + /** + * Computes the delay in milliseconds before the next retry attempt. + * Honours the {@code Retry-After} header when present (integer seconds, non-negative). + * Falls back to exponential backoff with ±20% jitter. + */ + private long computeDelay(HttpResponse response, int attempt) { + Optional retryAfter = response.headers().firstValue("Retry-After"); + if (retryAfter.isPresent()) { + try { + long seconds = Long.parseLong(retryAfter.get().trim()); + if (seconds >= 0) { + return Math.min(seconds * 1000L, maxDelayMs); + } + } catch (NumberFormatException ignored) { + // Not an integer — fall through to exponential backoff + } + } + // Exponential backoff: initialDelayMs * 2^(attempt-1), capped at maxDelayMs + int shift = Math.min(attempt - 1, 30); // prevent long overflow on large attempt values + long base = Math.min(initialDelayMs * (1L << shift), maxDelayMs); + // ±20% jitter, clamped so maxDelayMs remains a hard ceiling + double jitter = (random.nextDouble() * 0.4) - 0.2; // range [-0.2, 0.2] + return Math.min(maxDelayMs, Math.max(0, Math.round(base * (1.0 + jitter)))); + } + + // ------------------------------------------------------------------------- + // Delegation of all remaining abstract HttpClient methods + // ------------------------------------------------------------------------- + + @Override + public Optional cookieHandler() { + return delegate.cookieHandler(); + } + + @Override + public Optional connectTimeout() { + return delegate.connectTimeout(); + } + + @Override + public Redirect followRedirects() { + return delegate.followRedirects(); + } + + @Override + public Optional proxy() { + return delegate.proxy(); + } + + @Override + public SSLContext sslContext() { + return delegate.sslContext(); + } + + @Override + public SSLParameters sslParameters() { + return delegate.sslParameters(); + } + + @Override + public Optional authenticator() { + return delegate.authenticator(); + } + + @Override + public Version version() { + return delegate.version(); + } + + @Override + public Optional executor() { + return delegate.executor(); + } + +} diff --git a/ce/src/main/java/org/thingsboard/client/ThingsboardClient.java b/ce/src/main/java/org/thingsboard/client/ThingsboardClient.java index a301289b..3901a580 100644 --- a/ce/src/main/java/org/thingsboard/client/ThingsboardClient.java +++ b/ce/src/main/java/org/thingsboard/client/ThingsboardClient.java @@ -41,6 +41,7 @@ *
  • Automatic token refresh before expiry
  • *
  • Automatic re-login when the refresh token expires
  • *
  • Client-server clock skew compensation
  • + *
  • Automatic retry on HTTP 429 (Too Many Requests) with exponential backoff
  • * * *
    {@code
    @@ -56,6 +57,22 @@
      *         .apiKey("your-api-key")
      *         .build();
      *
    + * // Tuning rate-limit retry behaviour (retry is enabled by default)
    + * ThingsboardClient client = ThingsboardClient.builder()
    + *         .url("http://localhost:8080")
    + *         .credentials("tenant@thingsboard.org", "password")
    + *         .maxRetries(5)
    + *         .initialRetryDelayMs(500L)
    + *         .maxRetryDelayMs(60_000L)
    + *         .build();
    + *
    + * // Disable rate-limit retry
    + * ThingsboardClient client = ThingsboardClient.builder()
    + *         .url("http://localhost:8080")
    + *         .credentials("tenant@thingsboard.org", "password")
    + *         .retryOnRateLimit(false)
    + *         .build();
    + *
      * // All generated API methods are available directly
      * Device device = client.getDeviceById(deviceId);
      * }
    @@ -96,6 +113,10 @@ public static class Builder { private String username; private String password; private String apiKey; + private boolean retryOnRateLimit = true; + private int maxRetries = 3; + private long initialRetryDelayMs = 1000L; + private long maxRetryDelayMs = 30_000L; private Builder() {} @@ -115,16 +136,54 @@ public Builder apiKey(String apiKey) { return this; } + /** + * Enables or disables automatic retry on rate-limit (HTTP 429) responses. + * Enabled by default. + */ + public Builder retryOnRateLimit(boolean retryOnRateLimit) { + this.retryOnRateLimit = retryOnRateLimit; + return this; + } + + /** + * Maximum number of retry attempts after an HTTP 429 response. + * Default: 3. + */ + public Builder maxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + /** + * Initial backoff delay in milliseconds for the first retry. + * Subsequent retries use exponential backoff with ±20% jitter. + * Default: 1000 ms. + */ + public Builder initialRetryDelayMs(long initialRetryDelayMs) { + this.initialRetryDelayMs = initialRetryDelayMs; + return this; + } + + /** + * Maximum backoff delay in milliseconds. The computed delay is capped at this value. + * Default: 30 000 ms. + */ + public Builder maxRetryDelayMs(long maxRetryDelayMs) { + this.maxRetryDelayMs = maxRetryDelayMs; + return this; + } + public ThingsboardClient build() throws ApiException { if (url == null) { throw new IllegalArgumentException("url is required"); } - if (apiKey != null) { - return new ThingsboardClient(new AuthManager(url, AuthType.API_KEY, apiKey)); - } - AuthManager auth = new AuthManager(url, AuthType.JWT, null); + ApiClient apiClient = retryOnRateLimit + ? new RetryableApiClient(maxRetries, initialRetryDelayMs, maxRetryDelayMs) + : new ApiClient(); + AuthType authType = apiKey != null ? AuthType.API_KEY : AuthType.JWT; + AuthManager auth = new AuthManager(url, authType, apiKey, apiClient); ThingsboardClient client = new ThingsboardClient(auth); - if (username != null) { + if (authType == AuthType.JWT && username != null) { client.login(username, password); } return client; @@ -132,6 +191,27 @@ public ThingsboardClient build() throws ApiException { } + /** + * ApiClient subclass that wraps every built HttpClient with retry-on-429 logic. + */ + private static class RetryableApiClient extends ApiClient { + + private final int maxRetries; + private final long initialDelayMs; + private final long maxDelayMs; + + RetryableApiClient(int maxRetries, long initialDelayMs, long maxDelayMs) { + this.maxRetries = maxRetries; + this.initialDelayMs = initialDelayMs; + this.maxDelayMs = maxDelayMs; + } + + @Override + public HttpClient getHttpClient() { + return RetryingHttpClient.wrap(super.getHttpClient(), maxRetries, initialDelayMs, maxDelayMs); + } + } + /** * Authenticates with ThingsBoard using username and password. * The JWT token is automatically applied to all subsequent API calls. @@ -178,9 +258,9 @@ private record TokenInfo(String token, String refreshToken, long tokenExpTs, private volatile String password; private volatile boolean refreshing; - AuthManager(String url, AuthType authType, String apiKey) { + AuthManager(String url, AuthType authType, String apiKey, ApiClient apiClient) { this.authType = authType; - this.apiClient = new ApiClient(); + this.apiClient = apiClient; apiClient.updateBaseUri(url); this.baseUrl = apiClient.getBaseUri(); this.httpClient = apiClient.getHttpClient(); diff --git a/common/src/main/java/org/thingsboard/client/RetryingHttpClient.java b/common/src/main/java/org/thingsboard/client/RetryingHttpClient.java new file mode 100644 index 00000000..20e35c9a --- /dev/null +++ b/common/src/main/java/org/thingsboard/client/RetryingHttpClient.java @@ -0,0 +1,239 @@ +/** + * Copyright © 2026-2026 ThingsBoard, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.client; + +import lombok.extern.java.Log; + +import java.io.IOException; +import java.net.Authenticator; +import java.net.CookieHandler; +import java.net.ProxySelector; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.logging.Level; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; + +/** + * An {@link HttpClient} wrapper that automatically retries requests that receive a retriable + * HTTP status code (429 Too Many Requests by default) using exponential backoff with jitter. + * + *

    The {@code Retry-After} response header is honoured when present: if it contains a + * non-negative integer, that number of seconds is used as the retry delay (capped to + * {@code maxDelayMs}). + * + *

    After exhausting all retry attempts the final (still-retriable) response is returned to the + * caller so that the upstream code (e.g. {@code ThingsboardApi}) can throw an + * {@link ApiException} with the correct HTTP status code. + * + *

    Obtain an instance via the static factory: + *

    {@code
    + * RetryingHttpClient client = RetryingHttpClient.wrap(HttpClient.newHttpClient(), 3, 1000L, 30_000L);
    + * }
    + */ +@Log +public class RetryingHttpClient extends HttpClient { + + private final HttpClient delegate; + private final int maxRetries; + private final long initialDelayMs; + private final long maxDelayMs; + private final Random random = new Random(); + + private RetryingHttpClient(HttpClient delegate, int maxRetries, long initialDelayMs, long maxDelayMs) { + this.delegate = delegate; + this.maxRetries = maxRetries; + this.initialDelayMs = initialDelayMs; + this.maxDelayMs = maxDelayMs; + } + + /** + * Creates a new {@code RetryingHttpClient} that wraps the given delegate. + * + * @param delegate the underlying {@link HttpClient} to delegate to + * @param maxRetries maximum number of retry attempts (not counting the initial request) + * @param initialDelayMs initial backoff delay in milliseconds + * @param maxDelayMs maximum backoff delay in milliseconds + * @return a new {@code RetryingHttpClient} + */ + public static RetryingHttpClient wrap(HttpClient delegate, int maxRetries, long initialDelayMs, long maxDelayMs) { + return new RetryingHttpClient(delegate, maxRetries, initialDelayMs, maxDelayMs); + } + + /** + * Returns {@code true} if the given status code should trigger a retry. + * Override in subclasses to add additional retriable status codes. + * + * @param statusCode the HTTP response status code + * @return {@code true} for retriable status codes (429 by default) + */ + protected boolean isRetriable(int statusCode) { + return statusCode == 429; + } + + @Override + public HttpResponse send(HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) + throws IOException, InterruptedException { + HttpResponse response = delegate.send(request, responseBodyHandler); + + if (!isRetriable(response.statusCode())) { + return response; + } + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + long delayMs = computeDelay(response, attempt); + log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})", + new Object[]{response.statusCode(), delayMs, attempt, maxRetries}); + closeBody(response); + Thread.sleep(delayMs); + + response = delegate.send(request, responseBodyHandler); + if (!isRetriable(response.statusCode())) { + return response; + } + } + + // Exhausted retries — return the last response so the caller can throw ApiException + return response; + } + + @Override + public CompletableFuture> sendAsync( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) { + return sendAsyncWithRetry(request, responseBodyHandler, 1); + } + + // Push-promise variant delegates without retry: server-push semantics are incompatible + // with request-level retry, and this overload is not used by the generated API code. + @Override + public CompletableFuture> sendAsync( + HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler, + HttpResponse.PushPromiseHandler pushPromiseHandler) { + return delegate.sendAsync(request, responseBodyHandler, pushPromiseHandler); + } + + private CompletableFuture> sendAsyncWithRetry( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler, int attempt) { + return delegate.sendAsync(request, responseBodyHandler).thenCompose(response -> { + if (!isRetriable(response.statusCode()) || attempt > maxRetries) { + return CompletableFuture.completedFuture(response); + } + long delayMs = computeDelay(response, attempt); + log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})", + new Object[]{response.statusCode(), delayMs, attempt, maxRetries}); + closeBody(response); + Executor delayedExecutor = CompletableFuture.delayedExecutor( + delayMs, java.util.concurrent.TimeUnit.MILLISECONDS); + return CompletableFuture.supplyAsync(() -> null, delayedExecutor) + .thenCompose(ignored -> sendAsyncWithRetry(request, responseBodyHandler, attempt + 1)); + }); + } + + /** + * Closes the response body if it is {@link AutoCloseable} (e.g. {@code InputStream}-backed responses) + * to free the underlying connection before retrying. + */ + private static void closeBody(HttpResponse response) { + if (response.body() instanceof AutoCloseable c) { + try { + c.close(); + } catch (Exception ignored) { + } + } + } + + /** + * Computes the delay in milliseconds before the next retry attempt. + * Honours the {@code Retry-After} header when present (integer seconds, non-negative). + * Falls back to exponential backoff with ±20% jitter. + */ + private long computeDelay(HttpResponse response, int attempt) { + Optional retryAfter = response.headers().firstValue("Retry-After"); + if (retryAfter.isPresent()) { + try { + long seconds = Long.parseLong(retryAfter.get().trim()); + if (seconds >= 0) { + return Math.min(seconds * 1000L, maxDelayMs); + } + } catch (NumberFormatException ignored) { + // Not an integer — fall through to exponential backoff + } + } + // Exponential backoff: initialDelayMs * 2^(attempt-1), capped at maxDelayMs + int shift = Math.min(attempt - 1, 30); // prevent long overflow on large attempt values + long base = Math.min(initialDelayMs * (1L << shift), maxDelayMs); + // ±20% jitter, clamped so maxDelayMs remains a hard ceiling + double jitter = (random.nextDouble() * 0.4) - 0.2; // range [-0.2, 0.2] + return Math.min(maxDelayMs, Math.max(0, Math.round(base * (1.0 + jitter)))); + } + + // ------------------------------------------------------------------------- + // Delegation of all remaining abstract HttpClient methods + // ------------------------------------------------------------------------- + + @Override + public Optional cookieHandler() { + return delegate.cookieHandler(); + } + + @Override + public Optional connectTimeout() { + return delegate.connectTimeout(); + } + + @Override + public Redirect followRedirects() { + return delegate.followRedirects(); + } + + @Override + public Optional proxy() { + return delegate.proxy(); + } + + @Override + public SSLContext sslContext() { + return delegate.sslContext(); + } + + @Override + public SSLParameters sslParameters() { + return delegate.sslParameters(); + } + + @Override + public Optional authenticator() { + return delegate.authenticator(); + } + + @Override + public Version version() { + return delegate.version(); + } + + @Override + public Optional executor() { + return delegate.executor(); + } + +} diff --git a/common/src/main/java/org/thingsboard/client/ThingsboardClient.java b/common/src/main/java/org/thingsboard/client/ThingsboardClient.java index a301289b..3901a580 100644 --- a/common/src/main/java/org/thingsboard/client/ThingsboardClient.java +++ b/common/src/main/java/org/thingsboard/client/ThingsboardClient.java @@ -41,6 +41,7 @@ *
  • Automatic token refresh before expiry
  • *
  • Automatic re-login when the refresh token expires
  • *
  • Client-server clock skew compensation
  • + *
  • Automatic retry on HTTP 429 (Too Many Requests) with exponential backoff
  • * * *
    {@code
    @@ -56,6 +57,22 @@
      *         .apiKey("your-api-key")
      *         .build();
      *
    + * // Tuning rate-limit retry behaviour (retry is enabled by default)
    + * ThingsboardClient client = ThingsboardClient.builder()
    + *         .url("http://localhost:8080")
    + *         .credentials("tenant@thingsboard.org", "password")
    + *         .maxRetries(5)
    + *         .initialRetryDelayMs(500L)
    + *         .maxRetryDelayMs(60_000L)
    + *         .build();
    + *
    + * // Disable rate-limit retry
    + * ThingsboardClient client = ThingsboardClient.builder()
    + *         .url("http://localhost:8080")
    + *         .credentials("tenant@thingsboard.org", "password")
    + *         .retryOnRateLimit(false)
    + *         .build();
    + *
      * // All generated API methods are available directly
      * Device device = client.getDeviceById(deviceId);
      * }
    @@ -96,6 +113,10 @@ public static class Builder { private String username; private String password; private String apiKey; + private boolean retryOnRateLimit = true; + private int maxRetries = 3; + private long initialRetryDelayMs = 1000L; + private long maxRetryDelayMs = 30_000L; private Builder() {} @@ -115,16 +136,54 @@ public Builder apiKey(String apiKey) { return this; } + /** + * Enables or disables automatic retry on rate-limit (HTTP 429) responses. + * Enabled by default. + */ + public Builder retryOnRateLimit(boolean retryOnRateLimit) { + this.retryOnRateLimit = retryOnRateLimit; + return this; + } + + /** + * Maximum number of retry attempts after an HTTP 429 response. + * Default: 3. + */ + public Builder maxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + /** + * Initial backoff delay in milliseconds for the first retry. + * Subsequent retries use exponential backoff with ±20% jitter. + * Default: 1000 ms. + */ + public Builder initialRetryDelayMs(long initialRetryDelayMs) { + this.initialRetryDelayMs = initialRetryDelayMs; + return this; + } + + /** + * Maximum backoff delay in milliseconds. The computed delay is capped at this value. + * Default: 30 000 ms. + */ + public Builder maxRetryDelayMs(long maxRetryDelayMs) { + this.maxRetryDelayMs = maxRetryDelayMs; + return this; + } + public ThingsboardClient build() throws ApiException { if (url == null) { throw new IllegalArgumentException("url is required"); } - if (apiKey != null) { - return new ThingsboardClient(new AuthManager(url, AuthType.API_KEY, apiKey)); - } - AuthManager auth = new AuthManager(url, AuthType.JWT, null); + ApiClient apiClient = retryOnRateLimit + ? new RetryableApiClient(maxRetries, initialRetryDelayMs, maxRetryDelayMs) + : new ApiClient(); + AuthType authType = apiKey != null ? AuthType.API_KEY : AuthType.JWT; + AuthManager auth = new AuthManager(url, authType, apiKey, apiClient); ThingsboardClient client = new ThingsboardClient(auth); - if (username != null) { + if (authType == AuthType.JWT && username != null) { client.login(username, password); } return client; @@ -132,6 +191,27 @@ public ThingsboardClient build() throws ApiException { } + /** + * ApiClient subclass that wraps every built HttpClient with retry-on-429 logic. + */ + private static class RetryableApiClient extends ApiClient { + + private final int maxRetries; + private final long initialDelayMs; + private final long maxDelayMs; + + RetryableApiClient(int maxRetries, long initialDelayMs, long maxDelayMs) { + this.maxRetries = maxRetries; + this.initialDelayMs = initialDelayMs; + this.maxDelayMs = maxDelayMs; + } + + @Override + public HttpClient getHttpClient() { + return RetryingHttpClient.wrap(super.getHttpClient(), maxRetries, initialDelayMs, maxDelayMs); + } + } + /** * Authenticates with ThingsBoard using username and password. * The JWT token is automatically applied to all subsequent API calls. @@ -178,9 +258,9 @@ private record TokenInfo(String token, String refreshToken, long tokenExpTs, private volatile String password; private volatile boolean refreshing; - AuthManager(String url, AuthType authType, String apiKey) { + AuthManager(String url, AuthType authType, String apiKey, ApiClient apiClient) { this.authType = authType; - this.apiClient = new ApiClient(); + this.apiClient = apiClient; apiClient.updateBaseUri(url); this.baseUrl = apiClient.getBaseUri(); this.httpClient = apiClient.getHttpClient(); diff --git a/common/src/test/java/org/thingsboard/client/RetryingHttpClientTest.java b/common/src/test/java/org/thingsboard/client/RetryingHttpClientTest.java new file mode 100644 index 00000000..805fd948 --- /dev/null +++ b/common/src/test/java/org/thingsboard/client/RetryingHttpClientTest.java @@ -0,0 +1,243 @@ +/** + * Copyright © 2026-2026 ThingsBoard, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.client; + +import org.junit.jupiter.api.Test; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLSession; +import java.io.IOException; +import java.io.InputStream; +import java.net.Authenticator; +import java.net.CookieHandler; +import java.net.ProxySelector; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class RetryingHttpClientTest { + + // Reusable no-op request to pass into send() + private static final HttpRequest DUMMY_REQUEST = HttpRequest.newBuilder() + .uri(URI.create("http://localhost/test")) + .GET() + .build(); + + @Test + void testNoRetryOn200() throws Exception { + StubHttpClient stub = StubHttpClient.ofStatusCodes(List.of(200)); + RetryingHttpClient client = RetryingHttpClient.wrap(stub, 3, 0L, 0L); + + HttpResponse response = client.send(DUMMY_REQUEST, HttpResponse.BodyHandlers.ofInputStream()); + + assertEquals(200, response.statusCode()); + assertEquals(1, stub.callCount(), "send() should have been called exactly once"); + } + + @Test + void testRetriesOn429ThenSucceeds() throws Exception { + // 429, 429, 200 — succeeds on the third attempt + StubHttpClient stub = StubHttpClient.ofStatusCodes(List.of(429, 429, 200)); + RetryingHttpClient client = RetryingHttpClient.wrap(stub, 3, 0L, 0L); + + HttpResponse response = client.send(DUMMY_REQUEST, HttpResponse.BodyHandlers.ofInputStream()); + + assertEquals(200, response.statusCode()); + assertEquals(3, stub.callCount(), "send() should have been called exactly 3 times"); + } + + @Test + void testExhaustsRetries() throws Exception { + // Always returns 429; maxRetries=2 means 1 original + 2 retries = 3 calls total + StubHttpClient stub = StubHttpClient.ofStatusCodes(List.of(429, 429, 429, 429)); + RetryingHttpClient client = RetryingHttpClient.wrap(stub, 2, 0L, 0L); + + HttpResponse response = client.send(DUMMY_REQUEST, HttpResponse.BodyHandlers.ofInputStream()); + + assertEquals(429, response.statusCode(), "Final response should still be 429 after exhausting retries"); + assertEquals(3, stub.callCount(), "send() should have been called 3 times (1 original + 2 retries)"); + } + + @Test + void testRespectsRetryAfterHeader() throws Exception { + // First response: 429 with Retry-After: 0 (instant retry). Second: 200. + StubHttpClient stub = StubHttpClient.ofResponses( + List.of(new StubHttpResponse(429, Map.of("Retry-After", List.of("0"))), + new StubHttpResponse(200, Map.of()))); + RetryingHttpClient client = RetryingHttpClient.wrap(stub, 3, 0L, 0L); + + HttpResponse response = client.send(DUMMY_REQUEST, HttpResponse.BodyHandlers.ofInputStream()); + + assertEquals(200, response.statusCode()); + assertEquals(2, stub.callCount(), "send() should have been called exactly twice"); + } + + @Test + void testIsRetriableOnlyFor429() { + RetryingHttpClient client = RetryingHttpClient.wrap(HttpClient.newHttpClient(), 3, 0L, 0L); + + assertFalse(client.isRetriable(200), "200 should not be retriable"); + assertTrue(client.isRetriable(429), "429 should be retriable"); + assertFalse(client.isRetriable(503), "503 should not be retriable"); + } + + // --------------------------------------------------------------------------- + // Stubs + // --------------------------------------------------------------------------- + + /** + * A stub {@link HttpClient} that returns responses from a pre-programmed sequence. + * Tracks how many times {@link #send} was called. + */ + private static class StubHttpClient extends HttpClient { + + private final List responses; + private final AtomicInteger index = new AtomicInteger(0); + + /** Private constructor; use {@link #ofStatusCodes} or {@link #ofResponses}. */ + private StubHttpClient(List responses) { + this.responses = responses; + } + + /** Factory: status codes only, no special headers. */ + static StubHttpClient ofStatusCodes(List statusCodes) { + return new StubHttpClient(statusCodes.stream() + .map(code -> new StubHttpResponse(code, Map.of())) + .toList()); + } + + /** Factory: full response objects (use for Retry-After header tests). */ + static StubHttpClient ofResponses(List responses) { + return new StubHttpClient(responses); + } + + int callCount() { + return index.get(); + } + + @Override + @SuppressWarnings("unchecked") + public HttpResponse send(HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) + throws IOException, InterruptedException { + int i = index.getAndIncrement(); + if (i >= responses.size()) { + throw new IllegalStateException("StubHttpClient exhausted responses (called " + (i + 1) + " times)"); + } + return (HttpResponse) responses.get(i); + } + + @Override + public CompletableFuture> sendAsync( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) { + try { + return CompletableFuture.completedFuture(send(request, responseBodyHandler)); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + + @Override + public CompletableFuture> sendAsync( + HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler, + HttpResponse.PushPromiseHandler pushPromiseHandler) { + return sendAsync(request, responseBodyHandler); + } + + // Remaining abstract methods — minimal no-op implementations + @Override + public Optional cookieHandler() {return Optional.empty();} + + @Override + public Optional connectTimeout() {return Optional.empty();} + + @Override + public Redirect followRedirects() {return Redirect.NORMAL;} + + @Override + public Optional proxy() {return Optional.empty();} + + @Override + public SSLContext sslContext() {try {return SSLContext.getDefault();} catch (Exception e) {throw new RuntimeException(e);}} + + @Override + public SSLParameters sslParameters() {return new SSLParameters();} + + @Override + public Optional authenticator() {return Optional.empty();} + + @Override + public Version version() {return Version.HTTP_1_1;} + + @Override + public Optional executor() {return Optional.empty();} + + } + + /** + * A stub {@link HttpResponse} that returns a fixed status code and configurable headers. + */ + private static class StubHttpResponse implements HttpResponse { + + private final int statusCode; + private final HttpHeaders headers; + + StubHttpResponse(int statusCode, Map> headers) { + this.statusCode = statusCode; + this.headers = HttpHeaders.of(headers, (k, v) -> true); + } + + @Override + public int statusCode() {return statusCode;} + + @Override + public HttpHeaders headers() {return headers;} + + @Override + public InputStream body() {return InputStream.nullInputStream();} + + @Override + public HttpRequest request() {return DUMMY_REQUEST;} + + @Override + public Optional> previousResponse() {return Optional.empty();} + + @Override + public Optional sslSession() {return Optional.empty();} + + @Override + public URI uri() {return DUMMY_REQUEST.uri();} + + @Override + public HttpClient.Version version() {return HttpClient.Version.HTTP_1_1;} + + } + +} diff --git a/paas/src/main/java/org/thingsboard/client/RetryingHttpClient.java b/paas/src/main/java/org/thingsboard/client/RetryingHttpClient.java new file mode 100644 index 00000000..20e35c9a --- /dev/null +++ b/paas/src/main/java/org/thingsboard/client/RetryingHttpClient.java @@ -0,0 +1,239 @@ +/** + * Copyright © 2026-2026 ThingsBoard, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.client; + +import lombok.extern.java.Log; + +import java.io.IOException; +import java.net.Authenticator; +import java.net.CookieHandler; +import java.net.ProxySelector; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.logging.Level; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; + +/** + * An {@link HttpClient} wrapper that automatically retries requests that receive a retriable + * HTTP status code (429 Too Many Requests by default) using exponential backoff with jitter. + * + *

    The {@code Retry-After} response header is honoured when present: if it contains a + * non-negative integer, that number of seconds is used as the retry delay (capped to + * {@code maxDelayMs}). + * + *

    After exhausting all retry attempts the final (still-retriable) response is returned to the + * caller so that the upstream code (e.g. {@code ThingsboardApi}) can throw an + * {@link ApiException} with the correct HTTP status code. + * + *

    Obtain an instance via the static factory: + *

    {@code
    + * RetryingHttpClient client = RetryingHttpClient.wrap(HttpClient.newHttpClient(), 3, 1000L, 30_000L);
    + * }
    + */ +@Log +public class RetryingHttpClient extends HttpClient { + + private final HttpClient delegate; + private final int maxRetries; + private final long initialDelayMs; + private final long maxDelayMs; + private final Random random = new Random(); + + private RetryingHttpClient(HttpClient delegate, int maxRetries, long initialDelayMs, long maxDelayMs) { + this.delegate = delegate; + this.maxRetries = maxRetries; + this.initialDelayMs = initialDelayMs; + this.maxDelayMs = maxDelayMs; + } + + /** + * Creates a new {@code RetryingHttpClient} that wraps the given delegate. + * + * @param delegate the underlying {@link HttpClient} to delegate to + * @param maxRetries maximum number of retry attempts (not counting the initial request) + * @param initialDelayMs initial backoff delay in milliseconds + * @param maxDelayMs maximum backoff delay in milliseconds + * @return a new {@code RetryingHttpClient} + */ + public static RetryingHttpClient wrap(HttpClient delegate, int maxRetries, long initialDelayMs, long maxDelayMs) { + return new RetryingHttpClient(delegate, maxRetries, initialDelayMs, maxDelayMs); + } + + /** + * Returns {@code true} if the given status code should trigger a retry. + * Override in subclasses to add additional retriable status codes. + * + * @param statusCode the HTTP response status code + * @return {@code true} for retriable status codes (429 by default) + */ + protected boolean isRetriable(int statusCode) { + return statusCode == 429; + } + + @Override + public HttpResponse send(HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) + throws IOException, InterruptedException { + HttpResponse response = delegate.send(request, responseBodyHandler); + + if (!isRetriable(response.statusCode())) { + return response; + } + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + long delayMs = computeDelay(response, attempt); + log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})", + new Object[]{response.statusCode(), delayMs, attempt, maxRetries}); + closeBody(response); + Thread.sleep(delayMs); + + response = delegate.send(request, responseBodyHandler); + if (!isRetriable(response.statusCode())) { + return response; + } + } + + // Exhausted retries — return the last response so the caller can throw ApiException + return response; + } + + @Override + public CompletableFuture> sendAsync( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) { + return sendAsyncWithRetry(request, responseBodyHandler, 1); + } + + // Push-promise variant delegates without retry: server-push semantics are incompatible + // with request-level retry, and this overload is not used by the generated API code. + @Override + public CompletableFuture> sendAsync( + HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler, + HttpResponse.PushPromiseHandler pushPromiseHandler) { + return delegate.sendAsync(request, responseBodyHandler, pushPromiseHandler); + } + + private CompletableFuture> sendAsyncWithRetry( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler, int attempt) { + return delegate.sendAsync(request, responseBodyHandler).thenCompose(response -> { + if (!isRetriable(response.statusCode()) || attempt > maxRetries) { + return CompletableFuture.completedFuture(response); + } + long delayMs = computeDelay(response, attempt); + log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})", + new Object[]{response.statusCode(), delayMs, attempt, maxRetries}); + closeBody(response); + Executor delayedExecutor = CompletableFuture.delayedExecutor( + delayMs, java.util.concurrent.TimeUnit.MILLISECONDS); + return CompletableFuture.supplyAsync(() -> null, delayedExecutor) + .thenCompose(ignored -> sendAsyncWithRetry(request, responseBodyHandler, attempt + 1)); + }); + } + + /** + * Closes the response body if it is {@link AutoCloseable} (e.g. {@code InputStream}-backed responses) + * to free the underlying connection before retrying. + */ + private static void closeBody(HttpResponse response) { + if (response.body() instanceof AutoCloseable c) { + try { + c.close(); + } catch (Exception ignored) { + } + } + } + + /** + * Computes the delay in milliseconds before the next retry attempt. + * Honours the {@code Retry-After} header when present (integer seconds, non-negative). + * Falls back to exponential backoff with ±20% jitter. + */ + private long computeDelay(HttpResponse response, int attempt) { + Optional retryAfter = response.headers().firstValue("Retry-After"); + if (retryAfter.isPresent()) { + try { + long seconds = Long.parseLong(retryAfter.get().trim()); + if (seconds >= 0) { + return Math.min(seconds * 1000L, maxDelayMs); + } + } catch (NumberFormatException ignored) { + // Not an integer — fall through to exponential backoff + } + } + // Exponential backoff: initialDelayMs * 2^(attempt-1), capped at maxDelayMs + int shift = Math.min(attempt - 1, 30); // prevent long overflow on large attempt values + long base = Math.min(initialDelayMs * (1L << shift), maxDelayMs); + // ±20% jitter, clamped so maxDelayMs remains a hard ceiling + double jitter = (random.nextDouble() * 0.4) - 0.2; // range [-0.2, 0.2] + return Math.min(maxDelayMs, Math.max(0, Math.round(base * (1.0 + jitter)))); + } + + // ------------------------------------------------------------------------- + // Delegation of all remaining abstract HttpClient methods + // ------------------------------------------------------------------------- + + @Override + public Optional cookieHandler() { + return delegate.cookieHandler(); + } + + @Override + public Optional connectTimeout() { + return delegate.connectTimeout(); + } + + @Override + public Redirect followRedirects() { + return delegate.followRedirects(); + } + + @Override + public Optional proxy() { + return delegate.proxy(); + } + + @Override + public SSLContext sslContext() { + return delegate.sslContext(); + } + + @Override + public SSLParameters sslParameters() { + return delegate.sslParameters(); + } + + @Override + public Optional authenticator() { + return delegate.authenticator(); + } + + @Override + public Version version() { + return delegate.version(); + } + + @Override + public Optional executor() { + return delegate.executor(); + } + +} diff --git a/paas/src/main/java/org/thingsboard/client/ThingsboardClient.java b/paas/src/main/java/org/thingsboard/client/ThingsboardClient.java index a301289b..3901a580 100644 --- a/paas/src/main/java/org/thingsboard/client/ThingsboardClient.java +++ b/paas/src/main/java/org/thingsboard/client/ThingsboardClient.java @@ -41,6 +41,7 @@ *
  • Automatic token refresh before expiry
  • *
  • Automatic re-login when the refresh token expires
  • *
  • Client-server clock skew compensation
  • + *
  • Automatic retry on HTTP 429 (Too Many Requests) with exponential backoff
  • * * *
    {@code
    @@ -56,6 +57,22 @@
      *         .apiKey("your-api-key")
      *         .build();
      *
    + * // Tuning rate-limit retry behaviour (retry is enabled by default)
    + * ThingsboardClient client = ThingsboardClient.builder()
    + *         .url("http://localhost:8080")
    + *         .credentials("tenant@thingsboard.org", "password")
    + *         .maxRetries(5)
    + *         .initialRetryDelayMs(500L)
    + *         .maxRetryDelayMs(60_000L)
    + *         .build();
    + *
    + * // Disable rate-limit retry
    + * ThingsboardClient client = ThingsboardClient.builder()
    + *         .url("http://localhost:8080")
    + *         .credentials("tenant@thingsboard.org", "password")
    + *         .retryOnRateLimit(false)
    + *         .build();
    + *
      * // All generated API methods are available directly
      * Device device = client.getDeviceById(deviceId);
      * }
    @@ -96,6 +113,10 @@ public static class Builder { private String username; private String password; private String apiKey; + private boolean retryOnRateLimit = true; + private int maxRetries = 3; + private long initialRetryDelayMs = 1000L; + private long maxRetryDelayMs = 30_000L; private Builder() {} @@ -115,16 +136,54 @@ public Builder apiKey(String apiKey) { return this; } + /** + * Enables or disables automatic retry on rate-limit (HTTP 429) responses. + * Enabled by default. + */ + public Builder retryOnRateLimit(boolean retryOnRateLimit) { + this.retryOnRateLimit = retryOnRateLimit; + return this; + } + + /** + * Maximum number of retry attempts after an HTTP 429 response. + * Default: 3. + */ + public Builder maxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + /** + * Initial backoff delay in milliseconds for the first retry. + * Subsequent retries use exponential backoff with ±20% jitter. + * Default: 1000 ms. + */ + public Builder initialRetryDelayMs(long initialRetryDelayMs) { + this.initialRetryDelayMs = initialRetryDelayMs; + return this; + } + + /** + * Maximum backoff delay in milliseconds. The computed delay is capped at this value. + * Default: 30 000 ms. + */ + public Builder maxRetryDelayMs(long maxRetryDelayMs) { + this.maxRetryDelayMs = maxRetryDelayMs; + return this; + } + public ThingsboardClient build() throws ApiException { if (url == null) { throw new IllegalArgumentException("url is required"); } - if (apiKey != null) { - return new ThingsboardClient(new AuthManager(url, AuthType.API_KEY, apiKey)); - } - AuthManager auth = new AuthManager(url, AuthType.JWT, null); + ApiClient apiClient = retryOnRateLimit + ? new RetryableApiClient(maxRetries, initialRetryDelayMs, maxRetryDelayMs) + : new ApiClient(); + AuthType authType = apiKey != null ? AuthType.API_KEY : AuthType.JWT; + AuthManager auth = new AuthManager(url, authType, apiKey, apiClient); ThingsboardClient client = new ThingsboardClient(auth); - if (username != null) { + if (authType == AuthType.JWT && username != null) { client.login(username, password); } return client; @@ -132,6 +191,27 @@ public ThingsboardClient build() throws ApiException { } + /** + * ApiClient subclass that wraps every built HttpClient with retry-on-429 logic. + */ + private static class RetryableApiClient extends ApiClient { + + private final int maxRetries; + private final long initialDelayMs; + private final long maxDelayMs; + + RetryableApiClient(int maxRetries, long initialDelayMs, long maxDelayMs) { + this.maxRetries = maxRetries; + this.initialDelayMs = initialDelayMs; + this.maxDelayMs = maxDelayMs; + } + + @Override + public HttpClient getHttpClient() { + return RetryingHttpClient.wrap(super.getHttpClient(), maxRetries, initialDelayMs, maxDelayMs); + } + } + /** * Authenticates with ThingsBoard using username and password. * The JWT token is automatically applied to all subsequent API calls. @@ -178,9 +258,9 @@ private record TokenInfo(String token, String refreshToken, long tokenExpTs, private volatile String password; private volatile boolean refreshing; - AuthManager(String url, AuthType authType, String apiKey) { + AuthManager(String url, AuthType authType, String apiKey, ApiClient apiClient) { this.authType = authType; - this.apiClient = new ApiClient(); + this.apiClient = apiClient; apiClient.updateBaseUri(url); this.baseUrl = apiClient.getBaseUri(); this.httpClient = apiClient.getHttpClient(); diff --git a/pe/src/main/java/org/thingsboard/client/RetryingHttpClient.java b/pe/src/main/java/org/thingsboard/client/RetryingHttpClient.java new file mode 100644 index 00000000..20e35c9a --- /dev/null +++ b/pe/src/main/java/org/thingsboard/client/RetryingHttpClient.java @@ -0,0 +1,239 @@ +/** + * Copyright © 2026-2026 ThingsBoard, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.client; + +import lombok.extern.java.Log; + +import java.io.IOException; +import java.net.Authenticator; +import java.net.CookieHandler; +import java.net.ProxySelector; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.logging.Level; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; + +/** + * An {@link HttpClient} wrapper that automatically retries requests that receive a retriable + * HTTP status code (429 Too Many Requests by default) using exponential backoff with jitter. + * + *

    The {@code Retry-After} response header is honoured when present: if it contains a + * non-negative integer, that number of seconds is used as the retry delay (capped to + * {@code maxDelayMs}). + * + *

    After exhausting all retry attempts the final (still-retriable) response is returned to the + * caller so that the upstream code (e.g. {@code ThingsboardApi}) can throw an + * {@link ApiException} with the correct HTTP status code. + * + *

    Obtain an instance via the static factory: + *

    {@code
    + * RetryingHttpClient client = RetryingHttpClient.wrap(HttpClient.newHttpClient(), 3, 1000L, 30_000L);
    + * }
    + */ +@Log +public class RetryingHttpClient extends HttpClient { + + private final HttpClient delegate; + private final int maxRetries; + private final long initialDelayMs; + private final long maxDelayMs; + private final Random random = new Random(); + + private RetryingHttpClient(HttpClient delegate, int maxRetries, long initialDelayMs, long maxDelayMs) { + this.delegate = delegate; + this.maxRetries = maxRetries; + this.initialDelayMs = initialDelayMs; + this.maxDelayMs = maxDelayMs; + } + + /** + * Creates a new {@code RetryingHttpClient} that wraps the given delegate. + * + * @param delegate the underlying {@link HttpClient} to delegate to + * @param maxRetries maximum number of retry attempts (not counting the initial request) + * @param initialDelayMs initial backoff delay in milliseconds + * @param maxDelayMs maximum backoff delay in milliseconds + * @return a new {@code RetryingHttpClient} + */ + public static RetryingHttpClient wrap(HttpClient delegate, int maxRetries, long initialDelayMs, long maxDelayMs) { + return new RetryingHttpClient(delegate, maxRetries, initialDelayMs, maxDelayMs); + } + + /** + * Returns {@code true} if the given status code should trigger a retry. + * Override in subclasses to add additional retriable status codes. + * + * @param statusCode the HTTP response status code + * @return {@code true} for retriable status codes (429 by default) + */ + protected boolean isRetriable(int statusCode) { + return statusCode == 429; + } + + @Override + public HttpResponse send(HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) + throws IOException, InterruptedException { + HttpResponse response = delegate.send(request, responseBodyHandler); + + if (!isRetriable(response.statusCode())) { + return response; + } + + for (int attempt = 1; attempt <= maxRetries; attempt++) { + long delayMs = computeDelay(response, attempt); + log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})", + new Object[]{response.statusCode(), delayMs, attempt, maxRetries}); + closeBody(response); + Thread.sleep(delayMs); + + response = delegate.send(request, responseBodyHandler); + if (!isRetriable(response.statusCode())) { + return response; + } + } + + // Exhausted retries — return the last response so the caller can throw ApiException + return response; + } + + @Override + public CompletableFuture> sendAsync( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) { + return sendAsyncWithRetry(request, responseBodyHandler, 1); + } + + // Push-promise variant delegates without retry: server-push semantics are incompatible + // with request-level retry, and this overload is not used by the generated API code. + @Override + public CompletableFuture> sendAsync( + HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler, + HttpResponse.PushPromiseHandler pushPromiseHandler) { + return delegate.sendAsync(request, responseBodyHandler, pushPromiseHandler); + } + + private CompletableFuture> sendAsyncWithRetry( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler, int attempt) { + return delegate.sendAsync(request, responseBodyHandler).thenCompose(response -> { + if (!isRetriable(response.statusCode()) || attempt > maxRetries) { + return CompletableFuture.completedFuture(response); + } + long delayMs = computeDelay(response, attempt); + log.log(Level.WARNING, "HTTP {0} received, retrying in {1}ms (attempt {2}/{3})", + new Object[]{response.statusCode(), delayMs, attempt, maxRetries}); + closeBody(response); + Executor delayedExecutor = CompletableFuture.delayedExecutor( + delayMs, java.util.concurrent.TimeUnit.MILLISECONDS); + return CompletableFuture.supplyAsync(() -> null, delayedExecutor) + .thenCompose(ignored -> sendAsyncWithRetry(request, responseBodyHandler, attempt + 1)); + }); + } + + /** + * Closes the response body if it is {@link AutoCloseable} (e.g. {@code InputStream}-backed responses) + * to free the underlying connection before retrying. + */ + private static void closeBody(HttpResponse response) { + if (response.body() instanceof AutoCloseable c) { + try { + c.close(); + } catch (Exception ignored) { + } + } + } + + /** + * Computes the delay in milliseconds before the next retry attempt. + * Honours the {@code Retry-After} header when present (integer seconds, non-negative). + * Falls back to exponential backoff with ±20% jitter. + */ + private long computeDelay(HttpResponse response, int attempt) { + Optional retryAfter = response.headers().firstValue("Retry-After"); + if (retryAfter.isPresent()) { + try { + long seconds = Long.parseLong(retryAfter.get().trim()); + if (seconds >= 0) { + return Math.min(seconds * 1000L, maxDelayMs); + } + } catch (NumberFormatException ignored) { + // Not an integer — fall through to exponential backoff + } + } + // Exponential backoff: initialDelayMs * 2^(attempt-1), capped at maxDelayMs + int shift = Math.min(attempt - 1, 30); // prevent long overflow on large attempt values + long base = Math.min(initialDelayMs * (1L << shift), maxDelayMs); + // ±20% jitter, clamped so maxDelayMs remains a hard ceiling + double jitter = (random.nextDouble() * 0.4) - 0.2; // range [-0.2, 0.2] + return Math.min(maxDelayMs, Math.max(0, Math.round(base * (1.0 + jitter)))); + } + + // ------------------------------------------------------------------------- + // Delegation of all remaining abstract HttpClient methods + // ------------------------------------------------------------------------- + + @Override + public Optional cookieHandler() { + return delegate.cookieHandler(); + } + + @Override + public Optional connectTimeout() { + return delegate.connectTimeout(); + } + + @Override + public Redirect followRedirects() { + return delegate.followRedirects(); + } + + @Override + public Optional proxy() { + return delegate.proxy(); + } + + @Override + public SSLContext sslContext() { + return delegate.sslContext(); + } + + @Override + public SSLParameters sslParameters() { + return delegate.sslParameters(); + } + + @Override + public Optional authenticator() { + return delegate.authenticator(); + } + + @Override + public Version version() { + return delegate.version(); + } + + @Override + public Optional executor() { + return delegate.executor(); + } + +} diff --git a/pe/src/main/java/org/thingsboard/client/ThingsboardClient.java b/pe/src/main/java/org/thingsboard/client/ThingsboardClient.java index a301289b..3901a580 100644 --- a/pe/src/main/java/org/thingsboard/client/ThingsboardClient.java +++ b/pe/src/main/java/org/thingsboard/client/ThingsboardClient.java @@ -41,6 +41,7 @@ *
  • Automatic token refresh before expiry
  • *
  • Automatic re-login when the refresh token expires
  • *
  • Client-server clock skew compensation
  • + *
  • Automatic retry on HTTP 429 (Too Many Requests) with exponential backoff
  • * * *
    {@code
    @@ -56,6 +57,22 @@
      *         .apiKey("your-api-key")
      *         .build();
      *
    + * // Tuning rate-limit retry behaviour (retry is enabled by default)
    + * ThingsboardClient client = ThingsboardClient.builder()
    + *         .url("http://localhost:8080")
    + *         .credentials("tenant@thingsboard.org", "password")
    + *         .maxRetries(5)
    + *         .initialRetryDelayMs(500L)
    + *         .maxRetryDelayMs(60_000L)
    + *         .build();
    + *
    + * // Disable rate-limit retry
    + * ThingsboardClient client = ThingsboardClient.builder()
    + *         .url("http://localhost:8080")
    + *         .credentials("tenant@thingsboard.org", "password")
    + *         .retryOnRateLimit(false)
    + *         .build();
    + *
      * // All generated API methods are available directly
      * Device device = client.getDeviceById(deviceId);
      * }
    @@ -96,6 +113,10 @@ public static class Builder { private String username; private String password; private String apiKey; + private boolean retryOnRateLimit = true; + private int maxRetries = 3; + private long initialRetryDelayMs = 1000L; + private long maxRetryDelayMs = 30_000L; private Builder() {} @@ -115,16 +136,54 @@ public Builder apiKey(String apiKey) { return this; } + /** + * Enables or disables automatic retry on rate-limit (HTTP 429) responses. + * Enabled by default. + */ + public Builder retryOnRateLimit(boolean retryOnRateLimit) { + this.retryOnRateLimit = retryOnRateLimit; + return this; + } + + /** + * Maximum number of retry attempts after an HTTP 429 response. + * Default: 3. + */ + public Builder maxRetries(int maxRetries) { + this.maxRetries = maxRetries; + return this; + } + + /** + * Initial backoff delay in milliseconds for the first retry. + * Subsequent retries use exponential backoff with ±20% jitter. + * Default: 1000 ms. + */ + public Builder initialRetryDelayMs(long initialRetryDelayMs) { + this.initialRetryDelayMs = initialRetryDelayMs; + return this; + } + + /** + * Maximum backoff delay in milliseconds. The computed delay is capped at this value. + * Default: 30 000 ms. + */ + public Builder maxRetryDelayMs(long maxRetryDelayMs) { + this.maxRetryDelayMs = maxRetryDelayMs; + return this; + } + public ThingsboardClient build() throws ApiException { if (url == null) { throw new IllegalArgumentException("url is required"); } - if (apiKey != null) { - return new ThingsboardClient(new AuthManager(url, AuthType.API_KEY, apiKey)); - } - AuthManager auth = new AuthManager(url, AuthType.JWT, null); + ApiClient apiClient = retryOnRateLimit + ? new RetryableApiClient(maxRetries, initialRetryDelayMs, maxRetryDelayMs) + : new ApiClient(); + AuthType authType = apiKey != null ? AuthType.API_KEY : AuthType.JWT; + AuthManager auth = new AuthManager(url, authType, apiKey, apiClient); ThingsboardClient client = new ThingsboardClient(auth); - if (username != null) { + if (authType == AuthType.JWT && username != null) { client.login(username, password); } return client; @@ -132,6 +191,27 @@ public ThingsboardClient build() throws ApiException { } + /** + * ApiClient subclass that wraps every built HttpClient with retry-on-429 logic. + */ + private static class RetryableApiClient extends ApiClient { + + private final int maxRetries; + private final long initialDelayMs; + private final long maxDelayMs; + + RetryableApiClient(int maxRetries, long initialDelayMs, long maxDelayMs) { + this.maxRetries = maxRetries; + this.initialDelayMs = initialDelayMs; + this.maxDelayMs = maxDelayMs; + } + + @Override + public HttpClient getHttpClient() { + return RetryingHttpClient.wrap(super.getHttpClient(), maxRetries, initialDelayMs, maxDelayMs); + } + } + /** * Authenticates with ThingsBoard using username and password. * The JWT token is automatically applied to all subsequent API calls. @@ -178,9 +258,9 @@ private record TokenInfo(String token, String refreshToken, long tokenExpTs, private volatile String password; private volatile boolean refreshing; - AuthManager(String url, AuthType authType, String apiKey) { + AuthManager(String url, AuthType authType, String apiKey, ApiClient apiClient) { this.authType = authType; - this.apiClient = new ApiClient(); + this.apiClient = apiClient; apiClient.updateBaseUri(url); this.baseUrl = apiClient.getBaseUri(); this.httpClient = apiClient.getHttpClient();