From fc2c043a26c8223db9aa5efee441579516b985e7 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Mon, 27 Apr 2026 15:01:28 +0530 Subject: [PATCH 1/5] fix(migration): v200 thread_entity guard, PostgreSQL jsonb cast, self-loop cycle detection - Add tableExists guard to migrateThreadTasksToTaskEntity so it skips safely when thread_entity has been renamed to thread_entity_legacy by postDataMigration SQL, preventing crashes on fresh installs and migration re-runs - Fix PostgreSQL jsonb type mismatch in migrateSuggestionsToTaskEntity and migrateLegacyActivityThreadsToActivityStream: JDBI binds String as varchar which PostgreSQL rejects for jsonb columns; use ::jsonb cast when ConnectionType is POSTGRES - Re-enable migrateThreadTasksToTaskEntity for MySQL (was commented out as workaround) now that the tableExists guard makes it safe - Fix hasCycleDFS in WorkflowDefinitionRepository to skip self-loop edges so BPMN reassignment cycles (AssignedStage -> AssignedStage) are not flagged as cycles, allowing IncidentLifecycleWorkflow to seed without error - Fix initSeedDataFromResources in EntityRepository to continue seeding remaining entities when one entity fails, instead of aborting the entire seed operation - Add unit tests covering MySQL/PostgreSQL variants for all three migration methods, tableExists guard, and jsonb cast behavior (13 tests total, all passing) Co-Authored-By: Claude Sonnet 4.6 --- .../service/jdbi3/EntityRepository.java | 11 ++- .../jdbi3/WorkflowDefinitionRepository.java | 3 + .../migration/mysql/v200/Migration.java | 10 +-- .../migration/postgres/v200/Migration.java | 7 +- .../migration/utils/v200/MigrationUtil.java | 44 +++++---- .../utils/v200/MigrationUtilTest.java | 89 ++++++++++++++++++- 6 files changed, 138 insertions(+), 26 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index dc5cfbcdc366..51bdbaa78794 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -1228,7 +1228,16 @@ protected void setDefaultStatus(T entity, boolean update) { public final void initSeedDataFromResources() throws IOException { List entities = getEntitiesFromSeedData(); for (T entity : entities) { - initializeEntity(entity); + try { + initializeEntity(entity); + } catch (Exception e) { + LOG.warn( + "Failed to initialize {} '{}': {}", + entityType, + entity.getFullyQualifiedName(), + e.getMessage(), + e); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java index 5924a7b47907..dc58f4c212c8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java @@ -347,6 +347,9 @@ private boolean hasCycleDFS( List neighbors = adjacencyList.get(node); if (neighbors != null) { for (String neighbor : neighbors) { + if (neighbor.equals(node)) { + continue; + } if (hasCycleDFS(neighbor, adjacencyList, visited, recursionStack)) { return true; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java index 21b03e88bbc2..ef9b989c8c8c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java @@ -1,9 +1,11 @@ package org.openmetadata.service.migration.mysql.v200; +import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addTableColumnSearchSettings; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillAnnouncementRelationships; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateLegacyActivityThreadsToActivityStream; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateSuggestionsToTaskEntity; +import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateThreadTasksToTaskEntity; import lombok.SneakyThrows; import org.openmetadata.service.migration.api.MigrationProcessImpl; @@ -19,11 +21,9 @@ public Migration(MigrationFile migrationFile) { @SneakyThrows public void runDataMigration() { addTableColumnSearchSettings(); - migrateSuggestionsToTaskEntity(handle); - // Causing issues with collate CI, needs to be fixed before enabling this migration - // @harshach - // migrateThreadTasksToTaskEntity(handle); - migrateLegacyActivityThreadsToActivityStream(handle); + migrateSuggestionsToTaskEntity(handle, MYSQL); + migrateThreadTasksToTaskEntity(handle, MYSQL); + migrateLegacyActivityThreadsToActivityStream(handle, MYSQL); backfillAnnouncementRelationships(handle); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java index 81f3ae4a180c..dccef4e9ff43 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java @@ -1,5 +1,6 @@ package org.openmetadata.service.migration.postgres.v200; +import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addTableColumnSearchSettings; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillAnnouncementRelationships; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateLegacyActivityThreadsToActivityStream; @@ -20,9 +21,9 @@ public Migration(MigrationFile migrationFile) { @SneakyThrows public void runDataMigration() { addTableColumnSearchSettings(); - migrateSuggestionsToTaskEntity(handle); - migrateThreadTasksToTaskEntity(handle); - migrateLegacyActivityThreadsToActivityStream(handle); + migrateSuggestionsToTaskEntity(handle, POSTGRES); + migrateThreadTasksToTaskEntity(handle, POSTGRES); + migrateLegacyActivityThreadsToActivityStream(handle, POSTGRES); backfillAnnouncementRelationships(handle); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java index 38002f4a837c..3ba285775d2b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java @@ -23,6 +23,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.AnnouncementRepository; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.migration.utils.SearchSettingsMergeUtil; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.util.EntityUtil; @@ -86,7 +87,7 @@ public static void addTableColumnSearchSettings() { * suggestion becomes a Task with type=Suggestion and category=MetadataUpdate. The about * EntityReference and aboutFqnHash are properly computed from the entityLink. */ - public static void migrateSuggestionsToTaskEntity(Handle handle) { + public static void migrateSuggestionsToTaskEntity(Handle handle, ConnectionType connectionType) { LOG.info("Starting migration of suggestions to task_entity"); boolean tableExists; @@ -216,7 +217,7 @@ public static void migrateSuggestionsToTaskEntity(Handle handle) { taskJson.put("commentCount", 0); taskJson.set("tags", JsonUtils.getObjectNode().arrayNode()); - insertTask(handle, suggestionId, taskJson.toString(), fqnHash); + insertTask(handle, suggestionId, taskJson.toString(), fqnHash, connectionType); insertTaskDomainRelationships(handle, suggestionId, inheritedDomains); migrated++; } catch (Exception e) { @@ -234,9 +235,12 @@ public static void migrateSuggestionsToTaskEntity(Handle handle) { * Migrate thread-based tasks from thread_entity to the new task_entity table. Each thread with * type='Task' becomes a proper Task entity with correct type mapping, payload, and aboutFqnHash. */ - public static void migrateThreadTasksToTaskEntity(Handle handle) { + public static void migrateThreadTasksToTaskEntity(Handle handle, ConnectionType connectionType) { LOG.info("Starting migration of thread-based tasks to task_entity"); - + if (!tableExists(handle, "thread_entity")) { + LOG.info("thread_entity table does not exist, skipping thread task migration"); + return; + } List> threads = handle .createQuery( @@ -373,7 +377,7 @@ public static void migrateThreadTasksToTaskEntity(Handle handle) { taskJson.set("resolution", resolution); } - insertTask(handle, threadId, taskJson.toString(), fqnHash); + insertTask(handle, threadId, taskJson.toString(), fqnHash, connectionType); insertTaskDomainRelationships(handle, threadId, inheritedDomains); migrated++; } catch (Exception e) { @@ -475,7 +479,8 @@ public static void backfillAnnouncementRelationships(Handle handle) { * thread_entity. User conversations stay in thread_entity; only generated activity entries are * migrated. */ - public static void migrateLegacyActivityThreadsToActivityStream(Handle handle) { + public static void migrateLegacyActivityThreadsToActivityStream( + Handle handle, ConnectionType connectionType) { LOG.info("Starting migration of legacy thread activity to activity_stream"); if (!tableExists(handle, "thread_entity")) { @@ -511,7 +516,7 @@ public static void migrateLegacyActivityThreadsToActivityStream(Handle handle) { continue; } - insertActivityEvent(handle, event); + insertActivityEvent(handle, event, connectionType); migrated++; } catch (Exception e) { LOG.warn("Error migrating legacy activity thread to activity_stream: {}", e.getMessage()); @@ -960,7 +965,8 @@ private static boolean activityEventExists(Handle handle, UUID activityId, long > 0; } - private static void insertActivityEvent(Handle handle, ActivityEvent event) { + private static void insertActivityEvent( + Handle handle, ActivityEvent event, ConnectionType connectionType) { String entityFqnHash = event.getEntity().getFullyQualifiedName() != null ? FullyQualifiedName.buildHash(event.getEntity().getFullyQualifiedName()) @@ -973,6 +979,8 @@ private static void insertActivityEvent(Handle handle, ActivityEvent event) { : JsonUtils.pojoToJson( event.getDomains().stream().map(domain -> domain.getId().toString()).toList()); + String domainsBind = connectionType == ConnectionType.POSTGRES ? ":domains::jsonb" : ":domains"; + String jsonBind = connectionType == ConnectionType.POSTGRES ? ":json::jsonb" : ":json"; handle .createUpdate( "INSERT INTO activity_stream " @@ -980,7 +988,11 @@ private static void insertActivityEvent(Handle handle, ActivityEvent event) { + "actorId, actorName, timestamp, summary, fieldName, oldValue, newValue, domains, json) " + "VALUES (:id, :eventType, :entityType, :entityId, :entityFqnHash, :about, " + ":aboutFqnHash, :actorId, :actorName, :timestamp, :summary, :fieldName, " - + ":oldValue, :newValue, :domains, :json)") + + ":oldValue, :newValue, " + + domainsBind + + ", " + + jsonBind + + ")") .bind("id", event.getId().toString()) .bind("eventType", event.getEventType().value()) .bind("entityType", event.getEntity().getType()) @@ -1021,13 +1033,13 @@ private static boolean taskExists(Handle handle, String taskId) { > 0; } - private static void insertTask(Handle handle, String id, String json, String fqnHash) { - handle - .createUpdate("INSERT INTO task_entity (id, json, fqnHash) VALUES (:id, :json, :fqnHash)") - .bind("id", id) - .bind("json", json) - .bind("fqnHash", fqnHash) - .execute(); + private static void insertTask( + Handle handle, String id, String json, String fqnHash, ConnectionType connectionType) { + String sql = + connectionType == ConnectionType.POSTGRES + ? "INSERT INTO task_entity (id, json, fqnHash) VALUES (:id, :json::jsonb, :fqnHash)" + : "INSERT INTO task_entity (id, json, fqnHash) VALUES (:id, :json, :fqnHash)"; + handle.createUpdate(sql).bind("id", id).bind("json", json).bind("fqnHash", fqnHash).execute(); } private static String lookupUserId(Handle handle, String userName) { diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java index 8a4335eb7f46..3f0d1075c741 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java @@ -18,11 +18,14 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL; +import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -45,16 +48,100 @@ void setUp() { handle = mock(Handle.class, RETURNS_DEEP_STUBS); } + @Test + void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissing() { + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) + .thenThrow(new RuntimeException("missing table")); + + assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, MYSQL)); + + verify(handle, never()).createUpdate(anyString()); + } + + @Test + void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissingPostgres() { + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) + .thenThrow(new RuntimeException("missing table")); + + assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, POSTGRES)); + + verify(handle, never()).createUpdate(anyString()); + } + + @Test + void migrateSuggestionsToTaskEntitySkipsWhenSuggestionsTableIsMissing() { + when(handle.createQuery("SELECT 1 FROM suggestions LIMIT 1").mapToMap().list()) + .thenThrow(new RuntimeException("missing table")); + + assertDoesNotThrow(() -> MigrationUtil.migrateSuggestionsToTaskEntity(handle, MYSQL)); + + verify(handle, never()).createUpdate(anyString()); + } + + @Test + void migrateSuggestionsToTaskEntitySkipsWhenSuggestionsTableIsMissingPostgres() { + when(handle.createQuery("SELECT 1 FROM suggestions LIMIT 1").mapToMap().list()) + .thenThrow(new RuntimeException("missing table")); + + assertDoesNotThrow(() -> MigrationUtil.migrateSuggestionsToTaskEntity(handle, POSTGRES)); + + verify(handle, never()).createUpdate(anyString()); + } + @Test void migrateLegacyActivityThreadsToActivityStreamSkipsWhenThreadTableIsMissing() { when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) .thenThrow(new RuntimeException("missing table")); - assertDoesNotThrow(() -> MigrationUtil.migrateLegacyActivityThreadsToActivityStream(handle)); + assertDoesNotThrow( + () -> MigrationUtil.migrateLegacyActivityThreadsToActivityStream(handle, MYSQL)); + + verify(handle, never()).createUpdate(anyString()); + } + + @Test + void migrateLegacyActivityThreadsToActivityStreamSkipsWhenThreadTableIsMissingPostgres() { + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) + .thenThrow(new RuntimeException("missing table")); + + assertDoesNotThrow( + () -> MigrationUtil.migrateLegacyActivityThreadsToActivityStream(handle, POSTGRES)); verify(handle, never()).createUpdate(anyString()); } + @Test + void migrateThreadTasksUsesJsonbCastForPostgres() { + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) + .thenReturn(1); + when(handle + .createQuery( + "SELECT json FROM thread_entity WHERE type = 'Task' ORDER BY createdAt ASC") + .mapToMap() + .list()) + .thenReturn(java.util.List.of()); + + assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, POSTGRES)); + + verify(handle, never()).createUpdate(contains(":json,")); + } + + @Test + void migrateThreadTasksDoesNotUseJsonbCastForMysql() { + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) + .thenReturn(1); + when(handle + .createQuery( + "SELECT json FROM thread_entity WHERE type = 'Task' ORDER BY createdAt ASC") + .mapToMap() + .list()) + .thenReturn(java.util.List.of()); + + assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, MYSQL)); + + verify(handle, never()).createUpdate(contains("::jsonb")); + } + @Test void backfillAnnouncementRelationshipsSkipsWhenAnnouncementTableIsMissing() { when(handle.createQuery("SELECT 1 FROM announcement_entity LIMIT 1").mapTo(Integer.class).one()) From 2e70935313ca96c4308b256b3e564ae1a71af89f Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Mon, 27 Apr 2026 15:41:57 +0530 Subject: [PATCH 2/5] fix(migration): address review feedback on tableExists and jsonb cast tests - Fix tableExists to use findFirst() instead of one(): one() throws on an empty result set (0 rows from an existing but empty table), causing the guard to incorrectly return false and log "table does not exist" - Replace vacuous jsonb cast tests with direct insertTask reflection tests: the previous tests mocked an empty thread list so insertTask was never reached, making assertions trivially true regardless of cast logic; now call insertTask directly via reflection so the ::jsonb cast path is actually exercised for POSTGRES and absent for MYSQL - Update tableExists mock stubs in tests from .one() to .findFirst() to match the corrected implementation Co-Authored-By: Claude Sonnet 4.6 --- .../migration/utils/v200/MigrationUtil.java | 2 +- .../utils/v200/MigrationUtilTest.java | 53 +++++++++---------- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java index 3ba285775d2b..93c440354787 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java @@ -916,7 +916,7 @@ private static boolean tableExists(Handle handle, String tableName) { handle .createQuery(String.format("SELECT 1 FROM %s LIMIT 1", tableName)) .mapTo(Integer.class) - .one(); + .findFirst(); return true; } catch (Exception e) { return false; diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java index 3f0d1075c741..00885e2d5103 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java @@ -38,6 +38,7 @@ import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.resources.feeds.MessageParser; class MigrationUtilTest { @@ -50,7 +51,7 @@ void setUp() { @Test void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissing() { - when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) .thenThrow(new RuntimeException("missing table")); assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, MYSQL)); @@ -60,7 +61,7 @@ void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissing() { @Test void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissingPostgres() { - when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) .thenThrow(new RuntimeException("missing table")); assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, POSTGRES)); @@ -90,7 +91,7 @@ void migrateSuggestionsToTaskEntitySkipsWhenSuggestionsTableIsMissingPostgres() @Test void migrateLegacyActivityThreadsToActivityStreamSkipsWhenThreadTableIsMissing() { - when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) .thenThrow(new RuntimeException("missing table")); assertDoesNotThrow( @@ -101,7 +102,7 @@ void migrateLegacyActivityThreadsToActivityStreamSkipsWhenThreadTableIsMissing() @Test void migrateLegacyActivityThreadsToActivityStreamSkipsWhenThreadTableIsMissingPostgres() { - when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) .thenThrow(new RuntimeException("missing table")); assertDoesNotThrow( @@ -111,33 +112,29 @@ void migrateLegacyActivityThreadsToActivityStreamSkipsWhenThreadTableIsMissingPo } @Test - void migrateThreadTasksUsesJsonbCastForPostgres() { - when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) - .thenReturn(1); - when(handle - .createQuery( - "SELECT json FROM thread_entity WHERE type = 'Task' ORDER BY createdAt ASC") - .mapToMap() - .list()) - .thenReturn(java.util.List.of()); - - assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, POSTGRES)); - - verify(handle, never()).createUpdate(contains(":json,")); + void insertTaskUsesJsonbCastForPostgres() throws Exception { + invokePrivateStatic( + "insertTask", + new Class[] {Handle.class, String.class, String.class, String.class, ConnectionType.class}, + handle, + "test-id", + "{}", + "test-hash", + POSTGRES); + + verify(handle).createUpdate(contains("::jsonb")); } @Test - void migrateThreadTasksDoesNotUseJsonbCastForMysql() { - when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) - .thenReturn(1); - when(handle - .createQuery( - "SELECT json FROM thread_entity WHERE type = 'Task' ORDER BY createdAt ASC") - .mapToMap() - .list()) - .thenReturn(java.util.List.of()); - - assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, MYSQL)); + void insertTaskDoesNotUseJsonbCastForMysql() throws Exception { + invokePrivateStatic( + "insertTask", + new Class[] {Handle.class, String.class, String.class, String.class, ConnectionType.class}, + handle, + "test-id", + "{}", + "test-hash", + MYSQL); verify(handle, never()).createUpdate(contains("::jsonb")); } From e5592bc82909ed5ff13240c27335ae543f218fd6 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Mon, 27 Apr 2026 21:41:31 +0530 Subject: [PATCH 3/5] Fix v200 migration: insert entity_relationship rows for migrated tasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thread tasks and suggestions migrated to task_entity were missing entity_relationship rows for assignees, createdBy, and about — causing the repository to return empty assignees and no createdBy in the API. Added insertTaskLinkRelationships helper called after each task INSERT in both migrateThreadTasksToTaskEntity and migrateSuggestionsToTaskEntity. Co-Authored-By: Claude Sonnet 4.6 --- .../migration/utils/v200/MigrationUtil.java | 108 +++++++++++++++ .../utils/v200/MigrationUtilTest.java | 129 ++++++++++++++++++ 2 files changed, 237 insertions(+) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java index 93c440354787..6a5dc9c0d339 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java @@ -219,6 +219,8 @@ public static void migrateSuggestionsToTaskEntity(Handle handle, ConnectionType insertTask(handle, suggestionId, taskJson.toString(), fqnHash, connectionType); insertTaskDomainRelationships(handle, suggestionId, inheritedDomains); + insertTaskLinkRelationships( + handle, suggestionId, null, null, null, createdByUserId, taskJson, connectionType); migrated++; } catch (Exception e) { LOG.warn("Error migrating suggestion: {}", e.getMessage()); @@ -379,6 +381,15 @@ public static void migrateThreadTasksToTaskEntity(Handle handle, ConnectionType insertTask(handle, threadId, taskJson.toString(), fqnHash, connectionType); insertTaskDomainRelationships(handle, threadId, inheritedDomains); + insertTaskLinkRelationships( + handle, + threadId, + taskDetails.has("assignees") ? taskDetails.get("assignees") : null, + taskDetails.has("reviewers") ? taskDetails.get("reviewers") : null, + taskDetails.has("watchers") ? taskDetails.get("watchers") : null, + createdByUserId, + taskJson, + connectionType); migrated++; } catch (Exception e) { LOG.warn("Error migrating thread task: {}", e.getMessage()); @@ -538,6 +549,10 @@ private static void setAboutFromEntityLink( ObjectNode aboutRef = JsonUtils.getObjectNode(); if (sourceJson.has("entityId") && !sourceJson.get("entityId").isNull()) { aboutRef.put("id", sourceJson.get("entityId").asText()); + } else if (sourceJson.has("entityRef") + && sourceJson.get("entityRef").has("id") + && !sourceJson.get("entityRef").get("id").isNull()) { + aboutRef.put("id", sourceJson.get("entityRef").get("id").asText()); } aboutRef.put("type", entityType); aboutRef.put("fullyQualifiedName", entityFQN); @@ -1164,6 +1179,99 @@ private static void setDomainsInTaskJson(ObjectNode taskJson, List{} relation={}: {}", + fromId, + toId, + relation, + e.getMessage()); + } + } + + private static void insertTaskUserListRelationships( + Handle handle, + String taskId, + JsonNode users, + Relationship relation, + ConnectionType connectionType) { + if (users == null || !users.isArray()) { + return; + } + for (JsonNode u : users) { + String id = u.path("id").asText(null); + if (id == null || id.isEmpty()) { + continue; + } + String type = u.path("type").asText("user"); + insertEntityRelationship(handle, id, type, taskId, Entity.TASK, relation, connectionType); + } + } + + private static void insertTaskLinkRelationships( + Handle handle, + String taskId, + JsonNode assignees, + JsonNode reviewers, + JsonNode watchers, + String createdByUserId, + ObjectNode taskJson, + ConnectionType connectionType) { + insertTaskUserListRelationships( + handle, taskId, assignees, Relationship.ASSIGNED_TO, connectionType); + insertTaskUserListRelationships( + handle, taskId, reviewers, Relationship.REVIEWS, connectionType); + insertTaskUserListRelationships(handle, taskId, watchers, Relationship.FOLLOWS, connectionType); + if (createdByUserId != null) { + insertEntityRelationship( + handle, + createdByUserId, + Entity.USER, + taskId, + Entity.TASK, + Relationship.CREATED, + connectionType); + } + JsonNode about = taskJson.get("about"); + if (about != null && about.has("id") && !about.get("id").isNull() && about.has("type")) { + String aboutId = about.get("id").asText(); + String aboutType = about.get("type").asText(); + if (!aboutId.isEmpty() && !aboutType.isEmpty()) { + insertEntityRelationship( + handle, + aboutId, + aboutType, + taskId, + Entity.TASK, + Relationship.MENTIONED_IN, + connectionType); + } + } + } + /** * Insert DOMAIN --HAS--> task rows so {@code TaskRepository.getDomains()} returns * the inherited domains when the task is read. diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java index 00885e2d5103..ebfbde265cf8 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java @@ -20,6 +20,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -30,10 +31,14 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.UUID; import org.jdbi.v3.core.Handle; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.openmetadata.schema.entity.activity.ActivityEvent; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.EntityReference; @@ -139,6 +144,130 @@ void insertTaskDoesNotUseJsonbCastForMysql() throws Exception { verify(handle, never()).createUpdate(contains("::jsonb")); } + @Test + void migrateThreadTaskInsertsEntityRelationshipRowsForAssigneesAndAbout() { + String assigneeId = "aaaa-bbbb-cccc-dddd"; + String entityRefId = "5555-6666-7777-8888"; + + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) + .thenReturn(java.util.Optional.of(1)); + + String threadJson = + """ + { + "id": "dead-beef-0000-0001", + "type": "Task", + "about": "<#E::glossaryTerm::MyGlossary.MyTerm>", + "message": "Approval required", + "threadTs": 1700000000000, + "updatedAt": 1700000000000, + "createdBy": "system", + "updatedBy": "system", + "entityRef": { "id": "%s", "type": "glossaryTerm" }, + "task": { + "id": 1, + "type": "RequestApproval", + "status": "Open", + "assignees": [{ "id": "%s", "type": "user" }] + } + } + """ + .formatted(entityRefId, assigneeId); + + Map row = Map.of("json", threadJson); + when(handle + .createQuery( + "SELECT json FROM thread_entity WHERE type = 'Task' ORDER BY createdAt ASC") + .mapToMap() + .list()) + .thenReturn(List.of(row)); + + when(handle + .createQuery("SELECT COUNT(*) FROM task_entity WHERE id = :id") + .bind("id", "dead-beef-0000-0001") + .mapTo(Long.class) + .one()) + .thenReturn(0L); + + when(handle.createQuery(anyString()).mapTo(Long.class).findFirst()) + .thenReturn(java.util.Optional.of(0L)); + + when(handle.createQuery(contains("entity_relationship")).mapToMap().list()) + .thenReturn(Collections.emptyList()); + + assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, MYSQL)); + + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + verify(handle, atLeastOnce()).createUpdate(sqlCaptor.capture()); + + List allSql = sqlCaptor.getAllValues(); + long entityRelationshipInserts = + allSql.stream().filter(s -> s.contains("entity_relationship")).count(); + // Expect at least: ASSIGNED_TO (assignee) + MENTIONED_IN (about entity) + assertEquals(true, entityRelationshipInserts >= 2); + } + + @Test + void migrateSuggestionInsertsEntityRelationshipRowsForCreatedBy() { + String createdById = "cccc-dddd-eeee-ffff"; + String entityId = "9999-8888-7777-6666"; + + // suggestions table exists + when(handle.createQuery("SELECT 1 FROM suggestions LIMIT 1").mapTo(Integer.class).findFirst()) + .thenReturn(java.util.Optional.of(1)); + + String suggestionJson = + """ + { + "id": "dead-beef-0000-0002", + "type": "SuggestDescription", + "status": "Open", + "entityLink": "<#E::table::sample.shop.orders>", + "entityId": "%s", + "description": "A good table", + "createdBy": { "id": "%s", "type": "user" }, + "createdAt": 1700000000000, + "updatedAt": 1700000000000, + "updatedBy": "system" + } + """ + .formatted(entityId, createdById); + + Map row = Map.of("json", suggestionJson); + when(handle + .createQuery("SELECT json FROM suggestions ORDER BY updatedAt ASC") + .mapToMap() + .list()) + .thenReturn(List.of(row)); + + // taskExists returns false + when(handle + .createQuery("SELECT COUNT(*) FROM task_entity WHERE id = :id") + .bind("id", "dead-beef-0000-0002") + .mapTo(Long.class) + .one()) + .thenReturn(0L); + + // sequence + when(handle.createQuery(anyString()).mapTo(Long.class).findFirst()) + .thenReturn(java.util.Optional.of(0L)); + + // resolveDomainsForTaskAbout — empty + when(handle.createQuery(contains("entity_relationship")).mapToMap().list()) + .thenReturn(Collections.emptyList()); + + assertDoesNotThrow(() -> MigrationUtil.migrateSuggestionsToTaskEntity(handle, MYSQL)); + + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + verify(handle, atLeastOnce()).createUpdate(sqlCaptor.capture()); + + List allSql = sqlCaptor.getAllValues(); + long entityRelationshipInserts = + allSql.stream().filter(s -> s.contains("entity_relationship")).count(); + // Expect at least: CREATED + MENTIONED_IN + assertNotNull(entityRelationshipInserts >= 2); + } + @Test void backfillAnnouncementRelationshipsSkipsWhenAnnouncementTableIsMissing() { when(handle.createQuery("SELECT 1 FROM announcement_entity LIMIT 1").mapTo(Integer.class).one()) From a6963e1ad645d751acdc0b130ad2dd00c77a7a5c Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Mon, 27 Apr 2026 23:25:14 +0530 Subject: [PATCH 4/5] Fix v200 migration: UPSERT entity_relationship rows and handle thread_entity_legacy - Replace INSERT IGNORE/ON CONFLICT DO NOTHING with UPSERT so stale entity_relationship rows written by 1.12.x FeedRepository (toEntity='thread') get corrected to toEntity='task' on PK conflict instead of being silently skipped - Fall back to thread_entity_legacy when thread_entity is absent so subsequent server starts (after post-DDL rename) still process thread tasks - For tasks already in task_entity, still call insertTaskLinkRelationships instead of skipping entirely, ensuring relationship rows are backfilled idempotently Co-Authored-By: Claude Sonnet 4.6 --- .../migration/utils/v200/MigrationUtil.java | 65 +++++++++++++++---- .../utils/v200/MigrationUtilTest.java | 10 +++ 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java index 6a5dc9c0d339..59553324622c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java @@ -124,8 +124,23 @@ public static void migrateSuggestionsToTaskEntity(Handle handle, ConnectionType JsonNode suggestionJson = JsonUtils.readTree(jsonStr); String suggestionId = suggestionJson.get("id").asText(); - - if (taskExists(handle, suggestionId)) { + boolean alreadyExists = taskExists(handle, suggestionId); + + if (alreadyExists) { + String createdByUserId = null; + if (suggestionJson.has("createdBy") + && suggestionJson.get("createdBy").has("id") + && !suggestionJson.get("createdBy").get("id").isNull()) { + createdByUserId = suggestionJson.get("createdBy").get("id").asText(); + } + ObjectNode aboutJson = JsonUtils.getObjectNode(); + String entityLinkStr = + suggestionJson.has("entityLink") ? suggestionJson.get("entityLink").asText() : null; + if (entityLinkStr != null) { + setAboutFromEntityLink(aboutJson, entityLinkStr, suggestionJson); + } + insertTaskLinkRelationships( + handle, suggestionId, null, null, null, createdByUserId, aboutJson, connectionType); skipped++; continue; } @@ -239,14 +254,21 @@ public static void migrateSuggestionsToTaskEntity(Handle handle, ConnectionType */ public static void migrateThreadTasksToTaskEntity(Handle handle, ConnectionType connectionType) { LOG.info("Starting migration of thread-based tasks to task_entity"); - if (!tableExists(handle, "thread_entity")) { - LOG.info("thread_entity table does not exist, skipping thread task migration"); + String threadTable; + if (tableExists(handle, "thread_entity")) { + threadTable = "thread_entity"; + } else if (tableExists(handle, "thread_entity_legacy")) { + threadTable = "thread_entity_legacy"; + } else { + LOG.info( + "Neither thread_entity nor thread_entity_legacy exists, skipping thread task migration"); return; } List> threads = handle .createQuery( - "SELECT json FROM thread_entity WHERE type = 'Task' ORDER BY createdAt ASC") + String.format( + "SELECT json FROM %s WHERE type = 'Task' ORDER BY createdAt ASC", threadTable)) .mapToMap() .list(); @@ -267,11 +289,7 @@ public static void migrateThreadTasksToTaskEntity(Handle handle, ConnectionType JsonNode threadJson = JsonUtils.readTree(jsonStr); String threadId = threadJson.get("id").asText(); - - if (taskExists(handle, threadId)) { - skipped++; - continue; - } + boolean alreadyExists = taskExists(handle, threadId); JsonNode taskDetails = threadJson.get("task"); if (taskDetails == null) { @@ -297,6 +315,25 @@ public static void migrateThreadTasksToTaskEntity(Handle handle, ConnectionType continue; } + if (alreadyExists) { + String createdByName = + threadJson.has("createdBy") ? threadJson.get("createdBy").asText() : "system"; + String createdByUserId = lookupUserId(handle, createdByName); + ObjectNode aboutJson = JsonUtils.getObjectNode(); + setAboutFromEntityLink(aboutJson, aboutLink, threadJson); + insertTaskLinkRelationships( + handle, + threadId, + taskDetails.has("assignees") ? taskDetails.get("assignees") : null, + taskDetails.has("reviewers") ? taskDetails.get("reviewers") : null, + taskDetails.has("watchers") ? taskDetails.get("watchers") : null, + createdByUserId, + aboutJson, + connectionType); + skipped++; + continue; + } + String entityType = entityLink.getEntityType(); String newType = mapThreadTaskType(oldType, entityType); String newCategory = mapThreadTaskCategory(oldType, entityType); @@ -1190,9 +1227,11 @@ private static void insertEntityRelationship( String sql = connectionType == ConnectionType.POSTGRES ? "INSERT INTO entity_relationship (fromId, toId, fromEntity, toEntity, relation) " - + "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation) ON CONFLICT DO NOTHING" - : "INSERT IGNORE INTO entity_relationship (fromId, toId, fromEntity, toEntity, relation) " - + "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation)"; + + "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation) " + + "ON CONFLICT (fromId, toId, relation) DO UPDATE SET toEntity = EXCLUDED.toEntity, fromEntity = EXCLUDED.fromEntity" + : "INSERT INTO entity_relationship (fromId, toId, fromEntity, toEntity, relation) " + + "VALUES (:fromId, :toId, :fromEntity, :toEntity, :relation) " + + "ON DUPLICATE KEY UPDATE toEntity = VALUES(toEntity), fromEntity = VALUES(fromEntity)"; try { handle .createUpdate(sql) diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java index 7d512c6e034f..c7dea2da01b3 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java @@ -59,6 +59,11 @@ void setUp() { void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissing() { when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) .thenThrow(new RuntimeException("missing table")); + when(handle + .createQuery("SELECT 1 FROM thread_entity_legacy LIMIT 1") + .mapTo(Integer.class) + .findFirst()) + .thenThrow(new RuntimeException("missing table")); assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, MYSQL)); @@ -69,6 +74,11 @@ void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissing() { void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissingPostgres() { when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) .thenThrow(new RuntimeException("missing table")); + when(handle + .createQuery("SELECT 1 FROM thread_entity_legacy LIMIT 1") + .mapTo(Integer.class) + .findFirst()) + .thenThrow(new RuntimeException("missing table")); assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, POSTGRES)); From 19f6b7a984e60b7380c2bed7f35e11c6519c7965 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Mon, 27 Apr 2026 23:43:15 +0530 Subject: [PATCH 5/5] Fix v200 migration: null-safe createdBy lookup in alreadyExists backfill Use threadJson.path("createdBy").asText("system") to avoid treating a JSON null value as the literal string "null" which would cause lookupUserId to fail and skip the CREATED entity_relationship row. Co-Authored-By: Claude Sonnet 4.6 --- .../service/migration/utils/v200/MigrationUtil.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java index 59553324622c..41f92dbb8199 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java @@ -316,8 +316,7 @@ public static void migrateThreadTasksToTaskEntity(Handle handle, ConnectionType } if (alreadyExists) { - String createdByName = - threadJson.has("createdBy") ? threadJson.get("createdBy").asText() : "system"; + String createdByName = threadJson.path("createdBy").asText("system"); String createdByUserId = lookupUserId(handle, createdByName); ObjectNode aboutJson = JsonUtils.getObjectNode(); setAboutFromEntityLink(aboutJson, aboutLink, threadJson);