Skip to content

Commit 78457cc

Browse files
committed
fix(ingestion-pipeline): inherit owners from service and authorize trigger (#27962)
1 parent 17c3b8b commit 78457cc

3 files changed

Lines changed: 266 additions & 1 deletion

File tree

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
package org.openmetadata.it.tests;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNotNull;
5+
import static org.junit.jupiter.api.Assertions.assertThrows;
6+
import static org.junit.jupiter.api.Assertions.assertTrue;
7+
8+
import java.util.Date;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.UUID;
12+
import org.joda.time.DateTime;
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.api.extension.ExtendWith;
15+
import org.junit.jupiter.api.parallel.Execution;
16+
import org.junit.jupiter.api.parallel.ExecutionMode;
17+
import org.openmetadata.it.factories.DashboardServiceTestFactory;
18+
import org.openmetadata.it.util.SdkClients;
19+
import org.openmetadata.it.util.TestNamespace;
20+
import org.openmetadata.it.util.TestNamespaceExtension;
21+
import org.openmetadata.schema.api.policies.CreatePolicy;
22+
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
23+
import org.openmetadata.schema.api.teams.CreateRole;
24+
import org.openmetadata.schema.api.teams.CreateUser;
25+
import org.openmetadata.schema.entity.policies.Policy;
26+
import org.openmetadata.schema.entity.policies.accessControl.Rule;
27+
import org.openmetadata.schema.entity.services.DashboardService;
28+
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
29+
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
30+
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
31+
import org.openmetadata.schema.entity.teams.Role;
32+
import org.openmetadata.schema.entity.teams.User;
33+
import org.openmetadata.schema.metadataIngestion.DashboardServiceMetadataPipeline;
34+
import org.openmetadata.schema.metadataIngestion.SourceConfig;
35+
import org.openmetadata.schema.type.EntityReference;
36+
import org.openmetadata.schema.type.MetadataOperation;
37+
import org.openmetadata.sdk.client.OpenMetadataClient;
38+
import org.openmetadata.sdk.network.HttpMethod;
39+
40+
/**
41+
* Integration tests for IngestionPipeline owner inheritance and trigger authorization.
42+
*
43+
* <p>Covers two coordinated changes that fix GH-27962 (Pylon-19838):
44+
*
45+
* <ul>
46+
* <li>{@code IngestionPipelineRepository.setInheritedFields} now inherits owners from the
47+
* referenced service / TestSuite / App, so {@code isOwner()} conditions on pipeline policies
48+
* evaluate correctly.
49+
* <li>{@code POST /v1/services/ingestionPipelines/trigger/{id}} now authorizes against {@code
50+
* MetadataOperation.TRIGGER}.
51+
* </ul>
52+
*/
53+
@Execution(ExecutionMode.CONCURRENT)
54+
@ExtendWith(TestNamespaceExtension.class)
55+
public class IngestionPipelineOwnerInheritanceIT {
56+
57+
private static final Date START_DATE = new DateTime("2022-06-10T15:06:47+00:00").toDate();
58+
59+
@Test
60+
void test_inheritedOwners_fromService(TestNamespace ns) {
61+
OpenMetadataClient adminClient = SdkClients.adminClient();
62+
String unique = UUID.randomUUID().toString().substring(0, 8);
63+
String userName = "ipinhowner_" + unique;
64+
User serviceOwner =
65+
adminClient
66+
.users()
67+
.create(
68+
new CreateUser().withName(userName).withEmail(userName + "@test.openmetadata.org"));
69+
70+
try {
71+
DashboardService service = DashboardServiceTestFactory.createMetabase(ns);
72+
DashboardService fetchedService =
73+
adminClient.dashboardServices().get(service.getId().toString());
74+
fetchedService.setOwners(List.of(serviceOwner.getEntityReference()));
75+
adminClient.dashboardServices().update(service.getId().toString(), fetchedService);
76+
77+
try {
78+
IngestionPipeline pipeline =
79+
adminClient
80+
.ingestionPipelines()
81+
.create(
82+
new CreateIngestionPipeline()
83+
.withName(ns.prefix("ipinhPipeline"))
84+
.withPipelineType(PipelineType.METADATA)
85+
.withService(service.getEntityReference())
86+
.withSourceConfig(
87+
new SourceConfig().withConfig(new DashboardServiceMetadataPipeline()))
88+
.withAirflowConfig(new AirflowConfig().withStartDate(START_DATE)));
89+
90+
try {
91+
IngestionPipeline withOwners =
92+
adminClient.ingestionPipelines().get(pipeline.getId().toString(), "owners");
93+
assertNotNull(withOwners.getOwners(), "Inherited owners should be populated");
94+
assertEquals(1, withOwners.getOwners().size(), "Pipeline should inherit one owner");
95+
EntityReference inherited = withOwners.getOwners().get(0);
96+
assertEquals(
97+
serviceOwner.getId(),
98+
inherited.getId(),
99+
"Inherited owner should match service owner");
100+
assertTrue(
101+
Boolean.TRUE.equals(inherited.getInherited()),
102+
"Owner inherited from the parent service must be marked inherited=true");
103+
} finally {
104+
adminClient.ingestionPipelines().delete(pipeline.getId().toString());
105+
}
106+
} finally {
107+
adminClient
108+
.dashboardServices()
109+
.delete(service.getId().toString(), Map.of("hardDelete", "true", "recursive", "true"));
110+
}
111+
} finally {
112+
adminClient.users().delete(serviceOwner.getId());
113+
}
114+
}
115+
116+
@Test
117+
void test_isOwnerPolicy_appliesToEditAndTrigger(TestNamespace ns) {
118+
OpenMetadataClient adminClient = SdkClients.adminClient();
119+
String unique = UUID.randomUUID().toString().substring(0, 8);
120+
121+
Rule ownerRule =
122+
new Rule()
123+
.withName("pipelineOwnerEditAndTrigger")
124+
.withDescription("Allow owners to edit and trigger ingestion pipelines")
125+
.withEffect(Rule.Effect.ALLOW)
126+
.withOperations(List.of(MetadataOperation.EDIT_ALL, MetadataOperation.TRIGGER))
127+
.withResources(List.of("ingestionPipeline"))
128+
.withCondition("isOwner()");
129+
Policy ownerPolicy =
130+
adminClient
131+
.policies()
132+
.create(
133+
new CreatePolicy()
134+
.withName("ipauthPolicy_" + unique)
135+
.withDescription("Owner-only policy for ingestion pipelines")
136+
.withRules(List.of(ownerRule)));
137+
138+
try {
139+
Role ownerRole =
140+
adminClient
141+
.roles()
142+
.create(
143+
new CreateRole()
144+
.withName("ipauthRole_" + unique)
145+
.withPolicies(List.of(ownerPolicy.getFullyQualifiedName())));
146+
147+
try {
148+
String ownerName = "ipauthowner_" + unique;
149+
User pipelineOwner =
150+
adminClient
151+
.users()
152+
.create(
153+
new CreateUser()
154+
.withName(ownerName)
155+
.withEmail(ownerName + "@test.openmetadata.org")
156+
.withRoles(List.of(ownerRole.getId())));
157+
158+
String otherName = "ipauthother_" + unique;
159+
User otherUser =
160+
adminClient
161+
.users()
162+
.create(
163+
new CreateUser()
164+
.withName(otherName)
165+
.withEmail(otherName + "@test.openmetadata.org"));
166+
167+
try {
168+
DashboardService service = DashboardServiceTestFactory.createMetabase(ns);
169+
DashboardService fetchedService =
170+
adminClient.dashboardServices().get(service.getId().toString());
171+
fetchedService.setOwners(List.of(pipelineOwner.getEntityReference()));
172+
adminClient.dashboardServices().update(service.getId().toString(), fetchedService);
173+
174+
try {
175+
IngestionPipeline pipeline =
176+
adminClient
177+
.ingestionPipelines()
178+
.create(
179+
new CreateIngestionPipeline()
180+
.withName(ns.prefix("ipauthPipeline_" + unique))
181+
.withPipelineType(PipelineType.METADATA)
182+
.withService(service.getEntityReference())
183+
.withSourceConfig(
184+
new SourceConfig()
185+
.withConfig(new DashboardServiceMetadataPipeline()))
186+
.withAirflowConfig(new AirflowConfig().withStartDate(START_DATE)));
187+
188+
try {
189+
OpenMetadataClient ownerClient =
190+
SdkClients.createClient(ownerName, ownerName, new String[] {});
191+
OpenMetadataClient otherClient =
192+
SdkClients.createClient(otherName, otherName, new String[] {});
193+
194+
// Owner can PATCH displayName.
195+
IngestionPipeline ownerEdit =
196+
adminClient.ingestionPipelines().get(pipeline.getId().toString());
197+
ownerEdit.setDisplayName("owner-updated-display-name");
198+
ownerClient.ingestionPipelines().update(pipeline.getId().toString(), ownerEdit);
199+
200+
// Non-owner cannot PATCH displayName.
201+
IngestionPipeline otherEdit =
202+
adminClient.ingestionPipelines().get(pipeline.getId().toString());
203+
otherEdit.setDisplayName("non-owner-attempt");
204+
assertThrows(
205+
Exception.class,
206+
() ->
207+
otherClient
208+
.ingestionPipelines()
209+
.update(pipeline.getId().toString(), otherEdit),
210+
"Non-owner PATCH should be forbidden");
211+
212+
// Owner can trigger.
213+
String triggerPath = "/v1/services/ingestionPipelines/trigger/" + pipeline.getId();
214+
ownerClient.getHttpClient().execute(HttpMethod.POST, triggerPath, null, Void.class);
215+
216+
// Non-owner cannot trigger.
217+
assertThrows(
218+
Exception.class,
219+
() ->
220+
otherClient
221+
.getHttpClient()
222+
.execute(HttpMethod.POST, triggerPath, null, Void.class),
223+
"Non-owner trigger should be forbidden");
224+
} finally {
225+
adminClient.ingestionPipelines().delete(pipeline.getId().toString());
226+
}
227+
} finally {
228+
adminClient
229+
.dashboardServices()
230+
.delete(
231+
service.getId().toString(), Map.of("hardDelete", "true", "recursive", "true"));
232+
}
233+
} finally {
234+
adminClient.users().delete(otherUser.getId());
235+
adminClient.users().delete(pipelineOwner.getId());
236+
}
237+
} finally {
238+
adminClient.roles().delete(ownerRole.getId());
239+
}
240+
} finally {
241+
adminClient.policies().delete(ownerPolicy.getId());
242+
}
243+
}
244+
}

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static org.openmetadata.schema.type.EventType.ENTITY_FIELDS_CHANGED;
1717
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
18+
import static org.openmetadata.schema.type.Include.ALL;
1819
import static org.openmetadata.service.Entity.INGESTION_PIPELINE;
1920

2021
import jakarta.ws.rs.core.Response;
@@ -67,6 +68,7 @@
6768
import org.openmetadata.service.Entity;
6869
import org.openmetadata.service.OpenMetadataApplicationConfig;
6970
import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher;
71+
import org.openmetadata.service.exception.EntityNotFoundException;
7072
import org.openmetadata.service.logstorage.LogStorageInterface;
7173
import org.openmetadata.service.logstorage.S3LogStorage.LogStreamListener;
7274
import org.openmetadata.service.monitoring.IngestionProgressTracker;
@@ -150,6 +152,24 @@ public void setFields(
150152
}
151153
}
152154

155+
@Override
156+
public void setInheritedFields(IngestionPipeline ingestionPipeline, Fields fields) {
157+
EntityReference serviceRef = ingestionPipeline.getService();
158+
if (serviceRef == null) {
159+
return;
160+
}
161+
try {
162+
EntityInterface parent = Entity.getEntity(serviceRef, "owners,domains", ALL);
163+
inheritOwners(ingestionPipeline, fields, parent);
164+
inheritDomains(ingestionPipeline, fields, parent);
165+
} catch (EntityNotFoundException e) {
166+
LOG.debug(
167+
"Parent service {} not found for ingestion pipeline {}; skipping owner/domain inheritance",
168+
serviceRef.getFullyQualifiedName(),
169+
ingestionPipeline.getFullyQualifiedName());
170+
}
171+
}
172+
153173
@Override
154174
public void setFieldsInBulk(Fields fields, List<IngestionPipeline> entities) {
155175
if (entities == null || entities.isEmpty()) {

openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1337,6 +1337,8 @@ private PipelineServiceClientResponse deployPipelineInternal(
13371337

13381338
public PipelineServiceClientResponse triggerPipelineInternal(
13391339
UUID id, UriInfo uriInfo, SecurityContext securityContext, String botName) {
1340+
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.TRIGGER);
1341+
authorizer.authorize(securityContext, operationContext, getResourceContextById(id));
13401342
if (pipelineServiceClient == null) {
13411343
return new PipelineServiceClientResponse()
13421344
.withCode(200)
@@ -1346,7 +1348,6 @@ public PipelineServiceClientResponse triggerPipelineInternal(
13461348
IngestionPipeline ingestionPipeline = repository.get(uriInfo, id, fields);
13471349
CreateResourceContext<IngestionPipeline> createResourceContext =
13481350
new CreateResourceContext<>(entityType, ingestionPipeline);
1349-
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.TRIGGER);
13501351
limits.enforceLimits(securityContext, createResourceContext, operationContext);
13511352
if (CommonUtil.nullOrEmpty(botName)) {
13521353
// Use Default Ingestion Bot

0 commit comments

Comments
 (0)