Skip to content

Commit e23dff8

Browse files
fix(it): stop flaky integration tests (#27649) (#27651)
* fix(it): stop two flaky integration tests (#27649) GlossaryOntologyExportIT: mark @isolated. @BeforeAll flips RdfUpdater (a JVM-wide singleton) on, which makes every concurrent test class start doing synchronous Fuseki writes on entity create, saturating the Dropwizard thread pool and causing 60s request timeouts. @execution (SAME_THREAD) alone only serialises within this class. WorkflowDefinitionResourceIT#triggerWorkflow_SDK: drop the redundant waitForWorkflowDeployment call — the create path already waits. Add descriptive aliases to the two await() polls so the next flake tells us which FQN or workflow name actually timed out instead of an anonymous lambda. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(search): never skip live indexing during reindex (#27649) Live search indexing was silently skipped whenever a reindex job was in RUNNING/READY/STOPPING state. SearchRepository.createEntityIndex() and six sibling methods consulted SearchIndexRetryQueue.isEntityTypeSuspended() and returned early with nothing written, nothing enqueued — entities vanished from search until a future reindex happened to cover them. The retry worker doubled down: when the scope refresh observed an active job, it purged the retry queue; and processRecord() deleted records whose type was suspended. So even manually enqueued retries were wiped. This is how the #27649 flake surfaced: AppsResourceIT triggers SearchIndexingApplication runs and its best-effort 30s wait silently swallows timeouts. If a run was still RUNNING when AppsResourceIT finished, the next class in the sequential fork (WorkflowDefinitionResourceIT) inherited the suspension and its freshly-created tables were never indexed — waitForEntityIndexedInSearch then timed out at 120s. Same mechanism bites real users mid-reindex in production. Remove the suspension mechanism entirely: * SearchRepository — drop the 8 isEntityTypeSuspended() early-returns; the client-availability path already enqueues for retry on its own. * SearchIndexRetryWorker — drop refreshReindexSuspensionScopeIfNeeded() and the suspension branches in processRecord(); remove the retry-queue purge on suspendAll. * SearchIndexRetryQueue — delete the updateSuspension / clearSuspension / isEntityTypeSuspended / isStreamingSuspended / isSuspendAllStreaming / getSuspendedEntityTypes API and the static AtomicBoolean / AtomicReference they backed. * Drop the two IT cases that asserted the removed behaviour. Live writes now always reach the search client; reindex and live writes both target the same indices as before. Version conflicts between the two paths (stale reindex batch overwriting a newer live write) remain possible as they did before suspension was introduced — that is the race suspension was meant to dodge, but dropping writes altogether was worse than the race. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(search): route live writes to staged index during reindex (#27649) The distributed reindex has a TOCTOU: partitions read from a DB snapshot at T0 and write to a staged index, then at T1 (seconds later) the alias is atomically swapped from the old index to the staged one and the old index is deleted. Any entity that live-writers create between T0 and T1 goes via the alias → old index, and is destroyed when that old index is deleted post-swap. The CI log for #27649 shows this directly: 10:13:35 staged table_search_index_rebuild_…_215646 built from snapshot 10:13:40 POST /v1/tables table1_gold → written to alias target (old index _179670) 10:13:40 table2_silver, table3_bronze, table4_brass all written to old index _179670 10:13:42 Atomically swapped aliases from [_179670] to _215646 10:13:42 Successfully deleted index _179670 10:13:43+ waitForEntityIndexedInSearch polls, finds nothing, times out at 2 min Removing the silent-skip suspension mechanism in the previous commit exposed this race (it had been hidden by dropping the writes outright, which was strictly worse). Route live writes to the staged index during the reindex window: * SearchRepository gains an activeStagedIndices map (entityType → stagedIndex) plus register/unregister/resolveWriteIndex. Writes resolve to the staged index when one is registered for the type, otherwise to the canonical alias — the existing behaviour. * DefaultRecreateHandler.recreateIndexFromMapping registers the staged index as soon as it is created; finalizeReindex and promoteEntityIndex unregister it on every exit path (successful swap, swap failure, failed-reindex delete, exception). * Every live-write path in SearchRepository — createEntityIndex, createEntitiesIndex, indexTableColumns, indexColumnsForTables, updateEntityIndex, createTimeSeriesEntity, updateTimeSeriesEntity, deleteEntityIndex, deleteEntityByFQNPrefix, deleteTimeSeriesEntityById — goes through resolveWriteIndex instead of reading the canonical alias directly. During a reindex, live writes land in the index that the alias will promote to; after the swap the alias points to that same index and subsequent writes continue to reach the same place. Old-index deletion no longer discards fresh data. Note: searches through the alias during the brief reindex window (< seconds in the CI log) can miss a write until the swap lands — an acceptable trade compared to silently dropping the write or losing it on deletion. The #27649 test tolerates this because its 120s poll spans many swap cycles. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(search): re-register SearchIndexHandler on every SearchRepository init (#27649) The previous commit routed live writes through resolveWriteIndex so they land in the staged index during reindex. The CI log for the next run showed the register/unregister fire correctly, but the live writes to tables still went to the canonical alias — as if activeStagedIndices was empty for the entity type. Root cause: stale handler pointing at a stale SearchRepository. TestSuiteBootstrap creates SearchRepository three times (migration, createIndices, and finally the embedded OpenMetadataApplication). Each constructor calls registerSearchIndexHandler → new SearchIndexHandler(this) → dispatcher.registerHandler(…). EntityLifecycleEventDispatcher. registerHandler silently SKIPS if a handler with the same name already exists (see EntityLifecycleEventDispatcher.java:80-86), so the dispatcher keeps the FIRST SearchIndexHandler forever — bound to the migration-time SearchRepository. Meanwhile DefaultRecreateHandler.registerStagedIndex writes into Entity.getSearchRepository(), which by then is the third (current) instance. Live writes flowing through the stale handler never see that entry; resolveWriteIndex falls through to the canonical alias; the alias swap at the end of the reindex drops the writes, same as before. Fix: unregister any existing SearchIndexHandler by name before registering the new one. The latest-constructed SearchRepository always owns the handler delivered through the dispatcher, so its activeStagedIndices is the one consulted on every live write. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(search): centralize staged-index routing on canonical name (#27649) Re-key activeStagedIndices by canonical index name (e.g. openmetadata_table_search_index) instead of entity type, and route every live-write site through a single getWriteIndexName(IndexMapping) helper. Why - Previous routing went through resolveWriteIndex(entityType, mapping) but only at hand-picked call sites. Several write paths still resolved indexMapping.getIndexName(clusterAlias) directly and bypassed routing — bulkIndexPipelineExecutions, deleteByScript, softDeleteOrRestoreEntity, propagateToDomainChildren, updateEntityCertificationInSearch, propagateToRelatedEntities (PAGE), deleteTableColumns, updateTableColumnsInheritedFields. Any reindex in flight could lose those writes on the alias swap. - Keying by canonical index name lets any write site resolve correctly even without entity type in scope (FQN-prefix deletes, child propagation, script updates). What - activeStagedIndices: Map<canonicalIndexName, stagedIndexName>. - registerStagedIndex(entityType, stagedIndex) now resolves the canonical name from the IndexMapping before storing. - New getWriteIndexName(IndexMapping) is the single point of resolution; routeToStagedIfActive(String) handles raw alias names (e.g. pipeline_status_search_index resolved via getIndexOrAliasName). - Replaced every direct indexMapping.getIndexName(clusterAlias) for writes with getWriteIndexName(indexMapping). Admin/setup paths (createIndex/updateIndex/deleteIndex/createOrUpdateIndexTemplate) intentionally keep canonical names — they manage the alias itself. - Cascade ops on shared aliases (GLOBAL_SEARCH_ALIAS, DATA_ASSET_SEARCH_ALIAS, child aliases) are not entity-scoped and cannot route to a single staged index; left untouched. - resolveWriteIndex(entityType, mapping) preserved as a thin wrapper for binary compatibility. Also runs spotless:apply on the file. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(it): bump DB sort memory + search-wait ceilings (#27649) Two CI failures observed under the parallel-tests fork load on the post-centralization run: 1. TagResourceIT line 161 (listEntities) — the server returned 500 wrapping "java.sql.SQLException: Out of sort memory, consider increasing server sort buffer size" from TagDAO.listAfter. The query joins tag → entity_relationship → classification and orders by tag.name,tag.id; with the tag table accumulating across many parallel test classes (reuseForks=true), MySQL's default 256KB sort_buffer_size overflows. Bump it to 8MB. Add a parallel work_mem=32MB bump to the postgres command for the same query. 2. TagResourceIT line 1 — Awaitility timeout at 1m30s waiting for a freshly created tag to appear in search index. Five inherited waits in BaseEntityIT had a 90s ceiling while the sibling checkCreatedEntity already used 180s. Standardise on 180s — under tag-scale data the alias swap that the staged-index routing depends on can take longer than 90s in slow CI workers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(search): address Copilot/Gitar review on staged-index routing (#27649) * DefaultRecreateHandler.finalizeReindex / promoteEntityIndex — wrap the entire promote block in try/finally so unregisterStagedIndex always runs, including on swap failure, empty aliasesToAttach, and exceptions. Without this the routing map could be left pointing at a staged index nobody reads from, silently diverging live writes from search results until the next reindex (Copilot, multiple comments). * SearchRepository.resolveWriteIndex — deprecate. The entityType argument is unused; getWriteIndexName(IndexMapping) is the single resolution point now (Copilot + Gitar). * SearchRepository.routeToStagedIfActive — tighten the Javadoc to state explicitly that it expects a canonical index name and that short/parent aliases are passed through unchanged (Copilot). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(search): fan out cross-alias update-by-query to staged indices (#27649) The four bulk update-by-query operations rooted on shared aliases — updateAssetDomainsForDataProduct, updateAssetDomainsByIds, updateDomainFqnByPrefix, updateAssetDomainFqnByPrefix — hardcoded their target to GLOBAL_SEARCH_ALIAS / Entity.DOMAIN. During an in-flight reindex those updates landed on the about-to-be-discarded active index only; on alias swap, the new staged index (built from a DB snapshot taken before the script ran) replaced it and the script's effect was lost. Copilot called this out four times. Add SearchRepository.getWriteFanoutTargets(aliasOrIndex) — returns the caller's alias plus every currently-staged index. Pass that list to req.index(...) on all four methods in both OpenSearchEntityManager and ElasticSearchEntityManager. The OS/ES update-by-query API natively takes a list, so the fan-out is one request per call. The scripts these methods run are idempotent (UPDATE_ASSET_DOMAIN_SCRIPT checks `exists` before adding a domain; UPDATE_DOMAIN_FQN_BY_PREFIX_SCRIPT walks the array and rewrites in place), so applying them again to the staged index — even if the staged copy of the document already reflects the latest DB state — converges to the same result. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(search): scope fan-out to canonical input vs multi-entity alias (#27649) Previous getWriteFanoutTargets always appended every staged index, which made entity-scoped update-by-query calls (e.g. updateDomainFqnByPrefix targeting only the domain canonical index) fan out onto unrelated staged indices. Adds avoidable load on every currently-reindexing entity type for an update that should touch one index. Branch the implementation on whether the input is a known canonical entity index name. If yes, only the matching staged index is added. If no — i.e. the caller is hitting a multi-entity alias such as GLOBAL_SEARCH_ALIAS — every staged index is added because the update's match query can hit documents from any reindexing type. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 84ed278 commit e23dff8

11 files changed

Lines changed: 244 additions & 322 deletions

File tree

openmetadata-integration-tests/src/test/java/org/openmetadata/it/bootstrap/TestSuiteBootstrap.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,14 @@ private void startDatabase() {
230230
mysql.withDatabaseName("openmetadata");
231231
mysql.withUsername("test");
232232
mysql.withPassword("test");
233-
mysql.withCommand("mysqld", "--max_allowed_packet=" + mysqlMaxAllowedPacket);
233+
mysql.withCommand(
234+
"mysqld",
235+
"--max_allowed_packet=" + mysqlMaxAllowedPacket,
236+
// The tag list query (TagDAO.listAfter) joins three tables and sorts by tag.name,
237+
// tag.id; under the parallel-tests fork the tag table grows large and the default
238+
// 256KB sort_buffer_size overflows with "Out of sort memory" (#27649). 8MB is plenty
239+
// for an integration-test workload and well under the 4GB overall limit.
240+
"--sort_buffer_size=8M");
234241
mysql.withStartupTimeoutSeconds(240);
235242
mysql.withConnectTimeoutSeconds(240);
236243
mysql.withTmpFs(java.util.Map.of("/var/lib/mysql", "rw,size=2g"));
@@ -278,7 +285,12 @@ private void startDatabase() {
278285
"-c",
279286
"synchronous_commit=off",
280287
"-c",
281-
"full_page_writes=off");
288+
"full_page_writes=off",
289+
// Bump work_mem for the same reason MySQL gets a larger sort_buffer above:
290+
// TagDAO.listAfter joins three tables and sorts; default 4MB spills to temp files
291+
// under load.
292+
"-c",
293+
"work_mem=32MB");
282294
postgres.withTmpFs(java.util.Map.of("/var/lib/postgresql/data", "rw,size=2g"));
283295
postgres.withCreateContainerCmdModifier(
284296
cmd ->

openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/BaseEntityIT.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4378,7 +4378,7 @@ void test_bulkCreateOrUpdate_searchIndexed(TestNamespace ns) {
43784378
OpenMetadataClient client = SdkClients.adminClient();
43794379

43804380
Awaitility.await()
4381-
.atMost(Duration.ofSeconds(90))
4381+
.atMost(Duration.ofSeconds(180))
43824382
.pollDelay(Duration.ofMillis(500))
43834383
.pollInterval(Duration.ofSeconds(2))
43844384
.ignoreExceptions()
@@ -4440,7 +4440,7 @@ void test_bulkUpdate_searchIndexUpdated(TestNamespace ns) {
44404440
OpenMetadataClient client = SdkClients.adminClient();
44414441

44424442
Awaitility.await()
4443-
.atMost(Duration.ofSeconds(90))
4443+
.atMost(Duration.ofSeconds(180))
44444444
.pollDelay(Duration.ofMillis(500))
44454445
.pollInterval(Duration.ofSeconds(3))
44464446
.ignoreExceptions()
@@ -5130,7 +5130,7 @@ void checkDeletedEntity(TestNamespace ns) throws Exception {
51305130
Awaitility.await("Wait for entity to appear in search index")
51315131
.pollDelay(Duration.ofMillis(500))
51325132
.pollInterval(Duration.ofSeconds(1))
5133-
.atMost(Duration.ofSeconds(90))
5133+
.atMost(Duration.ofSeconds(180))
51345134
.ignoreExceptions()
51355135
.untilAsserted(
51365136
() -> {
@@ -5166,7 +5166,7 @@ void checkIndexCreated(TestNamespace ns) throws Exception {
51665166
Awaitility.await("Wait for entity to appear in search index")
51675167
.pollDelay(Duration.ofMillis(500))
51685168
.pollInterval(Duration.ofSeconds(1))
5169-
.atMost(Duration.ofSeconds(90))
5169+
.atMost(Duration.ofSeconds(180))
51705170
.ignoreExceptions()
51715171
.untilAsserted(
51725172
() -> {
@@ -5194,7 +5194,7 @@ void updateDescriptionAndCheckInSearch(TestNamespace ns) throws Exception {
51945194
Awaitility.await("Wait for entity to appear in search index")
51955195
.pollDelay(Duration.ofMillis(500))
51965196
.pollInterval(Duration.ofSeconds(1))
5197-
.atMost(Duration.ofSeconds(90))
5197+
.atMost(Duration.ofSeconds(180))
51985198
.ignoreExceptions()
51995199
.untilAsserted(
52005200
() -> {
@@ -5213,7 +5213,7 @@ void updateDescriptionAndCheckInSearch(TestNamespace ns) throws Exception {
52135213
Awaitility.await("Wait for search to reflect update")
52145214
.pollDelay(Duration.ofMillis(500))
52155215
.pollInterval(Duration.ofSeconds(1))
5216-
.atMost(Duration.ofSeconds(90))
5216+
.atMost(Duration.ofSeconds(180))
52175217
.ignoreExceptions()
52185218
.untilAsserted(
52195219
() -> {

openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/GlossaryOntologyExportIT.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.junit.jupiter.api.extension.ExtendWith;
1818
import org.junit.jupiter.api.parallel.Execution;
1919
import org.junit.jupiter.api.parallel.ExecutionMode;
20+
import org.junit.jupiter.api.parallel.Isolated;
2021
import org.openmetadata.it.bootstrap.TestSuiteBootstrap;
2122
import org.openmetadata.it.factories.GlossaryTermTestFactory;
2223
import org.openmetadata.it.factories.GlossaryTestFactory;
@@ -41,10 +42,13 @@
4142
*
4243
* <p>Test isolation: Uses TestNamespace for unique entity naming.
4344
*
44-
* <p>Parallelization: Runs with @Execution(ExecutionMode.SAME_THREAD) because each test
45-
* blocks a server thread on synchronous Fuseki writes; concurrent execution can exhaust the
46-
* server thread pool and cause request timeouts.
45+
* <p>Parallelization: Annotated {@code @Isolated} because {@link RdfUpdater} is a JVM-wide
46+
* singleton. {@code @BeforeAll} flips it on, so any test class running concurrently starts doing
47+
* synchronous Fuseki writes on every entity create — saturating the Dropwizard thread pool and
48+
* causing 60s request timeouts (see issue #27649). {@code @Execution(SAME_THREAD)} alone only
49+
* serializes within this class and does not prevent that cross-class leakage.
4750
*/
51+
@Isolated
4852
@Execution(ExecutionMode.SAME_THREAD)
4953
@ExtendWith(TestNamespaceExtension.class)
5054
public class GlossaryOntologyExportIT {

openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/SearchIndexRetryQueueIT.java

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -793,64 +793,6 @@ void testEnqueuePreservesErrorDetailInFailureReason(TestNamespace ns) {
793793
retryQueueDAO.deleteByEntity(entityId, entityFqn);
794794
}
795795

796-
// ---------------------------------------------------------------------------
797-
// Suspension tests
798-
// ---------------------------------------------------------------------------
799-
800-
@Test
801-
void testSuspensionPreventsEnqueue(TestNamespace ns) {
802-
String entityId = UUID.randomUUID().toString();
803-
String entityFqn = ns.prefix("rq") + ".suspended.entity";
804-
try {
805-
SearchIndexRetryQueue.updateSuspension(java.util.Set.of(), true);
806-
assertTrue(SearchIndexRetryQueue.isSuspendAllStreaming());
807-
808-
// Enqueue should still insert (suspension affects worker processing, not enqueueing)
809-
SearchIndexRetryQueue.enqueue(entityId, entityFqn, "during suspension");
810-
List<SearchIndexRetryRecord> records =
811-
retryQueueDAO.findByStatus(SearchIndexRetryQueue.STATUS_PENDING, 1000);
812-
assertTrue(records.stream().anyMatch(r -> r.getEntityId().equals(entityId)));
813-
} finally {
814-
SearchIndexRetryQueue.clearSuspension();
815-
retryQueueDAO.deleteByEntity(entityId, entityFqn);
816-
}
817-
}
818-
819-
@Test
820-
void testWorkerDeletesRecordsDuringSuspendAll(TestNamespace ns) throws Exception {
821-
String entityId = UUID.randomUUID().toString();
822-
String entityFqn = ns.prefix("rq") + ".suspended.entity";
823-
824-
retryQueueDAO.upsert(
825-
entityId, entityFqn, "will be suspended", SearchIndexRetryQueue.STATUS_PENDING, "table");
826-
827-
try {
828-
SearchIndexRetryQueue.updateSuspension(java.util.Set.of(), true);
829-
830-
SearchIndexRetryWorker worker = new SearchIndexRetryWorker(collectionDAO, searchRepository);
831-
worker.start();
832-
try {
833-
Awaitility.await("Worker should delete record during full suspension")
834-
.atMost(Duration.ofSeconds(30))
835-
.pollInterval(Duration.ofSeconds(1))
836-
.until(
837-
() -> {
838-
List<SearchIndexRetryRecord> remaining =
839-
retryQueueDAO.findByStatuses(
840-
List.of(
841-
SearchIndexRetryQueue.STATUS_PENDING,
842-
SearchIndexRetryQueue.STATUS_IN_PROGRESS),
843-
1000);
844-
return remaining.stream().noneMatch(r -> r.getEntityId().equals(entityId));
845-
});
846-
} finally {
847-
worker.stop();
848-
}
849-
} finally {
850-
SearchIndexRetryQueue.clearSuspension();
851-
}
852-
}
853-
854796
// ---------------------------------------------------------------------------
855797
// Helpers
856798
// ---------------------------------------------------------------------------

openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6882,7 +6882,6 @@ private void triggerWorkflow_SDK(TestNamespace ns, List<Table> localTestTables)
68826882
String workflowName = "DataCompletenessWorkflow";
68836883
OpenMetadataClient client = SdkClients.adminClient();
68846884

6885-
waitForWorkflowDeployment(client, workflowName);
68866885
for (Table table : localTestTables) {
68876886
waitForEntityIndexedInSearch(client, "table_search_index", table.getFullyQualifiedName());
68886887
}
@@ -7000,7 +6999,7 @@ private Domain getOrCreateDomain(TestNamespace ns) {
70006999
}
70017000

70027001
private void waitForWorkflowDeployment(OpenMetadataClient client, String workflowName) {
7003-
await()
7002+
await("workflow '" + workflowName + "' to finish Flowable deployment")
70047003
.atMost(Duration.ofSeconds(120))
70057004
.pollDelay(Duration.ofSeconds(1))
70067005
.pollInterval(Duration.ofSeconds(2))
@@ -7015,7 +7014,7 @@ private void waitForWorkflowDeployment(OpenMetadataClient client, String workflo
70157014

70167015
private void waitForEntityIndexedInSearch(
70177016
OpenMetadataClient client, String indexName, String entityFqn) {
7018-
await()
7017+
await("entity '" + entityFqn + "' to appear in " + indexName)
70197018
.atMost(Duration.ofSeconds(120))
70207019
.pollDelay(Duration.ofSeconds(1))
70217020
.pollInterval(Duration.ofSeconds(2))

openmetadata-service/src/main/java/org/openmetadata/service/search/DefaultRecreateHandler.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,13 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
9999
}
100100

101101
if (shouldPromote) {
102+
// Always clear staged-index routing on the way out, regardless of outcome:
103+
// - swap success → alias now points at staged; canonical and staged resolve to the
104+
// same index, so unregistering keeps reads/writes consistent.
105+
// - swap failure / empty aliases / exception → leaving routing active would silently
106+
// send live writes to a staged index nothing reads from, which
107+
// is strictly worse than the writes going back to the canonical
108+
// alias target. Operators need to retry the reindex either way.
102109
try {
103110
Set<String> aliasesToAttach = new HashSet<>();
104111

@@ -148,6 +155,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
148155
entityType);
149156
return;
150157
}
158+
} else {
159+
LOG.warn("Entity '{}': aliasesToAttach is empty, skipping alias swap", entityType);
151160
}
152161

153162
LOG.info(
@@ -180,6 +189,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
180189
if (metrics != null) {
181190
metrics.recordPromotionFailure(entityType);
182191
}
192+
} finally {
193+
searchRepository.unregisterStagedIndex(entityType, stagedIndex);
183194
}
184195
} else {
185196
try {
@@ -196,6 +207,8 @@ public void finalizeReindex(EntityReindexContext context, boolean reindexSuccess
196207
stagedIndex,
197208
entityType,
198209
ex);
210+
} finally {
211+
searchRepository.unregisterStagedIndex(entityType, stagedIndex);
199212
}
200213
}
201214
}
@@ -262,10 +275,13 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc
262275
stagedIndex,
263276
entityType,
264277
ex);
278+
} finally {
279+
searchRepository.unregisterStagedIndex(entityType, stagedIndex);
265280
}
266281
return;
267282
}
268283

284+
// Always clear staged-index routing on the way out — see the rationale in finalizeReindex.
269285
try {
270286
Set<String> aliasesToAttach =
271287
getAliasesFromMapping(indexMapping, searchRepository.getClusterAlias());
@@ -340,6 +356,8 @@ public void promoteEntityIndex(EntityReindexContext context, boolean reindexSucc
340356
if (promoteMetrics != null) {
341357
promoteMetrics.recordPromotionFailure(entityType);
342358
}
359+
} finally {
360+
searchRepository.unregisterStagedIndex(entityType, stagedIndex);
343361
}
344362
}
345363

@@ -422,6 +440,7 @@ protected void recreateIndexFromMapping(
422440

423441
String stagedIndexName = buildStagedIndexName(canonicalIndexName);
424442
searchClient.createIndex(stagedIndexName, mappingContent);
443+
searchRepository.registerStagedIndex(entityType, stagedIndexName);
425444

426445
Set<String> existingAliases =
427446
activeIndexName != null ? searchClient.getAliases(activeIndexName) : new HashSet<>();

openmetadata-service/src/main/java/org/openmetadata/service/search/SearchIndexRetryQueue.java

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
package org.openmetadata.service.search;
22

33
import io.micrometer.core.instrument.Metrics;
4-
import java.util.Collections;
5-
import java.util.HashSet;
6-
import java.util.Set;
74
import java.util.UUID;
8-
import java.util.concurrent.atomic.AtomicBoolean;
9-
import java.util.concurrent.atomic.AtomicReference;
105
import lombok.extern.slf4j.Slf4j;
116
import org.openmetadata.schema.EntityInterface;
127
import org.openmetadata.service.Entity;
@@ -24,10 +19,6 @@ public final class SearchIndexRetryQueue {
2419

2520
private static final int MAX_REASON_LENGTH = 8192;
2621

27-
private static final AtomicReference<Set<String>> SUSPENDED_ENTITY_TYPES =
28-
new AtomicReference<>(Collections.emptySet());
29-
private static final AtomicBoolean SUSPEND_ALL_STREAMING = new AtomicBoolean(false);
30-
3122
private SearchIndexRetryQueue() {}
3223

3324
public static void enqueue(EntityInterface entity, String operation, Throwable failure) {
@@ -117,46 +108,6 @@ public static boolean isRetryableStatusCode(int status) {
117108
return status < 400;
118109
}
119110

120-
public static void updateSuspension(Set<String> entityTypes, boolean suspendAll) {
121-
Set<String> normalized = new HashSet<>();
122-
for (String entityType : entityTypes == null ? Collections.<String>emptySet() : entityTypes) {
123-
String normalizedType = normalize(entityType);
124-
if (!normalizedType.isEmpty()) {
125-
normalized.add(normalizedType);
126-
}
127-
}
128-
129-
// Set entity types before the boolean so that isEntityTypeSuspended never
130-
// sees suspendAll=false with an outdated (empty) entity-types set.
131-
SUSPENDED_ENTITY_TYPES.set(Collections.unmodifiableSet(normalized));
132-
SUSPEND_ALL_STREAMING.set(suspendAll);
133-
}
134-
135-
public static void clearSuspension() {
136-
SUSPEND_ALL_STREAMING.set(false);
137-
SUSPENDED_ENTITY_TYPES.set(Collections.emptySet());
138-
}
139-
140-
public static boolean isEntityTypeSuspended(String entityType) {
141-
if (SUSPEND_ALL_STREAMING.get()) {
142-
return true;
143-
}
144-
String normalized = normalize(entityType);
145-
return !normalized.isEmpty() && SUSPENDED_ENTITY_TYPES.get().contains(normalized);
146-
}
147-
148-
public static boolean isStreamingSuspended() {
149-
return SUSPEND_ALL_STREAMING.get() || !SUSPENDED_ENTITY_TYPES.get().isEmpty();
150-
}
151-
152-
public static boolean isSuspendAllStreaming() {
153-
return SUSPEND_ALL_STREAMING.get();
154-
}
155-
156-
public static Set<String> getSuspendedEntityTypes() {
157-
return SUSPENDED_ENTITY_TYPES.get();
158-
}
159-
160111
private static String truncate(String value) {
161112
if (value == null) {
162113
return null;

0 commit comments

Comments
 (0)