fix(it): Stabilize Flaky integration tests#27546
Conversation
- TagResourceIT.test_searchTagByClassificationDisplayName: raise Awaitility
timeout from 30s to 90s — under full-suite concurrent load the tag search
index can lag well past 30s before the tag is discoverable by classification
display name
- GlossaryOntologyExportIT.testExportGlossaryAsRdfXml: replace legacy
model.write("RDF/XML") with RDFDataMgr.write(RDFXML_PLAIN) — the legacy
Jena API attempts external DTD/entity resolution from w3.org, hanging ~104s
in network-isolated CI before the client times out at 60s; RIOT writes
purely in-memory with no network I/O
- SearchResourceIT.testExportWithFromAndSizeForPagination: add _id as a
final tiebreaker sort on export requests in both ElasticSearch and
OpenSearch managers; from/size pagination without a unique tiebreaker
produces duplicate rows across pages when concurrent CI tests mutate the
same index between requests; also deduplicate the redundant name.keyword
secondary sort when the caller already sorts by name.keyword
There was a problem hiding this comment.
Pull request overview
This PR targets flaky integration tests by hardening two eventual-consistency/pagination behaviors in search and by switching RDF/XML serialization to a non-networking Jena writer.
Changes:
- Add a deterministic tiebreaker sort for export searches and avoid redundant
name.keywordsecondary sort when already sorting byname.keyword. - Switch glossary ontology export serialization to Jena RIOT (
RDFDataMgr) with explicitRDFFormatselection. - Increase Awaitility timeout for tag search-by-classification-display-name test to tolerate slower indexing in CI.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchSearchManager.java | Adds export sort stabilization (secondary sort + tiebreaker). |
| openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchSearchManager.java | Mirrors export sort stabilization for Elasticsearch. |
| openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfRepository.java | Uses RIOT writer + RDFFormat to avoid RDF/XML hangs from legacy writer behavior. |
| openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TagResourceIT.java | Extends Awaitility window for search indexing lag. |
| if (!sortField.equalsIgnoreCase("name.keyword")) { | ||
| requestBuilder.sort("name.keyword", SortOrder.Asc, SORT_TYPE_KEYWORD); | ||
| } | ||
| requestBuilder.sort("_id", SortOrder.Asc, null); |
| if (!sortField.equalsIgnoreCase("name.keyword")) { | ||
| requestBuilder.sort("name.keyword", SortOrder.Asc, SORT_TYPE_KEYWORD); | ||
| } | ||
| requestBuilder.sort("_id", SortOrder.Asc, null); |
_id is an Elasticsearch meta-field that requires fielddata to sort on, disabled by default. Use the indexed id.keyword sub-field instead, which is a proper keyword field with doc values and is sortable without any cluster setting changes.
There was a problem hiding this comment.
Pull request overview
Stabilizes several flaky integration tests by addressing eventual-consistency timing, RDF/XML export serialization hangs in network-isolated CI, and non-deterministic export pagination caused by non-unique sorting.
Changes:
- Add a deterministic
id.keywordtiebreaker sort (and avoid duplicatingname.keyword) for export/score-based searches in both Elasticsearch and OpenSearch managers. - Switch glossary ontology export serialization to Jena RIOT (
RDFDataMgr.write) and map requested formats toRDFFormatconstants. - Increase Awaitility timeout in
TagResourceITto accommodate slower indexing under parallel CI load.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchSearchManager.java | Stabilizes export pagination by adding deterministic secondary sorts and avoiding redundant name.keyword sort. |
| openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchSearchManager.java | Mirrors the same deterministic export sorting behavior for Elasticsearch. |
| openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfRepository.java | Uses RIOT serialization to prevent RDF/XML export hangs caused by external resolution. |
| openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TagResourceIT.java | Raises Awaitility timeout to reduce flakiness from search indexing lag. |
| if (!sortField.equalsIgnoreCase("name.keyword")) { | ||
| requestBuilder.sort("name.keyword", SortOrder.Asc, SORT_TYPE_KEYWORD); | ||
| } | ||
| requestBuilder.sort("id.keyword", SortOrder.Asc, SORT_TYPE_KEYWORD); |
| if (sortField.equalsIgnoreCase(SORT_FIELD_SCORE) || isExport) { | ||
| requestBuilder.sort("name.keyword", SortOrder.Asc, SORT_TYPE_KEYWORD); | ||
| if (!sortField.equalsIgnoreCase("name.keyword")) { | ||
| requestBuilder.sort("name.keyword", SortOrder.Asc, SORT_TYPE_KEYWORD); | ||
| } | ||
| requestBuilder.sort("id.keyword", SortOrder.Asc, SORT_TYPE_KEYWORD); | ||
| } |
| if (!sortField.equalsIgnoreCase("name.keyword")) { | ||
| requestBuilder.sort("name.keyword", SortOrder.Asc, SORT_TYPE_KEYWORD); | ||
| } | ||
| requestBuilder.sort("id.keyword", SortOrder.Asc, SORT_TYPE_KEYWORD); |
| RDFFormat rdfFormat = | ||
| switch (format.toLowerCase()) { | ||
| case "rdfxml", "xml" -> "RDF/XML"; | ||
| case "ntriples", "nt" -> "N-TRIPLES"; | ||
| case "jsonld", "json-ld" -> "JSON-LD"; | ||
| default -> "TURTLE"; | ||
| case "rdfxml", "xml" -> RDFFormat.RDFXML_PLAIN; | ||
| case "ntriples", "nt" -> RDFFormat.NTRIPLES; | ||
| case "jsonld", "json-ld" -> RDFFormat.JSONLD; | ||
| default -> RDFFormat.TURTLE_FLAT; | ||
| }; | ||
|
|
||
| model.write(writer, rdfFormat); | ||
| RDFDataMgr.write(writer, model, rdfFormat); | ||
| return writer.toString(); |
| } finally { | ||
| deleteIndexIfExists(searchClient, canonicalIndex); | ||
| Request recreateRequest = new Request("PUT", "/" + canonicalIndex); | ||
| searchClient.performRequest(recreateRequest); | ||
| Entity.getSearchRepository() | ||
| .createIndex(Entity.getSearchRepository().getEntityIndexMap().get("tag")); | ||
| } |
| Request recreateRequest = new Request("PUT", "/" + canonicalIndex); | ||
| searchClient.performRequest(recreateRequest); | ||
| Entity.getSearchRepository() | ||
| .createIndex(Entity.getSearchRepository().getEntityIndexMap().get("tag")); |
| Awaitility.await("Page 1 and page 2 must return different rows") | ||
| .atMost(15, TimeUnit.SECONDS) | ||
| .pollInterval(500, TimeUnit.MILLISECONDS) | ||
| .untilAsserted( | ||
| () -> { |
…search index testDocUpdateOnDeletedIndexUsesTemplateNotAutoInference was deleting the production openmetadata_tag_search_index backing index, racing with TagResourceIT.test_searchTagByClassificationDisplayName which polls that index for 90s. Use a test-scoped index name matching the template pattern instead, consistent with the other tests in this class.
| void testDocUpdateOnDeletedIndexUsesTemplateNotAutoInference(TestNamespace ns) throws Exception { | ||
| Rest5Client searchClient = TestSuiteBootstrap.createSearchClient(); | ||
| String canonicalIndex = CLUSTER_ALIAS + "_tag_search_index"; | ||
| String testIndexName = CLUSTER_ALIAS + "_tag_search_index_rebuild_it_doc_update"; | ||
|
|
||
| assertNotNull( | ||
| getMappingsForIndex(searchClient, canonicalIndex), "Original index should have mappings"); | ||
|
|
||
| String realIndexName = resolveActualIndexName(searchClient, canonicalIndex); | ||
| deleteIndexIfExists(searchClient, realIndexName); | ||
|
|
||
| try { | ||
| Request existsRequest = new Request("HEAD", "/" + canonicalIndex); | ||
| Response existsResponse = searchClient.performRequest(existsRequest); | ||
| assertEquals(404, existsResponse.getStatusCode(), "Index should not exist after deletion"); | ||
| } catch (Exception e) { | ||
| assertTrue(e.getMessage().contains("404"), "Index/alias should not exist after deletion"); | ||
| } | ||
| deleteIndexIfExists(searchClient, testIndexName); |
There was a problem hiding this comment.
Fixed — the PR description now reflects the actual approach: the test was rewritten to use a test-scoped index name (openmetadata_tag_search_index_rebuild_it_doc_update) so the production tag index is never touched. No SearchRepository.createIndex() call is involved.
The production SearchIndexRetryWorker (4 daemon threads, 5s poll) races the test by calling the same global claimPending SQL. Replace the brittle size-based assertion with an Awaitility loop that checks claimedAt != null for each inserted record — proving claimPending's SQL filter accepted the record's status regardless of which thread won the race.
…/fix-flaky-it-tests
| void testDocUpdateOnDeletedIndexUsesTemplateNotAutoInference(TestNamespace ns) throws Exception { | ||
| Rest5Client searchClient = TestSuiteBootstrap.createSearchClient(); | ||
| String canonicalIndex = CLUSTER_ALIAS + "_tag_search_index"; | ||
| String testIndexName = CLUSTER_ALIAS + "_tag_search_index_rebuild_it_doc_update"; | ||
|
|
||
| assertNotNull( | ||
| getMappingsForIndex(searchClient, canonicalIndex), "Original index should have mappings"); | ||
|
|
||
| String realIndexName = resolveActualIndexName(searchClient, canonicalIndex); | ||
| deleteIndexIfExists(searchClient, realIndexName); | ||
|
|
||
| try { | ||
| Request existsRequest = new Request("HEAD", "/" + canonicalIndex); | ||
| Response existsResponse = searchClient.performRequest(existsRequest); | ||
| assertEquals(404, existsResponse.getStatusCode(), "Index should not exist after deletion"); | ||
| } catch (Exception e) { | ||
| assertTrue(e.getMessage().contains("404"), "Index/alias should not exist after deletion"); | ||
| } | ||
| deleteIndexIfExists(searchClient, testIndexName); |
| void testClaimPendingIncludesRetryStatuses(TestNamespace ns) { | ||
| String id1 = UUID.randomUUID().toString(); | ||
| String id2 = UUID.randomUUID().toString(); | ||
| String id3 = UUID.randomUUID().toString(); | ||
| String fqn1 = ns.prefix("rq") + ".a"; | ||
| String fqn2 = ns.prefix("rq") + ".b"; | ||
| String fqn3 = ns.prefix("rq") + ".c"; | ||
|
|
||
| retryQueueDAO.upsert(id1, fqn1, "f", SearchIndexRetryQueue.STATUS_PENDING, ""); | ||
| retryQueueDAO.upsert(id2, fqn2, "f", SearchIndexRetryQueue.STATUS_PENDING_RETRY_1, ""); | ||
| retryQueueDAO.upsert(id3, fqn3, "f", SearchIndexRetryQueue.STATUS_PENDING_RETRY_2, ""); | ||
|
|
||
| List<SearchIndexRetryRecord> claimed = retryQueueDAO.claimPending(10); | ||
| assertTrue(claimed.size() >= 3); | ||
| assertTrue(claimed.stream().anyMatch(r -> r.getEntityId().equals(id1))); | ||
| assertTrue(claimed.stream().anyMatch(r -> r.getEntityId().equals(id2))); | ||
| assertTrue(claimed.stream().anyMatch(r -> r.getEntityId().equals(id3))); | ||
| Set<String> ourIds = Set.of(id1, id2, id3); | ||
|
|
||
| Awaitility.await() | ||
| .atMost(Duration.ofSeconds(15)) | ||
| .pollInterval(Duration.ofMillis(200)) | ||
| .untilAsserted( | ||
| () -> { | ||
| retryQueueDAO.claimPending(50); | ||
| Set<String> claimed = new HashSet<>(); | ||
| for (String status : | ||
| List.of( | ||
| SearchIndexRetryQueue.STATUS_PENDING, | ||
| SearchIndexRetryQueue.STATUS_PENDING_RETRY_1, | ||
| SearchIndexRetryQueue.STATUS_PENDING_RETRY_2, | ||
| SearchIndexRetryQueue.STATUS_IN_PROGRESS, | ||
| SearchIndexRetryQueue.STATUS_FAILED)) { | ||
| retryQueueDAO.findByStatus(status, 5000).stream() | ||
| .filter(r -> ourIds.contains(r.getEntityId()) && r.getClaimedAt() != null) | ||
| .map(SearchIndexRetryRecord::getEntityId) | ||
| .forEach(claimed::add); | ||
| } | ||
| assertTrue(claimed.contains(id1), "id1 (PENDING) was never claimed"); | ||
| assertTrue(claimed.contains(id2), "id2 (PENDING_RETRY_1) was never claimed"); | ||
| assertTrue(claimed.contains(id3), "id3 (PENDING_RETRY_2) was never claimed"); | ||
| }); |
The GlossaryTermApprovalWorkflow fires asynchronously when reviewers are added, setting entityStatus=IN_REVIEW. The final patch sent the stale entityStatus=APPROVED from the previous response, causing a spurious IN_REVIEW→APPROVED transition in the diff which requires the caller to be a reviewer — admin is not. Re-fetch the entity before the reviewer removal so the diff contains only the reviewer change.
updateFailureAndRetryCount sets claimedAt=NULL after the worker processes a record. Add retryCount > 0 as a secondary proof-of-claim signal so records that were claimed, processed, and had claimedAt reset are still counted — covers the FAILED exhaustion path and intermediate PENDING_RETRY_* states where claimedAt is temporarily null.
|
Fixed in 929f2b2 — added |
| void testClaimPendingIncludesRetryStatuses(TestNamespace ns) { | ||
| String id1 = UUID.randomUUID().toString(); | ||
| String id2 = UUID.randomUUID().toString(); | ||
| String id3 = UUID.randomUUID().toString(); | ||
| String fqn1 = ns.prefix("rq") + ".a"; | ||
| String fqn2 = ns.prefix("rq") + ".b"; | ||
| String fqn3 = ns.prefix("rq") + ".c"; | ||
|
|
||
| retryQueueDAO.upsert(id1, fqn1, "f", SearchIndexRetryQueue.STATUS_PENDING, ""); | ||
| retryQueueDAO.upsert(id2, fqn2, "f", SearchIndexRetryQueue.STATUS_PENDING_RETRY_1, ""); | ||
| retryQueueDAO.upsert(id3, fqn3, "f", SearchIndexRetryQueue.STATUS_PENDING_RETRY_2, ""); | ||
|
|
||
| List<SearchIndexRetryRecord> claimed = retryQueueDAO.claimPending(10); | ||
| assertTrue(claimed.size() >= 3); | ||
| assertTrue(claimed.stream().anyMatch(r -> r.getEntityId().equals(id1))); | ||
| assertTrue(claimed.stream().anyMatch(r -> r.getEntityId().equals(id2))); | ||
| assertTrue(claimed.stream().anyMatch(r -> r.getEntityId().equals(id3))); | ||
| Set<String> ourIds = Set.of(id1, id2, id3); | ||
|
|
||
| Awaitility.await() | ||
| .atMost(Duration.ofSeconds(15)) | ||
| .pollInterval(Duration.ofMillis(200)) | ||
| .untilAsserted( | ||
| () -> { | ||
| retryQueueDAO.claimPending(50); | ||
| Set<String> claimed = new HashSet<>(); | ||
| for (String status : | ||
| List.of( | ||
| SearchIndexRetryQueue.STATUS_PENDING, | ||
| SearchIndexRetryQueue.STATUS_PENDING_RETRY_1, | ||
| SearchIndexRetryQueue.STATUS_PENDING_RETRY_2, | ||
| SearchIndexRetryQueue.STATUS_IN_PROGRESS, | ||
| SearchIndexRetryQueue.STATUS_FAILED)) { | ||
| retryQueueDAO.findByStatus(status, 5000).stream() | ||
| .filter( | ||
| r -> | ||
| ourIds.contains(r.getEntityId()) | ||
| && (r.getClaimedAt() != null || r.getRetryCount() > 0)) | ||
| .map(SearchIndexRetryRecord::getEntityId) | ||
| .forEach(claimed::add); | ||
| } | ||
| assertTrue(claimed.contains(id1), "id1 (PENDING) was never claimed"); | ||
| assertTrue(claimed.contains(id2), "id2 (PENDING_RETRY_1) was never claimed"); | ||
| assertTrue(claimed.contains(id3), "id3 (PENDING_RETRY_2) was never claimed"); | ||
| }); |
…gnizerMetadata repository.create() publishes a ChangeEvent that triggers ApplyRecognizerFeedbackImpl asynchronously. That workflow call races with the direct applyFeedback below: by the time the workflow runs, the GENERATED tag is already removed by the direct call, so getRecognizerIdFromTagLabel returns null and the workflow falls back to ALL recognizers, contaminating recognizer2. Fix: insert directly to DAO (bypassing publishChangeEvent) so the governance workflow is never triggered for this unit-level test.
| Awaitility.await() | ||
| .atMost(Duration.ofSeconds(15)) | ||
| .pollInterval(Duration.ofMillis(200)) | ||
| .untilAsserted( | ||
| () -> { | ||
| retryQueueDAO.claimPending(50); | ||
| Set<String> claimed = new HashSet<>(); | ||
| for (String status : | ||
| List.of( | ||
| SearchIndexRetryQueue.STATUS_PENDING, | ||
| SearchIndexRetryQueue.STATUS_PENDING_RETRY_1, | ||
| SearchIndexRetryQueue.STATUS_PENDING_RETRY_2, | ||
| SearchIndexRetryQueue.STATUS_IN_PROGRESS, | ||
| SearchIndexRetryQueue.STATUS_FAILED)) { | ||
| retryQueueDAO.findByStatus(status, 5000).stream() | ||
| .filter( | ||
| r -> | ||
| ourIds.contains(r.getEntityId()) | ||
| && (r.getClaimedAt() != null || r.getRetryCount() > 0)) | ||
| .map(SearchIndexRetryRecord::getEntityId) | ||
| .forEach(claimed::add); | ||
| } | ||
| assertTrue(claimed.contains(id1), "id1 (PENDING) was never claimed"); | ||
| assertTrue(claimed.contains(id2), "id2 (PENDING_RETRY_1) was never claimed"); | ||
| assertTrue(claimed.contains(id3), "id3 (PENDING_RETRY_2) was never claimed"); |
There was a problem hiding this comment.
Fixed in 7a4ec4c. Now tracking which IDs are still visible in any status after each poll. An ID absent from all statuses was deleted by the worker via deleteByEntity — that path is only reached after claimPending accepted the record, so absence is equally valid proof that the SQL filter worked. The absent IDs are added to claimed before the final assertions.
…sRetryStatuses The worker's processRecord takes the delete path (removeStaleEntityById + deleteByEntity) when resolveEntityReference returns null but entityId is non-empty — which applies to our fake UUID test records. If the worker wins and deletes a record, findByStatus finds nothing and the assertion fails. Fix: track which IDs are still visible in any status. An ID absent from all statuses was deleted by the worker after a successful claim — deleteByEntity is only reached after claimPending accepted the record, so absence is equally valid proof that claimPending's SQL filter worked.
…/fix-flaky-it-tests
…rsMultipleUpdates Same root cause as patch_addDeleteReviewers: GlossaryTermApprovalWorkflow fires asynchronously after reviewers are added, setting entityStatus=IN_REVIEW. Subsequent patches using the stale APPROVED status from the previous response trigger a spurious IN_REVIEW→APPROVED transition, rejected because admin is not a reviewer. Re-fetch before each subsequent patch to avoid the stale status.
| Awaitility.await("Page 1 and page 2 must return different rows") | ||
| .atMost(15, TimeUnit.SECONDS) | ||
| .pollInterval(500, TimeUnit.MILLISECONDS) | ||
| .untilAsserted( | ||
| () -> { | ||
| HttpResponse<String> page1 = | ||
| httpGetExport( | ||
| "/v1/search/export?q=export_page_test&index=table_search_index" | ||
| + "&sort_field=name.keyword&sort_order=asc&from=0&size=1"); | ||
| assertEquals(200, page1.statusCode()); | ||
| String[] page1Lines = page1.body().split("\n"); | ||
| assertEquals(2, page1Lines.length, "from=0&size=1 should return header + 1 data row"); | ||
|
|
||
| HttpResponse<String> page2 = | ||
| httpGetExport( | ||
| "/v1/search/export?q=export_page_test&index=table_search_index" | ||
| + "&sort_field=name.keyword&sort_order=asc&from=1&size=1"); | ||
| assertEquals(200, page2.statusCode()); | ||
| String[] page2Lines = page2.body().split("\n"); | ||
| assertEquals(2, page2Lines.length, "from=1&size=1 should return header + 1 data row"); |
Code Review ✅ Approved 1 resolved / 1 findingsStabilizes seven flaky integration tests by addressing race conditions, increasing timeouts for slow CI operations, and ensuring proper test isolation. No issues found. ✅ 1 resolved✅ Edge Case: claimedAt check may miss records already processed by worker
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|



Summary
Fixes seven flaky integration tests:
Changes
Test plan