Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.openmetadata.mcp;

import org.openmetadata.service.OpenMetadataApplicationConfig;

/** Holds application-level singletons needed by MCP tools at runtime. */
public class McpApplicationContext {
private static volatile OpenMetadataApplicationConfig config;

private McpApplicationContext() {}

public static void setConfig(OpenMetadataApplicationConfig applicationConfig) {
config = applicationConfig;
}

public static OpenMetadataApplicationConfig getConfig() {
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void initializeMcpServer(
SecurityConfigurationManager.getCurrentAuthzConfig());
this.authorizer = authorizer;
this.limits = limits;
McpApplicationContext.setConfig(config);
this.environment = environment;
MutableServletContextHandler contextHandler = environment.getApplicationContext();
List<McpSchema.Tool> tools = getTools();
Expand Down Expand Up @@ -286,9 +287,7 @@ private List<String> getAllowedOriginsFromConfig() {
return List.of();
}

/**
* Get base URL from system settings, with fallback to localhost for development.
*/
/** Get base URL from system settings, with fallback to localhost for development. */
private String getBaseUrlFromSettings() {
try {
org.openmetadata.service.jdbi3.SystemRepository systemRepository =
Expand All @@ -313,9 +312,9 @@ private String getBaseUrlFromSettings() {
LOG.warn("Could not get instance URL from SystemSettings, using fallback", e);
}
LOG.error(
"No base URL configured in MCP settings or system settings. "
+ "Falling back to http://localhost:8585 — this is only suitable for local development. "
+ "Configure a proper base URL for production deployments.");
"No base URL configured in MCP settings or system settings. Falling back to"
+ " http://localhost:8585 — this is only suitable for local development. Configure a"
+ " proper base URL for production deployments.");
return "http://localhost:8585";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,22 @@ public static List<EntityReference> getTeamsOrUsers(Object teamsOrUsersParam) {
}
return teamsOrUsers;
}

public static int parseLimit(Map<String, Object> params, String key, int defaultValue) {
if (!params.containsKey(key)) {
return defaultValue;
}
Object v = params.get(key);
if (v instanceof Number n) {
return n.intValue();
}
if (v instanceof String s) {
try {
return Integer.parseInt(s);
} catch (NumberFormatException e) {
LOG.warn("Invalid value for '{}': '{}', using default {}", key, s, defaultValue);
}
}
return defaultValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public class DefaultToolContext {
public DefaultToolContext() {}

/**
* Loads tool definitions from a JSON file located at the specified path.
* The JSON file should contain an array of tool definitions under the "tools" key.
* Loads tool definitions from a JSON file located at the specified path. The JSON file should
* contain an array of tool definitions under the "tools" key.
*
* @return List of McpSchema.Tool objects loaded from the JSON file.
*/
Expand Down Expand Up @@ -83,6 +83,15 @@ public McpSchema.CallToolResult callTool(
case "create_metric":
result = new CreateMetricTool().execute(authorizer, limits, securityContext, params);
break;
case "list_ingestion_pipelines":
Comment thread
gitar-bot[bot] marked this conversation as resolved.
Outdated
result = new ListIngestionPipelinesTool().execute(authorizer, securityContext, params);
break;
case "get_pipeline_status":
result = new GetPipelineStatusTool().execute(authorizer, securityContext, params);
break;
case "trigger_ingestion_pipeline":
result = new TriggerIngestionPipelineTool().execute(authorizer, securityContext, params);
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trigger_ingestion_pipeline is routed to the 3-arg execute(authorizer, securityContext, params) overload, which means Limits enforcement cannot run even if the tool is updated to support it. Since triggering pipelines is an operationally sensitive action (and the REST path enforces limits), route this tool through the limits-aware overload (execute(authorizer, limits, ...)) and let the tool decide whether/what limits to enforce.

Suggested change
result = new TriggerIngestionPipelineTool().execute(authorizer, securityContext, params);
result =
new TriggerIngestionPipelineTool()
.execute(authorizer, limits, securityContext, params);

Copilot uses AI. Check for mistakes.
break;
default:
return McpSchema.CallToolResult.builder()
.content(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.openmetadata.mcp.tools;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContext;

@Slf4j
public class GetPipelineStatusTool implements McpTool {

@Override
public Map<String, Object> execute(
Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params)
throws IOException {
String fqn = (String) params.get("fqn");
if (fqn == null || fqn.isBlank()) {
throw new IllegalArgumentException("Parameter 'fqn' is required");
}
int limit = CommonUtils.parseLimit(params, "limit", 5);

authorizer.authorize(
securityContext,
new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC),
new ResourceContext<>(Entity.INGESTION_PIPELINE));

LOG.info("Getting pipeline status for FQN: {}, limit: {}", fqn, limit);

IngestionPipelineRepository repository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);

IngestionPipeline pipeline =
(IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "", null);

var latestStatus = repository.getLatestPipelineStatus(pipeline);
var recentRuns = repository.listPipelineStatus(fqn, null, null, limit);

Map<String, Object> response = new HashMap<>();
response.put("fqn", fqn);
response.put("pipelineName", pipeline.getName());
response.put("pipelineType", pipeline.getPipelineType());
response.put("enabled", pipeline.getEnabled());
response.put("deployed", pipeline.getDeployed());
response.put("latestStatus", latestStatus != null ? JsonUtils.getMap(latestStatus) : null);
response.put("recentRuns", JsonUtils.getMap(recentRuns));
return response;
}

@Override
public Map<String, Object> execute(
Authorizer authorizer,
Limits limits,
CatalogSecurityContext securityContext,
Map<String, Object> params)
throws IOException {
throw new UnsupportedOperationException(
"GetPipelineStatusTool does not require limit validation.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.openmetadata.mcp.tools;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.IngestionPipelineRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContext;

@Slf4j
public class ListIngestionPipelinesTool implements McpTool {

private static final List<String> EXCLUDE_FIELDS =
List.of(
"version",
"updatedAt",
"updatedBy",
"changeDescription",
"sourceHash",
"openMetadataServerConnection",
"airflowConfig");

@Override
public Map<String, Object> execute(
Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params)
throws IOException {
String service = (String) params.get("service");
String pipelineType = (String) params.get("pipelineType");
String after = (String) params.get("after");
int limit = CommonUtils.parseLimit(params, "limit", 10);

authorizer.authorize(
securityContext,
new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.VIEW_BASIC),
new ResourceContext<>(Entity.INGESTION_PIPELINE));

LOG.info(
"Listing ingestion pipelines — service: {}, pipelineType: {}, limit: {}",
service,
pipelineType,
limit);

IngestionPipelineRepository repository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);

ListFilter filter = new ListFilter(Include.NON_DELETED);
if (service != null && !service.isBlank()) {
filter.addQueryParam("service", service);
}
if (pipelineType != null && !pipelineType.isBlank()) {
filter.addQueryParam("pipelineType", pipelineType);
}

var resultList =
repository.listAfter(
null, repository.getFields("sourceConfig,pipelineType"), filter, limit, after);

Map<String, Object> response = JsonUtils.getMap(resultList);
if (response.get("data") instanceof List<?> pipelines) {
pipelines.forEach(
p -> {
if (p instanceof Map<?, ?> pipeline) {
@SuppressWarnings("unchecked")
Map<String, Object> m = (Map<String, Object>) pipeline;
EXCLUDE_FIELDS.forEach(m::remove);
}
});
}
return response;
}

@Override
public Map<String, Object> execute(
Authorizer authorizer,
Limits limits,
CatalogSecurityContext securityContext,
Map<String, Object> params)
throws IOException {
throw new UnsupportedOperationException(
"ListIngestionPipelinesTool does not require limit validation.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.openmetadata.mcp.tools;

import java.io.IOException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.mcp.McpApplicationContext;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContext;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;

@Slf4j
public class TriggerIngestionPipelineTool implements McpTool {

@Override
public Map<String, Object> execute(
Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params)
throws IOException {
String fqn = (String) params.get("fqn");
if (fqn == null || fqn.isBlank()) {
throw new IllegalArgumentException("Parameter 'fqn' is required");
}

authorizer.authorize(
securityContext,
new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.EDIT_ALL),
new ResourceContext<>(Entity.INGESTION_PIPELINE));

PipelineServiceClientInterface pipelineServiceClient =
PipelineServiceClientFactory.createPipelineServiceClient(null);
if (pipelineServiceClient == null) {
return Map.of(
"error",
"Pipeline service client is not configured. Ensure the ingestion infrastructure is"
+ " set up.",
"fqn",
fqn);
}

IngestionPipeline pipeline =
(IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null);

if (!Boolean.TRUE.equals(pipeline.getDeployed())) {
return Map.of(
"error",
"Pipeline '" + fqn + "' is not deployed. Deploy it first before triggering a run.",
"fqn",
fqn,
"deployed",
false);
}

LOG.info("Triggering ingestion pipeline: {}", fqn);

if (McpApplicationContext.getConfig() != null) {
pipeline.setOpenMetadataServerConnection(
new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build());
Comment on lines +84 to +86
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setupServerConnection() sets a raw OpenMetadataServerConnection built from config only. The server trigger path uses OpenMetadataConnectionBuilder(openMetadataApplicationConfig, ingestionPipeline) to pick the pipeline bot when configured, and then encrypts/nullifies fields via SecretsManager before handing the pipeline off to the pipeline client. For parity (and to avoid leaking/incorrectly formatting JWTs for k8s clients that embed the workflow config), consider building the connection with the pipeline-aware constructor and applying the same encrypt/nullify approach used in IngestionPipelineResource.decryptOrNullify(...) for trigger operations.

Suggested change
if (McpApplicationContext.getConfig() != null) {
pipeline.setOpenMetadataServerConnection(
new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build());
var config = McpApplicationContext.getConfig();
if (config != null) {
pipeline.setOpenMetadataServerConnection(
new OpenMetadataConnectionBuilder(config, pipeline).build());

Copilot uses AI. Check for mistakes.
}

EntityReference serviceRef = pipeline.getService();
Object service = Entity.getEntity(serviceRef, "ingestionRunner", null);

var response = pipelineServiceClient.runPipeline(pipeline, service);
LOG.info("Trigger response for pipeline {}: {}", fqn, response);
return JsonUtils.getMap(response);
}

@Override
public Map<String, Object> execute(
Authorizer authorizer,
Limits limits,
CatalogSecurityContext securityContext,
Map<String, Object> params)
throws IOException {
throw new UnsupportedOperationException(
"TriggerIngestionPipelineTool does not require limit validation.");
}
}
Loading
Loading