diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py index 5a97395ac6cd..6ffedde4ac78 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py @@ -69,6 +69,9 @@ TopicDetails, TopicFQN, ) +from metadata.ingestion.source.pipeline.openlineage.ownership_resolver import ( + OpenLineageOwnerResolver, +) from metadata.ingestion.source.pipeline.openlineage.service_resolver import ( build_service_name, extract_integration_type, @@ -130,6 +133,11 @@ def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str def prepare(self): self._service_cache = {} self._current_pipeline_service = None + self._owner_resolver = OpenLineageOwnerResolver( + self.metadata, + include_owners=self.source_config.includeOwners, + ownership_update_mode=self.source_config.ownershipUpdateMode, + ) self._entity_cache: LRUCache = LRUCache(maxsize=10000) self._namespace_to_service_cache: LRUCache = LRUCache(maxsize=10000) self._resolution_cache: LRUCache = LRUCache(maxsize=RESOLUTION_CACHE_MAXSIZE) @@ -797,13 +805,24 @@ def yield_pipeline(self, pipeline_details: OpenLineageEvent) -> Iterable[Either[ pipeline_name = self.get_pipeline_name(pipeline_details) self._current_pipeline_service = self._resolve_pipeline_service(pipeline_details) try: + pipeline_fqn = fqn.build( + metadata=self.metadata, + entity_type=Pipeline, + service_name=self._current_pipeline_service, + pipeline_name=pipeline_name, + ) + owners = self._owner_resolver.get_pipeline_job_owners( + pipeline_details.job, + pipeline_fqn=pipeline_fqn, + ) description = f"""```json {json.dumps(pipeline_details.run_facet, indent=4).strip()}```""" - request = CreatePipelineRequest( + request = CreatePipelineRequest( # pyright: ignore[reportCallIssue] name=pipeline_name, service=self._current_pipeline_service, description=description, tasks=[], + owners=owners, ) yield Either(right=request) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/ownership_resolver.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/ownership_resolver.py new file mode 100644 index 000000000000..d1ebea862fea --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/ownership_resolver.py @@ -0,0 +1,237 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Any, Dict, List, Optional, Set, Tuple # noqa: UP035 + +from metadata.generated.schema.entity.teams.team import Team, TeamType +from metadata.generated.schema.entity.teams.user import User +from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import ( + OwnershipUpdateMode, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.entityReferenceList import EntityReferenceList +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.utils import model_str +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +OWNER_CACHE_PAGE_SIZE = 1000 +OWNER_PIPELINE_ENTITY = "pipeline" +OWNER_TEAM_ENTITY = "team" +OWNER_USER_ENTITY = "user" + + +class OpenLineageOwnerResolver: + """ + Resolve OpenLineage job ownership facet owners. + + OpenLineage recommends owner identifiers such as ``team:data`` and + ``user:jdoe``. Qualified identifiers are resolved against their matching + OpenMetadata entity type. Unqualified names are resolved as Group team + first, then user, with a warning when both exist. + """ + + def __init__( + self, + metadata: OpenMetadata, + include_owners: bool | None, + ownership_update_mode: OwnershipUpdateMode | str | None, + ): + self.metadata = metadata + self.include_owners = bool(include_owners) + self.ownership_update_mode = OwnershipUpdateMode( + ownership_update_mode.value + if isinstance(ownership_update_mode, OwnershipUpdateMode) + else ownership_update_mode or OwnershipUpdateMode.replace.value + ) + self._owner_cache_loaded = False + self._team_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006 + self._user_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006 + self._owner_refs_by_pipeline_fqn: Dict[str, List[EntityReference]] = {} # noqa: UP006 + + def get_pipeline_job_owners( + self, + job: Dict[str, Any], # noqa: UP006 + pipeline_fqn: Optional[str] = None, # noqa: UP045 + ) -> Optional[EntityReferenceList]: # noqa: UP045 + """ + Resolve owners from ``job.facets.ownership.owners``. + + ``replace`` returns owners resolved from the current event. ``append`` + returns active existing Pipeline owners plus newly resolved owners. + """ + if not self.include_owners: + return None + + owners = (((job or {}).get("facets") or {}).get("ownership") or {}).get("owners") or [] + if not isinstance(owners, list) or not owners: + return None + + self._ensure_pipeline_owner_cache() + + resolved: List[EntityReference] = [] # noqa: UP006 + seen: Set[Tuple[str, str]] = set() # noqa: UP006 + + for owner in owners: + owner_ref = self._get_owner_ref(owner) + if not owner_ref: + continue + + ref_key = (owner_ref.type, str(owner_ref.id)) + if ref_key not in seen: + resolved.append(owner_ref) + seen.add(ref_key) + + if not resolved: + return None + + if self.ownership_update_mode != OwnershipUpdateMode.append or not pipeline_fqn: + return EntityReferenceList(root=resolved) + + existing_owners = self._owner_refs_by_pipeline_fqn.get(pipeline_fqn, []) + merged_owners = list(existing_owners) + seen_refs = {(owner_ref.type, str(owner_ref.id)) for owner_ref in existing_owners} + for owner_ref in resolved: + ref_key = (owner_ref.type, str(owner_ref.id)) + if ref_key not in seen_refs: + merged_owners.append(owner_ref) + seen_refs.add(ref_key) + + self._owner_refs_by_pipeline_fqn[pipeline_fqn] = merged_owners + return EntityReferenceList(root=merged_owners) + + def _get_owner_ref(self, owner: Any) -> Optional[EntityReference]: # noqa: UP045 + """ + Resolve a single OpenLineage ownership owner object to an OpenMetadata owner reference. + + Qualified names such as ``team:data-platform`` and ``user:jdoe`` are resolved against the + matching entity cache. Unqualified names are resolved as Group team first, then user. + """ + if not isinstance(owner, dict): + return None + + raw_owner_name = owner.get("name") + if not isinstance(raw_owner_name, str) or not raw_owner_name: + return None + + owner_type, separator, owner_name = raw_owner_name.partition(":") + if separator: + owner_type = owner_type.lower() + else: + owner_type = None + owner_name = raw_owner_name + + owner_key = owner_name.strip().lower() + if not owner_key: + return None + + if owner_type == OWNER_TEAM_ENTITY: + owner_ref = self._team_owner_ref_by_name.get(owner_key) + elif owner_type == OWNER_USER_ENTITY: + owner_ref = self._user_owner_ref_by_name.get(owner_key) + else: + team_ref = self._team_owner_ref_by_name.get(owner_key) + user_ref = self._user_owner_ref_by_name.get(owner_key) + if team_ref and user_ref: + logger.warning( + f"OpenLineage owner [{raw_owner_name}] matched both a team " + "and a user. Using the team for pipeline ownership." + ) + owner_ref = team_ref or user_ref + + if not owner_ref: + logger.warning(f"Unable to resolve OpenLineage owner [{raw_owner_name}] for pipeline ownership.") + return owner_ref + + def _ensure_pipeline_owner_cache(self) -> None: + """ + Load OpenMetadata Group teams, users, and pipeline ownership references. + """ + if self._owner_cache_loaded: + return + + team_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006 + user_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006 + owner_refs_by_pipeline_fqn: Dict[str, List[EntityReference]] = {} # noqa: UP006 + + try: + for team in self.metadata.list_all_entities( + entity=Team, + fields=["teamType", "owns"], + limit=OWNER_CACHE_PAGE_SIZE, + params={"ownsEntityType": OWNER_PIPELINE_ENTITY}, + skip_on_failure=True, + ): + if team.teamType != TeamType.Group: + continue + team_name = model_str(team.name).strip().lower() + if team_name: + team_ref = team_owner_ref_by_name.setdefault( + team_name, + EntityReference( # pyright: ignore[reportCallIssue] + id=model_str(team.id), + type=OWNER_TEAM_ENTITY, + name=model_str(team.name), + displayName=team.displayName, + ), + ) + for owned_pipeline in team.owns.root if team.owns else []: + pipeline_fqn_value = owned_pipeline.fullyQualifiedName or owned_pipeline.name + if owned_pipeline.type == OWNER_PIPELINE_ENTITY and pipeline_fqn_value: + pipeline_fqn = model_str(pipeline_fqn_value) + pipeline_owner_refs = owner_refs_by_pipeline_fqn.setdefault(pipeline_fqn, []) + if not any( + owner.type == team_ref.type and str(owner.id) == str(team_ref.id) + for owner in pipeline_owner_refs + ): + pipeline_owner_refs.append(team_ref) + except Exception as exc: + logger.warning(f"Unable to load OpenMetadata teams for owner cache: {exc}") + + try: + for user in self.metadata.list_all_entities( + entity=User, + fields=["owns"], + limit=OWNER_CACHE_PAGE_SIZE, + params={ + "ownsEntityType": OWNER_PIPELINE_ENTITY, + "directOwnsOnly": "true", + }, + skip_on_failure=True, + ): + user_name = model_str(user.name).strip().lower() + if user_name: + user_ref = user_owner_ref_by_name.setdefault( + user_name, + EntityReference( # pyright: ignore[reportCallIssue] + id=model_str(user.id), + type=OWNER_USER_ENTITY, + name=model_str(user.name), + displayName=user.displayName, + ), + ) + for owned_pipeline in user.owns.root if user.owns else []: + pipeline_fqn_value = owned_pipeline.fullyQualifiedName or owned_pipeline.name + if owned_pipeline.type == OWNER_PIPELINE_ENTITY and pipeline_fqn_value: + pipeline_fqn = model_str(pipeline_fqn_value) + pipeline_owner_refs = owner_refs_by_pipeline_fqn.setdefault(pipeline_fqn, []) + if not any( + owner.type == user_ref.type and str(owner.id) == str(user_ref.id) + for owner in pipeline_owner_refs + ): + pipeline_owner_refs.append(user_ref) + except Exception as exc: + logger.warning(f"Unable to load OpenMetadata users for owner cache: {exc}") + + self._team_owner_ref_by_name = team_owner_ref_by_name + self._user_owner_ref_by_name = user_owner_ref_by_name + self._owner_refs_by_pipeline_fqn = owner_refs_by_pipeline_fqn + self._owner_cache_loaded = True diff --git a/ingestion/tests/unit/topology/pipeline/test_openlineage.py b/ingestion/tests/unit/topology/pipeline/test_openlineage.py index 6ac39f52859b..12e03ebc55ba 100644 --- a/ingestion/tests/unit/topology/pipeline/test_openlineage.py +++ b/ingestion/tests/unit/topology/pipeline/test_openlineage.py @@ -4,7 +4,7 @@ import unittest from pathlib import Path from unittest.mock import MagicMock, Mock, patch -from uuid import UUID +from uuid import UUID, uuid4 from cachetools import LRUCache @@ -35,6 +35,7 @@ from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.generated.schema.type.entityLineage import ColumnLineage from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.entityReferenceList import EntityReferenceList from metadata.ingestion.source.pipeline.openlineage.metadata import ( RESOLUTION_CACHE_MAXSIZE, OpenlineageSource, @@ -48,6 +49,7 @@ from metadata.ingestion.source.pipeline.openlineage.utils import ( message_to_open_lineage_event, ) +from metadata.utils import fqn MOCK_WORKFLOW_CONFIG = { "openMetadataServerConfig": { @@ -956,6 +958,69 @@ def test_get_pipelines_list(self): self.assertIsInstance(ol_event, OpenLineageEvent) self.assertEqual(ol_event, EXPECTED_OL_EVENT) + def test_yield_pipeline_sets_owners_from_job_ownership_facet(self): + """Test pipeline owners are populated from OpenLineage job ownership facet.""" + ol_event = copy.deepcopy(EXPECTED_OL_EVENT) + ol_event.job = { + **ol_event.job, + "facets": {"ownership": {"owners": [{"name": "team:data-platform", "type": "OWNER"}]}}, + } + owners = EntityReferenceList( + root=[ + EntityReference( + id=uuid4(), + type="team", + name="data-platform", + displayName="Data Platform", + ) + ] + ) + owner_resolver = Mock() + owner_resolver.get_pipeline_job_owners.return_value = owners + self.open_lineage_source._owner_resolver = owner_resolver + + with ( + patch.object( + self.open_lineage_source, + "_resolve_pipeline_service", + return_value=MOCK_PIPELINE_SERVICE.name.root, + ), + patch.object(self.open_lineage_source, "register_record"), + ): + results = list(self.open_lineage_source.yield_pipeline(ol_event)) + + self.assertEqual(len(results), 1) + self.assertEqual(results[0].right.owners, owners) + owner_resolver.get_pipeline_job_owners.assert_called_once() + self.assertEqual( + owner_resolver.get_pipeline_job_owners.call_args.args, + (ol_event.job,), + ) + self.assertEqual( + owner_resolver.get_pipeline_job_owners.call_args.kwargs["pipeline_fqn"], + fqn.build( + metadata=self.open_lineage_source.metadata, + entity_type=Pipeline, + service_name=MOCK_PIPELINE_SERVICE.name.root, + pipeline_name=self.open_lineage_source.get_pipeline_name(ol_event), + ), + ) + + def test_prepare_passes_include_owners_to_owner_resolver(self): + self.open_lineage_source.source_config.includeOwners = False + + with ( + patch.object(self.open_lineage_source, "_build_db_service_type_map", return_value={}), + patch("metadata.ingestion.source.pipeline.openlineage.metadata.OpenLineageOwnerResolver") as resolver_cls, + ): + self.open_lineage_source.prepare() + + resolver_cls.assert_called_once_with( + self.open_lineage_source.metadata, + include_owners=False, + ownership_update_mode=self.open_lineage_source.source_config.ownershipUpdateMode, + ) + @patch("metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om") def test_yield_pipeline_lineage_details(self, mock_get_table_from_om): def t_fqn_build_side_effect( diff --git a/ingestion/tests/unit/topology/pipeline/test_openlineage_ownership.py b/ingestion/tests/unit/topology/pipeline/test_openlineage_ownership.py new file mode 100644 index 000000000000..a8e06b31480b --- /dev/null +++ b/ingestion/tests/unit/topology/pipeline/test_openlineage_ownership.py @@ -0,0 +1,257 @@ +from unittest.mock import MagicMock, patch +from uuid import uuid4 + +from metadata.generated.schema.entity.teams.team import Team, TeamType +from metadata.generated.schema.entity.teams.user import User +from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import ( + OwnershipUpdateMode, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.entityReferenceList import EntityReferenceList +from metadata.ingestion.source.pipeline.openlineage.ownership_resolver import ( + OpenLineageOwnerResolver, +) + + +def build_user( + name: str, + display_name: str | None = None, + owns: list[EntityReference] | None = None, +) -> User: + return User( + id=uuid4(), + name=name, + displayName=display_name, + email=f"{name}@example.com", + owns=EntityReferenceList(root=owns) if owns else None, + ) + + +def build_team( + name: str, + display_name: str | None = None, + team_type: TeamType = TeamType.Group, + owns: list[EntityReference] | None = None, +) -> Team: + return Team( + id=uuid4(), + name=name, + teamType=team_type, + displayName=display_name, + owns=EntityReferenceList(root=owns) if owns else None, + ) + + +def build_resolver( + users=None, + teams=None, + include_owners=True, + ownership_update_mode=OwnershipUpdateMode.replace, +): + metadata = MagicMock() + + def list_all_entities(entity, **_): + if entity is User: + return users or [] + if entity is Team: + return teams or [] + return [] + + metadata.list_all_entities.side_effect = list_all_entities + return ( + OpenLineageOwnerResolver( + metadata, + include_owners=include_owners, + ownership_update_mode=ownership_update_mode, + ), + metadata, + ) + + +def build_job(owner_names): + return {"facets": {"ownership": {"owners": [{"name": owner_name} for owner_name in owner_names]}}} + + +def test_resolves_qualified_user_and_team_owner_names(): + user = build_user("jdoe", "John Doe") + team = build_team("data-platform", "Data Platform") + resolver, _ = build_resolver(users=[user], teams=[team]) + + owners = resolver.get_pipeline_job_owners(build_job(["user:jdoe", "team:data-platform"])) + + assert owners is not None + assert [(owner.type, owner.name) for owner in owners.root] == [ + ("user", "jdoe"), + ("team", "data-platform"), + ] + + +def test_unqualified_owner_prefers_group_team_over_user(): + user = build_user("analytics") + team = build_team("analytics") + resolver, _ = build_resolver(users=[user], teams=[team]) + + with patch("metadata.ingestion.source.pipeline.openlineage.ownership_resolver.logger.warning") as warning: + owners = resolver.get_pipeline_job_owners(build_job(["analytics"])) + + assert owners is not None + assert len(owners.root) == 1 + assert owners.root[0].type == "team" + assert owners.root[0].name == "analytics" + warning.assert_called_once_with( + "OpenLineage owner [analytics] matched both a team and a user. Using the team for pipeline ownership." + ) + + +def test_only_caches_group_teams(): + department = build_team("finance", team_type=TeamType.Department) + resolver, _ = build_resolver(teams=[department]) + + with patch("metadata.ingestion.source.pipeline.openlineage.ownership_resolver.logger.warning") as warning: + owners = resolver.get_pipeline_job_owners(build_job(["team:finance"])) + + assert owners is None + warning.assert_called_once_with("Unable to resolve OpenLineage owner [team:finance] for pipeline ownership.") + + +def test_does_not_build_cache_without_ownership_facet(): + resolver, metadata = build_resolver() + + owners = resolver.get_pipeline_job_owners({"name": "job"}) + + assert owners is None + metadata.list_all_entities.assert_not_called() + + +def test_does_not_build_cache_when_include_owners_is_disabled(): + resolver, metadata = build_resolver( + users=[build_user("jdoe")], + teams=[build_team("data-platform")], + include_owners=False, + ) + + owners = resolver.get_pipeline_job_owners(build_job(["user:jdoe", "team:data-platform"])) + + assert owners is None + metadata.list_all_entities.assert_not_called() + + +def test_does_not_build_cache_when_include_owners_is_none(): + resolver, metadata = build_resolver( + users=[build_user("jdoe")], + teams=[build_team("data-platform")], + include_owners=None, + ) + + owners = resolver.get_pipeline_job_owners(build_job(["user:jdoe", "team:data-platform"])) + + assert owners is None + metadata.list_all_entities.assert_not_called() + + +def test_builds_pipeline_owner_cache_with_filtered_owns_api(): + pipeline_ref = EntityReference( + id=uuid4(), + type="pipeline", + name="daily_orders", + fullyQualifiedName="airflow.daily_orders", + ) + user = build_user("jdoe", owns=[pipeline_ref]) + team = build_team("data-platform", owns=[pipeline_ref]) + resolver, metadata = build_resolver(users=[user], teams=[team]) + + resolver.get_pipeline_job_owners(build_job(["team:data-platform"])) + + metadata.list_all_entities.assert_any_call( + entity=Team, + fields=["teamType", "owns"], + limit=1000, + params={"ownsEntityType": "pipeline"}, + skip_on_failure=True, + ) + metadata.list_all_entities.assert_any_call( + entity=User, + fields=["owns"], + limit=1000, + params={"ownsEntityType": "pipeline", "directOwnsOnly": "true"}, + skip_on_failure=True, + ) + + +def test_replace_mode_does_not_carry_existing_pipeline_owners(): + pipeline_ref = EntityReference( + id=uuid4(), + type="pipeline", + name="daily_orders", + fullyQualifiedName="airflow.daily_orders", + ) + existing_team = build_team("data-platform", owns=[pipeline_ref]) + new_user = build_user("jdoe") + resolver, _ = build_resolver( + users=[new_user], + teams=[existing_team], + ownership_update_mode="replace", + ) + + owners = resolver.get_pipeline_job_owners( + build_job(["user:jdoe"]), + pipeline_fqn="airflow.daily_orders", + ) + + assert owners is not None + assert [(owner.type, owner.name) for owner in owners.root] == [("user", "jdoe")] + + +def test_none_ownership_update_mode_defaults_to_replace(): + pipeline_ref = EntityReference( + id=uuid4(), + type="pipeline", + name="daily_orders", + fullyQualifiedName="airflow.daily_orders", + ) + existing_team = build_team("data-platform", owns=[pipeline_ref]) + new_user = build_user("jdoe") + resolver, _ = build_resolver( + users=[new_user], + teams=[existing_team], + ownership_update_mode=None, + ) + + owners = resolver.get_pipeline_job_owners( + build_job(["user:jdoe"]), + pipeline_fqn="airflow.daily_orders", + ) + + assert owners is not None + assert [(owner.type, owner.name) for owner in owners.root] == [("user", "jdoe")] + + +def test_append_mode_merges_existing_pipeline_owners_and_updates_cache(): + pipeline_ref = EntityReference( + id=uuid4(), + type="pipeline", + name="daily_orders", + fullyQualifiedName="airflow.daily_orders", + ) + user = build_user("jdoe") + team = build_team("data-platform", owns=[pipeline_ref]) + resolver, _ = build_resolver( + users=[user], + teams=[team], + ownership_update_mode="append", + ) + + owners = resolver.get_pipeline_job_owners( + build_job(["user:jdoe"]), + pipeline_fqn="airflow.daily_orders", + ) + + assert owners is not None + assert [(owner.type, owner.name) for owner in owners.root] == [ + ("team", "data-platform"), + ("user", "jdoe"), + ] + assert [(owner.type, owner.name) for owner in resolver._owner_refs_by_pipeline_fqn["airflow.daily_orders"]] == [ + ("team", "data-platform"), + ("user", "jdoe"), + ] diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TeamResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TeamResourceIT.java index 6cb5237ae9c3..eca15e67e1f7 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TeamResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/TeamResourceIT.java @@ -27,17 +27,21 @@ import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; import org.openmetadata.it.factories.DatabaseServiceTestFactory; +import org.openmetadata.it.factories.PipelineServiceTestFactory; import org.openmetadata.it.util.SdkClients; import org.openmetadata.it.util.TestNamespace; import org.openmetadata.schema.api.data.CreateDatabase; import org.openmetadata.schema.api.data.CreateDatabaseSchema; +import org.openmetadata.schema.api.data.CreatePipeline; import org.openmetadata.schema.api.data.CreateTable; import org.openmetadata.schema.api.teams.CreateTeam; import org.openmetadata.schema.api.teams.CreateTeam.TeamType; import org.openmetadata.schema.api.teams.CreateUser; import org.openmetadata.schema.entity.data.Database; import org.openmetadata.schema.entity.data.DatabaseSchema; +import org.openmetadata.schema.entity.data.Pipeline; import org.openmetadata.schema.entity.services.DatabaseService; +import org.openmetadata.schema.entity.services.PipelineService; import org.openmetadata.schema.entity.teams.Team; import org.openmetadata.schema.entity.teams.User; import org.openmetadata.schema.type.ApiStatus; @@ -872,6 +876,57 @@ void test_updateTeamParent(TestNamespace ns) { assertFalse(verifyDiv.getParents().stream().anyMatch(p -> p.getId().equals(bu1.getId()))); } + @Test + void test_listTeams_ownsEntityTypeFilter_preservesDomainsAndFiltersOwns(TestNamespace ns) { + OpenMetadataClient client = SdkClients.adminClient(); + + String domainFqn = testDomain().getFullyQualifiedName(); + CreateTeam createTeam = + new CreateTeam() + .withName(ns.prefix("ownsFilterTeam")) + .withTeamType(TeamType.GROUP) + .withDomains(List.of(domainFqn)) + .withDescription("Team for ownsEntityType filter regression test (#28381)"); + Team team = createEntity(createTeam); + + PipelineService service = PipelineServiceTestFactory.createAirflow(ns); + CreatePipeline createPipeline = + new CreatePipeline() + .withName(ns.prefix("ownedPipeline")) + .withService(service.getFullyQualifiedName()) + .withOwners(List.of(team.getEntityReference())); + Pipeline pipeline = client.pipelines().create(createPipeline); + + ListParams params = new ListParams(); + params.setLimit(1000000); + params.setFields("owns,domains"); + params.addFilter("ownsEntityType", "pipeline"); + ListResponse response = listEntities(params); + + Team listed = + response.getData().stream() + .filter(t -> t.getId().equals(team.getId())) + .findFirst() + .orElse(null); + assertNotNull(listed, "Team must be present in the list response"); + + // Regression guard (#28381): the ownsEntityType filter must not drop relationship-bulk + // fields like domains, which Team populates only via the batched relationship fetch. + assertNotNull( + listed.getDomains(), "domains must not be dropped when the ownsEntityType filter is set"); + assertTrue( + listed.getDomains().stream().anyMatch(d -> d.getFullyQualifiedName().equals(domainFqn)), + "Team domains should be preserved alongside the ownsEntityType filter"); + + assertNotNull(listed.getOwns(), "owns should be populated"); + assertTrue( + listed.getOwns().stream().anyMatch(o -> o.getId().equals(pipeline.getId())), + "owns should include the owned pipeline"); + assertTrue( + listed.getOwns().stream().allMatch(o -> "pipeline".equals(o.getType())), + "owns should contain only pipelines when ownsEntityType=pipeline"); + } + @Test void test_teamWithOwner(TestNamespace ns) { OpenMetadataClient client = SdkClients.adminClient(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index e6f3d23834b7..85089f20a0cf 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -2150,7 +2150,7 @@ public final List listAll(Fields fields, ListFilter filter) { T entity = JsonUtils.readValue(json, entityClass); entities.add(entity); } - setFieldsInBulk(fields, entities); + setFieldsInBulk(fields, entities, filter); return entities; } @@ -2181,6 +2181,10 @@ public final List listAllForCSV(Fields fields, String parentFqn) { *

* Example implementation can be found in {@link GlossaryTermRepository#setFieldsInBulk}. */ + public void setFieldsInBulk(Fields fields, List entities, ListFilter filter) { + setFieldsInBulk(fields, entities); + } + public void setFieldsInBulk(Fields fields, List entities) { if (entities == null || entities.isEmpty()) { return; @@ -2223,7 +2227,7 @@ public ResultList listAfter( String afterId = cursorMap.get("id"); List jsons = dao.listAfter(filter, limitParam + 1, afterName, afterId); - entities = listInternal(jsons, fields, uriInfo); + entities = listInternal(jsons, fields, uriInfo, filter); String beforeCursor; String afterCursor = null; @@ -2245,18 +2249,19 @@ public ResultList listAfterWithOffset( int total = ListCountCache.getOrCompute(entityType, filter, () -> dao.listCount(filter)); List jsons = dao.listAfter(filter, limit, offset); - List entities = listInternal(jsons, fields, uriInfo); + List entities = listInternal(jsons, fields, uriInfo, filter); return new ResultList<>(entities, offset, limit, total); } - private List listInternal(List jsons, Fields fields, UriInfo uriInfo) { + private List listInternal( + List jsons, Fields fields, UriInfo uriInfo, ListFilter filter) { List entities; try (var ignored = phase("jsonDeserialize")) { entities = JsonUtils.readObjects(jsons, entityClass); } try (var ignored = phase("setFieldsBulk")) { - setFieldsInBulk(fields, entities); + setFieldsInBulk(fields, entities, filter); } entities.forEach(entity -> withHref(uriInfo, entity)); return entities; @@ -2280,7 +2285,8 @@ public ResultList listAfterKeyset( boolean hasMoreData = jsons.size() > limitParam; List jsonsToProcess = hasMoreData ? jsons.subList(0, limitParam) : jsons; - Iterator> iterator = serializeJsons(jsonsToProcess, fields, null); + Iterator> iterator = + serializeJsons(jsonsToProcess, fields, null, filter); while (iterator.hasNext()) { Either either = iterator.next(); if (either.right().isPresent()) { @@ -2339,7 +2345,7 @@ public ResultList listBefore( List jsons = dao.listBefore(filter, limitParam + 1, beforeName, beforeId); List entities = JsonUtils.readObjects(jsons, entityClass); - setFieldsInBulk(fields, entities); + setFieldsInBulk(fields, entities, filter); entities.forEach(entity -> withHref(uriInfo, entity)); String beforeCursor = null; @@ -2439,7 +2445,7 @@ public final ResultList listWithOffset( String beforeOffset = getBeforeOffset(offsetInt, limitParam); if (limitParam > 0) { List jsons = callable.apply(filter, limitParam, offsetInt); - Iterator> iterator = serializeJsons(jsons, fields, uriInfo); + Iterator> iterator = serializeJsons(jsons, fields, uriInfo, filter); while (iterator.hasNext()) { Either either = iterator.next(); if (either.right().isPresent()) { @@ -9970,9 +9976,21 @@ public static void validateColumn(Table table, String columnName, Boolean caseSe } protected void fetchAndSetFields(List entities, Fields fields) { + fetchAndSetFieldsExcept(entities, fields, Collections.emptySet()); + } + + /** + * Same as {@link #fetchAndSetFields} but skips the fetchers for {@code excludedFields}, letting + * the caller populate those fields itself (e.g. with a filtered query). Excluded fields are + * expected to be per-field fetchers, not relationship-bulk fields, so the batched relationship + * fetch still runs to keep fields like {@code domains}/{@code owners} populated and to avoid N+1. + */ + protected void fetchAndSetFieldsExcept( + List entities, Fields fields, Set excludedFields) { Set relationshipFieldsHandled = fetchAndSetRelationshipFieldsInBulk(entities, fields); for (Entry, Fields>> entry : fieldFetchers.entrySet()) { - if (relationshipFieldsHandled.contains(entry.getKey())) { + if (excludedFields.contains(entry.getKey()) + || relationshipFieldsHandled.contains(entry.getKey())) { continue; } entry.getValue().accept(entities, fields); @@ -10912,7 +10930,7 @@ List entityListToStrings(List entities) { } private Iterator> serializeJsons( - List jsons, Fields fields, UriInfo uriInfo) { + List jsons, Fields fields, UriInfo uriInfo, ListFilter filter) { List> results = new ArrayList<>(); List entities = new ArrayList<>(); @@ -10931,7 +10949,7 @@ private Iterator> serializeJsons( if (!entities.isEmpty()) { try { - setFieldsInBulk(fields, entities); + setFieldsInBulk(fields, entities, filter); if (!nullOrEmpty(uriInfo)) { entities.forEach(entity -> withHref(uriInfo, entity)); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TeamRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TeamRepository.java index a23862635616..baaa98508dad 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TeamRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TeamRepository.java @@ -110,6 +110,7 @@ public class TeamRepository extends EntityRepository { static final String PARENTS_FIELD = "parents"; static final String USERS_FIELD = "users"; + private static final String OWNS_ENTITY_TYPE_PARAM = "ownsEntityType"; static final String TEAM_UPDATE_FIELDS = "profile,users,defaultRoles,defaultPersona,parents,children,policies,teamType,email,domains"; static final String TEAM_PATCH_FIELDS = @@ -356,16 +357,25 @@ private void fetchAndSetUserCount(List teams, Fields fields) { } private void fetchAndSetOwns(List teams, Fields fields) { + fetchAndSetOwns(teams, fields, null); + } + + private void fetchAndSetOwns(List teams, Fields fields, ListFilter filter) { if (!fields.contains("owns") || teams == null || teams.isEmpty()) { return; } List teamIds = teams.stream().map(Team::getId).map(UUID::toString).distinct().toList(); + String ownsEntityType = filter == null ? null : filter.getQueryParam(OWNS_ENTITY_TYPE_PARAM); List ownsRecords = - daoCollection - .relationshipDAO() - .findToBatchAllTypes(teamIds, Relationship.OWNS.ordinal(), Include.ALL); + nullOrEmpty(ownsEntityType) + ? daoCollection + .relationshipDAO() + .findToBatchAllTypes(teamIds, Relationship.OWNS.ordinal(), Include.ALL) + : daoCollection + .relationshipDAO() + .findToBatch(teamIds, Relationship.OWNS.ordinal(), ownsEntityType, Include.ALL); Map> teamToOwns = new HashMap<>(); for (CollectionDAO.EntityRelationshipObject record : ownsRecords) { @@ -386,6 +396,22 @@ private void fetchAndSetOwns(List teams, Fields fields) { } } + @Override + public void setFieldsInBulk(Fields fields, List teams, ListFilter filter) { + if (fields.contains("owns") + && filter != null + && !nullOrEmpty(filter.getQueryParam(OWNS_ENTITY_TYPE_PARAM))) { + fetchAndSetFieldsExcept(teams, fields, Set.of("owns")); + fetchAndSetOwns(teams, fields, filter); + setInheritedFields(teams, fields); + for (Team team : teams) { + clearFieldsInternal(team, fields); + } + return; + } + super.setFieldsInBulk(fields, teams); + } + private List getDomains(UUID teamId) { // Team does not have domain. 'domains' is the field for user as team can belong to multiple // domains diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UserRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UserRepository.java index 3e4b2b1766a5..df1be8c224bb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UserRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/UserRepository.java @@ -131,6 +131,8 @@ public class UserRepository extends EntityRepository { "profile,roles,teams,authenticationMechanism,isEmailVerified,personas,defaultPersona,domains,personaPreferences"; static final String USER_UPDATE_FIELDS = "profile,roles,teams,authenticationMechanism,isEmailVerified,personas,defaultPersona,domains,personaPreferences"; + private static final String OWNS_ENTITY_TYPE_PARAM = "ownsEntityType"; + private static final String DIRECT_OWNS_ONLY_PARAM = "directOwnsOnly"; private volatile EntityReference organization; private InheritedFieldEntitySearch inheritedFieldEntitySearch; @@ -844,58 +846,81 @@ private void fetchAndSetRoles(List users, Fields fields) { } private void fetchAndSetOwns(List users, Fields fields) { + fetchAndSetOwns(users, fields, null); + } + + private void fetchAndSetOwns(List users, Fields fields, ListFilter filter) { if (!fields.contains("owns") || users == null || users.isEmpty()) { return; } List userIds = users.stream().map(User::getId).map(UUID::toString).distinct().toList(); + String ownsEntityType = filter == null ? null : filter.getQueryParam(OWNS_ENTITY_TYPE_PARAM); + boolean directOwnsOnly = + filter != null && Boolean.parseBoolean(filter.getQueryParam(DIRECT_OWNS_ONLY_PARAM)); // Get entities owned by users List ownsRecords = - daoCollection - .relationshipDAO() - .findToBatchAllTypes(userIds, Relationship.OWNS.ordinal(), Include.ALL); + nullOrEmpty(ownsEntityType) + ? daoCollection + .relationshipDAO() + .findToBatchAllTypes(userIds, Relationship.OWNS.ordinal(), Include.ALL) + : daoCollection + .relationshipDAO() + .findToBatch(userIds, Relationship.OWNS.ordinal(), ownsEntityType, Include.ALL); // Also get entities owned by teams that users belong to // First get all teams for all users Map> userTeams = new HashMap<>(); - if (!fields.contains(TEAMS_FIELD)) { - // If teams weren't already fetched, we need to get them - List teamRecords = - daoCollection - .relationshipDAO() - .findFromBatch(userIds, Relationship.HAS.ordinal(), Entity.TEAM, USER); - for (CollectionDAO.EntityRelationshipObject record : teamRecords) { - UUID userId = UUID.fromString(record.getToId()); - EntityReference teamRef = - Entity.getEntityReferenceById( - Entity.TEAM, UUID.fromString(record.getFromId()), Include.ALL); - userTeams.computeIfAbsent(userId, k -> new ArrayList<>()).add(teamRef); - } - } else { - // Use already fetched teams - for (User user : users) { - if (user.getTeams() != null) { - userTeams.put(user.getId(), user.getTeams()); + if (!directOwnsOnly) { + if (!fields.contains(TEAMS_FIELD)) { + // If teams weren't already fetched, we need to get them + List teamRecords = + daoCollection + .relationshipDAO() + .findFromBatch(userIds, Relationship.HAS.ordinal(), Entity.TEAM, USER); + for (CollectionDAO.EntityRelationshipObject record : teamRecords) { + UUID userId = UUID.fromString(record.getToId()); + EntityReference teamRef = + Entity.getEntityReferenceById( + Entity.TEAM, UUID.fromString(record.getFromId()), Include.ALL); + userTeams.computeIfAbsent(userId, k -> new ArrayList<>()).add(teamRef); + } + } else { + // Use already fetched teams + for (User user : users) { + if (user.getTeams() != null) { + userTeams.put(user.getId(), user.getTeams()); + } } } } // Get entities owned by teams Set allTeamIds = - userTeams.values().stream() - .flatMap(List::stream) - .map(EntityReference::getId) - .map(UUID::toString) - .collect(Collectors.toSet()); + directOwnsOnly + ? Collections.emptySet() + : userTeams.values().stream() + .flatMap(List::stream) + .map(EntityReference::getId) + .map(UUID::toString) + .collect(Collectors.toSet()); List teamOwnsRecords = new ArrayList<>(); if (!allTeamIds.isEmpty()) { teamOwnsRecords = - daoCollection - .relationshipDAO() - .findToBatchAllTypes( - new ArrayList<>(allTeamIds), Relationship.OWNS.ordinal(), Include.ALL); + nullOrEmpty(ownsEntityType) + ? daoCollection + .relationshipDAO() + .findToBatchAllTypes( + new ArrayList<>(allTeamIds), Relationship.OWNS.ordinal(), Include.ALL) + : daoCollection + .relationshipDAO() + .findToBatch( + new ArrayList<>(allTeamIds), + Relationship.OWNS.ordinal(), + ownsEntityType, + Include.ALL); } // Map user to owned entities @@ -945,6 +970,23 @@ private void fetchAndSetOwns(List users, Fields fields) { } } + @Override + public void setFieldsInBulk(Fields fields, List users, ListFilter filter) { + if (fields.contains("owns") + && filter != null + && (!nullOrEmpty(filter.getQueryParam(OWNS_ENTITY_TYPE_PARAM)) + || Boolean.parseBoolean(filter.getQueryParam(DIRECT_OWNS_ONLY_PARAM)))) { + fetchAndSetFieldsExcept(users, fields, Set.of("owns")); + fetchAndSetOwns(users, fields, filter); + setInheritedFields(users, fields); + for (User user : users) { + clearFieldsInternal(user, fields); + } + return; + } + super.setFieldsInBulk(fields, users); + } + private void fetchAndSetFollows(List users, Fields fields) { if (!fields.contains("follows") || users == null || users.isEmpty()) { return; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/teams/TeamResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/teams/TeamResource.java index c5143f25fa32..5ae11d1210f7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/teams/TeamResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/teams/TeamResource.java @@ -219,6 +219,12 @@ public ResultList list( schema = @Schema(type = "boolean")) @QueryParam("isJoinable") Boolean isJoinable, + @Parameter( + description = + "When fields contains owns, only include owned entities of this entity type.", + schema = @Schema(type = "string", example = "pipeline")) + @QueryParam("ownsEntityType") + String ownsEntityType, @Parameter( description = "Include all, deleted, or non-deleted entities.", schema = @Schema(implementation = Include.class)) @@ -229,6 +235,9 @@ public ResultList list( if (isJoinable != null) { filter.addQueryParam("isJoinable", String.valueOf(isJoinable)); } + if (ownsEntityType != null) { + filter.addQueryParam("ownsEntityType", ownsEntityType); + } return super.listInternal( uriInfo, securityContext, fieldsParam, filter, limitParam, before, after); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/teams/UserResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/teams/UserResource.java index 552efc93e372..fd65d0f11ee8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/teams/UserResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/teams/UserResource.java @@ -291,6 +291,18 @@ public ResultList list( schema = @Schema(type = "boolean")) @QueryParam("isBot") Boolean isBot, + @Parameter( + description = + "When fields contains owns, only include owned entities of this entity type.", + schema = @Schema(type = "string", example = "pipeline")) + @QueryParam("ownsEntityType") + String ownsEntityType, + @Parameter( + description = + "When fields contains owns, only include entities directly owned by the user.", + schema = @Schema(type = "boolean")) + @QueryParam("directOwnsOnly") + Boolean directOwnsOnly, @Parameter( description = "Include all, deleted, or non-deleted entities.", schema = @Schema(implementation = Include.class)) @@ -304,6 +316,12 @@ public ResultList list( if (isBot != null) { filter.addQueryParam("isBot", String.valueOf(isBot)); } + if (ownsEntityType != null) { + filter.addQueryParam("ownsEntityType", ownsEntityType); + } + if (directOwnsOnly != null) { + filter.addQueryParam("directOwnsOnly", String.valueOf(directOwnsOnly)); + } ResultList users = listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after); users.getData().forEach(user -> decryptOrNullify(securityContext, user)); diff --git a/openmetadata-spec/src/main/resources/json/schema/api/lineage/openlineage/openLineageFacets.json b/openmetadata-spec/src/main/resources/json/schema/api/lineage/openlineage/openLineageFacets.json index 2eea849afc28..c5961f8a1136 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/lineage/openlineage/openLineageFacets.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/lineage/openlineage/openLineageFacets.json @@ -249,7 +249,7 @@ } }, "ownershipFacet": { - "description": "Ownership facet providing owner information for the dataset.", + "description": "Ownership facet providing owner information for the dataset or job.", "type": "object", "allOf": [ { "$ref": "#/definitions/baseFacet" } @@ -386,6 +386,9 @@ "properties": { "sql": { "$ref": "#/definitions/sqlJobFacet" + }, + "ownership": { + "$ref": "#/definitions/ownershipFacet" } }, "additionalProperties": true diff --git a/openmetadata-spec/src/main/resources/json/schema/configuration/openLineageSettings.json b/openmetadata-spec/src/main/resources/json/schema/configuration/openLineageSettings.json index 44882a9d863c..de46cde86a6d 100644 --- a/openmetadata-spec/src/main/resources/json/schema/configuration/openLineageSettings.json +++ b/openmetadata-spec/src/main/resources/json/schema/configuration/openLineageSettings.json @@ -25,6 +25,13 @@ "enum": ["START", "RUNNING", "COMPLETE", "ABORT", "FAIL", "OTHER"] } }, + "ownershipUpdateMode": { + "description": "Set how owners from OpenLineage job ownership facets update Pipeline owners. In replace mode, resolved owners from the current event replace existing owners. In append mode, resolved owners are appended to active existing Pipeline owners.", + "type": "string", + "enum": ["replace", "append"], + "default": "replace", + "title": "Ownership Update Mode" + }, "defaultPipelineService": { "description": "Name of the Pipeline Service to use when auto-creating Pipeline entities from OpenLineage jobs. This service must exist in OpenMetadata.", "type": "string", diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/pipelineServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/pipelineServiceMetadataPipeline.json index e8d7503113fb..c5a8d0bc39f9 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/pipelineServiceMetadataPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/pipelineServiceMetadataPipeline.json @@ -30,6 +30,13 @@ "default": true, "title": "Include Owners" }, + "ownershipUpdateMode": { + "description": "Set how owners from source metadata update Pipeline owners. In replace mode, resolved owners from the current source replace existing owners. In append mode, resolved owners are appended to active existing Pipeline owners.", + "type": "string", + "enum": ["replace", "append"], + "default": "replace", + "title": "Ownership Update Mode" + }, "pipelineFilterPattern": { "description": "Regex exclude pipelines.", "$ref": "../type/filterPattern.json#/definitions/filterPattern", diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/openlineage/openLineageBatchRequest.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/openlineage/openLineageBatchRequest.ts index a66110a2c264..8053641674da 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/openlineage/openLineageBatchRequest.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/openlineage/openLineageBatchRequest.ts @@ -218,7 +218,7 @@ export interface DocumentationFacet { } /** - * Ownership facet providing owner information for the dataset. + * Ownership facet providing owner information for the dataset or job. * * Base facet that all facets extend from. */ @@ -357,7 +357,8 @@ export interface OpenLineageJob { * Facets that can be attached to a job. */ export interface JobFacets { - sql?: SQLJobFacet; + ownership?: OwnershipFacet; + sql?: SQLJobFacet; [property: string]: any; } diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/openlineage/openLineageRunEvent.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/openlineage/openLineageRunEvent.ts index f153a1e48a00..783dd76664bd 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/openlineage/openLineageRunEvent.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/lineage/openlineage/openLineageRunEvent.ts @@ -208,7 +208,7 @@ export interface DocumentationFacet { } /** - * Ownership facet providing owner information for the dataset. + * Ownership facet providing owner information for the dataset or job. * * Base facet that all facets extend from. */ @@ -347,7 +347,8 @@ export interface OpenLineageJob { * Facets that can be attached to a job. */ export interface JobFacets { - sql?: SQLJobFacet; + ownership?: OwnershipFacet; + sql?: SQLJobFacet; [property: string]: any; } diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts index 77997747ec73..49ff47839dc8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts @@ -656,6 +656,12 @@ export interface Pipeline { * etc., with that Pipeline will be deleted */ markDeletedPipelines?: boolean; + /** + * Set how owners from source metadata update Pipeline owners. In replace mode, resolved + * owners from the current source replace existing owners. In append mode, resolved owners + * are appended to active existing Pipeline owners. + */ + ownershipUpdateMode?: OwnershipUpdateMode; /** * Regex exclude pipelines. */ @@ -2884,6 +2890,16 @@ export interface OwnerConfiguration { table?: { [key: string]: string[] | string } | string; } +/** + * Set how owners from source metadata update Pipeline owners. In replace mode, resolved + * owners from the current source replace existing owners. In append mode, resolved owners + * are appended to active existing Pipeline owners. + */ +export enum OwnershipUpdateMode { + Append = "append", + Replace = "replace", +} + /** * A single access grant entry. The per-service shape lives under `config`. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/configuration/openLineageSettings.ts b/openmetadata-ui/src/main/resources/ui/src/generated/configuration/openLineageSettings.ts index 0e614ae76ffb..c5ed46744396 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/configuration/openLineageSettings.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/configuration/openLineageSettings.ts @@ -40,6 +40,12 @@ export interface OpenLineageSettings { * 'prod-postgres' */ namespaceToServiceMapping?: { [key: string]: string }; + /** + * Set how owners from OpenLineage job ownership facets update Pipeline owners. In replace + * mode, resolved owners from the current event replace existing owners. In append mode, + * resolved owners are appended to active existing Pipeline owners. + */ + ownershipUpdateMode?: OwnershipUpdateMode; } export enum EventTypeFilter { @@ -50,3 +56,13 @@ export enum EventTypeFilter { Running = "RUNNING", Start = "START", } + +/** + * Set how owners from OpenLineage job ownership facets update Pipeline owners. In replace + * mode, resolved owners from the current event replace existing owners. In append mode, + * resolved owners are appended to active existing Pipeline owners. + */ +export enum OwnershipUpdateMode { + Append = "append", + Replace = "replace", +} diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts index 023a53415a34..a9b4c35b8a90 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts @@ -1355,6 +1355,12 @@ export interface Pipeline { * etc., with that Pipeline will be deleted */ markDeletedPipelines?: boolean; + /** + * Set how owners from source metadata update Pipeline owners. In replace mode, resolved + * owners from the current source replace existing owners. In append mode, resolved owners + * are appended to active existing Pipeline owners. + */ + ownershipUpdateMode?: OwnershipUpdateMode; /** * Regex exclude pipelines. */ @@ -3416,6 +3422,16 @@ export interface OwnerConfiguration { table?: { [key: string]: string[] | string } | string; } +/** + * Set how owners from source metadata update Pipeline owners. In replace mode, resolved + * owners from the current source replace existing owners. In append mode, resolved owners + * are appended to active existing Pipeline owners. + */ +export enum OwnershipUpdateMode { + Append = "append", + Replace = "replace", +} + /** * A single access grant entry. The per-service shape lives under `config`. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/pipelineServiceMetadataPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/pipelineServiceMetadataPipeline.ts index 127acfa0dfaa..9b2ecd0e7f9f 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/pipelineServiceMetadataPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/pipelineServiceMetadataPipeline.ts @@ -57,6 +57,12 @@ export interface PipelineServiceMetadataPipeline { * applicable for fields like description, tags, owner and displayName */ overrideMetadata?: boolean; + /** + * Set how owners from source metadata update Pipeline owners. In replace mode, resolved + * owners from the current source replace existing owners. In append mode, resolved owners + * are appended to active existing Pipeline owners. + */ + ownershipUpdateMode?: OwnershipUpdateMode; /** * Regex exclude pipelines. */ @@ -91,6 +97,16 @@ export interface LineageInformation { [property: string]: any; } +/** + * Set how owners from source metadata update Pipeline owners. In replace mode, resolved + * owners from the current source replace existing owners. In append mode, resolved owners + * are appended to active existing Pipeline owners. + */ +export enum OwnershipUpdateMode { + Append = "append", + Replace = "replace", +} + /** * Regex exclude pipelines. * diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts index ab4e11e361b7..fc619f11431b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts @@ -5704,6 +5704,12 @@ export interface Pipeline { * etc., with that Pipeline will be deleted */ markDeletedPipelines?: boolean; + /** + * Set how owners from source metadata update Pipeline owners. In replace mode, resolved + * owners from the current source replace existing owners. In append mode, resolved owners + * are appended to active existing Pipeline owners. + */ + ownershipUpdateMode?: OwnershipUpdateMode; /** * Regex exclude pipelines. */ @@ -7590,6 +7596,16 @@ export interface OwnerConfiguration { table?: { [key: string]: string[] | string } | string; } +/** + * Set how owners from source metadata update Pipeline owners. In replace mode, resolved + * owners from the current source replace existing owners. In append mode, resolved owners + * are appended to active existing Pipeline owners. + */ +export enum OwnershipUpdateMode { + Append = "append", + Replace = "replace", +} + /** * A single access grant entry. The per-service shape lives under `config`. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/settings/settings.ts b/openmetadata-ui/src/main/resources/ui/src/generated/settings/settings.ts index 15be99b3be11..3d48975f6f2a 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/settings/settings.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/settings/settings.ts @@ -568,6 +568,12 @@ export interface PipelineServiceClientConfiguration { * 'prod-postgres' */ namespaceToServiceMapping?: { [key: string]: string }; + /** + * Set how owners from OpenLineage job ownership facets update Pipeline owners. In replace + * mode, resolved owners from the current event replace existing owners. In append mode, + * resolved owners are appended to active existing Pipeline owners. + */ + ownershipUpdateMode?: OwnershipUpdateMode; /** * List of allowed origins for CORS on OAuth endpoints. Use specific origins for production * security. Wildcard (*) is NOT recommended. @@ -2579,6 +2585,16 @@ export interface TitleSection { [property: string]: any; } +/** + * Set how owners from OpenLineage job ownership facets update Pipeline owners. In replace + * mode, resolved owners from the current event replace existing owners. In append mode, + * resolved owners are appended to active existing Pipeline owners. + */ +export enum OwnershipUpdateMode { + Append = "append", + Replace = "replace", +} + /** * Pipeline View Mode for Lineage. *