Skip to content

Commit 8574a04

Browse files
committed
add openlineage job ownership support in ingestion pipeline
1 parent dbb0737 commit 8574a04

12 files changed

Lines changed: 717 additions & 36 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@
7070
TopicDetails,
7171
TopicFQN,
7272
)
73+
from metadata.ingestion.source.pipeline.openlineage.ownership_resolver import (
74+
OpenLineageOwnerResolver,
75+
)
7376
from metadata.ingestion.source.pipeline.openlineage.service_resolver import (
7477
build_service_name,
7578
extract_integration_type,
@@ -124,6 +127,11 @@ def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str
124127
def prepare(self):
125128
self._service_cache = {}
126129
self._current_pipeline_service = None
130+
self._owner_resolver = OpenLineageOwnerResolver(
131+
self.metadata,
132+
include_owners=self.source_config.includeOwners,
133+
ownership_update_mode=self.source_config.ownershipUpdateMode,
134+
)
127135
self._entity_cache: LRUCache = LRUCache(maxsize=10000)
128136
self._namespace_to_service_cache: LRUCache = LRUCache(maxsize=10000)
129137
self._db_service_type_map: Dict[str, str] = self._build_db_service_type_map() # noqa: UP006
@@ -730,13 +738,24 @@ def yield_pipeline(self, pipeline_details: OpenLineageEvent) -> Iterable[Either[
730738
pipeline_name = self.get_pipeline_name(pipeline_details)
731739
self._current_pipeline_service = self._resolve_pipeline_service(pipeline_details)
732740
try:
741+
pipeline_fqn = fqn.build(
742+
metadata=self.metadata,
743+
entity_type=Pipeline,
744+
service_name=self._current_pipeline_service,
745+
pipeline_name=pipeline_name,
746+
)
747+
owners = self._owner_resolver.get_pipeline_job_owners(
748+
pipeline_details.job,
749+
pipeline_fqn=pipeline_fqn,
750+
)
733751
description = f"""```json
734752
{json.dumps(pipeline_details.run_facet, indent=4).strip()}```"""
735753
request = CreatePipelineRequest(
736754
name=pipeline_name,
737755
service=self._current_pipeline_service,
738756
description=description,
739757
tasks=[],
758+
owners=owners,
740759
)
741760

742761
yield Either(right=request)
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
from typing import Any, Dict, List, Optional, Set, Tuple # noqa: UP035
12+
13+
from metadata.generated.schema.entity.teams.team import Team, TeamType
14+
from metadata.generated.schema.entity.teams.user import User
15+
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
16+
OwnershipUpdateMode,
17+
)
18+
from metadata.generated.schema.type.entityReference import EntityReference
19+
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
20+
from metadata.ingestion.ometa.ometa_api import OpenMetadata
21+
from metadata.ingestion.ometa.utils import model_str
22+
from metadata.utils.logger import ingestion_logger
23+
24+
logger = ingestion_logger()
25+
26+
OWNER_CACHE_PAGE_SIZE = 1000
27+
OWNER_PIPELINE_ENTITY = "pipeline"
28+
OWNER_TEAM_ENTITY = "team"
29+
OWNER_USER_ENTITY = "user"
30+
31+
32+
class OpenLineageOwnerResolver:
33+
"""
34+
Resolve OpenLineage job ownership facet owners.
35+
36+
OpenLineage recommends owner identifiers such as ``team:data`` and
37+
``user:jdoe``. Qualified identifiers are resolved against their matching
38+
OpenMetadata entity type. Unqualified names are resolved as Group team
39+
first, then user, with a warning when both exist.
40+
"""
41+
42+
def __init__(
43+
self,
44+
metadata: OpenMetadata,
45+
include_owners: bool = True,
46+
ownership_update_mode: OwnershipUpdateMode = OwnershipUpdateMode.replace,
47+
):
48+
self.metadata = metadata
49+
self.include_owners = include_owners
50+
self.ownership_update_mode = (
51+
ownership_update_mode
52+
if isinstance(ownership_update_mode, OwnershipUpdateMode)
53+
else OwnershipUpdateMode(ownership_update_mode)
54+
)
55+
self._team_owner_ref_by_name: Optional[Dict[str, EntityReference]] = None # noqa: UP006, UP045
56+
self._user_owner_ref_by_name: Optional[Dict[str, EntityReference]] = None # noqa: UP006, UP045
57+
self._owner_refs_by_pipeline_fqn: Optional[Dict[str, List[EntityReference]]] = None # noqa: UP006, UP045
58+
59+
def get_pipeline_job_owners(
60+
self,
61+
job: Dict[str, Any], # noqa: UP006
62+
pipeline_fqn: Optional[str] = None, # noqa: UP045
63+
) -> Optional[EntityReferenceList]: # noqa: UP045
64+
"""
65+
Resolve owners from ``job.facets.ownership.owners``.
66+
67+
``replace`` returns owners resolved from the current event. ``append``
68+
returns active existing Pipeline owners plus newly resolved owners.
69+
"""
70+
if not self.include_owners:
71+
return None
72+
73+
owners = (((job or {}).get("facets") or {}).get("ownership") or {}).get("owners") or []
74+
if not isinstance(owners, list) or not owners:
75+
return None
76+
77+
self._ensure_pipeline_owner_cache()
78+
79+
resolved: List[EntityReference] = [] # noqa: UP006
80+
seen: Set[Tuple[str, str]] = set() # noqa: UP006
81+
82+
for owner in owners:
83+
if not isinstance(owner, dict):
84+
continue
85+
86+
raw_owner_name = owner.get("name")
87+
if not isinstance(raw_owner_name, str) or not raw_owner_name:
88+
continue
89+
90+
owner_type, separator, owner_name = raw_owner_name.partition(":")
91+
if separator:
92+
owner_type = owner_type.lower()
93+
else:
94+
owner_type = None
95+
owner_name = raw_owner_name
96+
97+
owner_key = owner_name.strip().lower()
98+
if not owner_key:
99+
continue
100+
101+
owner_ref = None
102+
if owner_type == OWNER_TEAM_ENTITY:
103+
owner_ref = self._team_owner_ref_by_name.get(owner_key)
104+
elif owner_type == OWNER_USER_ENTITY:
105+
owner_ref = self._user_owner_ref_by_name.get(owner_key)
106+
else:
107+
team_ref = self._team_owner_ref_by_name.get(owner_key)
108+
user_ref = self._user_owner_ref_by_name.get(owner_key)
109+
if team_ref and user_ref:
110+
logger.warning(
111+
f"OpenLineage owner [{raw_owner_name}] matched both a team "
112+
"and a user. Using the team for pipeline ownership."
113+
)
114+
owner_ref = team_ref or user_ref
115+
116+
if not owner_ref:
117+
logger.warning(
118+
f"Unable to resolve OpenLineage owner [{raw_owner_name}] "
119+
"for pipeline ownership."
120+
)
121+
continue
122+
123+
ref_key = (owner_ref.type, str(owner_ref.id))
124+
if ref_key not in seen:
125+
resolved.append(owner_ref)
126+
seen.add(ref_key)
127+
128+
if not resolved:
129+
return None
130+
131+
if self.ownership_update_mode != OwnershipUpdateMode.append or not pipeline_fqn:
132+
return EntityReferenceList(root=resolved)
133+
134+
existing_owners = self._owner_refs_by_pipeline_fqn.get(pipeline_fqn, [])
135+
merged_owners = list(existing_owners)
136+
seen_refs = {(owner_ref.type, str(owner_ref.id)) for owner_ref in existing_owners}
137+
for owner_ref in resolved:
138+
ref_key = (owner_ref.type, str(owner_ref.id))
139+
if ref_key not in seen_refs:
140+
merged_owners.append(owner_ref)
141+
seen_refs.add(ref_key)
142+
143+
self._owner_refs_by_pipeline_fqn[pipeline_fqn] = merged_owners
144+
return EntityReferenceList(root=merged_owners)
145+
146+
def _ensure_pipeline_owner_cache(self) -> None:
147+
"""
148+
Load OpenMetadata Group teams, users, and pipeline ownership references.
149+
"""
150+
if (
151+
self._team_owner_ref_by_name is not None
152+
and self._user_owner_ref_by_name is not None
153+
and self._owner_refs_by_pipeline_fqn is not None
154+
):
155+
return
156+
157+
team_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
158+
user_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
159+
owner_refs_by_pipeline_fqn: Dict[str, List[EntityReference]] = {} # noqa: UP006
160+
161+
try:
162+
for team in self.metadata.list_all_entities(
163+
entity=Team,
164+
fields=["teamType", "owns"],
165+
limit=OWNER_CACHE_PAGE_SIZE,
166+
params={"ownsEntityType": OWNER_PIPELINE_ENTITY},
167+
skip_on_failure=True,
168+
):
169+
if team.teamType != TeamType.Group:
170+
continue
171+
team_name = model_str(team.name).strip().lower()
172+
if team_name:
173+
team_ref = team_owner_ref_by_name.setdefault(
174+
team_name,
175+
EntityReference(
176+
id=model_str(team.id),
177+
type=OWNER_TEAM_ENTITY,
178+
name=model_str(team.name),
179+
displayName=team.displayName,
180+
),
181+
)
182+
for owned_pipeline in team.owns.root if team.owns else []:
183+
pipeline_fqn_value = owned_pipeline.fullyQualifiedName or owned_pipeline.name
184+
if owned_pipeline.type == OWNER_PIPELINE_ENTITY and pipeline_fqn_value:
185+
pipeline_fqn = model_str(pipeline_fqn_value)
186+
pipeline_owner_refs = owner_refs_by_pipeline_fqn.setdefault(
187+
pipeline_fqn, []
188+
)
189+
if not any(
190+
owner.type == team_ref.type and str(owner.id) == str(team_ref.id)
191+
for owner in pipeline_owner_refs
192+
):
193+
pipeline_owner_refs.append(team_ref)
194+
except Exception as exc:
195+
logger.warning(f"Unable to load OpenMetadata teams for owner cache: {exc}")
196+
197+
try:
198+
for user in self.metadata.list_all_entities(
199+
entity=User,
200+
fields=["owns"],
201+
limit=OWNER_CACHE_PAGE_SIZE,
202+
params={
203+
"ownsEntityType": OWNER_PIPELINE_ENTITY,
204+
"directOwnsOnly": "true",
205+
},
206+
skip_on_failure=True,
207+
):
208+
user_name = model_str(user.name).strip().lower()
209+
if user_name:
210+
user_ref = user_owner_ref_by_name.setdefault(
211+
user_name,
212+
EntityReference(
213+
id=model_str(user.id),
214+
type=OWNER_USER_ENTITY,
215+
name=model_str(user.name),
216+
displayName=user.displayName,
217+
),
218+
)
219+
for owned_pipeline in user.owns.root if user.owns else []:
220+
pipeline_fqn_value = owned_pipeline.fullyQualifiedName or owned_pipeline.name
221+
if owned_pipeline.type == OWNER_PIPELINE_ENTITY and pipeline_fqn_value:
222+
pipeline_fqn = model_str(pipeline_fqn_value)
223+
pipeline_owner_refs = owner_refs_by_pipeline_fqn.setdefault(
224+
pipeline_fqn, []
225+
)
226+
if not any(
227+
owner.type == user_ref.type and str(owner.id) == str(user_ref.id)
228+
for owner in pipeline_owner_refs
229+
):
230+
pipeline_owner_refs.append(user_ref)
231+
except Exception as exc:
232+
logger.warning(f"Unable to load OpenMetadata users for owner cache: {exc}")
233+
234+
self._team_owner_ref_by_name = team_owner_ref_by_name
235+
self._user_owner_ref_by_name = user_owner_ref_by_name
236+
self._owner_refs_by_pipeline_fqn = owner_refs_by_pipeline_fqn

ingestion/tests/unit/topology/pipeline/test_openlineage.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import unittest
55
from pathlib import Path
66
from unittest.mock import MagicMock, Mock, patch
7-
from uuid import UUID
7+
from uuid import UUID, uuid4
88

99
from cachetools import LRUCache
1010

@@ -36,6 +36,7 @@
3636
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
3737
from metadata.generated.schema.type.entityLineage import ColumnLineage
3838
from metadata.generated.schema.type.entityReference import EntityReference
39+
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
3940
from metadata.ingestion.api.models import Either
4041
from metadata.ingestion.source.pipeline.openlineage.metadata import OpenlineageSource
4142
from metadata.ingestion.source.pipeline.openlineage.models import (
@@ -47,6 +48,7 @@
4748
FQNNotFoundException,
4849
message_to_open_lineage_event,
4950
)
51+
from metadata.utils import fqn
5052

5153
MOCK_WORKFLOW_CONFIG = {
5254
"openMetadataServerConfig": {
@@ -572,6 +574,71 @@ def test_get_pipelines_list(self):
572574
self.assertIsInstance(ol_event, OpenLineageEvent)
573575
self.assertEqual(ol_event, EXPECTED_OL_EVENT)
574576

577+
def test_yield_pipeline_sets_owners_from_job_ownership_facet(self):
578+
"""Test pipeline owners are populated from OpenLineage job ownership facet."""
579+
ol_event = copy.deepcopy(EXPECTED_OL_EVENT)
580+
ol_event.job = {
581+
**ol_event.job,
582+
"facets": {
583+
"ownership": {
584+
"owners": [{"name": "team:data-platform", "type": "OWNER"}]
585+
}
586+
},
587+
}
588+
owners = EntityReferenceList(
589+
root=[
590+
EntityReference(
591+
id=uuid4(),
592+
type="team",
593+
name="data-platform",
594+
displayName="Data Platform",
595+
)
596+
]
597+
)
598+
owner_resolver = Mock()
599+
owner_resolver.get_pipeline_job_owners.return_value = owners
600+
self.open_lineage_source._owner_resolver = owner_resolver
601+
602+
with patch.object(
603+
self.open_lineage_source,
604+
"_resolve_pipeline_service",
605+
return_value=MOCK_PIPELINE_SERVICE.name.root,
606+
), patch.object(self.open_lineage_source, "register_record"):
607+
results = list(self.open_lineage_source.yield_pipeline(ol_event))
608+
609+
self.assertEqual(len(results), 1)
610+
self.assertEqual(results[0].right.owners, owners)
611+
owner_resolver.get_pipeline_job_owners.assert_called_once()
612+
self.assertEqual(
613+
owner_resolver.get_pipeline_job_owners.call_args.args,
614+
(ol_event.job,),
615+
)
616+
self.assertEqual(
617+
owner_resolver.get_pipeline_job_owners.call_args.kwargs["pipeline_fqn"],
618+
fqn.build(
619+
metadata=self.open_lineage_source.metadata,
620+
entity_type=Pipeline,
621+
service_name=MOCK_PIPELINE_SERVICE.name.root,
622+
pipeline_name=self.open_lineage_source.get_pipeline_name(ol_event),
623+
),
624+
)
625+
626+
def test_prepare_passes_include_owners_to_owner_resolver(self):
627+
self.open_lineage_source.source_config.includeOwners = False
628+
629+
with patch.object(
630+
self.open_lineage_source, "_build_db_service_type_map", return_value={}
631+
), patch(
632+
"metadata.ingestion.source.pipeline.openlineage.metadata.OpenLineageOwnerResolver"
633+
) as resolver_cls:
634+
self.open_lineage_source.prepare()
635+
636+
resolver_cls.assert_called_once_with(
637+
self.open_lineage_source.metadata,
638+
include_owners=False,
639+
ownership_update_mode=self.open_lineage_source.source_config.ownershipUpdateMode,
640+
)
641+
575642
@patch("metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om")
576643
def test_yield_pipeline_lineage_details(self, mock_get_table_from_om):
577644
def t_fqn_build_side_effect(

0 commit comments

Comments
 (0)