Skip to content

Commit db0b2b1

Browse files
eatulbansimonelbaz
authored andcommitted
Refactor Retry Handler To Move Into Source Crawler Package (opensearch-project#6275)
Signed-off-by: eatulban <eatulban@amazon.com>
1 parent 4ea61c3 commit db0b2b1

15 files changed

Lines changed: 2020 additions & 97 deletions

File tree

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
1717
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
1818
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;
19+
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.RetryHandler;
20+
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultRetryStrategy;
21+
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultStatusCodeHandler;
1922
import org.springframework.core.ParameterizedTypeReference;
2023
import org.springframework.http.HttpEntity;
2124
import org.springframework.http.HttpHeaders;
@@ -30,6 +33,7 @@
3033
import java.time.Instant;
3134
import java.util.List;
3235
import java.util.Map;
36+
import java.util.Optional;
3337

3438
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
3539
import static org.opensearch.dataprepper.plugins.source.microsoft_office365.utils.Constants.CONTENT_TYPES;
@@ -57,6 +61,7 @@ public class Office365RestClient {
5761
private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/";
5862

5963
private final RestTemplate restTemplate = new RestTemplate();
64+
private final RetryHandler retryHandler;
6065
private final Office365AuthenticationInterface authConfig;
6166
private final Timer auditLogFetchLatencyTimer;
6267
private final Timer searchCallLatencyTimer;
@@ -76,6 +81,9 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
7681
this.auditLogsRequestedCounter = pluginMetrics.counter(AUDIT_LOGS_REQUESTED);
7782
this.apiCallsCounter = pluginMetrics.counter(API_CALLS);
7883
this.errorTypeMetricCounterMap = getErrorTypeMetricCounterMap(pluginMetrics);
84+
this.retryHandler = new RetryHandler(
85+
new DefaultRetryStrategy(),
86+
new DefaultStatusCodeHandler());
7987
}
8088

8189
/**
@@ -111,7 +119,7 @@ public void startSubscriptions() {
111119
authConfig.getTenantId(),
112120
contentType);
113121

114-
RetryHandler.executeWithRetry(() -> {
122+
retryHandler.executeWithRetry(() -> {
115123
try {
116124
headers.setBearerAuth(authConfig.getAccessToken());
117125
apiCallsCounter.increment();
@@ -168,7 +176,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
168176

169177
return searchCallLatencyTimer.record(() -> {
170178
try {
171-
return RetryHandler.executeWithRetry(
179+
return retryHandler.executeWithRetry(
172180
() -> {
173181
headers.setBearerAuth(authConfig.getAccessToken());
174182
apiCallsCounter.increment();
@@ -213,7 +221,7 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
213221
return new AuditLogsResponse(response.getBody(), nextPageUri);
214222
},
215223
authConfig::renewCredentials,
216-
provideSearchRequestFailureCounter(pluginMetrics)
224+
Optional.of(provideSearchRequestFailureCounter(pluginMetrics))
217225
);
218226
} catch (Exception e) {
219227
publishErrorTypeMetricCounter(e, this.errorTypeMetricCounterMap);
@@ -242,7 +250,7 @@ public String getAuditLog(String contentUri) {
242250

243251
return auditLogFetchLatencyTimer.record(() -> {
244252
try {
245-
String response = RetryHandler.executeWithRetry(() -> {
253+
String response = retryHandler.executeWithRetry(() -> {
246254
headers.setBearerAuth(authConfig.getAccessToken());
247255
apiCallsCounter.increment();
248256
ResponseEntity<String> responseEntity = restTemplate.exchange(
@@ -253,7 +261,7 @@ public String getAuditLog(String contentUri) {
253261
);
254262

255263
return responseEntity.getBody();
256-
}, authConfig::renewCredentials, provideGetRequestsFailureCounter(pluginMetrics));
264+
}, authConfig::renewCredentials, Optional.of(provideGetRequestsFailureCounter(pluginMetrics)));
257265

258266
// Log response details
259267
if (response == null) {

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/RetryHandler.java

Lines changed: 0 additions & 90 deletions
This file was deleted.

data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/auth/Office365AuthenticationProvider.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111

1212
import lombok.Getter;
1313
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig;
14-
import org.opensearch.dataprepper.plugins.source.microsoft_office365.RetryHandler;
14+
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.RetryHandler;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultRetryStrategy;
16+
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultStatusCodeHandler;
1517
import org.slf4j.Logger;
1618
import org.slf4j.LoggerFactory;
1719
import org.springframework.http.HttpEntity;
@@ -40,6 +42,7 @@ public class Office365AuthenticationProvider implements Office365AuthenticationI
4042
"&scope=%s";
4143

4244
private final RestTemplate restTemplate = new RestTemplate();
45+
private final RetryHandler retryHandler;
4346
private final String tenantId;
4447
private final Office365SourceConfig office365SourceConfig;
4548
private String accessToken;
@@ -53,6 +56,9 @@ public class Office365AuthenticationProvider implements Office365AuthenticationI
5356
public Office365AuthenticationProvider(Office365SourceConfig config) {
5457
this.tenantId = config.getTenantId();
5558
this.office365SourceConfig = config;
59+
this.retryHandler = new RetryHandler(
60+
new DefaultRetryStrategy(),
61+
new DefaultStatusCodeHandler());
5662
}
5763

5864
@Override
@@ -76,7 +82,7 @@ public void renewCredentials() {
7682
HttpEntity<String> entity = new HttpEntity<>(payload, headers);
7783
String tokenEndpoint = String.format(TOKEN_URL, office365SourceConfig.getTenantId());
7884

79-
ResponseEntity<Map> response = RetryHandler.executeWithRetry(
85+
ResponseEntity<Map> response = retryHandler.executeWithRetry(
8086
() -> restTemplate.postForEntity(tokenEndpoint, entity, Map.class),
8187
() -> {
8288
} // No credential renewal for authentication endpoint
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry;
12+
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.springframework.http.HttpStatus;
15+
16+
import java.util.Arrays;
17+
import java.util.List;
18+
import java.util.Optional;
19+
20+
/**
21+
* Default retry strategy with fixed backoff times
22+
*/
23+
@Slf4j
24+
public class DefaultRetryStrategy implements RetryStrategy {
25+
private static final List<HttpStatus> DEFAULT_RATE_LIMIT_STATUS_CODES = Arrays.asList(HttpStatus.TOO_MANY_REQUESTS);
26+
27+
private final List<Integer> retryAttemptSleepTime;
28+
private final List<Integer> rateLimitRetrySleepTime;
29+
private final List<HttpStatus> rateLimitStatusCodes;
30+
private final int maxRetries;
31+
32+
/**
33+
* Constructor with default sleep times
34+
*/
35+
public DefaultRetryStrategy() {
36+
this.retryAttemptSleepTime = RetryStrategy.DEFAULT_RETRY_ATTEMPT_SLEEP_TIME;
37+
this.rateLimitRetrySleepTime = RetryStrategy.DEFAULT_RATE_LIMIT_RETRY_SLEEP_TIME;
38+
this.rateLimitStatusCodes = DEFAULT_RATE_LIMIT_STATUS_CODES;
39+
this.maxRetries = RetryStrategy.MAX_RETRIES;
40+
}
41+
42+
/**
43+
* Constructor with custom max retries
44+
*
45+
* @param maxRetries Maximum number of retries
46+
*/
47+
public DefaultRetryStrategy(final int maxRetries) {
48+
this.retryAttemptSleepTime = RetryStrategy.DEFAULT_RETRY_ATTEMPT_SLEEP_TIME;
49+
this.rateLimitRetrySleepTime = RetryStrategy.DEFAULT_RATE_LIMIT_RETRY_SLEEP_TIME;
50+
this.rateLimitStatusCodes = DEFAULT_RATE_LIMIT_STATUS_CODES;
51+
this.maxRetries = maxRetries;
52+
}
53+
54+
/**
55+
* Constructor with Custom sleep times for rate limit retries and custom rate limit status codes
56+
*
57+
* @param rateLimitRetrySleepTime Custom sleep times for rate limit retries (in
58+
* seconds)
59+
* @param rateLimitStatusCodes List of status codes that are considered rate limited
60+
*/
61+
public DefaultRetryStrategy(List<Integer> rateLimitRetrySleepTime, List<HttpStatus> rateLimitStatusCodes) {
62+
this.retryAttemptSleepTime = RetryStrategy.DEFAULT_RETRY_ATTEMPT_SLEEP_TIME;
63+
this.rateLimitRetrySleepTime = rateLimitRetrySleepTime != null
64+
? rateLimitRetrySleepTime
65+
: RetryStrategy.DEFAULT_RATE_LIMIT_RETRY_SLEEP_TIME;
66+
this.rateLimitStatusCodes = rateLimitStatusCodes != null
67+
? rateLimitStatusCodes
68+
: DEFAULT_RATE_LIMIT_STATUS_CODES;
69+
this.maxRetries = this.rateLimitRetrySleepTime.size();
70+
}
71+
72+
@Override
73+
public long calculateSleepTime(Exception ex, int retryCount) {
74+
Optional<HttpStatus> statusCode = RetryStrategy.getStatusCode(ex);
75+
76+
List<Integer> sleepTimes = (statusCode.isPresent() && rateLimitStatusCodes.contains(statusCode.get()))
77+
? rateLimitRetrySleepTime
78+
: retryAttemptSleepTime;
79+
80+
int sleepTimeSeconds = (retryCount < sleepTimes.size())
81+
? sleepTimes.get(retryCount)
82+
: sleepTimes.get(sleepTimes.size() - 1);
83+
84+
log.debug("Retrying in {} seconds (attempt {}/{})",
85+
sleepTimeSeconds, retryCount + 1, getMaxRetries());
86+
87+
return sleepTimeSeconds * RetryStrategy.SLEEP_TIME_MULTIPLIER_MS;
88+
}
89+
90+
@Override
91+
public int getMaxRetries() {
92+
return maxRetries;
93+
}
94+
95+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry;
12+
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.springframework.http.HttpStatus;
15+
import java.util.Optional;
16+
17+
import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY;
18+
19+
/**
20+
* Default status code handling - covers common HTTP scenarios
21+
*/
22+
@Slf4j
23+
public class DefaultStatusCodeHandler implements StatusCodeHandler {
24+
25+
@Override
26+
public RetryDecision handleStatusCode(Exception ex, int retryCount,
27+
Runnable credentialRenewal) {
28+
Optional<HttpStatus> statusCode = RetryStrategy.getStatusCode(ex);
29+
String statusMessage = ex.getMessage();
30+
31+
if (statusCode.isEmpty()) {
32+
return RetryDecision.stop();
33+
}
34+
35+
switch (statusCode.get()) {
36+
case UNAUTHORIZED:
37+
log.error(NOISY, "Token expired. Attempting to renew credentials.", ex);
38+
credentialRenewal.run();
39+
return RetryDecision.retry();
40+
41+
case FORBIDDEN:
42+
log.error(NOISY, "Access forbidden: {}", statusMessage, ex);
43+
return RetryDecision.stopWithException(
44+
new SecurityException("Access forbidden: " + statusMessage));
45+
46+
case NOT_FOUND:
47+
log.warn(NOISY, "Resource not found (404): {}. " +
48+
"This is expected for deleted/expired resources.", statusMessage);
49+
return RetryDecision.stop();
50+
51+
case TOO_MANY_REQUESTS:
52+
log.error(NOISY, "Hitting API rate limit. Backing off.", ex);
53+
return RetryDecision.retry();
54+
55+
case SERVICE_UNAVAILABLE:
56+
log.error(NOISY, "Service unavailable. Will retry.", ex);
57+
return RetryDecision.retry();
58+
59+
default:
60+
if (statusCode.get().is4xxClientError()) {
61+
log.error(NOISY, "Client error: {}. Will not retry.", statusCode, ex);
62+
return RetryDecision.stop();
63+
} else if (statusCode.get().is5xxServerError()) {
64+
log.error(NOISY, "Server error: {}. Will retry.", statusCode, ex);
65+
return RetryDecision.retry();
66+
} else {
67+
log.error(NOISY, "Unexpected status code: {}. Will not retry.",
68+
statusCode, ex);
69+
return RetryDecision.stop();
70+
}
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)