Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,34 +491,56 @@ public PipelineServiceClientResponse getServiceStatusInternal() {
if (response.statusCode() == 200) {
JSONObject responseJSON = new JSONObject(response.body());
String ingestionVersion = responseJSON.getString("version");
return validServerClientVersions(ingestionVersion, SERVER_VERSION)
? buildHealthyStatus(ingestionVersion)
: buildUnhealthyStatus(
buildVersionMismatchErrorMessage(ingestionVersion, SERVER_VERSION));
if (validServerClientVersions(ingestionVersion, SERVER_VERSION)) {
return buildHealthyStatus(ingestionVersion);
}
// Version mismatch is a config error, not transient — use 422 to skip retries
return new PipelineServiceClientResponse()
.withCode(422)
.withReason(buildVersionMismatchErrorMessage(ingestionVersion, SERVER_VERSION))
.withPlatform(this.getPlatform());
}

// Auth error when accessing the APIs
// Auth error when accessing the APIs — not retryable (use 401/403 code)
if (response.statusCode() == 401 || response.statusCode() == 403) {
return buildUnhealthyStatus(
String.format(
"Authentication failed for user [%s] trying to access the Airflow APIs at [%s]",
this.username, serviceURL.toString()));
return new PipelineServiceClientResponse()
.withCode(response.statusCode())
.withReason(
String.format(
"Authentication failed for user [%s] trying to access the Airflow APIs at [%s]",
this.username, serviceURL.toString()))
.withPlatform(this.getPlatform());
}

// APIs URL not found
// APIs URL not found — not retryable (use 404 code)
if (response.statusCode() == 404) {
return buildUnhealthyStatus(
String.format(
"Airflow APIs not found at [%s]. Please validate if the OpenMetadata Airflow plugin is installed correctly. %s",
serviceURL.toString(), DOCS_LINK));
return new PipelineServiceClientResponse()
.withCode(response.statusCode())
.withReason(
String.format(
"Airflow APIs not found at [%s]. Please validate if the OpenMetadata Airflow plugin is installed correctly. %s",
serviceURL.toString(), DOCS_LINK))
.withPlatform(this.getPlatform());
}

return buildUnhealthyStatus(
String.format(
"Unexpected status response at [%s]: code [%s] - [%s]",
serviceURL.toString(), response.statusCode(), response.body()));

} catch (IOException | URISyntaxException e) {
return new PipelineServiceClientResponse()
.withCode(response.statusCode())
.withReason(
String.format(
"Unexpected status response at [%s]: code [%s] - [%s]",
serviceURL.toString(), response.statusCode(), response.body()))
.withPlatform(this.getPlatform());

} catch (URISyntaxException e) {
// Malformed URL is a config error, not transient — use 422 to skip retries
return new PipelineServiceClientResponse()
.withCode(422)
.withReason(
String.format(
"Invalid Airflow URL [%s]: %s %s",
serviceURL.toString(), e.getMessage(), DOCS_LINK))
.withPlatform(this.getPlatform());
} catch (IOException e) {
String exceptionMsg;
if (e.getMessage() != null) {
exceptionMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,14 @@ void getServiceStatusReportsAuthFailuresAndMissingPlugins() throws Exception {
server.enqueue("GET", healthPath, 401, "{\"detail\":\"denied\"}");
AirflowRESTClient authFailureClient = newClient(server, basePath);
PipelineServiceClientResponse authFailure = authFailureClient.getServiceStatusInternal();
assertEquals(500, authFailure.getCode());
assertEquals(401, authFailure.getCode());
assertTrue(authFailure.getReason().contains("Authentication failed"));

server.enqueue("GET", healthPath, 200, "{\"version\":\"" + version + "\"}");
server.enqueue("GET", healthPath, 404, "{\"detail\":\"missing\"}");
AirflowRESTClient missingPluginClient = newClient(server, basePath);
PipelineServiceClientResponse missingPlugin = missingPluginClient.getServiceStatusInternal();
assertEquals(500, missingPlugin.getCode());
assertEquals(404, missingPlugin.getCode());
assertTrue(missingPlugin.getReason().contains("Airflow APIs not found"));
}
}
Expand All @@ -249,7 +249,7 @@ void getServiceStatusReportsVersionMismatch() throws Exception {
AirflowRESTClient client = newClient(server, basePath);
PipelineServiceClientResponse status = client.getServiceStatusInternal();

assertEquals(500, status.getCode());
assertEquals(422, status.getCode());
assertTrue(
status.getReason().contains("upgrade your server")
|| status.getReason().contains("upgrade your ingestion client"));
Expand Down Expand Up @@ -500,6 +500,54 @@ void runPipelineThrowsDeploymentExceptionWhenAirflowReturnsAnError() throws Exce
}
}

@Test
void getServiceStatusRecoversFromTransientFailure() throws Exception {
try (AirflowTestServer server = new AirflowTestServer()) {
String basePath = "/airflow";
String healthPath = basePath + "/pluginsv2/api/v2/openmetadata/health-auth";
String version = PipelineServiceClient.getServerVersion();

// First call: detection probe succeeds
server.enqueue("GET", healthPath, 200, "{\"version\":\"" + version + "\"}");
// Second call (first getServiceStatus attempt): transient 500
server.enqueue("GET", healthPath, 500, "{\"error\":\"selector manager closed\"}");
// Third call (retry): healthy response
server.enqueue("GET", healthPath, 200, "{\"version\":\"" + version + "\"}");

AirflowRESTClient client = newClient(server, basePath);

PipelineServiceClientResponse status = client.getServiceStatus();

assertEquals(200, status.getCode());
assertEquals(version, status.getVersion());
List<RequestRecord> healthRequests = server.requests("GET", healthPath);
assertEquals(3, healthRequests.size());
}
Comment thread
RajdeepKushwaha5 marked this conversation as resolved.
}

@Test
void getServiceStatusDoesNotRetryOnAuthError() throws Exception {
try (AirflowTestServer server = new AirflowTestServer()) {
String basePath = "/airflow";
String healthPath = basePath + "/pluginsv2/api/v2/openmetadata/health-auth";
String version = PipelineServiceClient.getServerVersion();

// Detection probe succeeds
server.enqueue("GET", healthPath, 200, "{\"version\":\"" + version + "\"}");
// Auth failure — should NOT be retried
server.enqueue("GET", healthPath, 401, "{\"error\":\"unauthorized\"}");

AirflowRESTClient client = newClient(server, basePath);

PipelineServiceClientResponse status = client.getServiceStatus();

assertEquals(401, status.getCode());
// Only 2 requests: detection probe + single 401 (no retry)
List<RequestRecord> healthRequests = server.requests("GET", healthPath);
assertEquals(2, healthRequests.size());
}
}

@Test
void constructorHandlesStringTimeout() throws Exception {
try (AirflowTestServer server = new AirflowTestServer()) {
Expand Down Expand Up @@ -594,7 +642,12 @@ private static AirflowRESTClient newClient(
config.setApiEndpoint(server.url(basePath));
config.setMetadataApiEndpoint("http://localhost:8585/api");
config.setParameters(parameters);
return new AirflowRESTClient(config);
return new AirflowRESTClient(config) {
@Override
protected long getRetryBackoffMillis() {
return 0L;
}
};
}

private static IngestionPipeline ingestionPipeline(String name, boolean enabled) {
Expand All @@ -609,9 +662,18 @@ private static Map<String, List<String>> cookieHeaders(String... values) {
}

private static void invokePrivate(Object target, String methodName) throws Exception {
Method method = target.getClass().getDeclaredMethod(methodName);
method.setAccessible(true);
method.invoke(target);
Class<?> clazz = target.getClass();
while (clazz != null) {
try {
Method method = clazz.getDeclaredMethod(methodName);
method.setAccessible(true);
method.invoke(target);
return;
} catch (NoSuchMethodException e) {
clazz = clazz.getSuperclass();
}
}
throw new NoSuchMethodException(target.getClass().getName() + "." + methodName + "()");
}

private record RequestRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -66,9 +65,11 @@ public abstract class PipelineServiceClient implements PipelineServiceClientInte
protected static final String CONTENT_HEADER = "Content-Type";
protected static final String CONTENT_TYPE = "application/json";
private static final Integer MAX_ATTEMPTS = 3;
private static final Integer BACKOFF_TIME_SECONDS = 5;
private static final long DEFAULT_BACKOFF_MILLIS = 5_000L;
private static final String DISABLED_STATUS = "disabled";

private volatile Retry serviceStatusRetry;

Comment on lines 67 to +72
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DEFAULT_BACKOFF_MILLIS hardcodes a 5s sleep between retries, meaning getServiceStatus() can block for ~10s on repeated retryable failures. This will directly impact REST endpoints that call this method (e.g., validation/status checks) and can also slow down unit tests that exercise unhealthy paths unless they override getRetryBackoffMillis(). Consider making the backoff configurable via PipelineServiceClientConfiguration (or similar) so production can keep 5s while tests/interactive calls can use a smaller value.

Copilot uses AI. Check for mistakes.
protected static final String SERVER_VERSION;

static {
Expand Down Expand Up @@ -196,38 +197,54 @@ private Response getHostIpInternal() {

/**
* Check the pipeline service status with an exception backoff to make sure we don't raise any
* false positives.
* false positives. Delegates to {@link #getServiceStatus()}, which already retries transient
* failures.
*/
public String getServiceStatusBackoff() {
RetryConfig retryConfig =
RetryConfig.<String>custom()
.maxAttempts(MAX_ATTEMPTS)
.waitDuration(Duration.ofMillis(BACKOFF_TIME_SECONDS * 1_000L))
.retryOnResult(response -> !HEALTHY_STATUS.equals(response))
.failAfterMaxAttempts(false)
.build();

Retry retry = Retry.of("getServiceStatus", retryConfig);

Supplier<String> responseSupplier =
() -> {
try {
PipelineServiceClientResponse status = getServiceStatus();
return status.getCode() != 200 ? UNHEALTHY_STATUS : HEALTHY_STATUS;
} catch (Exception e) {
throw new RuntimeException(e);
}
};

return retry.executeSupplier(responseSupplier);
}

/* Check the status of pipeline service to ensure it is healthy */
PipelineServiceClientResponse status = getServiceStatus();
return status.getCode() != 200 ? UNHEALTHY_STATUS : HEALTHY_STATUS;
}

/**
* Check the status of pipeline service to ensure it is healthy. Retries up to {@link
* #MAX_ATTEMPTS} times with backoff so that a single transient failure (e.g. Airflow restart,
* network blip) does not permanently mark the agent as unavailable. Responses with status codes
* &ge; 500 are retried; known non-transient 4xx responses (401, 403, 404, version mismatch) are
* returned immediately. Any other unexpected HTTP status code is passed through as-is and will
* not trigger retries.
Comment on lines +208 to +214
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc implies that auth/plugin/version mismatch cases will come back as 4xx and therefore bypass retries, but getServiceStatus() only decides retries based on response.getCode() >= 500. Implementations that translate permanent config/auth/RBAC failures into buildUnhealthyStatus(...) (code 500) will still be retried and incur backoff delays. Consider updating the Javadoc to describe the actual contract (implementations must preserve non-retryable 4xx/422 codes), and/or add a protected isRetryable(PipelineServiceClientResponse) hook so clients can explicitly mark permanent failures as non-retryable.

Copilot uses AI. Check for mistakes.
*/
public PipelineServiceClientResponse getServiceStatus() {
if (pipelineServiceClientEnabled) {
return getServiceStatusInternal();
if (!pipelineServiceClientEnabled) {
return buildHealthyStatus(DISABLED_STATUS).withPlatform(DISABLED_STATUS);
}
PipelineServiceClientResponse response =
retryForServiceStatus().executeSupplier(this::getServiceStatusInternal);
Comment on lines 216 to +221
Copy link

Copilot AI Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description says getServiceStatus() now retries when the response code is “not 200”, but the implementation only retries on null or code >= 500. Please align the PR description with the actual behavior, or adjust the retry predicate if the intended behavior is to retry all non-200 responses.

Copilot uses AI. Check for mistakes.
return response != null
? response
: buildUnhealthyStatus("Pipeline service returned no response");
}

/** Returns the wait duration between retry attempts in milliseconds. */
protected long getRetryBackoffMillis() {
return DEFAULT_BACKOFF_MILLIS;
}

private Retry retryForServiceStatus() {
if (serviceStatusRetry == null) {
synchronized (this) {
if (serviceStatusRetry == null) {
var retryConfig =
RetryConfig.<PipelineServiceClientResponse>custom()
.maxAttempts(MAX_ATTEMPTS)
.waitDuration(Duration.ofMillis(getRetryBackoffMillis()))
.retryOnResult(response -> response == null || response.getCode() >= 500)
.failAfterMaxAttempts(false)
.build();
Comment on lines +236 to +242
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says getServiceStatus() retries when the response code is not 200, but the current predicate only retries on response == null or code >= 500 (and will pass through 4xx/429/etc without retry). Please either update the PR description / method docs to match the actual behavior, or broaden the retry predicate to include the intended retryable non-200 statuses (e.g., 429/408) if that was the goal.

Copilot uses AI. Check for mistakes.
serviceStatusRetry = Retry.of("getServiceStatus", retryConfig);
}
}
}
return buildHealthyStatus(DISABLED_STATUS).withPlatform(DISABLED_STATUS);
return serviceStatusRetry;
}

public List<PipelineStatus> getQueuedPipelineStatus(IngestionPipeline ingestionPipeline) {
Expand Down
Loading