-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat(mcp): add workflow automation MCP tools for ingestion pipeline management #27741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
fb7c32f
f2e4bf4
6b33d43
8136e9c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package org.openmetadata.mcp; | ||
|
|
||
| import org.openmetadata.service.OpenMetadataApplicationConfig; | ||
|
|
||
| /** Holds application-level singletons needed by MCP tools at runtime. */ | ||
| public class McpApplicationContext { | ||
| private static volatile OpenMetadataApplicationConfig config; | ||
|
|
||
| private McpApplicationContext() {} | ||
|
|
||
| public static void setConfig(OpenMetadataApplicationConfig applicationConfig) { | ||
| config = applicationConfig; | ||
| } | ||
|
|
||
| public static OpenMetadataApplicationConfig getConfig() { | ||
| return config; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| package org.openmetadata.mcp.tools; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; | ||
| import org.openmetadata.schema.type.MetadataOperation; | ||
| import org.openmetadata.schema.utils.JsonUtils; | ||
| import org.openmetadata.service.Entity; | ||
| import org.openmetadata.service.jdbi3.IngestionPipelineRepository; | ||
| import org.openmetadata.service.limits.Limits; | ||
| import org.openmetadata.service.security.Authorizer; | ||
| import org.openmetadata.service.security.auth.CatalogSecurityContext; | ||
| import org.openmetadata.service.security.policyevaluator.OperationContext; | ||
| import org.openmetadata.service.security.policyevaluator.ResourceContext; | ||
|
|
||
| @Slf4j | ||
| public class GetPipelineStatusTool implements McpTool { | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params) | ||
| throws IOException { | ||
| String fqn = requireFqn(params); | ||
| int limit = CommonUtils.parseLimit(params, "limit", 5); | ||
| authorize(authorizer, securityContext); | ||
| LOG.info("Getting pipeline status for FQN: {}, limit: {}", fqn, limit); | ||
| return buildStatusResponse(fqn, limit); | ||
| } | ||
|
|
||
| private static String requireFqn(Map<String, Object> params) { | ||
| String fqn = (String) params.get("fqn"); | ||
| if (fqn == null || fqn.isBlank()) { | ||
| throw new IllegalArgumentException("Parameter 'fqn' is required"); | ||
| } | ||
| return fqn; | ||
| } | ||
|
|
||
| private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) { | ||
| authorizer.authorize( | ||
| securityContext, | ||
| new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC), | ||
| new ResourceContext<>(Entity.INGESTION_PIPELINE)); | ||
| } | ||
|
|
||
| private static Map<String, Object> buildStatusResponse(String fqn, int limit) throws IOException { | ||
| IngestionPipelineRepository repo = | ||
| (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); | ||
| IngestionPipeline pipeline = | ||
| (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "", null); | ||
| var latestStatus = repo.getLatestPipelineStatus(pipeline); | ||
| var recentRuns = repo.listPipelineStatus(fqn, null, null, limit); | ||
| Map<String, Object> response = new HashMap<>(); | ||
| response.put("fqn", fqn); | ||
| response.put("pipelineName", pipeline.getName()); | ||
| response.put("pipelineType", pipeline.getPipelineType()); | ||
| response.put("enabled", pipeline.getEnabled()); | ||
| response.put("deployed", pipeline.getDeployed()); | ||
| response.put("latestStatus", latestStatus != null ? JsonUtils.getMap(latestStatus) : null); | ||
| response.put("recentRuns", JsonUtils.getMap(recentRuns)); | ||
| return response; | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, | ||
| Limits limits, | ||
| CatalogSecurityContext securityContext, | ||
| Map<String, Object> params) | ||
| throws IOException { | ||
| throw new UnsupportedOperationException( | ||
| "GetPipelineStatusTool does not require limit validation."); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| package org.openmetadata.mcp.tools; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.openmetadata.schema.type.Include; | ||
| import org.openmetadata.schema.type.MetadataOperation; | ||
| import org.openmetadata.schema.utils.JsonUtils; | ||
| import org.openmetadata.service.Entity; | ||
| import org.openmetadata.service.jdbi3.IngestionPipelineRepository; | ||
| import org.openmetadata.service.jdbi3.ListFilter; | ||
| import org.openmetadata.service.limits.Limits; | ||
| import org.openmetadata.service.security.Authorizer; | ||
| import org.openmetadata.service.security.auth.CatalogSecurityContext; | ||
| import org.openmetadata.service.security.policyevaluator.OperationContext; | ||
| import org.openmetadata.service.security.policyevaluator.ResourceContext; | ||
|
|
||
| @Slf4j | ||
| public class ListIngestionPipelinesTool implements McpTool { | ||
|
|
||
| private static final List<String> EXCLUDE_FIELDS = | ||
| List.of( | ||
| "version", | ||
| "updatedAt", | ||
| "updatedBy", | ||
| "changeDescription", | ||
| "sourceHash", | ||
| "openMetadataServerConnection", | ||
| "airflowConfig"); | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params) | ||
| throws IOException { | ||
| authorize(authorizer, securityContext); | ||
| int limit = CommonUtils.parseLimit(params, "limit", 10); | ||
| LOG.info( | ||
| "Listing ingestion pipelines — service: {}, pipelineType: {}, limit: {}", | ||
| params.get("service"), | ||
| params.get("pipelineType"), | ||
| limit); | ||
| return fetchAndClean(params, limit); | ||
| } | ||
|
|
||
| private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) { | ||
| authorizer.authorize( | ||
| securityContext, | ||
| new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC), | ||
| new ResourceContext<>(Entity.INGESTION_PIPELINE)); | ||
| } | ||
|
|
||
| private static Map<String, Object> fetchAndClean(Map<String, Object> params, int limit) | ||
| throws IOException { | ||
| IngestionPipelineRepository repo = | ||
| (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); | ||
| ListFilter filter = buildFilter(params); | ||
| String after = (String) params.get("after"); | ||
| var resultList = | ||
| repo.listAfter(null, repo.getFields("sourceConfig,pipelineType"), filter, limit, after); | ||
| Map<String, Object> response = JsonUtils.getMap(resultList); | ||
| stripVerboseFields(response); | ||
| return response; | ||
|
Comment on lines
+55
to
+63
|
||
| } | ||
|
|
||
| private static ListFilter buildFilter(Map<String, Object> params) { | ||
| ListFilter filter = new ListFilter(Include.NON_DELETED); | ||
| String service = (String) params.get("service"); | ||
| String pipelineType = (String) params.get("pipelineType"); | ||
| if (service != null && !service.isBlank()) filter.addQueryParam("service", service); | ||
| if (pipelineType != null && !pipelineType.isBlank()) | ||
| filter.addQueryParam("pipelineType", pipelineType); | ||
| return filter; | ||
| } | ||
|
|
||
| private static void stripVerboseFields(Map<String, Object> response) { | ||
| if (response.get("data") instanceof List<?> pipelines) { | ||
| pipelines.forEach( | ||
| p -> { | ||
| if (p instanceof Map<?, ?> pipeline) { | ||
| @SuppressWarnings("unchecked") | ||
| Map<String, Object> m = (Map<String, Object>) pipeline; | ||
| EXCLUDE_FIELDS.forEach(m::remove); | ||
| } | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, | ||
| Limits limits, | ||
| CatalogSecurityContext securityContext, | ||
| Map<String, Object> params) | ||
| throws IOException { | ||
| throw new UnsupportedOperationException( | ||
| "ListIngestionPipelinesTool does not require limit validation."); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,100 @@ | ||||||||||||||||
| package org.openmetadata.mcp.tools; | ||||||||||||||||
|
|
||||||||||||||||
| import java.io.IOException; | ||||||||||||||||
| import java.util.Map; | ||||||||||||||||
| import lombok.extern.slf4j.Slf4j; | ||||||||||||||||
| import org.openmetadata.mcp.McpApplicationContext; | ||||||||||||||||
| import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; | ||||||||||||||||
| import org.openmetadata.schema.type.MetadataOperation; | ||||||||||||||||
| import org.openmetadata.schema.utils.JsonUtils; | ||||||||||||||||
| import org.openmetadata.sdk.PipelineServiceClientInterface; | ||||||||||||||||
| import org.openmetadata.service.Entity; | ||||||||||||||||
| import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; | ||||||||||||||||
| import org.openmetadata.service.limits.Limits; | ||||||||||||||||
| import org.openmetadata.service.security.Authorizer; | ||||||||||||||||
| import org.openmetadata.service.security.auth.CatalogSecurityContext; | ||||||||||||||||
| import org.openmetadata.service.security.policyevaluator.OperationContext; | ||||||||||||||||
| import org.openmetadata.service.security.policyevaluator.ResourceContext; | ||||||||||||||||
| import org.openmetadata.service.util.OpenMetadataConnectionBuilder; | ||||||||||||||||
|
|
||||||||||||||||
| @Slf4j | ||||||||||||||||
| public class TriggerIngestionPipelineTool implements McpTool { | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public Map<String, Object> execute( | ||||||||||||||||
| Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params) | ||||||||||||||||
| throws IOException { | ||||||||||||||||
| String fqn = requireFqn(params); | ||||||||||||||||
| authorize(authorizer, securityContext); | ||||||||||||||||
| PipelineServiceClientInterface client = resolveClient(); | ||||||||||||||||
| if (client == null) return clientNotConfiguredError(fqn); | ||||||||||||||||
| IngestionPipeline pipeline = fetchPipeline(fqn); | ||||||||||||||||
| if (!Boolean.TRUE.equals(pipeline.getDeployed())) return notDeployedError(fqn); | ||||||||||||||||
| LOG.info("Triggering ingestion pipeline: {}", fqn); | ||||||||||||||||
| setupServerConnection(pipeline); | ||||||||||||||||
| Object service = Entity.getEntity(pipeline.getService(), "ingestionRunner", null); | ||||||||||||||||
| var response = client.runPipeline(pipeline, service); | ||||||||||||||||
| LOG.info("Trigger response for pipeline {}: {}", fqn, response); | ||||||||||||||||
|
Comment on lines
+34
to
+37
|
||||||||||||||||
| return JsonUtils.getMap(response); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| private static String requireFqn(Map<String, Object> params) { | ||||||||||||||||
| String fqn = (String) params.get("fqn"); | ||||||||||||||||
| if (fqn == null || fqn.isBlank()) { | ||||||||||||||||
| throw new IllegalArgumentException("Parameter 'fqn' is required"); | ||||||||||||||||
| } | ||||||||||||||||
| return fqn; | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) { | ||||||||||||||||
| authorizer.authorize( | ||||||||||||||||
| securityContext, | ||||||||||||||||
| new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.EDIT_ALL), | ||||||||||||||||
| new ResourceContext<>(Entity.INGESTION_PIPELINE)); | ||||||||||||||||
| } | ||||||||||||||||
|
Comment on lines
+49
to
+54
|
||||||||||||||||
|
|
||||||||||||||||
| private static PipelineServiceClientInterface resolveClient() { | ||||||||||||||||
| return PipelineServiceClientFactory.createPipelineServiceClient(null); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| private static Map<String, Object> clientNotConfiguredError(String fqn) { | ||||||||||||||||
| return Map.of( | ||||||||||||||||
| "error", | ||||||||||||||||
| "Pipeline service client is not configured." | ||||||||||||||||
| + " Ensure the ingestion infrastructure is set up.", | ||||||||||||||||
| "fqn", | ||||||||||||||||
| fqn); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| private static Map<String, Object> notDeployedError(String fqn) { | ||||||||||||||||
| return Map.of( | ||||||||||||||||
| "error", | ||||||||||||||||
| "Pipeline '" + fqn + "' is not deployed. Deploy it first before triggering a run.", | ||||||||||||||||
| "fqn", | ||||||||||||||||
| fqn, | ||||||||||||||||
| "deployed", | ||||||||||||||||
| false); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| private static IngestionPipeline fetchPipeline(String fqn) throws IOException { | ||||||||||||||||
| return (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| private static void setupServerConnection(IngestionPipeline pipeline) { | ||||||||||||||||
| if (McpApplicationContext.getConfig() != null) { | ||||||||||||||||
| pipeline.setOpenMetadataServerConnection( | ||||||||||||||||
| new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build()); | ||||||||||||||||
|
Comment on lines
+84
to
+86
|
||||||||||||||||
| if (McpApplicationContext.getConfig() != null) { | |
| pipeline.setOpenMetadataServerConnection( | |
| new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build()); | |
| var config = McpApplicationContext.getConfig(); | |
| if (config != null) { | |
| pipeline.setOpenMetadataServerConnection( | |
| new OpenMetadataConnectionBuilder(config, pipeline).build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trigger_ingestion_pipelineis routed to the 3-argexecute(authorizer, securityContext, params)overload, which meansLimitsenforcement cannot run even if the tool is updated to support it. Since triggering pipelines is an operationally sensitive action (and the REST path enforces limits), route this tool through the limits-aware overload (execute(authorizer, limits, ...)) and let the tool decide whether/what limits to enforce.