Skip to content

Commit 0d023dc

Browse files
Fixes #25666: Add retry-with-backoff to getServiceStatus for transient failure recovery
1 parent 35fbefa commit 0d023dc

File tree

3 files changed

+122
-44
lines changed

3 files changed

+122
-44
lines changed

openmetadata-service/src/main/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClient.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -458,26 +458,36 @@ public PipelineServiceClientResponse getServiceStatusInternal() {
458458
if (response.statusCode() == 200) {
459459
JSONObject responseJSON = new JSONObject(response.body());
460460
String ingestionVersion = responseJSON.getString("version");
461-
return validServerClientVersions(ingestionVersion, SERVER_VERSION)
462-
? buildHealthyStatus(ingestionVersion)
463-
: buildUnhealthyStatus(
464-
buildVersionMismatchErrorMessage(ingestionVersion, SERVER_VERSION));
461+
if (validServerClientVersions(ingestionVersion, SERVER_VERSION)) {
462+
return buildHealthyStatus(ingestionVersion);
463+
}
464+
// Version mismatch is a config error, not transient — use 422 to skip retries
465+
return new PipelineServiceClientResponse()
466+
.withCode(422)
467+
.withReason(buildVersionMismatchErrorMessage(ingestionVersion, SERVER_VERSION))
468+
.withPlatform(this.getPlatform());
465469
}
466470

467-
// Auth error when accessing the APIs
471+
// Auth error when accessing the APIs — not retryable (use 401/403 code)
468472
if (response.statusCode() == 401 || response.statusCode() == 403) {
469-
return buildUnhealthyStatus(
470-
String.format(
471-
"Authentication failed for user [%s] trying to access the Airflow APIs at [%s]",
472-
this.username, serviceURL.toString()));
473+
return new PipelineServiceClientResponse()
474+
.withCode(response.statusCode())
475+
.withReason(
476+
String.format(
477+
"Authentication failed for user [%s] trying to access the Airflow APIs at [%s]",
478+
this.username, serviceURL.toString()))
479+
.withPlatform(this.getPlatform());
473480
}
474481

475-
// APIs URL not found
482+
// APIs URL not found — not retryable (use 404 code)
476483
if (response.statusCode() == 404) {
477-
return buildUnhealthyStatus(
478-
String.format(
479-
"Airflow APIs not found at [%s]. Please validate if the OpenMetadata Airflow plugin is installed correctly. %s",
480-
serviceURL.toString(), DOCS_LINK));
484+
return new PipelineServiceClientResponse()
485+
.withCode(response.statusCode())
486+
.withReason(
487+
String.format(
488+
"Airflow APIs not found at [%s]. Please validate if the OpenMetadata Airflow plugin is installed correctly. %s",
489+
serviceURL.toString(), DOCS_LINK))
490+
.withPlatform(this.getPlatform());
481491
}
482492

483493
return buildUnhealthyStatus(

openmetadata-service/src/test/java/org/openmetadata/service/clients/pipeline/airflow/AirflowRESTClientTest.java

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -224,14 +224,14 @@ void getServiceStatusReportsAuthFailuresAndMissingPlugins() throws Exception {
224224
server.enqueue("GET", healthPath, 401, "{\"detail\":\"denied\"}");
225225
AirflowRESTClient authFailureClient = newClient(server, basePath);
226226
PipelineServiceClientResponse authFailure = authFailureClient.getServiceStatusInternal();
227-
assertEquals(500, authFailure.getCode());
227+
assertEquals(401, authFailure.getCode());
228228
assertTrue(authFailure.getReason().contains("Authentication failed"));
229229

230230
server.enqueue("GET", healthPath, 200, "{\"version\":\"" + version + "\"}");
231231
server.enqueue("GET", healthPath, 404, "{\"detail\":\"missing\"}");
232232
AirflowRESTClient missingPluginClient = newClient(server, basePath);
233233
PipelineServiceClientResponse missingPlugin = missingPluginClient.getServiceStatusInternal();
234-
assertEquals(500, missingPlugin.getCode());
234+
assertEquals(404, missingPlugin.getCode());
235235
assertTrue(missingPlugin.getReason().contains("Airflow APIs not found"));
236236
}
237237
}
@@ -249,7 +249,7 @@ void getServiceStatusReportsVersionMismatch() throws Exception {
249249
AirflowRESTClient client = newClient(server, basePath);
250250
PipelineServiceClientResponse status = client.getServiceStatusInternal();
251251

252-
assertEquals(500, status.getCode());
252+
assertEquals(422, status.getCode());
253253
assertTrue(
254254
status.getReason().contains("upgrade your server")
255255
|| status.getReason().contains("upgrade your ingestion client"));
@@ -500,6 +500,54 @@ void runPipelineThrowsDeploymentExceptionWhenAirflowReturnsAnError() throws Exce
500500
}
501501
}
502502

503+
@Test
504+
void getServiceStatusRecoversFromTransientFailure() throws Exception {
505+
try (AirflowTestServer server = new AirflowTestServer()) {
506+
String basePath = "/airflow";
507+
String healthPath = basePath + "/pluginsv2/api/v2/openmetadata/health-auth";
508+
String version = PipelineServiceClient.getServerVersion();
509+
510+
// First call: detection probe succeeds
511+
server.enqueue("GET", healthPath, 200, "{\"version\":\"" + version + "\"}");
512+
// Second call (first getServiceStatus attempt): transient 500
513+
server.enqueue("GET", healthPath, 500, "{\"error\":\"selector manager closed\"}");
514+
// Third call (retry): healthy response
515+
server.enqueue("GET", healthPath, 200, "{\"version\":\"" + version + "\"}");
516+
517+
AirflowRESTClient client = newClient(server, basePath);
518+
519+
PipelineServiceClientResponse status = client.getServiceStatus();
520+
521+
assertEquals(200, status.getCode());
522+
assertEquals(version, status.getVersion());
523+
List<RequestRecord> healthRequests = server.requests("GET", healthPath);
524+
assertEquals(3, healthRequests.size());
525+
}
526+
}
527+
528+
@Test
529+
void getServiceStatusDoesNotRetryOnAuthError() throws Exception {
530+
try (AirflowTestServer server = new AirflowTestServer()) {
531+
String basePath = "/airflow";
532+
String healthPath = basePath + "/pluginsv2/api/v2/openmetadata/health-auth";
533+
String version = PipelineServiceClient.getServerVersion();
534+
535+
// Detection probe succeeds
536+
server.enqueue("GET", healthPath, 200, "{\"version\":\"" + version + "\"}");
537+
// Auth failure — should NOT be retried
538+
server.enqueue("GET", healthPath, 401, "{\"error\":\"unauthorized\"}");
539+
540+
AirflowRESTClient client = newClient(server, basePath);
541+
542+
PipelineServiceClientResponse status = client.getServiceStatus();
543+
544+
assertEquals(401, status.getCode());
545+
// Only 2 requests: detection probe + single 401 (no retry)
546+
List<RequestRecord> healthRequests = server.requests("GET", healthPath);
547+
assertEquals(2, healthRequests.size());
548+
}
549+
}
550+
503551
private static AirflowRESTClient newClient(AirflowTestServer server, String basePath)
504552
throws KeyStoreException {
505553
Parameters parameters = new Parameters();
@@ -512,7 +560,12 @@ private static AirflowRESTClient newClient(AirflowTestServer server, String base
512560
config.setApiEndpoint(server.url(basePath));
513561
config.setMetadataApiEndpoint("http://localhost:8585/api");
514562
config.setParameters(parameters);
515-
return new AirflowRESTClient(config);
563+
return new AirflowRESTClient(config) {
564+
@Override
565+
protected long getRetryBackoffMillis() {
566+
return 0L;
567+
}
568+
};
516569
}
517570

518571
private static IngestionPipeline ingestionPipeline(String name, boolean enabled) {
@@ -527,9 +580,18 @@ private static Map<String, List<String>> cookieHeaders(String... values) {
527580
}
528581

529582
private static void invokePrivate(Object target, String methodName) throws Exception {
530-
Method method = target.getClass().getDeclaredMethod(methodName);
531-
method.setAccessible(true);
532-
method.invoke(target);
583+
Class<?> clazz = target.getClass();
584+
while (clazz != null) {
585+
try {
586+
Method method = clazz.getDeclaredMethod(methodName);
587+
method.setAccessible(true);
588+
method.invoke(target);
589+
return;
590+
} catch (NoSuchMethodException e) {
591+
clazz = clazz.getSuperclass();
592+
}
593+
}
594+
throw new NoSuchMethodException(target.getClass().getName() + "." + methodName + "()");
533595
}
534596

535597
private record RequestRecord(

openmetadata-spec/src/main/java/org/openmetadata/service/clients/pipeline/PipelineServiceClient.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.List;
2828
import java.util.Map;
2929
import java.util.Properties;
30-
import java.util.function.Supplier;
3130
import java.util.regex.Pattern;
3231
import lombok.Getter;
3332
import lombok.Setter;
@@ -66,7 +65,7 @@ public abstract class PipelineServiceClient implements PipelineServiceClientInte
6665
protected static final String CONTENT_HEADER = "Content-Type";
6766
protected static final String CONTENT_TYPE = "application/json";
6867
private static final Integer MAX_ATTEMPTS = 3;
69-
private static final Integer BACKOFF_TIME_SECONDS = 5;
68+
private static final long DEFAULT_BACKOFF_MILLIS = 5_000L;
7069
private static final String DISABLED_STATUS = "disabled";
7170

7271
protected static final String SERVER_VERSION;
@@ -196,38 +195,45 @@ private Response getHostIpInternal() {
196195

197196
/**
198197
* Check the pipeline service status with an exception backoff to make sure we don't raise any
199-
* false positives.
198+
* false positives. Delegates to {@link #getServiceStatus()}, which already retries transient
199+
* failures.
200200
*/
201201
public String getServiceStatusBackoff() {
202+
PipelineServiceClientResponse status = getServiceStatus();
203+
return status.getCode() != 200 ? UNHEALTHY_STATUS : HEALTHY_STATUS;
204+
}
205+
206+
/**
207+
* Check the status of pipeline service to ensure it is healthy. Retries up to {@link
208+
* #MAX_ATTEMPTS} times with backoff so that a single transient failure (e.g. Airflow restart,
209+
* network blip) does not permanently mark the agent as unavailable. Responses with status codes
210+
* &ge; 500 are retried; 4xx responses (auth, config, version mismatch) are returned immediately.
211+
* Thrown exceptions are also retried by the underlying Resilience4j configuration.
212+
*/
213+
public PipelineServiceClientResponse getServiceStatus() {
214+
if (!pipelineServiceClientEnabled) {
215+
return buildHealthyStatus(DISABLED_STATUS).withPlatform(DISABLED_STATUS);
216+
}
217+
202218
RetryConfig retryConfig =
203-
RetryConfig.<String>custom()
219+
RetryConfig.<PipelineServiceClientResponse>custom()
204220
.maxAttempts(MAX_ATTEMPTS)
205-
.waitDuration(Duration.ofMillis(BACKOFF_TIME_SECONDS * 1_000L))
206-
.retryOnResult(response -> !HEALTHY_STATUS.equals(response))
221+
.waitDuration(Duration.ofMillis(getRetryBackoffMillis()))
222+
.retryOnResult(response -> response != null && response.getCode() >= 500)
207223
.failAfterMaxAttempts(false)
208224
.build();
209225

210226
Retry retry = Retry.of("getServiceStatus", retryConfig);
211227

212-
Supplier<String> responseSupplier =
213-
() -> {
214-
try {
215-
PipelineServiceClientResponse status = getServiceStatus();
216-
return status.getCode() != 200 ? UNHEALTHY_STATUS : HEALTHY_STATUS;
217-
} catch (Exception e) {
218-
throw new RuntimeException(e);
219-
}
220-
};
221-
222-
return retry.executeSupplier(responseSupplier);
228+
PipelineServiceClientResponse response = retry.executeSupplier(this::getServiceStatusInternal);
229+
return response != null
230+
? response
231+
: buildUnhealthyStatus("Pipeline service returned no response");
223232
}
224233

225-
/* Check the status of pipeline service to ensure it is healthy */
226-
public PipelineServiceClientResponse getServiceStatus() {
227-
if (pipelineServiceClientEnabled) {
228-
return getServiceStatusInternal();
229-
}
230-
return buildHealthyStatus(DISABLED_STATUS).withPlatform(DISABLED_STATUS);
234+
/** Returns the wait duration between retry attempts in milliseconds. */
235+
protected long getRetryBackoffMillis() {
236+
return DEFAULT_BACKOFF_MILLIS;
231237
}
232238

233239
public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {

0 commit comments

Comments
 (0)