Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
TopicDetails,
TopicFQN,
)
from metadata.ingestion.source.pipeline.openlineage.ownership_resolver import (
OpenLineageOwnerResolver,
)
from metadata.ingestion.source.pipeline.openlineage.service_resolver import (
build_service_name,
extract_integration_type,
Expand Down Expand Up @@ -130,6 +133,11 @@
def prepare(self):
self._service_cache = {}
self._current_pipeline_service = None
self._owner_resolver = OpenLineageOwnerResolver(
self.metadata,
include_owners=self.source_config.includeOwners,
ownership_update_mode=self.source_config.ownershipUpdateMode,
)
self._entity_cache: LRUCache = LRUCache(maxsize=10000)
self._namespace_to_service_cache: LRUCache = LRUCache(maxsize=10000)
self._resolution_cache: LRUCache = LRUCache(maxsize=RESOLUTION_CACHE_MAXSIZE)
Expand Down Expand Up @@ -562,7 +570,7 @@
return found[0]
raise FQNNotFoundException(f"Table FQN not found for {table_details}")

def _build_broker_to_service_map(self) -> Dict[str, str]: # noqa: UP006

Check failure on line 573 in ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5Q-dqXhtnCn4Wl_UlB&open=AZ5Q-dqXhtnCn4Wl_UlB&pullRequest=28381
"""
Build a cache mapping broker hostnames to messaging service FQNs.
Reads each messaging service's connection config to extract bootstrapServers.
Expand Down Expand Up @@ -718,7 +726,7 @@

return result

def _get_column_lineage(self, inputs: List, outputs: List) -> Dict[str, Dict[str, List[ColumnLineage]]]: # noqa: UP006

Check failure on line 729 in ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 30 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5yShKtePkx32pViTAa&open=AZ5yShKtePkx32pViTAa&pullRequest=28381
_result: List = [] # noqa: UP006

ol_name_to_fqn_map = self._build_ol_name_to_fqn_map(inputs + outputs)
Expand Down Expand Up @@ -797,13 +805,24 @@
pipeline_name = self.get_pipeline_name(pipeline_details)
self._current_pipeline_service = self._resolve_pipeline_service(pipeline_details)
try:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self._current_pipeline_service,
pipeline_name=pipeline_name,
)
owners = self._owner_resolver.get_pipeline_job_owners(
pipeline_details.job,
pipeline_fqn=pipeline_fqn,
)
description = f"""```json
{json.dumps(pipeline_details.run_facet, indent=4).strip()}```"""
request = CreatePipelineRequest(
request = CreatePipelineRequest( # pyright: ignore[reportCallIssue]
name=pipeline_name,
service=self._current_pipeline_service,
description=description,
tasks=[],
owners=owners,
)

yield Either(right=request)
Expand Down Expand Up @@ -914,7 +933,7 @@
f"Failed to cleanup pipeline-as-node edges for {pipeline_entity.fullyQualifiedName.root}: {exc}"
)

def yield_pipeline_lineage_details(self, pipeline_details: OpenLineageEvent) -> Iterable[Either[AddLineageRequest]]: # noqa: C901

Check failure on line 936 in ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 50 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5Q-dqXhtnCn4Wl_UlC&open=AZ5Q-dqXhtnCn4Wl_UlC&pullRequest=28381
# Start every event with a fresh resolution cache so results never
# leak across events and the cache cannot grow without bound.
self._resolution_cache = LRUCache(maxsize=RESOLUTION_CACHE_MAXSIZE)
Expand Down Expand Up @@ -1176,7 +1195,7 @@
def get_pipeline_name(self, pipeline_details: OpenLineageEvent) -> str:
return OpenlineageSource._render_pipeline_name(pipeline_details)

def yield_pipeline_status(self, pipeline_details: OpenLineageEvent) -> Iterable[Either[OMetaPipelineStatus]]:

Check failure on line 1198 in ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5Q-dqXhtnCn4Wl_UlD&open=AZ5Q-dqXhtnCn4Wl_UlD&pullRequest=28381
pass

def mark_pipelines_as_deleted(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, List, Optional, Set, Tuple # noqa: UP035

from metadata.generated.schema.entity.teams.team import Team, TeamType
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
OwnershipUpdateMode,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()

OWNER_CACHE_PAGE_SIZE = 1000
OWNER_PIPELINE_ENTITY = "pipeline"
OWNER_TEAM_ENTITY = "team"
OWNER_USER_ENTITY = "user"


class OpenLineageOwnerResolver:
"""
Resolve OpenLineage job ownership facet owners.

OpenLineage recommends owner identifiers such as ``team:data`` and
``user:jdoe``. Qualified identifiers are resolved against their matching
OpenMetadata entity type. Unqualified names are resolved as Group team
first, then user, with a warning when both exist.
"""

def __init__(
self,
metadata: OpenMetadata,
include_owners: bool | None,
ownership_update_mode: OwnershipUpdateMode | str | None,
):
self.metadata = metadata
self.include_owners = bool(include_owners)
self.ownership_update_mode = OwnershipUpdateMode(
ownership_update_mode.value
if isinstance(ownership_update_mode, OwnershipUpdateMode)
else ownership_update_mode or OwnershipUpdateMode.replace.value
)
self._owner_cache_loaded = False
self._team_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
self._user_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
self._owner_refs_by_pipeline_fqn: Dict[str, List[EntityReference]] = {} # noqa: UP006

def get_pipeline_job_owners(

Check failure on line 60 in ingestion/src/metadata/ingestion/source/pipeline/openlineage/ownership_resolver.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 18 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5Q-dihhtnCn4Wl_Uk-&open=AZ5Q-dihhtnCn4Wl_Uk-&pullRequest=28381
self,
job: Dict[str, Any], # noqa: UP006
pipeline_fqn: Optional[str] = None, # noqa: UP045
) -> Optional[EntityReferenceList]: # noqa: UP045
"""
Resolve owners from ``job.facets.ownership.owners``.

``replace`` returns owners resolved from the current event. ``append``
returns active existing Pipeline owners plus newly resolved owners.
"""
if not self.include_owners:
return None

owners = (((job or {}).get("facets") or {}).get("ownership") or {}).get("owners") or []
if not isinstance(owners, list) or not owners:
return None

self._ensure_pipeline_owner_cache()

resolved: List[EntityReference] = [] # noqa: UP006
seen: Set[Tuple[str, str]] = set() # noqa: UP006

for owner in owners:
owner_ref = self._get_owner_ref(owner)
if not owner_ref:
continue

ref_key = (owner_ref.type, str(owner_ref.id))
if ref_key not in seen:
resolved.append(owner_ref)
seen.add(ref_key)

if not resolved:
return None

if self.ownership_update_mode != OwnershipUpdateMode.append or not pipeline_fqn:
return EntityReferenceList(root=resolved)

existing_owners = self._owner_refs_by_pipeline_fqn.get(pipeline_fqn, [])
merged_owners = list(existing_owners)
seen_refs = {(owner_ref.type, str(owner_ref.id)) for owner_ref in existing_owners}
for owner_ref in resolved:
ref_key = (owner_ref.type, str(owner_ref.id))
if ref_key not in seen_refs:
merged_owners.append(owner_ref)
seen_refs.add(ref_key)

self._owner_refs_by_pipeline_fqn[pipeline_fqn] = merged_owners
return EntityReferenceList(root=merged_owners)

def _get_owner_ref(self, owner: Any) -> Optional[EntityReference]: # noqa: UP045
"""
Resolve a single OpenLineage ownership owner object to an OpenMetadata owner reference.

Qualified names such as ``team:data-platform`` and ``user:jdoe`` are resolved against the
matching entity cache. Unqualified names are resolved as Group team first, then user.
"""
if not isinstance(owner, dict):
return None

raw_owner_name = owner.get("name")
if not isinstance(raw_owner_name, str) or not raw_owner_name:
return None

owner_type, separator, owner_name = raw_owner_name.partition(":")
if separator:
owner_type = owner_type.lower()
else:
owner_type = None
owner_name = raw_owner_name

owner_key = owner_name.strip().lower()
if not owner_key:
return None

if owner_type == OWNER_TEAM_ENTITY:
owner_ref = self._team_owner_ref_by_name.get(owner_key)
elif owner_type == OWNER_USER_ENTITY:
owner_ref = self._user_owner_ref_by_name.get(owner_key)
else:
team_ref = self._team_owner_ref_by_name.get(owner_key)
user_ref = self._user_owner_ref_by_name.get(owner_key)
if team_ref and user_ref:
logger.warning(
f"OpenLineage owner [{raw_owner_name}] matched both a team "
"and a user. Using the team for pipeline ownership."
)
owner_ref = team_ref or user_ref

if not owner_ref:
logger.warning(f"Unable to resolve OpenLineage owner [{raw_owner_name}] for pipeline ownership.")
return owner_ref

def _ensure_pipeline_owner_cache(self) -> None:

Check failure on line 154 in ingestion/src/metadata/ingestion/source/pipeline/openlineage/ownership_resolver.py

View check run for this annotation

SonarQubeCloud / [open-metadata-ingestion] SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 47 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=open-metadata-ingestion&issues=AZ5Q-dihhtnCn4Wl_Uk_&open=AZ5Q-dihhtnCn4Wl_Uk_&pullRequest=28381
"""
Load OpenMetadata Group teams, users, and pipeline ownership references.
"""
if self._owner_cache_loaded:
return

team_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
user_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
owner_refs_by_pipeline_fqn: Dict[str, List[EntityReference]] = {} # noqa: UP006

try:
for team in self.metadata.list_all_entities(
entity=Team,
fields=["teamType", "owns"],
limit=OWNER_CACHE_PAGE_SIZE,
params={"ownsEntityType": OWNER_PIPELINE_ENTITY},
skip_on_failure=True,
):
if team.teamType != TeamType.Group:
continue
team_name = model_str(team.name).strip().lower()
if team_name:
team_ref = team_owner_ref_by_name.setdefault(
team_name,
EntityReference( # pyright: ignore[reportCallIssue]
id=model_str(team.id),
Comment thread
gitar-bot[bot] marked this conversation as resolved.
type=OWNER_TEAM_ENTITY,
name=model_str(team.name),
displayName=team.displayName,
),
)
for owned_pipeline in team.owns.root if team.owns else []:
pipeline_fqn_value = owned_pipeline.fullyQualifiedName or owned_pipeline.name
if owned_pipeline.type == OWNER_PIPELINE_ENTITY and pipeline_fqn_value:
pipeline_fqn = model_str(pipeline_fqn_value)
pipeline_owner_refs = owner_refs_by_pipeline_fqn.setdefault(pipeline_fqn, [])
if not any(
owner.type == team_ref.type and str(owner.id) == str(team_ref.id)
for owner in pipeline_owner_refs
):
pipeline_owner_refs.append(team_ref)
except Exception as exc:
logger.warning(f"Unable to load OpenMetadata teams for owner cache: {exc}")
Comment thread
jsingh-yelp marked this conversation as resolved.

try:
for user in self.metadata.list_all_entities(
entity=User,
fields=["owns"],
limit=OWNER_CACHE_PAGE_SIZE,
params={
"ownsEntityType": OWNER_PIPELINE_ENTITY,
"directOwnsOnly": "true",
},
skip_on_failure=True,
):
user_name = model_str(user.name).strip().lower()
if user_name:
user_ref = user_owner_ref_by_name.setdefault(
user_name,
EntityReference( # pyright: ignore[reportCallIssue]
id=model_str(user.id),
type=OWNER_USER_ENTITY,
name=model_str(user.name),
displayName=user.displayName,
),
)
for owned_pipeline in user.owns.root if user.owns else []:
pipeline_fqn_value = owned_pipeline.fullyQualifiedName or owned_pipeline.name
if owned_pipeline.type == OWNER_PIPELINE_ENTITY and pipeline_fqn_value:
pipeline_fqn = model_str(pipeline_fqn_value)
pipeline_owner_refs = owner_refs_by_pipeline_fqn.setdefault(pipeline_fqn, [])
if not any(
owner.type == user_ref.type and str(owner.id) == str(user_ref.id)
for owner in pipeline_owner_refs
):
pipeline_owner_refs.append(user_ref)
except Exception as exc:
logger.warning(f"Unable to load OpenMetadata users for owner cache: {exc}")

self._team_owner_ref_by_name = team_owner_ref_by_name
self._user_owner_ref_by_name = user_owner_ref_by_name
self._owner_refs_by_pipeline_fqn = owner_refs_by_pipeline_fqn
self._owner_cache_loaded = True
67 changes: 66 additions & 1 deletion ingestion/tests/unit/topology/pipeline/test_openlineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import unittest
from pathlib import Path
from unittest.mock import MagicMock, Mock, patch
from uuid import UUID
from uuid import UUID, uuid4

from cachetools import LRUCache

Expand Down Expand Up @@ -35,6 +35,7 @@
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityLineage import ColumnLineage
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.source.pipeline.openlineage.metadata import (
RESOLUTION_CACHE_MAXSIZE,
OpenlineageSource,
Expand All @@ -48,6 +49,7 @@
from metadata.ingestion.source.pipeline.openlineage.utils import (
message_to_open_lineage_event,
)
from metadata.utils import fqn

MOCK_WORKFLOW_CONFIG = {
"openMetadataServerConfig": {
Expand Down Expand Up @@ -956,6 +958,69 @@ def test_get_pipelines_list(self):
self.assertIsInstance(ol_event, OpenLineageEvent)
self.assertEqual(ol_event, EXPECTED_OL_EVENT)

def test_yield_pipeline_sets_owners_from_job_ownership_facet(self):
"""Test pipeline owners are populated from OpenLineage job ownership facet."""
ol_event = copy.deepcopy(EXPECTED_OL_EVENT)
ol_event.job = {
**ol_event.job,
"facets": {"ownership": {"owners": [{"name": "team:data-platform", "type": "OWNER"}]}},
}
owners = EntityReferenceList(
root=[
EntityReference(
id=uuid4(),
type="team",
name="data-platform",
displayName="Data Platform",
)
]
)
owner_resolver = Mock()
owner_resolver.get_pipeline_job_owners.return_value = owners
self.open_lineage_source._owner_resolver = owner_resolver

with (
patch.object(
self.open_lineage_source,
"_resolve_pipeline_service",
return_value=MOCK_PIPELINE_SERVICE.name.root,
),
patch.object(self.open_lineage_source, "register_record"),
):
results = list(self.open_lineage_source.yield_pipeline(ol_event))

self.assertEqual(len(results), 1)
self.assertEqual(results[0].right.owners, owners)
owner_resolver.get_pipeline_job_owners.assert_called_once()
self.assertEqual(
owner_resolver.get_pipeline_job_owners.call_args.args,
(ol_event.job,),
)
self.assertEqual(
owner_resolver.get_pipeline_job_owners.call_args.kwargs["pipeline_fqn"],
fqn.build(
metadata=self.open_lineage_source.metadata,
entity_type=Pipeline,
service_name=MOCK_PIPELINE_SERVICE.name.root,
pipeline_name=self.open_lineage_source.get_pipeline_name(ol_event),
),
)

def test_prepare_passes_include_owners_to_owner_resolver(self):
self.open_lineage_source.source_config.includeOwners = False

with (
patch.object(self.open_lineage_source, "_build_db_service_type_map", return_value={}),
patch("metadata.ingestion.source.pipeline.openlineage.metadata.OpenLineageOwnerResolver") as resolver_cls,
):
self.open_lineage_source.prepare()

resolver_cls.assert_called_once_with(
self.open_lineage_source.metadata,
include_owners=False,
ownership_update_mode=self.open_lineage_source.source_config.ownershipUpdateMode,
)

@patch("metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om")
def test_yield_pipeline_lineage_details(self, mock_get_table_from_om):
def t_fqn_build_side_effect(
Expand Down
Loading
Loading