Skip to content

Commit 68791c8

Browse files
Fixes #25666: Add retry-with-backoff to getServiceStatus for transient failure recovery
1 parent 9da7bea commit 68791c8

2 files changed

Lines changed: 45 additions & 23 deletions

File tree

openmetadata-service/src/test/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClientTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,31 @@ void runPipelineThrowsDeploymentExceptionWhenAirflowReturnsAnError() throws Exce
500500
}
501501
}
502502

503+
@Test
504+
void getServiceStatusRecoversFromTransientFailure() throws Exception {
505+
try (AirflowTestServer server = new AirflowTestServer()) {
506+
String basePath = "/airflow";
507+
String healthPath = basePath + "/pluginsv2/api/v2/openmetadata/health-auth";
508+
String version = PipelineServiceClient.getServerVersion();
509+
510+
// First call: detection probe succeeds
511+
server.enqueue("GET", healthPath, 200, "{\"version\":\"" + version + "\"}");
512+
// Second call (first getServiceStatus attempt): transient 500
513+
server.enqueue("GET", healthPath, 500, "{\"error\":\"selector manager closed\"}");
514+
// Third call (retry): healthy response
515+
server.enqueue("GET", healthPath, 200, "{\"version\":\"" + version + "\"}");
516+
517+
AirflowRESTClient client = newClient(server, basePath);
518+
519+
PipelineServiceClientResponse status = client.getServiceStatus();
520+
521+
assertEquals(200, status.getCode());
522+
assertEquals(version, status.getVersion());
523+
List<RequestRecord> healthRequests = server.requests("GET", healthPath);
524+
assertEquals(3, healthRequests.size());
525+
}
526+
}
527+
503528
private static AirflowRESTClient newClient(AirflowTestServer server, String basePath)
504529
throws KeyStoreException {
505530
Parameters parameters = new Parameters();

openmetadata-spec/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClient.java

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import java.util.Properties;
30-
import java.util.function.Supplier;
3130
import java.util.regex.Pattern;
3231
import lombok.Getter;
3332
import lombok.Setter;
@@ -196,38 +195,36 @@ private Response getHostIpInternal() {
196195

197196
/**
198197
* Check the pipeline service status with an exception backoff to make sure we don't raise any
199-
* false positives.
198+
* false positives. Delegates to {@link #getServiceStatus()}, which already retries transient
199+
* failures.
200200
*/
201201
public String getServiceStatusBackoff() {
202+
PipelineServiceClientResponse status = getServiceStatus();
203+
return status.getCode() != 200 ? UNHEALTHY_STATUS : HEALTHY_STATUS;
204+
}
205+
206+
/**
207+
* Check the status of pipeline service to ensure it is healthy. Retries up to {@link
208+
* #MAX_ATTEMPTS} times with {@link #BACKOFF_TIME_SECONDS}-second backoff so that a single
209+
* transient failure (e.g. Airflow restart, network blip) does not permanently mark the agent as
210+
* unavailable.
211+
*/
212+
public PipelineServiceClientResponse getServiceStatus() {
213+
if (!pipelineServiceClientEnabled) {
214+
return buildHealthyStatus(DISABLED_STATUS).withPlatform(DISABLED_STATUS);
215+
}
216+
202217
RetryConfig retryConfig =
203-
RetryConfig.<String>custom()
218+
RetryConfig.<PipelineServiceClientResponse>custom()
204219
.maxAttempts(MAX_ATTEMPTS)
205220
.waitDuration(Duration.ofMillis(BACKOFF_TIME_SECONDS * 1_000L))
206-
.retryOnResult(response -> !HEALTHY_STATUS.equals(response))
221+
.retryOnResult(response -> response.getCode() != 200)
207222
.failAfterMaxAttempts(false)
208223
.build();
209224

210225
Retry retry = Retry.of("getServiceStatus", retryConfig);
211226

212-
Supplier<String> responseSupplier =
213-
() -> {
214-
try {
215-
PipelineServiceClientResponse status = getServiceStatus();
216-
return status.getCode() != 200 ? UNHEALTHY_STATUS : HEALTHY_STATUS;
217-
} catch (Exception e) {
218-
throw new RuntimeException(e);
219-
}
220-
};
221-
222-
return retry.executeSupplier(responseSupplier);
223-
}
224-
225-
/* Check the status of pipeline service to ensure it is healthy */
226-
public PipelineServiceClientResponse getServiceStatus() {
227-
if (pipelineServiceClientEnabled) {
228-
return getServiceStatusInternal();
229-
}
230-
return buildHealthyStatus(DISABLED_STATUS).withPlatform(DISABLED_STATUS);
227+
return retry.executeSupplier(this::getServiceStatusInternal);
231228
}
232229

233230
public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {

0 commit comments

Comments
 (0)