diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpApplicationContext.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpApplicationContext.java new file mode 100644 index 000000000000..58782ff0d8cf --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpApplicationContext.java @@ -0,0 +1,18 @@ +package org.openmetadata.mcp; + +import org.openmetadata.service.OpenMetadataApplicationConfig; + +/** Holds application-level singletons needed by MCP tools at runtime. */ +public class McpApplicationContext { + private static volatile OpenMetadataApplicationConfig config; + + private McpApplicationContext() {} + + public static void setConfig(OpenMetadataApplicationConfig applicationConfig) { + config = applicationConfig; + } + + public static OpenMetadataApplicationConfig getConfig() { + return config; + } +} diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java index 9a6711715485..d02fd20edbb8 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/McpServer.java @@ -61,6 +61,7 @@ public void initializeMcpServer( SecurityConfigurationManager.getCurrentAuthzConfig()); this.authorizer = authorizer; this.limits = limits; + McpApplicationContext.setConfig(config); this.environment = environment; MutableServletContextHandler contextHandler = environment.getApplicationContext(); List tools = getTools(); @@ -286,9 +287,7 @@ private List getAllowedOriginsFromConfig() { return List.of(); } - /** - * Get base URL from system settings, with fallback to localhost for development. - */ + /** Get base URL from system settings, with fallback to localhost for development. */ private String getBaseUrlFromSettings() { try { org.openmetadata.service.jdbi3.SystemRepository systemRepository = @@ -313,9 +312,9 @@ private String getBaseUrlFromSettings() { LOG.warn("Could not get instance URL from SystemSettings, using fallback", e); } LOG.error( - "No base URL configured in MCP settings or system settings. " - + "Falling back to http://localhost:8585 — this is only suitable for local development. " - + "Configure a proper base URL for production deployments."); + "No base URL configured in MCP settings or system settings. Falling back to" + + " http://localhost:8585 — this is only suitable for local development. Configure a" + + " proper base URL for production deployments."); return "http://localhost:8585"; } } diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CommonUtils.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CommonUtils.java index a2ee6aed95d0..c37db8ff5620 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CommonUtils.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/CommonUtils.java @@ -53,4 +53,22 @@ public static List getTeamsOrUsers(Object teamsOrUsersParam) { } return teamsOrUsers; } + + public static int parseLimit(Map params, String key, int defaultValue) { + if (!params.containsKey(key)) { + return defaultValue; + } + Object v = params.get(key); + if (v instanceof Number n) { + return n.intValue(); + } + if (v instanceof String s) { + try { + return Integer.parseInt(s); + } catch (NumberFormatException e) { + LOG.warn("Invalid value for '{}': '{}', using default {}", key, s, defaultValue); + } + } + return defaultValue; + } } diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java index ad9d5d2448dd..0ee02aea6b72 100644 --- a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/DefaultToolContext.java @@ -14,11 +14,15 @@ @Slf4j public class DefaultToolContext { + static final String TOOL_LIST_INGESTION_PIPELINES = "list_ingestion_pipelines"; + static final String TOOL_GET_PIPELINE_STATUS = "get_pipeline_status"; + static final String TOOL_TRIGGER_INGESTION_PIPELINE = "trigger_ingestion_pipeline"; + public DefaultToolContext() {} /** - * Loads tool definitions from a JSON file located at the specified path. - * The JSON file should contain an array of tool definitions under the "tools" key. + * Loads tool definitions from a JSON file located at the specified path. The JSON file should + * contain an array of tool definitions under the "tools" key. * * @return List of McpSchema.Tool objects loaded from the JSON file. */ @@ -83,6 +87,15 @@ public McpSchema.CallToolResult callTool( case "create_metric": result = new CreateMetricTool().execute(authorizer, limits, securityContext, params); break; + case TOOL_LIST_INGESTION_PIPELINES: + result = new ListIngestionPipelinesTool().execute(authorizer, securityContext, params); + break; + case TOOL_GET_PIPELINE_STATUS: + result = new GetPipelineStatusTool().execute(authorizer, securityContext, params); + break; + case TOOL_TRIGGER_INGESTION_PIPELINE: + result = new TriggerIngestionPipelineTool().execute(authorizer, securityContext, params); + break; default: return McpSchema.CallToolResult.builder() .content( diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java new file mode 100644 index 000000000000..999934b9d763 --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/GetPipelineStatusTool.java @@ -0,0 +1,75 @@ +package org.openmetadata.mcp.tools; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.type.MetadataOperation; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.IngestionPipelineRepository; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; +import org.openmetadata.service.security.policyevaluator.OperationContext; +import org.openmetadata.service.security.policyevaluator.ResourceContext; + +@Slf4j +public class GetPipelineStatusTool implements McpTool { + + @Override + public Map execute( + Authorizer authorizer, CatalogSecurityContext securityContext, Map params) + throws IOException { + String fqn = requireFqn(params); + int limit = CommonUtils.parseLimit(params, "limit", 5); + authorize(authorizer, securityContext); + LOG.info("Getting pipeline status for FQN: {}, limit: {}", fqn, limit); + return buildStatusResponse(fqn, limit); + } + + private static String requireFqn(Map params) { + String fqn = (String) params.get("fqn"); + if (fqn == null || fqn.isBlank()) { + throw new IllegalArgumentException("Parameter 'fqn' is required"); + } + return fqn; + } + + private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) { + authorizer.authorize( + securityContext, + new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC), + new ResourceContext<>(Entity.INGESTION_PIPELINE)); + } + + private static Map buildStatusResponse(String fqn, int limit) throws IOException { + IngestionPipelineRepository repo = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + IngestionPipeline pipeline = + (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "", null); + var latestStatus = repo.getLatestPipelineStatus(pipeline); + var recentRuns = repo.listPipelineStatus(fqn, null, null, limit); + Map response = new HashMap<>(); + response.put("fqn", fqn); + response.put("pipelineName", pipeline.getName()); + response.put("pipelineType", pipeline.getPipelineType()); + response.put("enabled", pipeline.getEnabled()); + response.put("deployed", pipeline.getDeployed()); + response.put("latestStatus", latestStatus != null ? JsonUtils.getMap(latestStatus) : null); + response.put("recentRuns", JsonUtils.getMap(recentRuns)); + return response; + } + + @Override + public Map execute( + Authorizer authorizer, + Limits limits, + CatalogSecurityContext securityContext, + Map params) + throws IOException { + throw new UnsupportedOperationException( + "GetPipelineStatusTool does not require limit validation."); + } +} diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java new file mode 100644 index 000000000000..0f7fbe95f302 --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/ListIngestionPipelinesTool.java @@ -0,0 +1,99 @@ +package org.openmetadata.mcp.tools; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.type.Include; +import org.openmetadata.schema.type.MetadataOperation; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; +import org.openmetadata.service.jdbi3.IngestionPipelineRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; +import org.openmetadata.service.security.policyevaluator.OperationContext; +import org.openmetadata.service.security.policyevaluator.ResourceContext; + +@Slf4j +public class ListIngestionPipelinesTool implements McpTool { + + private static final List EXCLUDE_FIELDS = + List.of( + "version", + "updatedAt", + "updatedBy", + "changeDescription", + "sourceHash", + "openMetadataServerConnection", + "airflowConfig"); + + @Override + public Map execute( + Authorizer authorizer, CatalogSecurityContext securityContext, Map params) + throws IOException { + authorize(authorizer, securityContext); + int limit = CommonUtils.parseLimit(params, "limit", 10); + LOG.info( + "Listing ingestion pipelines — service: {}, pipelineType: {}, limit: {}", + params.get("service"), + params.get("pipelineType"), + limit); + return fetchAndClean(params, limit); + } + + private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) { + authorizer.authorize( + securityContext, + new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC), + new ResourceContext<>(Entity.INGESTION_PIPELINE)); + } + + private static Map fetchAndClean(Map params, int limit) + throws IOException { + IngestionPipelineRepository repo = + (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); + ListFilter filter = buildFilter(params); + String after = (String) params.get("after"); + var resultList = + repo.listAfter(null, repo.getFields("sourceConfig,pipelineType"), filter, limit, after); + Map response = JsonUtils.getMap(resultList); + stripVerboseFields(response); + return response; + } + + private static ListFilter buildFilter(Map params) { + ListFilter filter = new ListFilter(Include.NON_DELETED); + String service = (String) params.get("service"); + String pipelineType = (String) params.get("pipelineType"); + if (service != null && !service.isBlank()) filter.addQueryParam("service", service); + if (pipelineType != null && !pipelineType.isBlank()) + filter.addQueryParam("pipelineType", pipelineType); + return filter; + } + + private static void stripVerboseFields(Map response) { + if (response.get("data") instanceof List pipelines) { + pipelines.forEach( + p -> { + if (p instanceof Map pipeline) { + @SuppressWarnings("unchecked") + Map m = (Map) pipeline; + EXCLUDE_FIELDS.forEach(m::remove); + } + }); + } + } + + @Override + public Map execute( + Authorizer authorizer, + Limits limits, + CatalogSecurityContext securityContext, + Map params) + throws IOException { + throw new UnsupportedOperationException( + "ListIngestionPipelinesTool does not require limit validation."); + } +} diff --git a/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java new file mode 100644 index 000000000000..897297232781 --- /dev/null +++ b/openmetadata-mcp/src/main/java/org/openmetadata/mcp/tools/TriggerIngestionPipelineTool.java @@ -0,0 +1,100 @@ +package org.openmetadata.mcp.tools; + +import java.io.IOException; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.mcp.McpApplicationContext; +import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; +import org.openmetadata.schema.type.MetadataOperation; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.sdk.PipelineServiceClientInterface; +import org.openmetadata.service.Entity; +import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; +import org.openmetadata.service.limits.Limits; +import org.openmetadata.service.security.Authorizer; +import org.openmetadata.service.security.auth.CatalogSecurityContext; +import org.openmetadata.service.security.policyevaluator.OperationContext; +import org.openmetadata.service.security.policyevaluator.ResourceContext; +import org.openmetadata.service.util.OpenMetadataConnectionBuilder; + +@Slf4j +public class TriggerIngestionPipelineTool implements McpTool { + + @Override + public Map execute( + Authorizer authorizer, CatalogSecurityContext securityContext, Map params) + throws IOException { + String fqn = requireFqn(params); + authorize(authorizer, securityContext); + PipelineServiceClientInterface client = resolveClient(); + if (client == null) return clientNotConfiguredError(fqn); + IngestionPipeline pipeline = fetchPipeline(fqn); + if (!Boolean.TRUE.equals(pipeline.getDeployed())) return notDeployedError(fqn); + LOG.info("Triggering ingestion pipeline: {}", fqn); + setupServerConnection(pipeline); + Object service = Entity.getEntity(pipeline.getService(), "ingestionRunner", null); + var response = client.runPipeline(pipeline, service); + LOG.info("Trigger response for pipeline {}: {}", fqn, response); + return JsonUtils.getMap(response); + } + + private static String requireFqn(Map params) { + String fqn = (String) params.get("fqn"); + if (fqn == null || fqn.isBlank()) { + throw new IllegalArgumentException("Parameter 'fqn' is required"); + } + return fqn; + } + + private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) { + authorizer.authorize( + securityContext, + new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.EDIT_ALL), + new ResourceContext<>(Entity.INGESTION_PIPELINE)); + } + + private static PipelineServiceClientInterface resolveClient() { + return PipelineServiceClientFactory.createPipelineServiceClient(null); + } + + private static Map clientNotConfiguredError(String fqn) { + return Map.of( + "error", + "Pipeline service client is not configured." + + " Ensure the ingestion infrastructure is set up.", + "fqn", + fqn); + } + + private static Map notDeployedError(String fqn) { + return Map.of( + "error", + "Pipeline '" + fqn + "' is not deployed. Deploy it first before triggering a run.", + "fqn", + fqn, + "deployed", + false); + } + + private static IngestionPipeline fetchPipeline(String fqn) throws IOException { + return (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null); + } + + private static void setupServerConnection(IngestionPipeline pipeline) { + if (McpApplicationContext.getConfig() != null) { + pipeline.setOpenMetadataServerConnection( + new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build()); + } + } + + @Override + public Map execute( + Authorizer authorizer, + Limits limits, + CatalogSecurityContext securityContext, + Map params) + throws IOException { + throw new UnsupportedOperationException( + "TriggerIngestionPipelineTool does not require limit validation."); + } +} diff --git a/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json b/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json index d88b67d37ac8..ed10b6799ffb 100644 --- a/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json +++ b/openmetadata-mcp/src/main/resources/json/data/mcp/tools.json @@ -584,6 +584,65 @@ "entityType" ] } + }, + { + "name": "list_ingestion_pipelines", + "description": "List ingestion pipelines configured in OpenMetadata. Optionally filter by service name or pipeline type (metadata, usage, lineage, profiler, dataInsight, etc.). Returns pipeline names, FQNs, types, and enabled/deployed status. Use 'get_pipeline_status' to check the latest run status of a specific pipeline.", + "parameters": { + "type": "object", + "properties": { + "service": { + "type": "string", + "description": "Filter pipelines by service name (e.g., 'bigquery_prod', 'snowflake_dw'). Returns only pipelines belonging to this service." + }, + "pipelineType": { + "type": "string", + "description": "Filter by pipeline type. Values: metadata, usage, lineage, profiler, dataInsight, elasticSearchReindex, dbt, application. Leave empty to return all types." + }, + "limit": { + "type": "integer", + "description": "Maximum number of pipelines to return. Default: 10. Max recommended: 50.", + "default": 10 + }, + "after": { + "type": "string", + "description": "Pagination cursor. Pass the 'after' value from a previous response to retrieve the next page." + } + } + } + }, + { + "name": "get_pipeline_status", + "description": "Get execution status of a specific ingestion pipeline. Returns the latest run status (success, failed, running, queued) and recent run history. Use 'list_ingestion_pipelines' to find a pipeline's FQN first.", + "parameters": { + "type": "object", + "properties": { + "fqn": { + "type": "string", + "description": "Fully qualified name of the ingestion pipeline (e.g., 'bigquery_prod.metadata_ingestion'). Use 'list_ingestion_pipelines' or 'search_metadata' to find it." + }, + "limit": { + "type": "integer", + "description": "Number of recent pipeline runs to return. Default: 5.", + "default": 5 + } + }, + "required": ["fqn"] + } + }, + { + "name": "trigger_ingestion_pipeline", + "description": "Trigger an immediate run of an ingestion pipeline. Use this to kick off metadata, lineage, or profiling ingestion on demand without waiting for the scheduled run. Requires the pipeline to be deployed. Check the pipeline status with 'get_pipeline_status' after triggering to monitor progress.", + "parameters": { + "type": "object", + "properties": { + "fqn": { + "type": "string", + "description": "Fully qualified name of the ingestion pipeline to trigger (e.g., 'bigquery_prod.metadata_ingestion'). Use 'list_ingestion_pipelines' to find the FQN." + } + }, + "required": ["fqn"] + } } ] }