Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher;
import org.openmetadata.service.exception.BadRequestException;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.logstorage.LogStorageInterface;
import org.openmetadata.service.logstorage.S3LogStorage.LogStreamListener;
Expand Down Expand Up @@ -291,6 +292,33 @@ public void clearFields(IngestionPipeline ingestionPipeline, Fields fields) {
public void prepare(IngestionPipeline ingestionPipeline, boolean update) {
var service = getCachedParentOrLoad(ingestionPipeline.getService(), "", Include.NON_DELETED);
ingestionPipeline.setService(service.getEntityReference());
validateSourceConfigHasType(ingestionPipeline);
}

static void validateSourceConfigHasType(IngestionPipeline ingestionPipeline) {
if (ingestionPipeline.getSourceConfig() == null
|| ingestionPipeline.getSourceConfig().getConfig() == null) {
throw new BadRequestException("sourceConfig.config.type is required");
}

Object config = ingestionPipeline.getSourceConfig().getConfig();
Object type;
try {
type =
config instanceof Map<?, ?> configMap
? configMap.get("type")
: JsonUtils.getMap(config).get("type");
} catch (IllegalArgumentException e) {
throw new BadRequestException("sourceConfig.config must be an object with type");
}

if (type instanceof String typeValue && !typeValue.isBlank()) {
return;
}
if (type instanceof Enum<?>) {
return;
}
throw new BadRequestException("sourceConfig.config.type is required");
}

protected boolean requiresRedeployment(IngestionPipeline original, IngestionPipeline updated) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addCreateTaskRuleToDataConsumerPolicy;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addTaskAuthorPolicyToDataConsumerRole;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillAnnouncementRelationships;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillMetadataSourceConfigTypes;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateLegacyActivityThreadsToActivityStream;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateSuggestionsToTaskEntity;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateThreadTasksToTaskEntity;
Expand Down Expand Up @@ -32,6 +33,7 @@ public void runDataMigration() {
// runDataMigration() per PR #26571, so this dual-invoke is required to
// close that path. The helper is idempotent — safe on every run.
addTableColumnSearchSettings();
backfillMetadataSourceConfigTypes(handle);
migrateSuggestionsToTaskEntity(handle, MYSQL);
migrateThreadTasksToTaskEntity(handle, MYSQL);
migrateLegacyActivityThreadsToActivityStream(handle, MYSQL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addCreateTaskRuleToDataConsumerPolicy;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.addTaskAuthorPolicyToDataConsumerRole;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillAnnouncementRelationships;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.backfillMetadataSourceConfigTypes;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateLegacyActivityThreadsToActivityStream;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateSuggestionsToTaskEntity;
import static org.openmetadata.service.migration.utils.v200.MigrationUtil.migrateThreadTasksToTaskEntity;
Expand Down Expand Up @@ -32,6 +33,7 @@ public void runDataMigration() {
// runDataMigration() per PR #26571, so this dual-invoke is required to
// close that path. The helper is idempotent — safe on every run.
addTableColumnSearchSettings();
backfillMetadataSourceConfigTypes(handle);
migrateSuggestionsToTaskEntity(handle, POSTGRES);
migrateThreadTasksToTaskEntity(handle, POSTGRES);
migrateLegacyActivityThreadsToActivityStream(handle, POSTGRES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,62 @@ protected boolean removeEldestEntry(Map.Entry<String, List<EntityReference>> eld

private MigrationUtil() {}

public static void backfillMetadataSourceConfigTypes(Handle handle) {
boolean isMySQL = Boolean.TRUE.equals(DatasourceConfig.getInstance().isMySQL());
String sql =
isMySQL
? "UPDATE ingestion_pipeline_entity i "
+ "JOIN entity_relationship er ON er.toId = i.id "
+ "AND er.toEntity = 'ingestionPipeline' "
+ "AND er.relation = 0 "
+ "AND er.deleted = false "
+ "JOIN ("
+ "SELECT 'apiService' AS fromEntity, 'ApiMetadata' AS configType "
+ "UNION ALL SELECT 'dashboardService', 'DashboardMetadata' "
+ "UNION ALL SELECT 'databaseService', 'DatabaseMetadata' "
+ "UNION ALL SELECT 'driveService', 'DriveMetadata' "
+ "UNION ALL SELECT 'mcpService', 'McpMetadata' "
+ "UNION ALL SELECT 'messagingService', 'MessagingMetadata' "
+ "UNION ALL SELECT 'mlmodelService', 'MlModelMetadata' "
+ "UNION ALL SELECT 'pipelineService', 'PipelineMetadata' "
+ "UNION ALL SELECT 'searchService', 'SearchMetadata' "
+ "UNION ALL SELECT 'securityService', 'SecurityMetadata' "
+ "UNION ALL SELECT 'storageService', 'StorageMetadata'"
+ ") metadata_config_type ON metadata_config_type.fromEntity = er.fromEntity "
+ "SET i.json = JSON_SET("
+ "i.json, '$.sourceConfig.config.type', metadata_config_type.configType) "
+ "WHERE i.json ->> '$.pipelineType' = 'metadata' "
+ "AND i.json ->> '$.sourceConfig.config.type' IS NULL "
+ "AND JSON_TYPE(JSON_EXTRACT(i.json, '$.sourceConfig.config')) = 'OBJECT'"
: "UPDATE ingestion_pipeline_entity i "
+ "SET json = jsonb_set(i.json::jsonb, '{sourceConfig,config,type}', "
+ "to_jsonb(metadata_config_type.config_type::text), true)::json "
+ "FROM entity_relationship er "
+ "JOIN (VALUES "
+ "('apiService', 'ApiMetadata'), "
+ "('dashboardService', 'DashboardMetadata'), "
+ "('databaseService', 'DatabaseMetadata'), "
+ "('driveService', 'DriveMetadata'), "
+ "('mcpService', 'McpMetadata'), "
+ "('messagingService', 'MessagingMetadata'), "
+ "('mlmodelService', 'MlModelMetadata'), "
+ "('pipelineService', 'PipelineMetadata'), "
+ "('searchService', 'SearchMetadata'), "
+ "('securityService', 'SecurityMetadata'), "
+ "('storageService', 'StorageMetadata')"
+ ") AS metadata_config_type(from_entity, config_type) "
+ "ON metadata_config_type.from_entity = er.fromentity "
+ "WHERE er.toid = i.id "
+ "AND er.toentity = 'ingestionPipeline' "
+ "AND er.relation = 0 "
+ "AND er.deleted = false "
+ "AND i.json ->> 'pipelineType' = 'metadata' "
+ "AND i.json #>> '{sourceConfig,config,type}' IS NULL "
+ "AND json_typeof(i.json #> '{sourceConfig,config}') = 'object'";
int count = handle.execute(sql);
LOG.info("Backfilled metadata source config types for {} ingestion pipelines", count);
}

/**
* Ensure {@code TaskAuthorPolicy} is seeded and attached to the {@code DataConsumer} role on
* upgrades. Role→Policy attachments are modelled as {@code Relationship.HAS} edges in {@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -23,6 +25,7 @@
import org.openmetadata.schema.security.secrets.SecretsManagerConfiguration;
import org.openmetadata.schema.security.secrets.SecretsManagerProvider;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.service.exception.BadRequestException;
import org.openmetadata.service.secrets.SecretsManagerFactory;

class IngestionPipelineRepositoryTest {
Expand Down Expand Up @@ -268,6 +271,58 @@ void testBuildIngestionPipelineDecrypted_ServicePreserved() {
assertEquals("OpenMetadata", decrypted.getService().getName());
}

@Test
void validateSourceConfigHasTypeRejectsEmptyConfig() {
IngestionPipeline pipeline = pipelineWithConfig(Map.of());

BadRequestException ex =
assertThrows(
BadRequestException.class,
() -> IngestionPipelineRepository.validateSourceConfigHasType(pipeline));

assertTrue(
ex.getMessage().contains("sourceConfig.config.type"),
"diagnostic should name sourceConfig.config.type, got: " + ex.getMessage());
}

@Test
void validateSourceConfigHasTypeRejectsMissingConfig() {
IngestionPipeline pipeline = new IngestionPipeline().withSourceConfig(new SourceConfig());

BadRequestException ex =
assertThrows(
BadRequestException.class,
() -> IngestionPipelineRepository.validateSourceConfigHasType(pipeline));

assertTrue(ex.getMessage().contains("sourceConfig.config.type"));
}

@Test
void validateSourceConfigHasTypeRejectsNonObjectConfig() {
IngestionPipeline pipeline = pipelineWithConfig("DatabaseMetadata");

BadRequestException ex =
assertThrows(
BadRequestException.class,
() -> IngestionPipelineRepository.validateSourceConfigHasType(pipeline));

assertTrue(ex.getMessage().contains("sourceConfig.config"));
}

@Test
void validateSourceConfigHasTypeAcceptsRawConfigWithType() {
IngestionPipeline pipeline = pipelineWithConfig(Map.of("type", "DatabaseMetadata"));

assertDoesNotThrow(() -> IngestionPipelineRepository.validateSourceConfigHasType(pipeline));
}

@Test
void validateSourceConfigHasTypeAcceptsTypedConfigWithDefaultType() {
IngestionPipeline pipeline = pipelineWithConfig(new DatabaseServiceMetadataPipeline());

assertDoesNotThrow(() -> IngestionPipelineRepository.validateSourceConfigHasType(pipeline));
}

private static IngestionPipeline createPipelineWithSchedule(String schedule) {
IngestionPipeline pipeline = createBasicPipeline();
AirflowConfig airflowConfig = new AirflowConfig();
Expand Down Expand Up @@ -312,4 +367,8 @@ private static IngestionPipeline createBasicPipeline() {

return pipeline;
}

private static IngestionPipeline pipelineWithConfig(Object config) {
return new IngestionPipeline().withSourceConfig(new SourceConfig().withConfig(config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -40,11 +41,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.openmetadata.schema.entity.activity.ActivityEvent;
import org.openmetadata.schema.entity.feed.Thread;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.resources.databases.DatasourceConfig;
import org.openmetadata.service.resources.feeds.MessageParser;

class MigrationUtilTest {
Expand All @@ -55,6 +58,56 @@ void setUp() {
handle = mock(Handle.class, RETURNS_DEEP_STUBS);
}

@Test
void backfillsMetadataSourceConfigTypesWithMySqlJsonSet() {
when(handle.execute(anyString())).thenReturn(2);

try (MockedStatic<DatasourceConfig> ds = mockStatic(DatasourceConfig.class)) {
DatasourceConfig cfg = mock(DatasourceConfig.class);
ds.when(DatasourceConfig::getInstance).thenReturn(cfg);
when(cfg.isMySQL()).thenReturn(true);

MigrationUtil.backfillMetadataSourceConfigTypes(handle);
}

ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
verify(handle).execute(sqlCaptor.capture());
String sql = sqlCaptor.getValue();
assertTrue(
sql.contains("JOIN (SELECT 'apiService' AS fromEntity, 'ApiMetadata' AS configType"));
assertTrue(sql.contains("UNION ALL SELECT 'databaseService', 'DatabaseMetadata'"));
assertTrue(sql.contains("metadata_config_type.fromEntity = er.fromEntity"));
assertTrue(sql.contains("metadata_config_type.configType"));
assertTrue(sql.contains("i.json ->> '$.pipelineType' = 'metadata'"));
assertTrue(sql.contains("JSON_TYPE(JSON_EXTRACT(i.json, '$.sourceConfig.config')) = 'OBJECT'"));
assertTrue(!sql.contains("CASE er.fromEntity"));
}

@Test
void backfillsMetadataSourceConfigTypesWithPostgresJsonbSet() {
when(handle.execute(anyString())).thenReturn(2);

try (MockedStatic<DatasourceConfig> ds = mockStatic(DatasourceConfig.class)) {
DatasourceConfig cfg = mock(DatasourceConfig.class);
ds.when(DatasourceConfig::getInstance).thenReturn(cfg);
when(cfg.isMySQL()).thenReturn(false);

MigrationUtil.backfillMetadataSourceConfigTypes(handle);
}

ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
verify(handle).execute(sqlCaptor.capture());
String sql = sqlCaptor.getValue();
assertTrue(sql.contains("JOIN (VALUES ('apiService', 'ApiMetadata')"));
assertTrue(sql.contains("('databaseService', 'DatabaseMetadata')"));
assertTrue(sql.contains("metadata_config_type.from_entity = er.fromentity"));
assertTrue(sql.contains("jsonb_set(i.json::jsonb"));
assertTrue(sql.contains("to_jsonb(metadata_config_type.config_type::text), true)::json"));
assertTrue(sql.contains("i.json ->> 'pipelineType' = 'metadata'"));
assertTrue(sql.contains("json_typeof(i.json #> '{sourceConfig,config}') = 'object'"));
assertTrue(!sql.contains("CASE er.fromentity"));
}

@Test
void migrateThreadTasksToTaskEntitySkipsWhenThreadTableIsMissing() {
when(handle.createQuery("SELECT 1 FROM thread_entity LIMIT 1").mapTo(Integer.class).findFirst())
Expand Down
Loading