Skip to content

Commit 12d0abd

Browse files
committed
add openlineage job ownership support in ingestion pipeline
1 parent c9a5bdb commit 12d0abd

12 files changed

Lines changed: 696 additions & 36 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@
7070
TopicDetails,
7171
TopicFQN,
7272
)
73+
from metadata.ingestion.source.pipeline.openlineage.ownership_resolver import (
74+
OpenLineageOwnerResolver,
75+
)
7376
from metadata.ingestion.source.pipeline.openlineage.service_resolver import (
7477
build_service_name,
7578
extract_integration_type,
@@ -124,6 +127,11 @@ def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str
124127
def prepare(self):
125128
self._service_cache = {}
126129
self._current_pipeline_service = None
130+
self._owner_resolver = OpenLineageOwnerResolver(
131+
self.metadata,
132+
include_owners=self.source_config.includeOwners,
133+
ownership_update_mode=self.source_config.ownershipUpdateMode,
134+
)
127135
self._entity_cache: LRUCache = LRUCache(maxsize=10000)
128136
self._namespace_to_service_cache: LRUCache = LRUCache(maxsize=10000)
129137
self._db_service_type_map: Dict[str, str] = self._build_db_service_type_map() # noqa: UP006
@@ -730,13 +738,19 @@ def yield_pipeline(self, pipeline_details: OpenLineageEvent) -> Iterable[Either[
730738
pipeline_name = self.get_pipeline_name(pipeline_details)
731739
self._current_pipeline_service = self._resolve_pipeline_service(pipeline_details)
732740
try:
741+
pipeline_fqn = f"{self._current_pipeline_service}.{pipeline_name}"
742+
owners = self._owner_resolver.get_pipeline_job_owners(
743+
pipeline_details.job,
744+
pipeline_fqn=pipeline_fqn,
745+
)
733746
description = f"""```json
734747
{json.dumps(pipeline_details.run_facet, indent=4).strip()}```"""
735748
request = CreatePipelineRequest(
736749
name=pipeline_name,
737750
service=self._current_pipeline_service,
738751
description=description,
739752
tasks=[],
753+
owners=owners,
740754
)
741755

742756
yield Either(right=request)
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
from typing import Any, Dict, List, Optional, Set, Tuple # noqa: UP035
12+
13+
from metadata.generated.schema.entity.teams.team import Team, TeamType
14+
from metadata.generated.schema.entity.teams.user import User
15+
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
16+
OwnershipUpdateMode,
17+
)
18+
from metadata.generated.schema.type.entityReference import EntityReference
19+
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
20+
from metadata.ingestion.ometa.ometa_api import OpenMetadata
21+
from metadata.ingestion.ometa.utils import model_str
22+
from metadata.utils.logger import ingestion_logger
23+
24+
logger = ingestion_logger()
25+
26+
OWNER_CACHE_PAGE_SIZE = 1000
27+
OWNER_PIPELINE_ENTITY = "pipeline"
28+
OWNER_TEAM_ENTITY = "team"
29+
OWNER_USER_ENTITY = "user"
30+
31+
32+
class OpenLineageOwnerResolver:
33+
"""
34+
Resolve OpenLineage job ownership facet owners.
35+
36+
OpenLineage recommends owner identifiers such as ``team:data`` and
37+
``user:jdoe``. Qualified identifiers are resolved against their matching
38+
OpenMetadata entity type. Unqualified names are resolved as Group team
39+
first, then user, with a warning when both exist.
40+
"""
41+
42+
def __init__(
43+
self,
44+
metadata: OpenMetadata,
45+
include_owners: bool = True,
46+
ownership_update_mode: OwnershipUpdateMode = OwnershipUpdateMode.replace,
47+
):
48+
self.metadata = metadata
49+
self.include_owners = include_owners
50+
self.ownership_update_mode = (
51+
ownership_update_mode
52+
if isinstance(ownership_update_mode, OwnershipUpdateMode)
53+
else OwnershipUpdateMode(ownership_update_mode)
54+
)
55+
self._team_owner_ref_by_name: Optional[Dict[str, EntityReference]] = None # noqa: UP006, UP045
56+
self._user_owner_ref_by_name: Optional[Dict[str, EntityReference]] = None # noqa: UP006, UP045
57+
self._owner_refs_by_pipeline_fqn: Optional[Dict[str, List[EntityReference]]] = None # noqa: UP006, UP045
58+
59+
def get_pipeline_job_owners(
60+
self,
61+
job: Dict[str, Any], # noqa: UP006
62+
pipeline_fqn: Optional[str] = None, # noqa: UP045
63+
) -> Optional[EntityReferenceList]: # noqa: UP045
64+
"""
65+
Resolve owners from ``job.facets.ownership.owners``.
66+
67+
``replace`` returns owners resolved from the current event. ``append``
68+
returns active existing Pipeline owners plus newly resolved owners.
69+
"""
70+
if not self.include_owners:
71+
return None
72+
73+
owners = (((job or {}).get("facets") or {}).get("ownership") or {}).get("owners") or []
74+
if not isinstance(owners, list) or not owners:
75+
return None
76+
77+
self._ensure_pipeline_owner_cache()
78+
79+
resolved: List[EntityReference] = [] # noqa: UP006
80+
seen: Set[Tuple[str, str]] = set() # noqa: UP006
81+
82+
for owner in owners:
83+
if not isinstance(owner, dict):
84+
continue
85+
86+
raw_owner_name = owner.get("name")
87+
if not isinstance(raw_owner_name, str) or not raw_owner_name:
88+
continue
89+
90+
owner_type, separator, owner_name = raw_owner_name.partition(":")
91+
if separator:
92+
owner_type = owner_type.lower()
93+
else:
94+
owner_type = None
95+
owner_name = raw_owner_name
96+
97+
owner_key = owner_name.strip().lower()
98+
if not owner_key:
99+
continue
100+
101+
owner_ref = None
102+
if owner_type == OWNER_TEAM_ENTITY:
103+
owner_ref = self._team_owner_ref_by_name.get(owner_key)
104+
elif owner_type == OWNER_USER_ENTITY:
105+
owner_ref = self._user_owner_ref_by_name.get(owner_key)
106+
else:
107+
team_ref = self._team_owner_ref_by_name.get(owner_key)
108+
user_ref = self._user_owner_ref_by_name.get(owner_key)
109+
if team_ref and user_ref:
110+
logger.warning(
111+
f"OpenLineage owner [{raw_owner_name}] matched both a team "
112+
"and a user. Using the team for pipeline ownership."
113+
)
114+
owner_ref = team_ref or user_ref
115+
116+
if not owner_ref:
117+
logger.warning(
118+
f"Unable to resolve OpenLineage owner [{raw_owner_name}] "
119+
"for pipeline ownership."
120+
)
121+
continue
122+
123+
ref_key = (owner_ref.type, str(owner_ref.id))
124+
if ref_key not in seen:
125+
resolved.append(owner_ref)
126+
seen.add(ref_key)
127+
128+
if not resolved:
129+
return None
130+
131+
if self.ownership_update_mode != OwnershipUpdateMode.append or not pipeline_fqn:
132+
return EntityReferenceList(root=resolved)
133+
134+
existing_owners = self._owner_refs_by_pipeline_fqn.get(pipeline_fqn, [])
135+
merged_owners = list(existing_owners)
136+
seen_refs = {(owner_ref.type, str(owner_ref.id)) for owner_ref in existing_owners}
137+
for owner_ref in resolved:
138+
ref_key = (owner_ref.type, str(owner_ref.id))
139+
if ref_key not in seen_refs:
140+
merged_owners.append(owner_ref)
141+
seen_refs.add(ref_key)
142+
143+
self._owner_refs_by_pipeline_fqn[pipeline_fqn] = merged_owners
144+
return EntityReferenceList(root=merged_owners)
145+
146+
def _ensure_pipeline_owner_cache(self) -> None:
147+
"""
148+
Load OpenMetadata Group teams, users, and pipeline ownership references.
149+
"""
150+
if (
151+
self._team_owner_ref_by_name is not None
152+
and self._user_owner_ref_by_name is not None
153+
and self._owner_refs_by_pipeline_fqn is not None
154+
):
155+
return
156+
157+
team_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
158+
user_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
159+
owner_refs_by_pipeline_fqn: Dict[str, List[EntityReference]] = {} # noqa: UP006
160+
161+
try:
162+
for team in self.metadata.list_all_entities(
163+
entity=Team,
164+
fields=["teamType", "owns"],
165+
limit=OWNER_CACHE_PAGE_SIZE,
166+
params={"ownsEntityType": OWNER_PIPELINE_ENTITY},
167+
skip_on_failure=True,
168+
):
169+
if team.teamType != TeamType.Group:
170+
continue
171+
team_name = model_str(team.name).strip().lower()
172+
if team_name:
173+
team_ref = team_owner_ref_by_name.setdefault(
174+
team_name,
175+
EntityReference(
176+
id=model_str(team.id),
177+
type=OWNER_TEAM_ENTITY,
178+
name=model_str(team.name),
179+
displayName=team.displayName,
180+
),
181+
)
182+
for owned_pipeline in team.owns.root if team.owns else []:
183+
pipeline_fqn_value = owned_pipeline.fullyQualifiedName or owned_pipeline.name
184+
if owned_pipeline.type == OWNER_PIPELINE_ENTITY and pipeline_fqn_value:
185+
pipeline_fqn = model_str(pipeline_fqn_value)
186+
pipeline_owner_refs = owner_refs_by_pipeline_fqn.setdefault(
187+
pipeline_fqn, []
188+
)
189+
if not any(
190+
owner.type == team_ref.type and str(owner.id) == str(team_ref.id)
191+
for owner in pipeline_owner_refs
192+
):
193+
pipeline_owner_refs.append(team_ref)
194+
except Exception as exc:
195+
logger.warning(f"Unable to load OpenMetadata teams for owner cache: {exc}")
196+
197+
try:
198+
for user in self.metadata.list_all_entities(
199+
entity=User,
200+
fields=["owns"],
201+
limit=OWNER_CACHE_PAGE_SIZE,
202+
params={
203+
"ownsEntityType": OWNER_PIPELINE_ENTITY,
204+
"directOwnsOnly": "true",
205+
},
206+
skip_on_failure=True,
207+
):
208+
user_name = model_str(user.name).strip().lower()
209+
if user_name:
210+
user_ref = user_owner_ref_by_name.setdefault(
211+
user_name,
212+
EntityReference(
213+
id=model_str(user.id),
214+
type=OWNER_USER_ENTITY,
215+
name=model_str(user.name),
216+
displayName=user.displayName,
217+
),
218+
)
219+
for owned_pipeline in user.owns.root if user.owns else []:
220+
pipeline_fqn_value = owned_pipeline.fullyQualifiedName or owned_pipeline.name
221+
if owned_pipeline.type == OWNER_PIPELINE_ENTITY and pipeline_fqn_value:
222+
pipeline_fqn = model_str(pipeline_fqn_value)
223+
pipeline_owner_refs = owner_refs_by_pipeline_fqn.setdefault(
224+
pipeline_fqn, []
225+
)
226+
if not any(
227+
owner.type == user_ref.type and str(owner.id) == str(user_ref.id)
228+
for owner in pipeline_owner_refs
229+
):
230+
pipeline_owner_refs.append(user_ref)
231+
except Exception as exc:
232+
logger.warning(f"Unable to load OpenMetadata users for owner cache: {exc}")
233+
234+
self._team_owner_ref_by_name = team_owner_ref_by_name
235+
self._user_owner_ref_by_name = user_owner_ref_by_name
236+
self._owner_refs_by_pipeline_fqn = owner_refs_by_pipeline_fqn

ingestion/tests/unit/topology/pipeline/test_openlineage.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import unittest
55
from pathlib import Path
66
from unittest.mock import MagicMock, Mock, patch
7-
from uuid import UUID
7+
from uuid import UUID, uuid4
88

99
from cachetools import LRUCache
1010

@@ -36,6 +36,7 @@
3636
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
3737
from metadata.generated.schema.type.entityLineage import ColumnLineage
3838
from metadata.generated.schema.type.entityReference import EntityReference
39+
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
3940
from metadata.ingestion.api.models import Either
4041
from metadata.ingestion.source.pipeline.openlineage.metadata import OpenlineageSource
4142
from metadata.ingestion.source.pipeline.openlineage.models import (
@@ -572,6 +573,66 @@ def test_get_pipelines_list(self):
572573
self.assertIsInstance(ol_event, OpenLineageEvent)
573574
self.assertEqual(ol_event, EXPECTED_OL_EVENT)
574575

576+
def test_yield_pipeline_sets_owners_from_job_ownership_facet(self):
577+
"""Test pipeline owners are populated from OpenLineage job ownership facet."""
578+
ol_event = copy.deepcopy(EXPECTED_OL_EVENT)
579+
ol_event.job = {
580+
**ol_event.job,
581+
"facets": {
582+
"ownership": {
583+
"owners": [{"name": "team:data-platform", "type": "OWNER"}]
584+
}
585+
},
586+
}
587+
owners = EntityReferenceList(
588+
root=[
589+
EntityReference(
590+
id=uuid4(),
591+
type="team",
592+
name="data-platform",
593+
displayName="Data Platform",
594+
)
595+
]
596+
)
597+
owner_resolver = Mock()
598+
owner_resolver.get_pipeline_job_owners.return_value = owners
599+
self.open_lineage_source._owner_resolver = owner_resolver
600+
601+
with patch.object(
602+
self.open_lineage_source,
603+
"_resolve_pipeline_service",
604+
return_value=MOCK_PIPELINE_SERVICE.name.root,
605+
), patch.object(self.open_lineage_source, "register_record"):
606+
results = list(self.open_lineage_source.yield_pipeline(ol_event))
607+
608+
self.assertEqual(len(results), 1)
609+
self.assertEqual(results[0].right.owners, owners)
610+
owner_resolver.get_pipeline_job_owners.assert_called_once()
611+
self.assertEqual(
612+
owner_resolver.get_pipeline_job_owners.call_args.args,
613+
(ol_event.job,),
614+
)
615+
self.assertEqual(
616+
owner_resolver.get_pipeline_job_owners.call_args.kwargs["pipeline_fqn"],
617+
f"{MOCK_PIPELINE_SERVICE.name.root}.{self.open_lineage_source.get_pipeline_name(ol_event)}",
618+
)
619+
620+
def test_prepare_passes_include_owners_to_owner_resolver(self):
621+
self.open_lineage_source.source_config.includeOwners = False
622+
623+
with patch.object(
624+
self.open_lineage_source, "_build_db_service_type_map", return_value={}
625+
), patch(
626+
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenLineageOwnerResolver"
627+
) as resolver_cls:
628+
self.open_lineage_source.prepare()
629+
630+
resolver_cls.assert_called_once_with(
631+
self.open_lineage_source.metadata,
632+
include_owners=False,
633+
ownership_update_mode=self.open_lineage_source.source_config.ownershipUpdateMode,
634+
)
635+
575636
@patch("metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om")
576637
def test_yield_pipeline_lineage_details(self, mock_get_table_from_om):
577638
def t_fqn_build_side_effect(

0 commit comments

Comments
 (0)