diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index dc5cfbcdc366..51bdbaa78794 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -1228,7 +1228,16 @@ protected void setDefaultStatus(T entity, boolean update) { public final void initSeedDataFromResources() throws IOException { List entities = getEntitiesFromSeedData(); for (T entity : entities) { - initializeEntity(entity); + try { + initializeEntity(entity); + } catch (Exception e) { + LOG.warn( + "Failed to initialize {} '{}': {}", + entityType, + entity.getFullyQualifiedName(), + e.getMessage(), + e); + } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java index 5924a7b47907..dc58f4c212c8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/WorkflowDefinitionRepository.java @@ -347,6 +347,9 @@ private boolean hasCycleDFS( List neighbors = adjacencyList.get(node); if (neighbors != null) { for (String neighbor : neighbors) { + if (neighbor.equals(node)) { + continue; + } if (hasCycleDFS(neighbor, adjacencyList, visited, recursionStack)) { return true; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java index 21b03e88bbc2..ef9b989c8c8c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/mysql/v200/Migration.java @@ -1,9 +1,11 @@ package org.openmetadata.service.migration.mysql.v200; +import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addTableColumnSearchSettings; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillAnnouncementRelationships; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateLegacyActivityThreadsToActivityStream; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateSuggestionsToTaskEntity; +import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateThreadTasksToTaskEntity; import lombok.SneakyThrows; import org.openmetadata.service.migration.api.MigrationProcessImpl; @@ -19,11 +21,9 @@ public Migration(MigrationFile migrationFile) { @SneakyThrows public void runDataMigration() { addTableColumnSearchSettings(); - migrateSuggestionsToTaskEntity(handle); - // Causing issues with collate CI, needs to be fixed before enabling this migration - // @harshach - // migrateThreadTasksToTaskEntity(handle); - migrateLegacyActivityThreadsToActivityStream(handle); + migrateSuggestionsToTaskEntity(handle, MYSQL); + migrateThreadTasksToTaskEntity(handle, MYSQL); + migrateLegacyActivityThreadsToActivityStream(handle, MYSQL); backfillAnnouncementRelationships(handle); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java index 32281e424cc7..dccef4e9ff43 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/postgres/v200/Migration.java @@ -1,9 +1,11 @@ package org.openmetadata.service.migration.postgres.v200; +import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addTableColumnSearchSettings; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillAnnouncementRelationships; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateLegacyActivityThreadsToActivityStream; import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateSuggestionsToTaskEntity; +import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateThreadTasksToTaskEntity; import lombok.SneakyThrows; import org.openmetadata.service.migration.api.MigrationProcessImpl; @@ -19,11 +21,9 @@ public Migration(MigrationFile migrationFile) { @SneakyThrows public void runDataMigration() { addTableColumnSearchSettings(); - migrateSuggestionsToTaskEntity(handle); - // Causing issues with collate CI, needs to be fixed before enabling this migration - // @harshach - // migrateThreadTasksToTaskEntity(handle); - migrateLegacyActivityThreadsToActivityStream(handle); + migrateSuggestionsToTaskEntity(handle, POSTGRES); + migrateThreadTasksToTaskEntity(handle, POSTGRES); + migrateLegacyActivityThreadsToActivityStream(handle, POSTGRES); backfillAnnouncementRelationships(handle); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java index 38002f4a837c..41f92dbb8199 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/migration/utils/v200/MigrationUtil.java @@ -23,6 +23,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.jdbi3.AnnouncementRepository; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.migration.utils.SearchSettingsMergeUtil; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.util.EntityUtil; @@ -86,7 +87,7 @@ public static void addTableColumnSearchSettings() { * suggestion becomes a Task with type=Suggestion and category=MetadataUpdate. The about * EntityReference and aboutFqnHash are properly computed from the entityLink. */ - public static void migrateSuggestionsToTaskEntity(Handle handle) { + public static void migrateSuggestionsToTaskEntity(Handle handle, ConnectionType connectionType) { LOG.info("Starting migration of suggestions to task_entity"); boolean tableExists; @@ -123,8 +124,23 @@ public static void migrateSuggestionsToTaskEntity(Handle handle) { JsonNode suggestionJson = JsonUtils.readTree(jsonStr); String suggestionId = suggestionJson.get("id").asText(); - - if (taskExists(handle, suggestionId)) { + boolean alreadyExists = taskExists(handle, suggestionId); + + if (alreadyExists) { + String createdByUserId = null; + if (suggestionJson.has("createdBy") + && suggestionJson.get("createdBy").has("id") + && !suggestionJson.get("createdBy").get("id").isNull()) { + createdByUserId = suggestionJson.get("createdBy").get("id").asText(); + } + ObjectNode aboutJson = JsonUtils.getObjectNode(); + String entityLinkStr = + suggestionJson.has("entityLink") ? suggestionJson.get("entityLink").asText() : null; + if (entityLinkStr != null) { + setAboutFromEntityLink(aboutJson, entityLinkStr, suggestionJson); + } + insertTaskLinkRelationships( + handle, suggestionId, null, null, null, createdByUserId, aboutJson, connectionType); skipped++; continue; } @@ -216,8 +232,10 @@ public static void migrateSuggestionsToTaskEntity(Handle handle) { taskJson.put("commentCount", 0); taskJson.set("tags", JsonUtils.getObjectNode().arrayNode()); - insertTask(handle, suggestionId, taskJson.toString(), fqnHash); + insertTask(handle, suggestionId, taskJson.toString(), fqnHash, connectionType); insertTaskDomainRelationships(handle, suggestionId, inheritedDomains); + insertTaskLinkRelationships( + handle, suggestionId, null, null, null, createdByUserId, taskJson, connectionType); migrated++; } catch (Exception e) { LOG.warn("Error migrating suggestion: {}", e.getMessage()); @@ -234,13 +252,23 @@ public static void migrateSuggestionsToTaskEntity(Handle handle) { * Migrate thread-based tasks from thread_entity to the new task_entity table. Each thread with * type='Task' becomes a proper Task entity with correct type mapping, payload, and aboutFqnHash. */ - public static void migrateThreadTasksToTaskEntity(Handle handle) { + public static void migrateThreadTasksToTaskEntity(Handle handle, ConnectionType connectionType) { LOG.info("Starting migration of thread-based tasks to task_entity"); - + String threadTable; + if (tableExists(handle, "thread_entity")) { + threadTable = "thread_entity"; + } else if (tableExists(handle, "thread_entity_legacy")) { + threadTable = "thread_entity_legacy"; + } else { + LOG.info( + "Neither thread_entity nor thread_entity_legacy exists, skipping thread task migration"); + return; + } List> threads = handle .createQuery( - "SELECT json FROM thread_entity WHERE type = 'Task' ORDER BY createdAt ASC") + String.format( + "SELECT json FROM %s WHERE type = 'Task' ORDER BY createdAt ASC", threadTable)) .mapToMap() .list(); @@ -261,11 +289,7 @@ public static void migrateThreadTasksToTaskEntity(Handle handle) { JsonNode threadJson = JsonUtils.readTree(jsonStr); String threadId = threadJson.get("id").asText(); - - if (taskExists(handle, threadId)) { - skipped++; - continue; - } + boolean alreadyExists = taskExists(handle, threadId); JsonNode taskDetails = threadJson.get("task"); if (taskDetails == null) { @@ -291,6 +315,24 @@ public static void migrateThreadTasksToTaskEntity(Handle handle) { continue; } + if (alreadyExists) { + String createdByName = threadJson.path("createdBy").asText("system"); + String createdByUserId = lookupUserId(handle, createdByName); + ObjectNode aboutJson = JsonUtils.getObjectNode(); + setAboutFromEntityLink(aboutJson, aboutLink, threadJson); + insertTaskLinkRelationships( + handle, + threadId, + taskDetails.has("assignees") ? taskDetails.get("assignees") : null, + taskDetails.has("reviewers") ? taskDetails.get("reviewers") : null, + taskDetails.has("watchers") ? taskDetails.get("watchers") : null, + createdByUserId, + aboutJson, + connectionType); + skipped++; + continue; + } + String entityType = entityLink.getEntityType(); String newType = mapThreadTaskType(oldType, entityType); String newCategory = mapThreadTaskCategory(oldType, entityType); @@ -373,8 +415,17 @@ public static void migrateThreadTasksToTaskEntity(Handle handle) { taskJson.set("resolution", resolution); } - insertTask(handle, threadId, taskJson.toString(), fqnHash); + insertTask(handle, threadId, taskJson.toString(), fqnHash, connectionType); insertTaskDomainRelationships(handle, threadId, inheritedDomains); + insertTaskLinkRelationships( + handle, + threadId, + taskDetails.has("assignees") ? taskDetails.get("assignees") : null, + taskDetails.has("reviewers") ? taskDetails.get("reviewers") : null, + taskDetails.has("watchers") ? taskDetails.get("watchers") : null, + createdByUserId, + taskJson, + connectionType); migrated++; } catch (Exception e) { LOG.warn("Error migrating thread task: {}", e.getMessage()); @@ -475,7 +526,8 @@ public static void backfillAnnouncementRelationships(Handle handle) { * thread_entity. User conversations stay in thread_entity; only generated activity entries are * migrated. */ - public static void migrateLegacyActivityThreadsToActivityStream(Handle handle) { + public static void migrateLegacyActivityThreadsToActivityStream( + Handle handle, ConnectionType connectionType) { LOG.info("Starting migration of legacy thread activity to activity_stream"); if (!tableExists(handle, "thread_entity")) { @@ -511,7 +563,7 @@ public static void migrateLegacyActivityThreadsToActivityStream(Handle handle) { continue; } - insertActivityEvent(handle, event); + insertActivityEvent(handle, event, connectionType); migrated++; } catch (Exception e) { LOG.warn("Error migrating legacy activity thread to activity_stream: {}", e.getMessage()); @@ -533,6 +585,10 @@ private static void setAboutFromEntityLink( ObjectNode aboutRef = JsonUtils.getObjectNode(); if (sourceJson.has("entityId") && !sourceJson.get("entityId").isNull()) { aboutRef.put("id", sourceJson.get("entityId").asText()); + } else if (sourceJson.has("entityRef") + && sourceJson.get("entityRef").has("id") + && !sourceJson.get("entityRef").get("id").isNull()) { + aboutRef.put("id", sourceJson.get("entityRef").get("id").asText()); } aboutRef.put("type", entityType); aboutRef.put("fullyQualifiedName", entityFQN); @@ -911,7 +967,7 @@ private static boolean tableExists(Handle handle, String tableName) { handle .createQuery(String.format("SELECT 1 FROM %s LIMIT 1", tableName)) .mapTo(Integer.class) - .one(); + .findFirst(); return true; } catch (Exception e) { return false; @@ -960,7 +1016,8 @@ private static boolean activityEventExists(Handle handle, UUID activityId, long > 0; } - private static void insertActivityEvent(Handle handle, ActivityEvent event) { + private static void insertActivityEvent( + Handle handle, ActivityEvent event, ConnectionType connectionType) { String entityFqnHash = event.getEntity().getFullyQualifiedName() != null ? FullyQualifiedName.buildHash(event.getEntity().getFullyQualifiedName()) @@ -973,6 +1030,8 @@ private static void insertActivityEvent(Handle handle, ActivityEvent event) { : JsonUtils.pojoToJson( event.getDomains().stream().map(domain -> domain.getId().toString()).toList()); + String domainsBind = connectionType == ConnectionType.POSTGRES ? ":domains::jsonb" : ":domains"; + String jsonBind = connectionType == ConnectionType.POSTGRES ? ":json::jsonb" : ":json"; handle .createUpdate( "INSERT INTO activity_stream " @@ -980,7 +1039,11 @@ private static void insertActivityEvent(Handle handle, ActivityEvent event) { + "actorId, actorName, timestamp, summary, fieldName, oldValue, newValue, domains, json) " + "VALUES (:id, :eventType, :entityType, :entityId, :entityFqnHash, :about, " + ":aboutFqnHash, :actorId, :actorName, :timestamp, :summary, :fieldName, " - + ":oldValue, :newValue, :domains, :json)") + + ":oldValue, :newValue, " + + domainsBind + + ", " + + jsonBind + + ")") .bind("id", event.getId().toString()) .bind("eventType", event.getEventType().value()) .bind("entityType", event.getEntity().getType()) @@ -1021,13 +1084,13 @@ private static boolean taskExists(Handle handle, String taskId) { > 0; } - private static void insertTask(Handle handle, String id, String json, String fqnHash) { - handle - .createUpdate("INSERT INTO task_entity (id, json, fqnHash) VALUES (:id, :json, :fqnHash)") - .bind("id", id) - .bind("json", json) - .bind("fqnHash", fqnHash) - .execute(); + private static void insertTask( + Handle handle, String id, String json, String fqnHash, ConnectionType connectionType) { + String sql = + connectionType == ConnectionType.POSTGRES + ? "INSERT INTO task_entity (id, json, fqnHash) VALUES (:id, :json::jsonb, :fqnHash)" + : "INSERT INTO task_entity (id, json, fqnHash) VALUES (:id, :json, :fqnHash)"; + handle.createUpdate(sql).bind("id", id).bind("json", json).bind("fqnHash", fqnHash).execute(); } private static String lookupUserId(Handle handle, String userName) { @@ -1152,6 +1215,101 @@ private static void setDomainsInTaskJson(ObjectNode taskJson, List{} relation={}: {}", + fromId, + toId, + relation, + e.getMessage()); + } + } + + private static void insertTaskUserListRelationships( + Handle handle, + String taskId, + JsonNode users, + Relationship relation, + ConnectionType connectionType) { + if (users == null || !users.isArray()) { + return; + } + for (JsonNode u : users) { + String id = u.path("id").asText(null); + if (id == null || id.isEmpty()) { + continue; + } + String type = u.path("type").asText("user"); + insertEntityRelationship(handle, id, type, taskId, Entity.TASK, relation, connectionType); + } + } + + private static void insertTaskLinkRelationships( + Handle handle, + String taskId, + JsonNode assignees, + JsonNode reviewers, + JsonNode watchers, + String createdByUserId, + ObjectNode taskJson, + ConnectionType connectionType) { + insertTaskUserListRelationships( + handle, taskId, assignees, Relationship.ASSIGNED_TO, connectionType); + insertTaskUserListRelationships( + handle, taskId, reviewers, Relationship.REVIEWS, connectionType); + insertTaskUserListRelationships(handle, taskId, watchers, Relationship.FOLLOWS, connectionType); + if (createdByUserId != null) { + insertEntityRelationship( + handle, + createdByUserId, + Entity.USER, + taskId, + Entity.TASK, + Relationship.CREATED, + connectionType); + } + JsonNode about = taskJson.get("about"); + if (about != null && about.has("id") && !about.get("id").isNull() && about.has("type")) { + String aboutId = about.get("id").asText(); + String aboutType = about.get("type").asText(); + if (!aboutId.isEmpty() && !aboutType.isEmpty()) { + insertEntityRelationship( + handle, + aboutId, + aboutType, + taskId, + Entity.TASK, + Relationship.MENTIONED_IN, + connectionType); + } + } + } + /** * Insert DOMAIN --HAS--> task rows so {@code TaskRepository.getDomains()} returns * the inherited domains when the task is read. diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java index 8a4335eb7f46..c7dea2da01b3 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/migration/utils/v200/MigrationUtilTest.java @@ -17,24 +17,34 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.contains; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL; +import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.lang.reflect.Method; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.UUID; import org.jdbi.v3.core.Handle; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.openmetadata.schema.entity.activity.ActivityEvent; import org.openmetadata.schema.entity.feed.Thread; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.resources.feeds.MessageParser; class MigrationUtilTest { @@ -45,16 +55,234 @@ void setUp() { handle = mock(Handle.class, RETURNS_DEEP_STUBS); } + @Test + void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissing() { + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) + .thenThrow(new RuntimeException("missing table")); + when(handle + .createQuery("SELECT 1 FROM thread_entity_legacy LIMIT 1") + .mapTo(Integer.class) + .findFirst()) + .thenThrow(new RuntimeException("missing table")); + + assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, MYSQL)); + + verify(handle, never()).createUpdate(anyString()); + } + + @Test + void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissingPostgres() { + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) + .thenThrow(new RuntimeException("missing table")); + when(handle + .createQuery("SELECT 1 FROM thread_entity_legacy LIMIT 1") + .mapTo(Integer.class) + .findFirst()) + .thenThrow(new RuntimeException("missing table")); + + assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, POSTGRES)); + + verify(handle, never()).createUpdate(anyString()); + } + + @Test + void migrateSuggestionsToTaskEntitySkipsWhenSuggestionsTableIsMissing() { + when(handle.createQuery("SELECT 1 FROM suggestions LIMIT 1").mapToMap().list()) + .thenThrow(new RuntimeException("missing table")); + + assertDoesNotThrow(() -> MigrationUtil.migrateSuggestionsToTaskEntity(handle, MYSQL)); + + verify(handle, never()).createUpdate(anyString()); + } + + @Test + void migrateSuggestionsToTaskEntitySkipsWhenSuggestionsTableIsMissingPostgres() { + when(handle.createQuery("SELECT 1 FROM suggestions LIMIT 1").mapToMap().list()) + .thenThrow(new RuntimeException("missing table")); + + assertDoesNotThrow(() -> MigrationUtil.migrateSuggestionsToTaskEntity(handle, POSTGRES)); + + verify(handle, never()).createUpdate(anyString()); + } + @Test void migrateLegacyActivityThreadsToActivityStreamSkipsWhenThreadTableIsMissing() { - when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).one()) + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) .thenThrow(new RuntimeException("missing table")); - assertDoesNotThrow(() -> MigrationUtil.migrateLegacyActivityThreadsToActivityStream(handle)); + assertDoesNotThrow( + () -> MigrationUtil.migrateLegacyActivityThreadsToActivityStream(handle, MYSQL)); verify(handle, never()).createUpdate(anyString()); } + @Test + void migrateLegacyActivityThreadsToActivityStreamSkipsWhenThreadTableIsMissingPostgres() { + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) + .thenThrow(new RuntimeException("missing table")); + + assertDoesNotThrow( + () -> MigrationUtil.migrateLegacyActivityThreadsToActivityStream(handle, POSTGRES)); + + verify(handle, never()).createUpdate(anyString()); + } + + @Test + void insertTaskUsesJsonbCastForPostgres() throws Exception { + invokePrivateStatic( + "insertTask", + new Class[] {Handle.class, String.class, String.class, String.class, ConnectionType.class}, + handle, + "test-id", + "{}", + "test-hash", + POSTGRES); + + verify(handle).createUpdate(contains("::jsonb")); + } + + @Test + void insertTaskDoesNotUseJsonbCastForMysql() throws Exception { + invokePrivateStatic( + "insertTask", + new Class[] {Handle.class, String.class, String.class, String.class, ConnectionType.class}, + handle, + "test-id", + "{}", + "test-hash", + MYSQL); + + verify(handle, never()).createUpdate(contains("::jsonb")); + } + + @Test + void migrateThreadTaskInsertsEntityRelationshipRowsForAssigneesAndAbout() { + String assigneeId = "aaaa-bbbb-cccc-dddd"; + String entityRefId = "5555-6666-7777-8888"; + + when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst()) + .thenReturn(java.util.Optional.of(1)); + + String threadJson = + """ + { + "id": "dead-beef-0000-0001", + "type": "Task", + "about": "<#E::glossaryTerm::MyGlossary.MyTerm>", + "message": "Approval required", + "threadTs": 1700000000000, + "updatedAt": 1700000000000, + "createdBy": "system", + "updatedBy": "system", + "entityRef": { "id": "%s", "type": "glossaryTerm" }, + "task": { + "id": 1, + "type": "RequestApproval", + "status": "Open", + "assignees": [{ "id": "%s", "type": "user" }] + } + } + """ + .formatted(entityRefId, assigneeId); + + Map row = Map.of("json", threadJson); + when(handle + .createQuery( + "SELECT json FROM thread_entity WHERE type = 'Task' ORDER BY createdAt ASC") + .mapToMap() + .list()) + .thenReturn(List.of(row)); + + when(handle + .createQuery("SELECT COUNT(*) FROM task_entity WHERE id = :id") + .bind("id", "dead-beef-0000-0001") + .mapTo(Long.class) + .one()) + .thenReturn(0L); + + when(handle.createQuery(anyString()).mapTo(Long.class).findOne()) + .thenReturn(java.util.Optional.of(0L)); + + when(handle.createQuery(contains("entity_relationship")).mapToMap().list()) + .thenReturn(Collections.emptyList()); + + assertDoesNotThrow(() -> MigrationUtil.migrateThreadTasksToTaskEntity(handle, MYSQL)); + + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + verify(handle, atLeastOnce()).createUpdate(sqlCaptor.capture()); + + List allSql = sqlCaptor.getAllValues(); + long entityRelationshipInserts = + allSql.stream().filter(s -> s.contains("entity_relationship")).count(); + assertTrue( + entityRelationshipInserts >= 2, + "Expected at least 2 entity_relationship inserts (ASSIGNED_TO + MENTIONED_IN), got " + + entityRelationshipInserts); + } + + @Test + void migrateSuggestionInsertsEntityRelationshipRowsForCreatedBy() { + String createdById = "cccc-dddd-eeee-ffff"; + String entityId = "9999-8888-7777-6666"; + + // suggestions table exists + when(handle.createQuery("SELECT 1 FROM suggestions LIMIT 1").mapToMap().list()) + .thenReturn(List.of(Map.of("1", 1))); + + String suggestionJson = + """ + { + "id": "dead-beef-0000-0002", + "type": "SuggestDescription", + "status": "Open", + "entityLink": "<#E::table::sample.shop.orders>", + "entityId": "%s", + "description": "A good table", + "createdBy": { "id": "%s", "type": "user" }, + "createdAt": 1700000000000, + "updatedAt": 1700000000000, + "updatedBy": "system" + } + """ + .formatted(entityId, createdById); + + Map row = Map.of("json", suggestionJson); + when(handle + .createQuery("SELECT json FROM suggestions ORDER BY updatedAt ASC") + .mapToMap() + .list()) + .thenReturn(List.of(row)); + + // taskExists returns false + when(handle + .createQuery("SELECT COUNT(*) FROM task_entity WHERE id = :id") + .bind("id", "dead-beef-0000-0002") + .mapTo(Long.class) + .one()) + .thenReturn(0L); + + // sequence + when(handle.createQuery(anyString()).mapTo(Long.class).findOne()) + .thenReturn(java.util.Optional.of(0L)); + + // resolveDomainsForTaskAbout — empty + when(handle.createQuery(contains("entity_relationship")).mapToMap().list()) + .thenReturn(Collections.emptyList()); + + assertDoesNotThrow(() -> MigrationUtil.migrateSuggestionsToTaskEntity(handle, MYSQL)); + + ArgumentCaptor sqlCaptor = ArgumentCaptor.forClass(String.class); + verify(handle, atLeastOnce()).createUpdate(sqlCaptor.capture()); + + List allSql = sqlCaptor.getAllValues(); + long entityRelationshipInserts = + allSql.stream().filter(s -> s.contains("entity_relationship")).count(); + assertTrue( + entityRelationshipInserts >= 2, + "Expected at least 2 entity_relationship inserts (CREATED + MENTIONED_IN), got " + + entityRelationshipInserts); + } + @Test void backfillAnnouncementRelationshipsSkipsWhenAnnouncementTableIsMissing() { when(handle.createQuery("SELECT 1 FROM announcement_entity LIMIT 1").mapTo(Integer.class).one())