-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathTriggerIngestionPipelineTool.java
More file actions
87 lines (75 loc) · 3.19 KB
/
TriggerIngestionPipelineTool.java
File metadata and controls
87 lines (75 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package org.openmetadata.mcp.tools;
import java.io.IOException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.mcp.McpApplicationContext;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.Entity;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
import org.openmetadata.service.limits.Limits;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.auth.CatalogSecurityContext;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContext;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
@Slf4j
public class TriggerIngestionPipelineTool implements McpTool {
@Override
public Map<String, Object> execute(
Authorizer authorizer, CatalogSecurityContext securityContext, Map<String, Object> params)
throws IOException {
String fqn = (String) params.get("fqn");
if (fqn == null || fqn.isBlank()) {
throw new IllegalArgumentException("Parameter 'fqn' is required");
}
authorizer.authorize(
securityContext,
new OperationContext(Entity.INGESTION_PIPELINE, MetadataOperation.EDIT_ALL),
new ResourceContext<>(Entity.INGESTION_PIPELINE));
PipelineServiceClientInterface pipelineServiceClient =
PipelineServiceClientFactory.createPipelineServiceClient(null);
if (pipelineServiceClient == null) {
return Map.of(
"error",
"Pipeline service client is not configured. Ensure the ingestion infrastructure is"
+ " set up.",
"fqn",
fqn);
}
IngestionPipeline pipeline =
(IngestionPipeline) Entity.getEntityByName(Entity.INGESTION_PIPELINE, fqn, "*", null);
if (!Boolean.TRUE.equals(pipeline.getDeployed())) {
return Map.of(
"error",
"Pipeline '" + fqn + "' is not deployed. Deploy it first before triggering a run.",
"fqn",
fqn,
"deployed",
false);
}
LOG.info("Triggering ingestion pipeline: {}", fqn);
if (McpApplicationContext.getConfig() != null) {
pipeline.setOpenMetadataServerConnection(
new OpenMetadataConnectionBuilder(McpApplicationContext.getConfig()).build());
}
EntityReference serviceRef = pipeline.getService();
Object service = Entity.getEntity(serviceRef, "ingestionRunner", null);
var response = pipelineServiceClient.runPipeline(pipeline, service);
LOG.info("Trigger response for pipeline {}: {}", fqn, response);
return JsonUtils.getMap(response);
}
@Override
public Map<String, Object> execute(
Authorizer authorizer,
Limits limits,
CatalogSecurityContext securityContext,
Map<String, Object> params)
throws IOException {
throw new UnsupportedOperationException(
"TriggerIngestionPipelineTool does not require limit validation.");
}
}