Skip to content

Refactor Retry Handler To Move Into Source Crawler Package#6275

Merged
graytaylor0 merged 4 commits into
opensearch-project:mainfrom
eatulban:feature/RetryHandler
Dec 15, 2025
Merged

Refactor Retry Handler To Move Into Source Crawler Package#6275
graytaylor0 merged 4 commits into
opensearch-project:mainfrom
eatulban:feature/RetryHandler

Conversation

@eatulban

Copy link
Copy Markdown
Contributor

Description

This change refactors the retry mechanism by moving it into the source_crawler package.
The new implementation introduces modular strategies for handling retries.

How

We introduce a strategy-based retry framework that allows flexible configuration and improved control over retry behavior:

Introduced new strategy classes

  • DefaultRetryStrategy: Handles generic retry attempts with exponential backoff.
  • RetryAfterHeaderStrategy: Reads and honors the Retry-After header from API responses for dynamic wait times.
  • DefaultStatusCodeHandler: Determines retry eligibility based on HTTP status codes (e.g., 429, 503).

Refactored key components

  • Updated Office365RestClient and Office365AuthenticationProvider to use the new retry strategies instead of the old static retry logic.
  • Introduced unit tests for the new retry logic and strategy classes to ensure expected retry/backoff behavior.

Is this change backward compatible?

Yes.
The new retry framework maintains compatibility with existing connectors by preserving the same retry behavior.

Testing

Unit Tests

Added tests covering:

  • DefaultRetryStrategy
  • RetryAfterHeaderStrategy
  • DefaultStatusCodeHandler

Integration Verification

Successfully executed Office365 source connector end-to-end:

  • Verified correct retry/backoff on 429 (Rate Limit Exceeded) responses.
  • Confirmed that the connector resumed fetching audit logs after backoff.

Local pipeline run succeeded:

2025-11-10T18:09:32,209 [pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationProvider - Received new access token. Expires in 3599 seconds
2025-11-10T18:09:32,209 [pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.auth.AuthenticationInterface - Credentials initialized successfully
2025-11-10T18:09:32,209 [pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient - Starting Office 365 subscriptions for audit logs
2025-11-10T18:09:32,930 [pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourcePlugin - Starting microsoft_office365 Source Plugin
2025-11-10T18:09:32,959 [pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.core.pipeline.Pipeline - Pipeline [pipeline] - Submitting request to initiate the pipeline processing
2025-11-10T18:09:32,959 [pool-4-thread-3] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Worker thread started
2025-11-10T18:09:32,959 [pool-4-thread-3] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Processing Partitions
2025-11-10T18:09:32,959 [pool-4-thread-4] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Worker thread started
2025-11-10T18:09:32,959 [pool-4-thread-4] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Processing Partitions
2025-11-10T18:09:32,959 [pool-4-thread-2] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Worker thread started
2025-11-10T18:09:32,959 [pool-4-thread-2] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Processing Partitions
2025-11-10T18:09:32,959 [pool-4-thread-5] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Worker thread started
2025-11-10T18:09:32,959 [pool-4-thread-5] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler - Processing Partitions
2025-11-10T18:09:32,964 [pool-4-thread-1] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler - Running as a LEADER node
2025-11-10T18:09:32,975 [pool-4-thread-1] INFO  org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler - Total partitions created in this crawl: 30.0
2025-11-10T18:09:42,969 [pool-4-thread-4] ERROR org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultStatusCodeHandler - Hitting API rate limit. Backing off.
org.springframework.web.client.HttpClientErrorException: 429 Rate limit exceeded
	at org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient.lambda$searchAuditLogs$1(Office365RestClient.java:195)
	at org.opensearch.dataprepper.plugins.source.source_crawler.utils.RetryHandler.executeWithRetry(RetryHandler.java:78)
	at org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient.lambda$searchAuditLogs$2(Office365RestClient.java:189)
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:69)
	at org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient.searchAuditLogs(Office365RestClient.java:187)
	at org.opensearch.dataprepper.plugins.source.microsoft_office365.service.Office365Service.searchAuditLogs(Office365Service.java:74)
	at org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365CrawlerClient.executePartition(Office365CrawlerClient.java:116)
	at org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365CrawlerClient.executePartition(Office365CrawlerClient.java:51)
	at org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler.lambda$executePartition$0(DimensionalTimeSliceCrawler.java:89)
	at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
	at org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler.executePartition(DimensionalTimeSliceCrawler.java:89)
	at org.opensearch.dataprepper.plugins.source.source_crawler.base.DimensionalTimeSliceCrawler.executePartition(DimensionalTimeSliceCrawler.java:36)
	at org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler.processPartition(WorkerScheduler.java:116)
	at org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.WorkerScheduler.run(WorkerScheduler.java:82)
	at java.base/java.lang.Thread.run(Thread.java:834)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
{"CreationTime":"2025-11-10T11:23:40","Id":"a8dcfcfe-c112-446f-b36f-20530afa0478","Operation":"Update user.","OrganizationId":"e822651b-5027-4253-83f5-904854601a3b","RecordType":8,"ResultStatus":"Success","UserKey":"Not Available","UserType":4,"Version":1,"Workload":"AzureActiveDirectory","ObjectId":"demo.m3connector@trianzazuresb.onmicrosoft.com","UserId":"ServicePrincipal_3616d279-e97d-48d3-af3e-74ed7de78faf","AzureActiveDirectoryEventType":1,"ExtendedProperties":[{"Name":"additionalDetails","Value":"{\"UserType\":\"Member\",\"User-Agent\":\"Apache-HttpClient/4.5.13 (Java/17.0.16)\"}"},{"Name":"extendedAuditEventCategory","Value":"User"}],"ModifiedProperties":[{"Name":"JobTitle","NewValue":"[\r\n  \"Updated by Canary test at 2025-11-10T11:23:39.624864386Z\"\r\n]","OldValue":"[\r\n  \"Updated by Canary test at 2025-11-10T10:23:14.807519780Z\"\r\n]"},{"Name":"Included Updated Properties","NewValue":"JobTitle","OldValue":""},{"Name":"TargetId.UserType","NewValue":"Member","OldValue":""},{"Name":"ActorId.ServicePrincipalNames","NewValue":"fb6b0f13-8f1e-4a28-a772-d32d3133da23","OldValue":""},{"Name":"SPN","NewValue":"fb6b0f13-8f1e-4a28-a772-d32d3133da23","OldValue":""}],"Actor":[{"ID":"entraId_app","Type":1},{"ID":"fb6b0f13-8f1e-4a28-a772-d32d3133da23","Type":2},{"ID":"ServicePrincipal_3616d279-e97d-48d3-af3e-74ed7de78faf","Type":2},{"ID":"3616d279-e97d-48d3-af3e-74ed7de78faf","Type":2},{"ID":"ServicePrincipal","Type":2}],"ActorContextId":"e822651b-5027-4253-83f5-904854601a3b","InterSystemsId":"24e8de88-c2fd-4c31-9e98-dbe83fad6852","IntraSystemId":"6a345b97-216a-484a-94d2-1d9fe57b56d2","SupportTicketId":"","Target":[{"ID":"User_0fba2a0b-2680-45c1-9ae6-d20b74edb3ec","Type":2},{"ID":"0fba2a0b-2680-45c1-9ae6-d20b74edb3ec","Type":2},{"ID":"User","Type":2},{"ID":"demo.m3connector@trianzazuresb.onmicrosoft.com","Type":5},{"ID":"1003200511619FF9","Type":3}],"TargetContextId":"e822651b-5027-4253-83f5-904854601a3b"}
{"CreationTime":"2025-11-10T12:22:53","Id":"f1e0e4a7-1b4d-4e97-b987-2a64556221f8","Operation":"Update user.","OrganizationId":"e822651b-5027-4253-83f5-904854601a3b","RecordType":8,"ResultStatus":"Success","UserKey":"Not Available","UserType":4,"Version":1,"Workload":"AzureActiveDirectory","ObjectId":"demo.m3connector@trianzazuresb.onmicrosoft.com","UserId":"ServicePrincipal_3616d279-e97d-48d3-af3e-74ed7de78faf","AzureActiveDirectoryEventType":1,"ExtendedProperties":[{"Name":"additionalDetails","Value":"{\"UserType\":\"Member\",\"User-Agent\":\"Apache-HttpClient/4.5.13 (Java/17.0.16)\"}"},{"Name":"extendedAuditEventCategory","Value":"User"}],"ModifiedProperties":[{"Name":"JobTitle","NewValue":"[\r\n  \"Updated by Canary test at 2025-11-10T12:22:53.589770791Z\"\r\n]","OldValue":"[\r\n  \"Updated by Canary test at 2025-11-10T11:35:47.145496529Z\"\r\n]"},{"Name":"Included Updated Properties","NewValue":"JobTitle","OldValue":""},{"Name":"TargetId.UserType","NewValue":"Member","OldValue":""},{"Name":"ActorId.ServicePrincipalNames","NewValue":"fb6b0f13-8f1e-4a28-a772-d32d3133da23","OldValue":""},{"Name":"SPN","NewValue":"fb6b0f13-8f1e-4a28-a772-d32d3133da23","OldValue":""}],"Actor":[{"ID":"entraId_app","Type":1},{"ID":"fb6b0f13-8f1e-4a28-a772-d32d3133da23","Type":2},{"ID":"ServicePrincipal_3616d279-e97d-48d3-af3e-74ed7de78faf","Type":2},{"ID":"3616d279-e97d-48d3-af3e-74ed7de78faf","Type":2},{"ID":"ServicePrincipal","Type":2}],"ActorContextId":"e822651b-5027-4253-83f5-904854601a3b","InterSystemsId":"20045a00-d4b1-4f90-88cf-66e0ffa0693a","IntraSystemId":"9d1f2d7f-7d6c-4344-8a97-b9372fc54bc3","SupportTicketId":"","Target":[{"ID":"User_0fba2a0b-2680-45c1-9ae6-d20b74edb3ec","Type":2},{"ID":"0fba2a0b-2680-45c1-9ae6-d20b74edb3ec","Type":2},{"ID":"User","Type":2},{"ID":"demo.m3connector@trianzazuresb.onmicrosoft.com","Type":5},{"ID":"1003200511619FF9","Type":3}],"TargetContextId":"e822651b-5027-4253-83f5-904854601a3b"}
{"CreationTime":"2025-11-10T10:22:42","Id":"8be1a7c2-7589-4f7c-9f92-ef22015e051c","Operation":"Update user.","OrganizationId":"e822651b-5027-4253-83f5-904854601a3b","RecordType":8,"ResultStatus":"Success","UserKey":"Not Available","UserType":4,"Version":1,"Workload":"AzureActiveDirectory","ObjectId":"demo.m3connector@trianzazuresb.onmicrosoft.com","UserId":"ServicePrincipal_3616d279-e97d-48d3-af3e-74ed7de78faf","AzureActiveDirectoryEventType":1,"ExtendedProperties":[{"Name":"additionalDetails","Value":"{\"UserType\":\"Member\",\"User-Agent\":\"Apache-HttpClient/4.5.13 (Java/17.0.16)\"}"},{"Name":"extendedAuditEventCategory","Value":"User"}],"ModifiedProperties":[{"Name":"JobTitle","NewValue":"[\r\n  \"Updated by Canary test at 2025-11-10T10:22:41.312064944Z\"\r\n]","OldValue":"[\r\n  \"Updated by Canary test at 2025-11-10T09:33:05.569747406Z\"\r\n]"},{"Name":"Included Updated Properties","NewValue":"JobTitle","OldValue":""},{"Name":"TargetId.UserType","NewValue":"Member","OldValue":""},{"Name":"ActorId.ServicePrincipalNames","NewValue":"fb6b0f13-8f1e-4a28-a772-d32d3133da23","OldValue":""},{"Name":"SPN","NewValue":"fb6b0f13-8f1e-4a28-a772-d32d3133da23","OldValue":""}],"Actor":[{"ID":"entraId_app","Type":1},{"ID":"fb6b0f13-8f1e-4a28-a772-d32d3133da23","Type":2},{"ID":"ServicePrincipal_3616d279-e97d-48d3-af3e-74ed7de78faf","Type":2},{"ID":"3616d279-e97d-48d3-af3e-74ed7de78faf","Type":2},{"ID":"ServicePrincipal","Type":2}],"ActorContextId":"e822651b-5027-4253-83f5-904854601a3b","InterSystemsId":"601fabcd-a791-41db-aeff-e367479d7732","IntraSystemId":"76cca66a-3732-4a4e-b23b-c3ca00e3c3da","SupportTicketId":"","Target":[{"ID":"User_0fba2a0b-2680-45c1-9ae6-d20b74edb3ec","Type":2},{"ID":"0fba2a0b-2680-45c1-9ae6-d20b74edb3ec","Type":2},{"ID":"User","Type":2},{"ID":"demo.m3connector@trianzazuresb.onmicrosoft.com","Type":5},{"ID":"1003200511619FF9","Type":3}],"TargetContextId":"e822651b-5027-4253-83f5-904854601a3b"}
{"AppAccessContext":{},"CreationTime":"2025-11-10T10:20:40","Id":"7f04f6ed-796d-44eb-05bd-08de2042ccd9","Operation":"Set-ConditionalAccessPolicy","OrganizationId":"e822651b-5027-4253-83f5-904854601a3b","RecordType":1,"ResultStatus":"True","UserKey":"NT SERVICE\\MSExchangeServiceHostNetCore (Microsoft.Exchange.ServiceHost)","UserType":3,"Version":1,"Workload":"Exchange","ObjectId":"trianzazuresb.onmicrosoft.com\\058b600e-79d2-4c34-8ac8-95fe8ce13fc0","UserId":"NT SERVICE\\MSExchangeServiceHostNetCore (Microsoft.Exchange.ServiceHost)","AppId":"","AppPoolName":"MSExchangeServiceHostNetCore","ClientAppId":"","CorrelationID":"","ExternalAccess":true,"OrganizationName":"trianzazuresb.onmicrosoft.com","OriginatingServer":"MA1PR01MB3305 (15.20.9298.012)","Parameters":[{"Name":"Identity","Value":"trianzazuresb.onmicrosoft.com\\058b600e-79d2-4c34-8ac8-95fe8ce13fc0"},{"Name":"PolicyDetails","Value":"{\"Version\":0,\"State\":\"Disabled\"}"},{"Name":"PolicyLastUpdatedTime","Value":"11/10/2025 10:20:40 AM"},{"Name":"TenantDefaultPolicy","Value":"18"},{"Name":"DisplayName","Value":"Default Policy"},{"Name":"PolicyIdentifierString","Value":"6/24/2025 6:55:41 AM"}],"RequestId":"5227db16-6ff5-4406-af12-811680d58571"}
{"AppAccessContext":{"IssuedAtTime":"2025-11-10T11:11:25"},"CreationTime":"2025-11-10T11:11:37","Id":"43145a15-1b5f-4b62-65ed-08de2049eaa9","Operation":"Set-Mailbox","OrganizationId":"e822651b-5027-4253-83f5-904854601a3b","RecordType":1,"ResultStatus":"True","UserKey":"NT AUTHORITY\\SYSTEM (Microsoft.Exchange.AdminApi.NetCore)","UserType":3,"Version":1,"Workload":"Exchange","ClientIP":"[2603:1046:a00:e::7]:11252","ObjectId":"INDP287A003.PROD.OUTLOOK.COM/Microsoft Exchange Hosted Organizations/trianzazuresb.onmicrosoft.com/entityserve_tenant_acronyms_IND_replica_2","UserId":"NT AUTHORITY\\SYSTEM (Microsoft.Exchange.AdminApi.NetCore)","AppId":"66a88757-258c-4c72-893c-3e8bed4d6899","AppPoolName":"MSExchangeAdminApiNetCore","ClientAppId":"66a88757-258c-4c72-893c-3e8bed4d6899","CorrelationID":"","ExternalAccess":true,"OrganizationName":"trianzazuresb.onmicrosoft.com","OriginatingServer":"MAUPR01MB11744 (15.20.9298.012)","Parameters":[{"Name":"Identity","Value":"e822651b-5027-4253-83f5-904854601a3b\\6821f1cb-798e-4cdb-ac8b-56e9c1e494fb"},{"Name":"Arbitration","Value":"True"},{"Name":"Force","Value":"True"},{"Name":"DomainController","Value":"MA1P287A03DC006.INDP287A003.PROD.OUTLOOK.COM"}],"RequestId":"b88fc62f-3f46-5ece-06e7-efd201bea839","SessionId":"","TokenTenantId":"e822651b-5027-4253-83f5-904854601a3b"}

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

this.errorTypeMetricCounterMap = getErrorTypeMetricCounterMap(pluginMetrics);
this.retryHandler = new RetryHandler(
new DefaultRetryStrategy(),
new DefaultStatusCodeHandler());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think if we pass "pluginMetrics" into RetryHandler here so we do not have to pass Optional.of(auditLogRequestsFailedCounter) on line #261? Can also be an Optional param.

IMO, it would keep the executeWithRetry clean and concise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't think it would be beneficial. Even if we pass pluginMetrics into the RetryHandler constructor, we would still need to provide the metric name at line 261. Additionally, inside RetryHandler we would have to iterate through pluginMetrics to locate that specific metric. Please correct us if we are wrong.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I was thinking of passing this searchRequestsFailedCounter directly in the constructor. E.g

 new RetryHandler(
        new DefaultRetryStrategy(),
        new DefaultStatusCodeHandler(),
        Optional<Counter> failedCounter
);

Not a big blocker and will let others chime in.

@Override
public RetryDecision handleStatusCode(Exception ex, int retryCount,
Runnable credentialRenewal) {
HttpStatus statusCode = RetryStrategy.getStatusCode(ex).orElse(null);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already return Optional per static Optional<HttpStatus> getStatusCode(final Exception ex). Should we just use the Optional method to check for empty instead of doing null check here? Please correct me if I am wrong.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - We are now using the Optional returned by getStatusCode() to check for emptiness instead of performing a null check.

} catch (NumberFormatException e) {
log.warn(NOISY, "Failed to parse retry-after header: {}", e.getMessage());
}
return null;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use Java.Optional here instead of return null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - The method now uses Optional instead of returning null.

}

public static RetryDecision retry() {
return new RetryDecision(false, null);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use Java's Optional instead of null here?

@eatulban eatulban Nov 20, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - The method now uses Optional instead of null.

*/
public <T> T executeWithRetry(Supplier<T> operation, Runnable credentialRenewal, Optional<Counter> failureCounter) {
if (operation == null) {
throw new IllegalArgumentException("Operation cannot be null");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we throw SaaSCrawlerException similar to this?

Same comment IllegalArgumentException below. You might also have to update the RestClient to properly throw with the correct isRetryable state.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - The implementation now throws SaaSCrawlerException


@Override
public long calculateSleepTime(Exception ex, int retryCount) {
HttpStatus statusCode = RetryStrategy.getStatusCode(ex).orElse(null);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - We are now using the Optional returned by getStatusCode() to check for emptiness instead of performing a null check.


@Override
public long calculateSleepTime(Exception ex, int retryCount) {
HttpStatus statusCode = RetryStrategy.getStatusCode(ex).orElse(null);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - We are now using the Optional returned by getStatusCode() to check for emptiness instead of performing a null check.

}

// Fallback to fixed backoff
List<Integer> sleepTimes = (statusCode == HttpStatus.TOO_MANY_REQUESTS || statusCode == HttpStatus.FORBIDDEN

@vecheka vecheka Nov 19, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remind me again why we're only checking HttpStatus.TOO_MANY_REQUESTS in DefaultRetryStrategy.calculateSleepTime(...) but also FORBIDDEN & SERVICE_UNAVAILABLE here?

Also, I think we should abstract into a fuction.

statusCode == HttpStatus.TOO_MANY_REQUESTS || statusCode == HttpStatus.FORBIDDEN
                || statusCode == HttpStatus.SERVICE_UNAVAILABLE)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DefaultRetryStrategy is invoked only when Retry-After header is not present. In that case, we fallback to checking only HttpStatus.TOO_MANY_REQUESTS.
However, some connectors return rate-limit errors using FORBIDDEN or SERVICE_UNAVAILABLE while still including a Retry-After header.

Also, we have now abstracted this logic into a separate function.


switch (statusCode) {
case UNAUTHORIZED:
log.error(NOISY, "Token expired. Attempting to renew credentials.", ex);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have an authFailures metric incremented here? We need it to map to some of ATP's metrics and I believe we don't emit it anywhere. Source

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're already doing it here.

This is the metricMapping ->

errorTypeMetricCounterMap.put(HttpStatus.UNAUTHORIZED.getReasonPhrase(), pluginMetrics.counter(REQUEST_ACCESS_DENIED));
. Please correct me if you're referring to something else.

* Default status code handling - covers common HTTP scenarios
*/
@Slf4j
public class DefaultStatusCodeHandler implements StatusCodeHandler {

@vecheka vecheka Nov 25, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We recently found an issue in WindowEvents connector where "tokenExpired" throws 403(FORBIDDEN) instead of 401(UNAUTHORIZED), and we did not run credentialsRenew on it. This caused the connector to stop retrieving data after.

Should we consider allowing connectors to pass information whether to retry credsRenewal on UNAUTHORIZED, FORBIDDEN or both? E.g

Map<HttpStatus, Boolean> authStatusCredentialRenewalMap = new HashMap<>();
authStatusCredentialRenewalMap.put(FORBIDDEN, true);
new DefaultStatusCodeHandler(authStatusCredentialRenewalMap);


.......


case FORBIDDEN:
      log.error(NOISY, "Access forbidden: {}", statusMessage, ex);
      if (authStatusCredentialRenewalMap.get(FORBIDDEN)) {
           credentialRenewal.run();
           return RetryDecision.retry();
       }
      return RetryDecision.stopWithException(
              new SecurityException("Access forbidden: " + statusMessage));

Just one idea, please let me know what you think.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for cases like this, we have a CustomApiStatusCodeHandler where we can add all status codes that require special handling or are not covered inside DefaultStatusCodeHandler. For all other cases, we simply call super.handleStatusCode as shown below:

class CustomApiStatusCodeHandler extends DefaultStatusCodeHandler {

    @Override
    public RetryDecision handleStatusCode(Exception ex, int retryCount,
                                          Runnable credentialRenewal) {
        Optional<HttpStatus> statusCode = RetryStrategy.getStatusCode(ex);
        String statusMessage = ex.getMessage();

        if (statusCode.isEmpty()) {
            return RetryDecision.stop();
        }

        switch (statusCode.get()) {
            case FORBIDDEN:
                credentialRenewal.run();
                return RetryDecision.retry();

            default:
                // Delegate to default handler for all standard status codes
                return super.handleStatusCode(ex, retryCount, credentialRenewal);
        }
    }
}

And in the RestClient:

this.retryHandler = new RetryHandler(
        new RetryAfterHeaderStrategy(),
        new CustomApiStatusCodeHandler()
);

We have covered all such scenarios in the design document, please check: https://quip-amazon.com/vWbpATaYxCeS/Design-And-Refactor-Retry-Handler-To-Move-Into-Source-Crawler-Package.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay makes sense, thanks! That's super helpful.

List<Integer> DEFAULT_RETRY_ATTEMPT_SLEEP_TIME = List.of(1, 2, 5, 10, 20, 40);
List<Integer> DEFAULT_RATE_LIMIT_RETRY_SLEEP_TIME = List.of(5, 10, 30, 60, 120, 300);
int SLEEP_TIME_MULTIPLIER_MS = 1000;
int MAX_RETRIES = DEFAULT_RETRY_ATTEMPT_SLEEP_TIME.size();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this refactor looks good, but we need configurable retry counts for unit tests.

current issue: unit tests take a while due to 6 retries with long delays.

can you add constructor parameter to RetryStrategy implementations?

this allows tests to use 1 retry while production keeps 6.

please let me know if you have any concerns

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - Added a constructor parameter to the RetryStrategy implementations to allow configuring the maxRetries.

import java.util.function.Supplier;

@Slf4j
public class RetryHandler {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also move RetryHandler to retry folder?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - Moved to retry folder.

public long calculateSleepTime(Exception ex, int retryCount) {
Optional<HttpStatus> statusCode = RetryStrategy.getStatusCode(ex);

List<Integer> sleepTimes = (statusCode.isPresent() && statusCode.get() == HttpStatus.TOO_MANY_REQUESTS)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please always have HttpStatus.TOO_MANY_REQUESTS in the beginning to avoid NPE.

I know there is check on statusCode.isPresent(), but having real value on the left of equal is always a best practice.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - Now we are having list and we are checking with list weather containing status code or not.

private boolean isRateLimited(final HttpStatus status) {
return status == HttpStatus.TOO_MANY_REQUESTS ||
status == HttpStatus.FORBIDDEN ||
status == HttpStatus.SERVICE_UNAVAILABLE;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which vendor mark HttpStatus.SERVICE_UNAVAILABLE as rate limit?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In pingone we are getting Retry-After in case of HttpStatus.SERVICE_UNAVAILABLE
https://apidocs.pingidentity.com/pingone/platform/v1/api/#forward-compatibility-guidance-for-pingone-client-developers

Now by default we are keeping 429 only, from connector to connector we can pass different status codes.

return maxRetries;
}

private boolean isRateLimited(final HttpStatus status) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see this varies connector by connect.

Do you think it makes sense to make it a variable called rateLimitStatusCode?

Same for the default retryHandler

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - Created one list for status code that can be passed via constructor.

*/
@Slf4j
public class RetryAfterHeaderStrategy implements RetryStrategy {
private static final String RATE_LIMIT_REMAINING = "X-RateLimit-Remaining";

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there many vendors using the same HEADER?

If this is just for specific connector, we should move to that connector directory with a specific name.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, most of the vendors sending same headers.

return new RetryDecision(true, Optional.empty());
}

public static RetryDecision stopWithException(RuntimeException exception) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we make it generic to use Exception instead of RuntimeException?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed - changed to exception.

graytaylor0
graytaylor0 previously approved these changes Dec 10, 2025
@graytaylor0

Copy link
Copy Markdown
Member

This is a great change thanks! I see there are merge conflicts in Office365RestClient that need to be resolved.

san81
san81 previously approved these changes Dec 11, 2025
Signed-off-by: eatulban <eatulban@amazon.com>

# Conflicts:
#	data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java
Signed-off-by: eatulban <eatulban@amazon.com>
…performance

Signed-off-by: eatulban <eatulban@amazon.com>
@eatulban eatulban force-pushed the feature/RetryHandler branch from 2cef08d to 5057d70 Compare December 11, 2025 18:16
@graytaylor0

Copy link
Copy Markdown
Member

Thanks for making this change!

@graytaylor0 graytaylor0 merged commit a26ff23 into opensearch-project:main Dec 15, 2025
45 of 47 checks passed
wandna-amazon pushed a commit to wandna-amazon/data-prepper that referenced this pull request Jan 8, 2026
…h-project#6275)

Signed-off-by: eatulban <eatulban@amazon.com>
Signed-off-by: Nathan Wand <wandna@amazon.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
…h-project#6275)

Signed-off-by: eatulban <eatulban@amazon.com>
Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants