Skip to content

Commit f35c0ff

Browse files
committed
Refactor Retry Handler To Move Into Source Crawler Package
Signed-off-by: eatulban <eatulban@amazon.com> # Conflicts: # data-prepper-plugins/saas-source-plugins/microsoft-office365-source/src/main/java/org/opensearch/dataprepper/plugins/source/microsoft_office365/Office365RestClient.java
1 parent 8231699 commit f35c0ff

2 files changed

Lines changed: 466 additions & 0 deletions

File tree

  • data-prepper-plugins/saas-source-plugins/source-crawler/src
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.source_crawler.utils;
12+
13+
import io.micrometer.core.instrument.Counter;
14+
import lombok.extern.slf4j.Slf4j;
15+
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.RetryDecision;
16+
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.RetryStrategy;
17+
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.StatusCodeHandler;
18+
import org.springframework.web.client.HttpClientErrorException;
19+
import org.springframework.web.client.HttpServerErrorException;
20+
21+
import java.util.Optional;
22+
import java.util.function.Supplier;
23+
24+
@Slf4j
25+
public class RetryHandler {
26+
private final RetryStrategy retryStrategy;
27+
private final StatusCodeHandler statusCodeHandler;
28+
29+
/**
30+
* Constructor
31+
*
32+
* @param retryStrategy Strategy for determining retry behavior
33+
* @param statusCodeHandler Handler for HTTP status codes
34+
*/
35+
public RetryHandler(RetryStrategy retryStrategy, StatusCodeHandler statusCodeHandler) {
36+
this.retryStrategy = retryStrategy;
37+
this.statusCodeHandler = statusCodeHandler;
38+
}
39+
40+
/**
41+
* Executes the given operation with retry logic, optional credential renewal,
42+
* and failure counter.
43+
*
44+
* @param operation The operation to execute.
45+
* @param credentialRenewal The action to renew credentials if needed.
46+
*
47+
* @param <T> The return type of the operation.
48+
* @return The result of the operation.
49+
*/
50+
public <T> T executeWithRetry(Supplier<T> operation, Runnable credentialRenewal) {
51+
return executeWithRetry(operation, credentialRenewal, Optional.empty());
52+
}
53+
54+
/**
55+
* Executes the given operation with retry logic, optional credential renewal,
56+
* and failure counter.
57+
*
58+
* @param operation The operation to execute.
59+
* @param credentialRenewal The action to renew credentials if needed.
60+
* @param failureCounter The counter to increment on each failed attempt
61+
* (optional).
62+
* @param <T> The return type of the operation.
63+
* @return The result of the operation.
64+
*/
65+
public <T> T executeWithRetry(Supplier<T> operation, Runnable credentialRenewal, Optional<Counter> failureCounter) {
66+
if (operation == null) {
67+
throw new IllegalArgumentException("Operation cannot be null");
68+
}
69+
if (credentialRenewal == null) {
70+
throw new IllegalArgumentException("Credential renewal cannot be null");
71+
}
72+
73+
final int maxRetries = retryStrategy.getMaxRetries();
74+
int retryCount = 0;
75+
76+
while (retryCount < maxRetries) {
77+
boolean operationSucceeded = false;
78+
try {
79+
T result = operation.get();
80+
operationSucceeded = true;
81+
return result;
82+
} catch (HttpClientErrorException | HttpServerErrorException ex) {
83+
RetryDecision decision = statusCodeHandler.handleStatusCode(
84+
ex, retryCount, credentialRenewal);
85+
86+
if (decision.isShouldStop()) {
87+
if (decision.getException() != null) {
88+
throw decision.getException();
89+
}
90+
throw ex;
91+
}
92+
93+
if (retryCount == maxRetries - 1) {
94+
log.error("Exceeded maximum retry attempts ({})", maxRetries, ex);
95+
throw ex;
96+
}
97+
98+
// Calculate sleep time and wait
99+
long sleepTimeMs = retryStrategy.calculateSleepTime(ex, retryCount);
100+
sleep(sleepTimeMs);
101+
} finally {
102+
if (!operationSucceeded) {
103+
failureCounter.ifPresent(Counter::increment);
104+
}
105+
}
106+
retryCount++;
107+
}
108+
throw new RuntimeException("Exceeded maximum retry attempts (" + maxRetries + ")");
109+
}
110+
111+
private void sleep(long milliseconds) {
112+
try {
113+
Thread.sleep(milliseconds);
114+
} catch (InterruptedException ie) {
115+
Thread.currentThread().interrupt();
116+
throw new RuntimeException("Retry interrupted", ie);
117+
}
118+
}
119+
}

0 commit comments

Comments
 (0)