Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.openmetadata.mcp;

import org.openmetadata.service.OpenMetadataApplicationConfig;

/** Holds application-level singletons needed by MCP tools at runtime. */
public class McpApplicationContext {
private static volatile OpenMetadataApplicationConfig config;

private McpApplicationContext() {}

public static void setConfig(OpenMetadataApplicationConfig applicationConfig) {
config = applicationConfig;
}

public static OpenMetadataApplicationConfig getConfig() {
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void initializeMcpServer(
SecurityConfigurationManager.getCurrentAuthzConfig());
this.authorizer = authorizer;
this.limits = limits;
McpApplicationContext.setConfig(config);
this.environment = environment;
MutableServletContextHandler contextHandler = environment.getApplicationContext();
List<McpSchema.Tool> tools = getTools();
Expand Down Expand Up @@ -286,9 +287,7 @@ private List<String> getAllowedOriginsFromConfig() {
return List.of();
}

/**
* Get base URL from system settings, with fallback to localhost for development.
*/
/** Get base URL from system settings, with fallback to localhost for development. */
private String getBaseUrlFromSettings() {
try {
org.openmetadata.service.jdbi3.SystemRepository systemRepository =
Expand All @@ -313,9 +312,9 @@ private String getBaseUrlFromSettings() {
LOG.warn("Could not get instance URL from SystemSettings, using fallback", e);
}
LOG.error(
"No base URL configured in MCP settings or system settings. "
+ "Falling back to http://localhost:8585 — this is only suitable for local development. "
+ "Configure a proper base URL for production deployments.");
"No base URL configured in MCP settings or system settings. Falling back to"
+ " http://localhost:8585 — this is only suitable for local development. Configure a"
+ " proper base URL for production deployments.");
return "http://localhost:8585";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,22 @@ public static List<EntityReference> getTeamsOrUsers(Object teamsOrUsersParam) {
}
return teamsOrUsers;
}

public static int parseLimit(Map<String, Object> params, String key, int defaultValue) {
if (!params.containsKey(key)) {
return defaultValue;
}
Object v = params.get(key);
if (v instanceof Number n) {
return n.intValue();
}
if (v instanceof String s) {
try {
return Integer.parseInt(s);
} catch (NumberFormatException e) {
LOG.warn("Invalid value for '{}': '{}', using default {}", key, s, defaultValue);
}
}
return defaultValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@

@Slf4j
public class DefaultToolContext {
static final String TOOL_LIST_INGESTION_PIPELINES = "list_ingestion_pipelines";
static final String TOOL_GET_PIPELINE_STATUS = "get_pipeline_status";
static final String TOOL_TRIGGER_INGESTION_PIPELINE = "trigger_ingestion_pipeline";

public DefaultToolContext() {}

/**
* Loads tool definitions from a JSON file located at the specified path.
* The JSON file should contain an array of tool definitions under the "tools" key.
* Loads tool definitions from a JSON file located at the specified path. The JSON file should
* contain an array of tool definitions under the "tools" key.
*
* @return List of McpSchema.Tool objects loaded from the JSON file.
*/
Expand Down Expand Up @@ -83,6 +87,15 @@ public McpSchema.CallToolResult callTool(
case "create_metric":
result = new CreateMetricTool().execute(authorizer, limits, securityContext, params);
break;
case TOOL_LIST_INGESTION_PIPELINES:
result = new ListIngestionPipelinesTool().execute(authorizer, securityContext, params);
break;
case TOOL_GET_PIPELINE_STATUS:
result = new GetPipelineStatusTool().execute(authorizer, securityContext, params);
break;
case TOOL_TRIGGER_INGESTION_PIPELINE:
result = new TriggerIngestionPipelineTool().execute(authorizer, securityContext, params);
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trigger_ingestion_pipeline is routed to the 3-arg execute(authorizer, securityContext, params) overload, which means Limits enforcement cannot run even if the tool is updated to support it. Since triggering pipelines is an operationally sensitive action (and the REST path enforces limits), route this tool through the limits-aware overload (execute(authorizer, limits, ...)) and let the tool decide whether/what limits to enforce.

Suggested change
result = new TriggerIngestionPipelineTool().execute(authorizer, securityContext, params);
result =
new TriggerIngestionPipelineTool()
.execute(authorizer, limits, securityContext, params);

Copilot uses AI. Check for mistakes.
break;
default:
return McpSchema.CallToolResult.builder()
.content(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.openmetadata.mcp.tools;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContext;

@Slf4j
public class GetPipelineStatusTool implements McpTool {

@Override
public Map<String, Object> execute(
Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params)
throws IOException {
String fqn = requireFqn(params);
int limit = CommonUtils.parseLimit(params, "limit", 5);
authorize(authorizer, securityContext);
LOG.info("Getting pipeline status for FQN: {}, limit: {}", fqn, limit);
return buildStatusResponse(fqn, limit);
}

private static String requireFqn(Map<String, Object> params) {
String fqn = (String) params.get("fqn");
if (fqn == null || fqn.isBlank()) {
throw new IllegalArgumentException("Parameter 'fqn' is required");
}
return fqn;
}

private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) {
authorizer.authorize(
securityContext,
new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC),
new ResourceContext<>(Entity.INGESTION_PIPELINE));
}

private static Map<String, Object> buildStatusResponse(String fqn, int limit) throws IOException {
IngestionPipelineRepository repo =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
IngestionPipeline pipeline =
(IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "", null);
var latestStatus = repo.getLatestPipelineStatus(pipeline);
var recentRuns = repo.listPipelineStatus(fqn, null, null, limit);
Map<String, Object> response = new HashMap<>();
response.put("fqn", fqn);
response.put("pipelineName", pipeline.getName());
response.put("pipelineType", pipeline.getPipelineType());
response.put("enabled", pipeline.getEnabled());
response.put("deployed", pipeline.getDeployed());
response.put("latestStatus", latestStatus != null ? JsonUtils.getMap(latestStatus) : null);
response.put("recentRuns", JsonUtils.getMap(recentRuns));
return response;
}

@Override
public Map<String, Object> execute(
Authorizer authorizer,
Limits limits,
CatalogSecurityContext securityContext,
Map<String, Object> params)
throws IOException {
throw new UnsupportedOperationException(
"GetPipelineStatusTool does not require limit validation.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.openmetadata.mcp.tools;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContext;

@Slf4j
public class ListIngestionPipelinesTool implements McpTool {

private static final List<String> EXCLUDE_FIELDS =
List.of(
"version",
"updatedAt",
"updatedBy",
"changeDescription",
"sourceHash",
"openMetadataServerConnection",
"airflowConfig");

@Override
public Map<String, Object> execute(
Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params)
throws IOException {
authorize(authorizer, securityContext);
int limit = CommonUtils.parseLimit(params, "limit", 10);
LOG.info(
"Listing ingestion pipelines — service: {}, pipelineType: {}, limit: {}",
params.get("service"),
params.get("pipelineType"),
limit);
return fetchAndClean(params, limit);
}

private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) {
authorizer.authorize(
securityContext,
new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC),
new ResourceContext<>(Entity.INGESTION_PIPELINE));
}

private static Map<String, Object> fetchAndClean(Map<String, Object> params, int limit)
throws IOException {
IngestionPipelineRepository repo =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
ListFilter filter = buildFilter(params);
String after = (String) params.get("after");
var resultList =
repo.listAfter(null, repo.getFields("sourceConfig,pipelineType"), filter, limit, after);
Map<String, Object> response = JsonUtils.getMap(resultList);
stripVerboseFields(response);
return response;
Comment on lines +55 to +63
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchAndClean() requests sourceConfig via repo.getFields("sourceConfig,pipelineType") and then returns the serialized entities without removing sourceConfig. sourceConfig can include connector/serviceConnection details (potentially secrets), and the REST resource explicitly decrypts/masks/nullifies this field for list responses. Please avoid returning sourceConfig entirely (don’t request it in the fields mask and/or explicitly remove it), or apply the same decrypt/mask/nullify logic used by IngestionPipelineResource.list before serializing the result.

Copilot uses AI. Check for mistakes.
}

private static ListFilter buildFilter(Map<String, Object> params) {
ListFilter filter = new ListFilter(Include.NON_DELETED);
String service = (String) params.get("service");
String pipelineType = (String) params.get("pipelineType");
if (service != null && !service.isBlank()) filter.addQueryParam("service", service);
if (pipelineType != null && !pipelineType.isBlank())
filter.addQueryParam("pipelineType", pipelineType);
return filter;
}

private static void stripVerboseFields(Map<String, Object> response) {
if (response.get("data") instanceof List<?> pipelines) {
pipelines.forEach(
p -> {
if (p instanceof Map<?, ?> pipeline) {
@SuppressWarnings("unchecked")
Map<String, Object> m = (Map<String, Object>) pipeline;
EXCLUDE_FIELDS.forEach(m::remove);
}
});
}
}

@Override
public Map<String, Object> execute(
Authorizer authorizer,
Limits limits,
CatalogSecurityContext securityContext,
Map<String, Object> params)
throws IOException {
throw new UnsupportedOperationException(
"ListIngestionPipelinesTool does not require limit validation.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.openmetadata.mcp.tools;

import java.io.IOException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.mcp.McpApplicationContext;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContext;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;

@Slf4j
public class TriggerIngestionPipelineTool implements McpTool {

@Override
public Map<String, Object> execute(
Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params)
throws IOException {
String fqn = requireFqn(params);
authorize(authorizer, securityContext);
PipelineServiceClientInterface client = resolveClient();
if (client == null) return clientNotConfiguredError(fqn);
IngestionPipeline pipeline = fetchPipeline(fqn);
if (!Boolean.TRUE.equals(pipeline.getDeployed())) return notDeployedError(fqn);
LOG.info("Triggering ingestion pipeline: {}", fqn);
setupServerConnection(pipeline);
Object service = Entity.getEntity(pipeline.getService(), "ingestionRunner", null);
var response = client.runPipeline(pipeline, service);
LOG.info("Trigger response for pipeline {}: {}", fqn, response);
Comment on lines +34 to +37
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entity.getEntity(...) is assigned to an Object, but PipelineServiceClientInterface.runPipeline(...) requires a ServiceEntityInterface argument. As written, this should not compile. Change the variable type to ServiceEntityInterface (and import it), and consider using Include.NON_DELETED for the include parameter to match the server’s trigger logic.

Copilot uses AI. Check for mistakes.
return JsonUtils.getMap(response);
}

private static String requireFqn(Map<String, Object> params) {
String fqn = (String) params.get("fqn");
if (fqn == null || fqn.isBlank()) {
throw new IllegalArgumentException("Parameter 'fqn' is required");
}
return fqn;
}

private static void authorize(Authorizer authorizer, CatalogSecurityContext securityContext) {
authorizer.authorize(
securityContext,
new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.EDIT_ALL),
new ResourceContext<>(Entity.INGESTION_PIPELINE));
}
Comment on lines +49 to +54
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trigger authorization/limits logic here diverges from the server’s ingestion pipeline trigger implementation: the REST resource uses MetadataOperation.TRIGGER (not EDIT_ALL) and enforces limits via limits.enforceLimits(...) with a CreateResourceContext built from the specific pipeline. This tool currently (a) uses EDIT_ALL, and (b) bypasses limits entirely by only implementing the 3-arg execute path. Please align with MetadataOperation.TRIGGER and enforce limits (implement the execute(authorizer, limits, ...) variant and route to it), ideally authorizing against the specific pipeline resource context rather than the collection.

Copilot uses AI. Check for mistakes.

private static PipelineServiceClientInterface resolveClient() {
return PipelineServiceClientFactory.createPipelineServiceClient(null);
}

private static Map<String, Object> clientNotConfiguredError(String fqn) {
return Map.of(
"error",
"Pipeline service client is not configured."
+ " Ensure the ingestion infrastructure is set up.",
"fqn",
fqn);
}

private static Map<String, Object> notDeployedError(String fqn) {
return Map.of(
"error",
"Pipeline '" + fqn + "' is not deployed. Deploy it first before triggering a run.",
"fqn",
fqn,
"deployed",
false);
}

private static IngestionPipeline fetchPipeline(String fqn) throws IOException {
return (IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null);
}

private static void setupServerConnection(IngestionPipeline pipeline) {
if (McpApplicationContext.getConfig() != null) {
pipeline.setOpenMetadataServerConnection(
new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build());
Comment on lines +84 to +86
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setupServerConnection() sets a raw OpenMetadataServerConnection built from config only. The server trigger path uses OpenMetadataConnectionBuilder(openMetadataApplicationConfig, ingestionPipeline) to pick the pipeline bot when configured, and then encrypts/nullifies fields via SecretsManager before handing the pipeline off to the pipeline client. For parity (and to avoid leaking/incorrectly formatting JWTs for k8s clients that embed the workflow config), consider building the connection with the pipeline-aware constructor and applying the same encrypt/nullify approach used in IngestionPipelineResource.decryptOrNullify(...) for trigger operations.

Suggested change
if (McpApplicationContext.getConfig() != null) {
pipeline.setOpenMetadataServerConnection(
new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build());
var config = McpApplicationContext.getConfig();
if (config != null) {
pipeline.setOpenMetadataServerConnection(
new OpenMetadataConnectionBuilder(config, pipeline).build());

Copilot uses AI. Check for mistakes.
}
}

@Override
public Map<String, Object> execute(
Authorizer authorizer,
Limits limits,
CatalogSecurityContext securityContext,
Map<String, Object> params)
throws IOException {
throw new UnsupportedOperationException(
"TriggerIngestionPipelineTool does not require limit validation.");
}
}
Loading
Loading