Refactor Retry Handler To Move Into Source Crawler Package#6275
Conversation
| this.errorTypeMetricCounterMap = getErrorTypeMetricCounterMap(pluginMetrics); | ||
| this.retryHandler = new RetryHandler( | ||
| new DefaultRetryStrategy(), | ||
| new DefaultStatusCodeHandler()); |
There was a problem hiding this comment.
What do you think if we pass "pluginMetrics" into RetryHandler here so we do not have to pass Optional.of(auditLogRequestsFailedCounter) on line #261? Can also be an Optional param.
IMO, it would keep the executeWithRetry clean and concise.
There was a problem hiding this comment.
We don't think it would be beneficial. Even if we pass pluginMetrics into the RetryHandler constructor, we would still need to provide the metric name at line 261. Additionally, inside RetryHandler we would have to iterate through pluginMetrics to locate that specific metric. Please correct us if we are wrong.
There was a problem hiding this comment.
You're right. I was thinking of passing this searchRequestsFailedCounter directly in the constructor. E.g
new RetryHandler(
new DefaultRetryStrategy(),
new DefaultStatusCodeHandler(),
Optional<Counter> failedCounter
);
Not a big blocker and will let others chime in.
| @Override | ||
| public RetryDecision handleStatusCode(Exception ex, int retryCount, | ||
| Runnable credentialRenewal) { | ||
| HttpStatus statusCode = RetryStrategy.getStatusCode(ex).orElse(null); |
There was a problem hiding this comment.
This is already return Optional per static Optional<HttpStatus> getStatusCode(final Exception ex). Should we just use the Optional method to check for empty instead of doing null check here? Please correct me if I am wrong.
There was a problem hiding this comment.
Addressed - We are now using the Optional returned by getStatusCode() to check for emptiness instead of performing a null check.
| } catch (NumberFormatException e) { | ||
| log.warn(NOISY, "Failed to parse retry-after header: {}", e.getMessage()); | ||
| } | ||
| return null; |
There was a problem hiding this comment.
Should we use Java.Optional here instead of return null?
There was a problem hiding this comment.
Addressed - The method now uses Optional instead of returning null.
| } | ||
|
|
||
| public static RetryDecision retry() { | ||
| return new RetryDecision(false, null); |
There was a problem hiding this comment.
Should we use Java's Optional instead of null here?
There was a problem hiding this comment.
Addressed - The method now uses Optional instead of null.
| */ | ||
| public <T> T executeWithRetry(Supplier<T> operation, Runnable credentialRenewal, Optional<Counter> failureCounter) { | ||
| if (operation == null) { | ||
| throw new IllegalArgumentException("Operation cannot be null"); |
There was a problem hiding this comment.
Should we throw SaaSCrawlerException similar to this?
Same comment IllegalArgumentException below. You might also have to update the RestClient to properly throw with the correct isRetryable state.
There was a problem hiding this comment.
Addressed - The implementation now throws SaaSCrawlerException
|
|
||
| @Override | ||
| public long calculateSleepTime(Exception ex, int retryCount) { | ||
| HttpStatus statusCode = RetryStrategy.getStatusCode(ex).orElse(null); |
There was a problem hiding this comment.
There was a problem hiding this comment.
Addressed - We are now using the Optional returned by getStatusCode() to check for emptiness instead of performing a null check.
|
|
||
| @Override | ||
| public long calculateSleepTime(Exception ex, int retryCount) { | ||
| HttpStatus statusCode = RetryStrategy.getStatusCode(ex).orElse(null); |
There was a problem hiding this comment.
There was a problem hiding this comment.
Addressed - We are now using the Optional returned by getStatusCode() to check for emptiness instead of performing a null check.
| } | ||
|
|
||
| // Fallback to fixed backoff | ||
| List<Integer> sleepTimes = (statusCode == HttpStatus.TOO_MANY_REQUESTS || statusCode == HttpStatus.FORBIDDEN |
There was a problem hiding this comment.
Can you remind me again why we're only checking HttpStatus.TOO_MANY_REQUESTS in DefaultRetryStrategy.calculateSleepTime(...) but also FORBIDDEN & SERVICE_UNAVAILABLE here?
Also, I think we should abstract into a fuction.
statusCode == HttpStatus.TOO_MANY_REQUESTS || statusCode == HttpStatus.FORBIDDEN
|| statusCode == HttpStatus.SERVICE_UNAVAILABLE)
There was a problem hiding this comment.
DefaultRetryStrategy is invoked only when Retry-After header is not present. In that case, we fallback to checking only HttpStatus.TOO_MANY_REQUESTS.
However, some connectors return rate-limit errors using FORBIDDEN or SERVICE_UNAVAILABLE while still including a Retry-After header.
Also, we have now abstracted this logic into a separate function.
|
|
||
| switch (statusCode) { | ||
| case UNAUTHORIZED: | ||
| log.error(NOISY, "Token expired. Attempting to renew credentials.", ex); |
There was a problem hiding this comment.
Is it possible to have an authFailures metric incremented here? We need it to map to some of ATP's metrics and I believe we don't emit it anywhere. Source
There was a problem hiding this comment.
We're already doing it here.
This is the metricMapping ->
. Please correct me if you're referring to something else.| * Default status code handling - covers common HTTP scenarios | ||
| */ | ||
| @Slf4j | ||
| public class DefaultStatusCodeHandler implements StatusCodeHandler { |
There was a problem hiding this comment.
We recently found an issue in WindowEvents connector where "tokenExpired" throws 403(FORBIDDEN) instead of 401(UNAUTHORIZED), and we did not run credentialsRenew on it. This caused the connector to stop retrieving data after.
Should we consider allowing connectors to pass information whether to retry credsRenewal on UNAUTHORIZED, FORBIDDEN or both? E.g
Map<HttpStatus, Boolean> authStatusCredentialRenewalMap = new HashMap<>();
authStatusCredentialRenewalMap.put(FORBIDDEN, true);
new DefaultStatusCodeHandler(authStatusCredentialRenewalMap);
.......
case FORBIDDEN:
log.error(NOISY, "Access forbidden: {}", statusMessage, ex);
if (authStatusCredentialRenewalMap.get(FORBIDDEN)) {
credentialRenewal.run();
return RetryDecision.retry();
}
return RetryDecision.stopWithException(
new SecurityException("Access forbidden: " + statusMessage));
Just one idea, please let me know what you think.
There was a problem hiding this comment.
Yes, for cases like this, we have a CustomApiStatusCodeHandler where we can add all status codes that require special handling or are not covered inside DefaultStatusCodeHandler. For all other cases, we simply call super.handleStatusCode as shown below:
class CustomApiStatusCodeHandler extends DefaultStatusCodeHandler {
@Override
public RetryDecision handleStatusCode(Exception ex, int retryCount,
Runnable credentialRenewal) {
Optional<HttpStatus> statusCode = RetryStrategy.getStatusCode(ex);
String statusMessage = ex.getMessage();
if (statusCode.isEmpty()) {
return RetryDecision.stop();
}
switch (statusCode.get()) {
case FORBIDDEN:
credentialRenewal.run();
return RetryDecision.retry();
default:
// Delegate to default handler for all standard status codes
return super.handleStatusCode(ex, retryCount, credentialRenewal);
}
}
}
And in the RestClient:
this.retryHandler = new RetryHandler(
new RetryAfterHeaderStrategy(),
new CustomApiStatusCodeHandler()
);
We have covered all such scenarios in the design document, please check: https://quip-amazon.com/vWbpATaYxCeS/Design-And-Refactor-Retry-Handler-To-Move-Into-Source-Crawler-Package.
There was a problem hiding this comment.
Okay makes sense, thanks! That's super helpful.
| List<Integer> DEFAULT_RETRY_ATTEMPT_SLEEP_TIME = List.of(1, 2, 5, 10, 20, 40); | ||
| List<Integer> DEFAULT_RATE_LIMIT_RETRY_SLEEP_TIME = List.of(5, 10, 30, 60, 120, 300); | ||
| int SLEEP_TIME_MULTIPLIER_MS = 1000; | ||
| int MAX_RETRIES = DEFAULT_RETRY_ATTEMPT_SLEEP_TIME.size(); |
There was a problem hiding this comment.
this refactor looks good, but we need configurable retry counts for unit tests.
current issue: unit tests take a while due to 6 retries with long delays.
can you add constructor parameter to RetryStrategy implementations?
this allows tests to use 1 retry while production keeps 6.
please let me know if you have any concerns
There was a problem hiding this comment.
Addressed - Added a constructor parameter to the RetryStrategy implementations to allow configuring the maxRetries.
| import java.util.function.Supplier; | ||
|
|
||
| @Slf4j | ||
| public class RetryHandler { |
There was a problem hiding this comment.
Can you also move RetryHandler to retry folder?
There was a problem hiding this comment.
Addressed - Moved to retry folder.
| public long calculateSleepTime(Exception ex, int retryCount) { | ||
| Optional<HttpStatus> statusCode = RetryStrategy.getStatusCode(ex); | ||
|
|
||
| List<Integer> sleepTimes = (statusCode.isPresent() && statusCode.get() == HttpStatus.TOO_MANY_REQUESTS) |
There was a problem hiding this comment.
Please always have HttpStatus.TOO_MANY_REQUESTS in the beginning to avoid NPE.
I know there is check on statusCode.isPresent(), but having real value on the left of equal is always a best practice.
There was a problem hiding this comment.
Addressed - Now we are having list and we are checking with list weather containing status code or not.
| private boolean isRateLimited(final HttpStatus status) { | ||
| return status == HttpStatus.TOO_MANY_REQUESTS || | ||
| status == HttpStatus.FORBIDDEN || | ||
| status == HttpStatus.SERVICE_UNAVAILABLE; |
There was a problem hiding this comment.
Which vendor mark HttpStatus.SERVICE_UNAVAILABLE as rate limit?
There was a problem hiding this comment.
In pingone we are getting Retry-After in case of HttpStatus.SERVICE_UNAVAILABLE
https://apidocs.pingidentity.com/pingone/platform/v1/api/#forward-compatibility-guidance-for-pingone-client-developers
Now by default we are keeping 429 only, from connector to connector we can pass different status codes.
| return maxRetries; | ||
| } | ||
|
|
||
| private boolean isRateLimited(final HttpStatus status) { |
There was a problem hiding this comment.
I can see this varies connector by connect.
Do you think it makes sense to make it a variable called rateLimitStatusCode?
Same for the default retryHandler
There was a problem hiding this comment.
Addressed - Created one list for status code that can be passed via constructor.
| */ | ||
| @Slf4j | ||
| public class RetryAfterHeaderStrategy implements RetryStrategy { | ||
| private static final String RATE_LIMIT_REMAINING = "X-RateLimit-Remaining"; |
There was a problem hiding this comment.
Are there many vendors using the same HEADER?
If this is just for specific connector, we should move to that connector directory with a specific name.
There was a problem hiding this comment.
Yes, most of the vendors sending same headers.
| return new RetryDecision(true, Optional.empty()); | ||
| } | ||
|
|
||
| public static RetryDecision stopWithException(RuntimeException exception) { |
There was a problem hiding this comment.
Shall we make it generic to use Exception instead of RuntimeException?
There was a problem hiding this comment.
Addressed - changed to exception.
|
This is a great change thanks! I see there are merge conflicts in Office365RestClient that need to be resolved. |
Signed-off-by: eatulban <eatulban@amazon.com> # Conflicts: # data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java
Signed-off-by: eatulban <eatulban@amazon.com>
…performance Signed-off-by: eatulban <eatulban@amazon.com>
Signed-off-by: eatulban <eatulban@amazon.com>
2cef08d to
5057d70
Compare
|
Thanks for making this change! |
…h-project#6275) Signed-off-by: eatulban <eatulban@amazon.com> Signed-off-by: Nathan Wand <wandna@amazon.com>
…h-project#6275) Signed-off-by: eatulban <eatulban@amazon.com> Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
…h-project#6275) Signed-off-by: eatulban <eatulban@amazon.com>
…h-project#6275) Signed-off-by: eatulban <eatulban@amazon.com>
Description
This change refactors the retry mechanism by moving it into the
source_crawlerpackage.The new implementation introduces modular strategies for handling retries.
How
We introduce a strategy-based retry framework that allows flexible configuration and improved control over retry behavior:
Introduced new strategy classes
Retry-Afterheader from API responses for dynamic wait times.Refactored key components
Office365RestClientandOffice365AuthenticationProviderto use the new retry strategies instead of the old static retry logic.Is this change backward compatible?
Yes.
The new retry framework maintains compatibility with existing connectors by preserving the same retry behavior.
Testing
Unit Tests
Added tests covering:
DefaultRetryStrategyRetryAfterHeaderStrategyDefaultStatusCodeHandlerIntegration Verification
Successfully executed Office365 source connector end-to-end:
Local pipeline run succeeded:
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.