-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat(mcp): add workflow automation MCP tools for ingestion pipeline management #27741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
fb7c32f
f2e4bf4
6b33d43
8136e9c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package org.openmetadata.mcp; | ||
|
|
||
| import org.openmetadata.service.OpenMetadataApplicationConfig; | ||
|
|
||
| /** Holds application-level singletons needed by MCP tools at runtime. */ | ||
| public class McpApplicationContext { | ||
| private static volatile OpenMetadataApplicationConfig config; | ||
|
|
||
| private McpApplicationContext() {} | ||
|
|
||
| public static void setConfig(OpenMetadataApplicationConfig applicationConfig) { | ||
| config = applicationConfig; | ||
| } | ||
|
|
||
| public static OpenMetadataApplicationConfig getConfig() { | ||
| return config; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,8 +17,8 @@ public class DefaultToolContext { | |||||||||
| public DefaultToolContext() {} | ||||||||||
|
|
||||||||||
| /** | ||||||||||
| * Loads tool definitions from a JSON file located at the specified path. | ||||||||||
| * The JSON file should contain an array of tool definitions under the "tools" key. | ||||||||||
| * Loads tool definitions from a JSON file located at the specified path. The JSON file should | ||||||||||
| * contain an array of tool definitions under the "tools" key. | ||||||||||
| * | ||||||||||
| * @return List of McpSchema.Tool objects loaded from the JSON file. | ||||||||||
| */ | ||||||||||
|
|
@@ -83,6 +83,15 @@ public McpSchema.CallToolResult callTool( | |||||||||
| case "create_metric": | ||||||||||
| result = new CreateMetricTool().execute(authorizer, limits, securityContext, params); | ||||||||||
| break; | ||||||||||
| case "list_ingestion_pipelines": | ||||||||||
| result = new ListIngestionPipelinesTool().execute(authorizer, securityContext, params); | ||||||||||
| break; | ||||||||||
| case "get_pipeline_status": | ||||||||||
| result = new GetPipelineStatusTool().execute(authorizer, securityContext, params); | ||||||||||
| break; | ||||||||||
| case "trigger_ingestion_pipeline": | ||||||||||
| result = new TriggerIngestionPipelineTool().execute(authorizer, securityContext, params); | ||||||||||
|
||||||||||
| result = new TriggerIngestionPipelineTool().execute(authorizer, securityContext, params); | |
| result = | |
| new TriggerIngestionPipelineTool() | |
| .execute(authorizer, limits, securityContext, params); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| package org.openmetadata.mcp.tools; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; | ||
| import org.openmetadata.schema.type.MetadataOperation; | ||
| import org.openmetadata.schema.utils.JsonUtils; | ||
| import org.openmetadata.service.Entity; | ||
| import org.openmetadata.service.jdbi3.IngestionPipelineRepository; | ||
| import org.openmetadata.service.limits.Limits; | ||
| import org.openmetadata.service.security.Authorizer; | ||
| import org.openmetadata.service.security.auth.CatalogSecurityContext; | ||
| import org.openmetadata.service.security.policyevaluator.OperationContext; | ||
| import org.openmetadata.service.security.policyevaluator.ResourceContext; | ||
|
|
||
| @Slf4j | ||
| public class GetPipelineStatusTool implements McpTool { | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params) | ||
| throws IOException { | ||
| String fqn = (String) params.get("fqn"); | ||
| if (fqn == null || fqn.isBlank()) { | ||
| throw new IllegalArgumentException("Parameter 'fqn' is required"); | ||
| } | ||
| int limit = CommonUtils.parseLimit(params, "limit", 5); | ||
|
|
||
| authorizer.authorize( | ||
| securityContext, | ||
| new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC), | ||
| new ResourceContext<>(Entity.INGESTION_PIPELINE)); | ||
|
|
||
| LOG.info("Getting pipeline status for FQN: {}, limit: {}", fqn, limit); | ||
|
|
||
| IngestionPipelineRepository repository = | ||
| (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); | ||
|
|
||
| IngestionPipeline pipeline = | ||
| (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "", null); | ||
|
|
||
| var latestStatus = repository.getLatestPipelineStatus(pipeline); | ||
| var recentRuns = repository.listPipelineStatus(fqn, null, null, limit); | ||
|
|
||
| Map<String, Object> response = new HashMap<>(); | ||
| response.put("fqn", fqn); | ||
| response.put("pipelineName", pipeline.getName()); | ||
| response.put("pipelineType", pipeline.getPipelineType()); | ||
| response.put("enabled", pipeline.getEnabled()); | ||
| response.put("deployed", pipeline.getDeployed()); | ||
| response.put("latestStatus", latestStatus != null ? JsonUtils.getMap(latestStatus) : null); | ||
| response.put("recentRuns", JsonUtils.getMap(recentRuns)); | ||
| return response; | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, | ||
| Limits limits, | ||
| CatalogSecurityContext securityContext, | ||
| Map<String, Object> params) | ||
| throws IOException { | ||
| throw new UnsupportedOperationException( | ||
| "GetPipelineStatusTool does not require limit validation."); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| package org.openmetadata.mcp.tools; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.openmetadata.schema.type.Include; | ||
| import org.openmetadata.schema.type.MetadataOperation; | ||
| import org.openmetadata.schema.utils.JsonUtils; | ||
| import org.openmetadata.service.Entity; | ||
| import org.openmetadata.service.jdbi3.IngestionPipelineRepository; | ||
| import org.openmetadata.service.jdbi3.ListFilter; | ||
| import org.openmetadata.service.limits.Limits; | ||
| import org.openmetadata.service.security.Authorizer; | ||
| import org.openmetadata.service.security.auth.CatalogSecurityContext; | ||
| import org.openmetadata.service.security.policyevaluator.OperationContext; | ||
| import org.openmetadata.service.security.policyevaluator.ResourceContext; | ||
|
|
||
| @Slf4j | ||
| public class ListIngestionPipelinesTool implements McpTool { | ||
|
|
||
| private static final List<String> EXCLUDE_FIELDS = | ||
| List.of( | ||
| "version", | ||
| "updatedAt", | ||
| "updatedBy", | ||
| "changeDescription", | ||
| "sourceHash", | ||
| "openMetadataServerConnection", | ||
| "airflowConfig"); | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params) | ||
| throws IOException { | ||
| String service = (String) params.get("service"); | ||
| String pipelineType = (String) params.get("pipelineType"); | ||
| String after = (String) params.get("after"); | ||
| int limit = CommonUtils.parseLimit(params, "limit", 10); | ||
|
|
||
| authorizer.authorize( | ||
| securityContext, | ||
| new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC), | ||
| new ResourceContext<>(Entity.INGESTION_PIPELINE)); | ||
|
|
||
| LOG.info( | ||
| "Listing ingestion pipelines — service: {}, pipelineType: {}, limit: {}", | ||
| service, | ||
| pipelineType, | ||
| limit); | ||
|
|
||
| IngestionPipelineRepository repository = | ||
| (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); | ||
|
|
||
| ListFilter filter = new ListFilter(Include.NON_DELETED); | ||
| if (service != null && !service.isBlank()) { | ||
| filter.addQueryParam("service", service); | ||
| } | ||
| if (pipelineType != null && !pipelineType.isBlank()) { | ||
| filter.addQueryParam("pipelineType", pipelineType); | ||
| } | ||
|
|
||
| var resultList = | ||
| repository.listAfter( | ||
| null, repository.getFields("sourceConfig,pipelineType"), filter, limit, after); | ||
|
|
||
| Map<String, Object> response = JsonUtils.getMap(resultList); | ||
| if (response.get("data") instanceof List<?> pipelines) { | ||
| pipelines.forEach( | ||
| p -> { | ||
| if (p instanceof Map<?, ?> pipeline) { | ||
| @SuppressWarnings("unchecked") | ||
| Map<String, Object> m = (Map<String, Object>) pipeline; | ||
| EXCLUDE_FIELDS.forEach(m::remove); | ||
| } | ||
| }); | ||
| } | ||
| return response; | ||
| } | ||
|
|
||
| @Override | ||
| public Map<String, Object> execute( | ||
| Authorizer authorizer, | ||
| Limits limits, | ||
| CatalogSecurityContext securityContext, | ||
| Map<String, Object> params) | ||
| throws IOException { | ||
| throw new UnsupportedOperationException( | ||
| "ListIngestionPipelinesTool does not require limit validation."); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,87 @@ | ||||||||||||||||
| package org.openmetadata.mcp.tools; | ||||||||||||||||
|
|
||||||||||||||||
| import java.io.IOException; | ||||||||||||||||
| import java.util.Map; | ||||||||||||||||
| import lombok.extern.slf4j.Slf4j; | ||||||||||||||||
| import org.openmetadata.mcp.McpApplicationContext; | ||||||||||||||||
| import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline; | ||||||||||||||||
| import org.openmetadata.schema.type.EntityReference; | ||||||||||||||||
| import org.openmetadata.schema.type.MetadataOperation; | ||||||||||||||||
| import org.openmetadata.schema.utils.JsonUtils; | ||||||||||||||||
| import org.openmetadata.sdk.PipelineServiceClientInterface; | ||||||||||||||||
| import org.openmetadata.service.Entity; | ||||||||||||||||
| import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; | ||||||||||||||||
| import org.openmetadata.service.limits.Limits; | ||||||||||||||||
| import org.openmetadata.service.security.Authorizer; | ||||||||||||||||
| import org.openmetadata.service.security.auth.CatalogSecurityContext; | ||||||||||||||||
| import org.openmetadata.service.security.policyevaluator.OperationContext; | ||||||||||||||||
| import org.openmetadata.service.security.policyevaluator.ResourceContext; | ||||||||||||||||
| import org.openmetadata.service.util.OpenMetadataConnectionBuilder; | ||||||||||||||||
|
|
||||||||||||||||
| @Slf4j | ||||||||||||||||
| public class TriggerIngestionPipelineTool implements McpTool { | ||||||||||||||||
|
|
||||||||||||||||
| @Override | ||||||||||||||||
| public Map<String, Object> execute( | ||||||||||||||||
| Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params) | ||||||||||||||||
| throws IOException { | ||||||||||||||||
| String fqn = (String) params.get("fqn"); | ||||||||||||||||
| if (fqn == null || fqn.isBlank()) { | ||||||||||||||||
| throw new IllegalArgumentException("Parameter 'fqn' is required"); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| authorizer.authorize( | ||||||||||||||||
| securityContext, | ||||||||||||||||
| new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.EDIT_ALL), | ||||||||||||||||
| new ResourceContext<>(Entity.INGESTION_PIPELINE)); | ||||||||||||||||
|
|
||||||||||||||||
| PipelineServiceClientInterface pipelineServiceClient = | ||||||||||||||||
| PipelineServiceClientFactory.createPipelineServiceClient(null); | ||||||||||||||||
| if (pipelineServiceClient == null) { | ||||||||||||||||
| return Map.of( | ||||||||||||||||
| "error", | ||||||||||||||||
| "Pipeline service client is not configured. Ensure the ingestion infrastructure is" | ||||||||||||||||
| + " set up.", | ||||||||||||||||
| "fqn", | ||||||||||||||||
| fqn); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| IngestionPipeline pipeline = | ||||||||||||||||
| (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null); | ||||||||||||||||
|
|
||||||||||||||||
| if (!Boolean.TRUE.equals(pipeline.getDeployed())) { | ||||||||||||||||
| return Map.of( | ||||||||||||||||
| "error", | ||||||||||||||||
| "Pipeline '" + fqn + "' is not deployed. Deploy it first before triggering a run.", | ||||||||||||||||
| "fqn", | ||||||||||||||||
| fqn, | ||||||||||||||||
| "deployed", | ||||||||||||||||
| false); | ||||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
| LOG.info("Triggering ingestion pipeline: {}", fqn); | ||||||||||||||||
|
|
||||||||||||||||
| if (McpApplicationContext.getConfig() != null) { | ||||||||||||||||
| pipeline.setOpenMetadataServerConnection( | ||||||||||||||||
| new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build()); | ||||||||||||||||
|
Comment on lines
+84
to
+86
|
||||||||||||||||
| if (McpApplicationContext.getConfig() != null) { | |
| pipeline.setOpenMetadataServerConnection( | |
| new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build()); | |
| var config = McpApplicationContext.getConfig(); | |
| if (config != null) { | |
| pipeline.setOpenMetadataServerConnection( | |
| new OpenMetadataConnectionBuilder(config, pipeline).build()); |
Uh oh!
There was an error while loading. Please reload this page.