-
Notifications
You must be signed in to change notification settings - Fork 2k
Fixes #25666: Add retry-with-backoff to getServiceStatus for transient failure recovery #27112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b0f9406
0b675f3
0686407
11381b3
9a5042c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,6 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Properties; | ||
| import java.util.function.Supplier; | ||
| import java.util.regex.Pattern; | ||
| import lombok.Getter; | ||
| import lombok.Setter; | ||
|
|
@@ -66,9 +65,11 @@ public abstract class PipelineServiceClient implements PipelineServiceClientInte | |
| protected static final String CONTENT_HEADER = "Content-Type"; | ||
| protected static final String CONTENT_TYPE = "application/json"; | ||
| private static final Integer MAX_ATTEMPTS = 3; | ||
| private static final Integer BACKOFF_TIME_SECONDS = 5; | ||
| private static final long DEFAULT_BACKOFF_MILLIS = 5_000L; | ||
| private static final String DISABLED_STATUS = "disabled"; | ||
|
|
||
| private volatile Retry serviceStatusRetry; | ||
|
|
||
|
Comment on lines
67
to
+72
|
||
| protected static final String SERVER_VERSION; | ||
|
|
||
| static { | ||
|
|
@@ -196,38 +197,54 @@ private Response getHostIpInternal() { | |
|
|
||
| /** | ||
| * Check the pipeline service status with an exception backoff to make sure we don't raise any | ||
| * false positives. | ||
| * false positives. Delegates to {@link #getServiceStatus()}, which already retries transient | ||
| * failures. | ||
| */ | ||
| public String getServiceStatusBackoff() { | ||
| RetryConfig retryConfig = | ||
| RetryConfig.<String>custom() | ||
| .maxAttempts(MAX_ATTEMPTS) | ||
| .waitDuration(Duration.ofMillis(BACKOFF_TIME_SECONDS * 1_000L)) | ||
| .retryOnResult(response -> !HEALTHY_STATUS.equals(response)) | ||
| .failAfterMaxAttempts(false) | ||
| .build(); | ||
|
|
||
| Retry retry = Retry.of("getServiceStatus", retryConfig); | ||
|
|
||
| Supplier<String> responseSupplier = | ||
| () -> { | ||
| try { | ||
| PipelineServiceClientResponse status = getServiceStatus(); | ||
| return status.getCode() != 200 ? UNHEALTHY_STATUS : HEALTHY_STATUS; | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| }; | ||
|
|
||
| return retry.executeSupplier(responseSupplier); | ||
| } | ||
|
|
||
| /* Check the status of pipeline service to ensure it is healthy */ | ||
| PipelineServiceClientResponse status = getServiceStatus(); | ||
| return status.getCode() != 200 ? UNHEALTHY_STATUS : HEALTHY_STATUS; | ||
| } | ||
|
|
||
| /** | ||
| * Check the status of pipeline service to ensure it is healthy. Retries up to {@link | ||
| * #MAX_ATTEMPTS} times with backoff so that a single transient failure (e.g. Airflow restart, | ||
| * network blip) does not permanently mark the agent as unavailable. Responses with status codes | ||
| * ≥ 500 are retried; known non-transient 4xx responses (401, 403, 404, version mismatch) are | ||
| * returned immediately. Any other unexpected HTTP status code is passed through as-is and will | ||
| * not trigger retries. | ||
|
Comment on lines
+208
to
+214
|
||
| */ | ||
| public PipelineServiceClientResponse getServiceStatus() { | ||
| if (pipelineServiceClientEnabled) { | ||
| return getServiceStatusInternal(); | ||
| if (!pipelineServiceClientEnabled) { | ||
| return buildHealthyStatus(DISABLED_STATUS).withPlatform(DISABLED_STATUS); | ||
| } | ||
| PipelineServiceClientResponse response = | ||
| retryForServiceStatus().executeSupplier(this::getServiceStatusInternal); | ||
|
Comment on lines
216
to
+221
|
||
| return response != null | ||
| ? response | ||
| : buildUnhealthyStatus("Pipeline service returned no response"); | ||
| } | ||
|
|
||
| /** Returns the wait duration between retry attempts in milliseconds. */ | ||
| protected long getRetryBackoffMillis() { | ||
| return DEFAULT_BACKOFF_MILLIS; | ||
| } | ||
|
|
||
| private Retry retryForServiceStatus() { | ||
| if (serviceStatusRetry == null) { | ||
| synchronized (this) { | ||
| if (serviceStatusRetry == null) { | ||
| var retryConfig = | ||
| RetryConfig.<PipelineServiceClientResponse>custom() | ||
| .maxAttempts(MAX_ATTEMPTS) | ||
| .waitDuration(Duration.ofMillis(getRetryBackoffMillis())) | ||
| .retryOnResult(response -> response == null || response.getCode() >= 500) | ||
| .failAfterMaxAttempts(false) | ||
| .build(); | ||
|
Comment on lines
+236
to
+242
|
||
| serviceStatusRetry = Retry.of("getServiceStatus", retryConfig); | ||
| } | ||
| } | ||
| } | ||
| return buildHealthyStatus(DISABLED_STATUS).withPlatform(DISABLED_STATUS); | ||
| return serviceStatusRetry; | ||
| } | ||
|
|
||
| public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.