Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package org.openmetadata.it.tests;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.openmetadata.it.factories.DashboardServiceTestFactory;
import org.openmetadata.it.util.SdkClients;
import org.openmetadata.it.util.TestNamespace;
import org.openmetadata.it.util.TestNamespaceExtension;
import org.openmetadata.schema.api.policies.CreatePolicy;
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
import org.openmetadata.schema.api.teams.CreateRole;
import org.openmetadata.schema.api.teams.CreateUser;
import org.openmetadata.schema.entity.policies.Policy;
import org.openmetadata.schema.entity.policies.accessControl.Rule;
import org.openmetadata.schema.entity.services.DashboardService;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.entity.teams.Role;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.metadataIngestion.DashboardServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.sdk.client.OpenMetadataClient;
import org.openmetadata.sdk.network.HttpMethod;

/**
* Integration tests for IngestionPipeline owner inheritance and trigger authorization.
*
* <p>Covers two coordinated changes that fix GH-27962 (Pylon-19838):
*
* <ul>
* <li>{@code IngestionPipelineRepository.setInheritedFields} now inherits owners from the
* referenced service / TestSuite / App, so {@code isOwner()} conditions on pipeline policies
* evaluate correctly.
* <li>{@code POST /v1/services/ingestionPipelines/trigger/{id}} now authorizes against {@code
* MetadataOperation.TRIGGER}.
* </ul>
*/
@Execution(ExecutionMode.CONCURRENT)
@ExtendWith(TestNamespaceExtension.class)
public class IngestionPipelineOwnerInheritanceIT {

private static final Date START_DATE = Date.from(Instant.parse("2022-06-10T15:06:47Z"));

@Test
void test_inheritedOwners_fromService(TestNamespace ns) {
OpenMetadataClient adminClient = SdkClients.adminClient();
String unique = UUID.randomUUID().toString().substring(0, 8);
String userName = "ipinhowner_" + unique;
User serviceOwner =
adminClient
.users()
.create(
new CreateUser().withName(userName).withEmail(userName + "@test.openmetadata.org"));

try {
DashboardService service = DashboardServiceTestFactory.createMetabase(ns);
DashboardService fetchedService =
adminClient.dashboardServices().get(service.getId().toString());
fetchedService.setOwners(List.of(serviceOwner.getEntityReference()));
adminClient.dashboardServices().update(service.getId().toString(), fetchedService);

try {
IngestionPipeline pipeline =
adminClient
.ingestionPipelines()
.create(
new CreateIngestionPipeline()
.withName(ns.prefix("ipinhPipeline"))
.withPipelineType(PipelineType.METADATA)
.withService(service.getEntityReference())
.withSourceConfig(
new SourceConfig().withConfig(new DashboardServiceMetadataPipeline()))
.withAirflowConfig(new AirflowConfig().withStartDate(START_DATE)));

try {
IngestionPipeline withOwners =
adminClient.ingestionPipelines().get(pipeline.getId().toString(), "owners");
assertNotNull(withOwners.getOwners(), "Inherited owners should be populated");
assertEquals(1, withOwners.getOwners().size(), "Pipeline should inherit one owner");
EntityReference inherited = withOwners.getOwners().get(0);
assertEquals(
serviceOwner.getId(),
inherited.getId(),
"Inherited owner should match service owner");
assertTrue(
Boolean.TRUE.equals(inherited.getInherited()),
"Owner inherited from the parent service must be marked inherited=true");
} finally {
adminClient.ingestionPipelines().delete(pipeline.getId().toString());
}
} finally {
adminClient
.dashboardServices()
.delete(service.getId().toString(), Map.of("hardDelete", "true", "recursive", "true"));
}
} finally {
adminClient.users().delete(serviceOwner.getId());
}
}

@Test
void test_isOwnerPolicy_appliesToEditAndTrigger(TestNamespace ns) {
OpenMetadataClient adminClient = SdkClients.adminClient();
String unique = UUID.randomUUID().toString().substring(0, 8);

Rule ownerRule =
new Rule()
.withName("pipelineOwnerEditAndTrigger")
.withDescription("Allow owners to edit and trigger ingestion pipelines")
.withEffect(Rule.Effect.ALLOW)
.withOperations(List.of(MetadataOperation.EDIT_ALL, MetadataOperation.TRIGGER))
.withResources(List.of("ingestionPipeline"))
.withCondition("isOwner()");
Policy ownerPolicy =
adminClient
.policies()
.create(
new CreatePolicy()
.withName("ipauthPolicy_" + unique)
.withDescription("Owner-only policy for ingestion pipelines")
.withRules(List.of(ownerRule)));

try {
Role ownerRole =
adminClient
.roles()
.create(
new CreateRole()
.withName("ipauthRole_" + unique)
.withPolicies(List.of(ownerPolicy.getFullyQualifiedName())));

try {
String ownerName = "ipauthowner_" + unique;
User pipelineOwner =
adminClient
.users()
.create(
new CreateUser()
.withName(ownerName)
.withEmail(ownerName + "@test.openmetadata.org")
.withRoles(List.of(ownerRole.getId())));

String otherName = "ipauthother_" + unique;
User otherUser =
adminClient
.users()
.create(
new CreateUser()
.withName(otherName)
.withEmail(otherName + "@test.openmetadata.org"));

try {
DashboardService service = DashboardServiceTestFactory.createMetabase(ns);
DashboardService fetchedService =
adminClient.dashboardServices().get(service.getId().toString());
fetchedService.setOwners(List.of(pipelineOwner.getEntityReference()));
adminClient.dashboardServices().update(service.getId().toString(), fetchedService);

try {
IngestionPipeline pipeline =
adminClient
.ingestionPipelines()
.create(
new CreateIngestionPipeline()
.withName(ns.prefix("ipauthPipeline_" + unique))
.withPipelineType(PipelineType.METADATA)
.withService(service.getEntityReference())
.withSourceConfig(
new SourceConfig()
.withConfig(new DashboardServiceMetadataPipeline()))
.withAirflowConfig(new AirflowConfig().withStartDate(START_DATE)));

try {
OpenMetadataClient ownerClient =
SdkClients.createClient(ownerName, ownerName, new String[] {});
OpenMetadataClient otherClient =
SdkClients.createClient(otherName, otherName, new String[] {});

// Owner can PATCH displayName.
IngestionPipeline ownerEdit =
adminClient.ingestionPipelines().get(pipeline.getId().toString());
ownerEdit.setDisplayName("owner-updated-display-name");
ownerClient.ingestionPipelines().update(pipeline.getId().toString(), ownerEdit);

// Non-owner cannot PATCH displayName.
IngestionPipeline otherEdit =
adminClient.ingestionPipelines().get(pipeline.getId().toString());
otherEdit.setDisplayName("non-owner-attempt");
assertThrows(
Exception.class,
() ->
otherClient
.ingestionPipelines()
.update(pipeline.getId().toString(), otherEdit),
"Non-owner PATCH should be forbidden");

// Owner can trigger.
String triggerPath = "/v1/services/ingestionPipelines/trigger/" + pipeline.getId();
ownerClient.getHttpClient().execute(HttpMethod.POST, triggerPath, null, Void.class);

// Non-owner cannot trigger.
assertThrows(
Exception.class,
() ->
otherClient
.getHttpClient()
.execute(HttpMethod.POST, triggerPath, null, Void.class),
"Non-owner trigger should be forbidden");
} finally {
adminClient.ingestionPipelines().delete(pipeline.getId().toString());
}
} finally {
adminClient
.dashboardServices()
.delete(
service.getId().toString(), Map.of("hardDelete", "true", "recursive", "true"));
}
} finally {
adminClient.users().delete(otherUser.getId());
adminClient.users().delete(pipelineOwner.getId());
}
} finally {
adminClient.roles().delete(ownerRole.getId());
}
} finally {
adminClient.policies().delete(ownerPolicy.getId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.openmetadata.schema.type.EventType.ENTITY_FIELDS_CHANGED;
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.INGESTION_PIPELINE;

import jakarta.ws.rs.core.Response;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.logstorage.LogStorageInterface;
import org.openmetadata.service.logstorage.S3LogStorage.LogStreamListener;
import org.openmetadata.service.monitoring.IngestionProgressTracker;
Expand Down Expand Up @@ -150,6 +152,24 @@ public void setFields(
}
}

@Override
public void setInheritedFields(IngestionPipeline ingestionPipeline, Fields fields) {
EntityReference serviceRef = ingestionPipeline.getService();
if (serviceRef == null) {
return;
}
try {
EntityInterface parent = Entity.getEntity(serviceRef, "owners,domains", ALL);
inheritOwners(ingestionPipeline, fields, parent);
inheritDomains(ingestionPipeline, fields, parent);
} catch (EntityNotFoundException e) {
LOG.debug(
"Parent service {} not found for ingestion pipeline {}; skipping owner/domain inheritance",
serviceRef.getFullyQualifiedName(),
ingestionPipeline.getFullyQualifiedName());
}
}

@Override
public void setFieldsInBulk(Fields fields, List<IngestionPipeline> entities) {
if (entities == null || entities.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.openmetadata.service.migration.mysql.v1129;

import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerOperationToDefaultBotPolicies;
import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerRuleToDataStewardPolicy;

import lombok.SneakyThrows;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;

public class Migration extends MigrationProcessImpl {

public Migration(MigrationFile migrationFile) {
super(migrationFile);
}

@Override
@SneakyThrows
public void runDataMigration() {
addTriggerOperationToDefaultBotPolicies(collectionDAO);
addTriggerRuleToDataStewardPolicy(collectionDAO);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.openmetadata.service.migration.mysql.v1130;

import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerOperationToDefaultBotPolicies;
import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerRuleToDataStewardPolicy;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
Expand Down Expand Up @@ -31,5 +34,7 @@ public void runDataMigration() {
LOG.error("v1130 glossaryTerm version relatedTerms transform failed; re-run to retry.", e);
}
MigrationUtil.addTableColumnSearchSettings();
addTriggerOperationToDefaultBotPolicies(collectionDAO);
addTriggerRuleToDataStewardPolicy(collectionDAO);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.openmetadata.service.migration.postgres.v1129;

import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerOperationToDefaultBotPolicies;
import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerRuleToDataStewardPolicy;

import lombok.SneakyThrows;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;

public class Migration extends MigrationProcessImpl {

public Migration(MigrationFile migrationFile) {
super(migrationFile);
}

@Override
@SneakyThrows
public void runDataMigration() {
addTriggerOperationToDefaultBotPolicies(collectionDAO);
addTriggerRuleToDataStewardPolicy(collectionDAO);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.openmetadata.service.migration.postgres.v1130;

import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerOperationToDefaultBotPolicies;
import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerRuleToDataStewardPolicy;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
Expand Down Expand Up @@ -31,5 +34,7 @@ public void runDataMigration() {
LOG.error("v1130 glossaryTerm version relatedTerms transform failed; re-run to retry.", e);
}
MigrationUtil.addTableColumnSearchSettings();
addTriggerOperationToDefaultBotPolicies(collectionDAO);
addTriggerRuleToDataStewardPolicy(collectionDAO);
}
}
Loading
Loading