Skip to content

Commit b60fbb4

Browse files
Fixes #28065: [openlinege] Add job ownership support in ingestion pipeline (#28381)
* add openlineage job ownership support in ingestion pipeline * fix pre-commit issues * resolve static type checking errors * fix: preserve relationship-bulk fields (e.g. Team domains) when ownsEntityType filter is set On the filtered users/teams list path (ownsEntityType / directOwnsOnly), fetchAndSetFieldsExcept iterated only fieldFetchers and never called fetchAndSetRelationshipFieldsInBulk. Any field handled solely by the batched relationship fetch was therefore dropped (Team.domains came back null), and the relationship fields that the bulk layer normally batches fell back to per-field N+1 fetches. Run the batched relationship fetch inside fetchAndSetFieldsExcept and skip both the excluded fields and the relationship-handled fields. fetchAndSetFields now delegates to it with an empty exclusion set, so the default path is unchanged. Add a TeamResourceIT regression test: listing teams with fields=owns,domains&ownsEntityType=pipeline must keep domains populated and restrict owns to pipelines. --------- Co-authored-by: sonika-shah <58761340+sonika-shah@users.noreply.github.com>
1 parent 63017e5 commit b60fbb4

21 files changed

Lines changed: 912 additions & 51 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@
6969
TopicDetails,
7070
TopicFQN,
7171
)
72+
from metadata.ingestion.source.pipeline.openlineage.ownership_resolver import (
73+
OpenLineageOwnerResolver,
74+
)
7275
from metadata.ingestion.source.pipeline.openlineage.service_resolver import (
7376
build_service_name,
7477
extract_integration_type,
@@ -130,6 +133,11 @@ def create(cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str
130133
def prepare(self):
131134
self._service_cache = {}
132135
self._current_pipeline_service = None
136+
self._owner_resolver = OpenLineageOwnerResolver(
137+
self.metadata,
138+
include_owners=self.source_config.includeOwners,
139+
ownership_update_mode=self.source_config.ownershipUpdateMode,
140+
)
133141
self._entity_cache: LRUCache = LRUCache(maxsize=10000)
134142
self._namespace_to_service_cache: LRUCache = LRUCache(maxsize=10000)
135143
self._resolution_cache: LRUCache = LRUCache(maxsize=RESOLUTION_CACHE_MAXSIZE)
@@ -797,13 +805,24 @@ def yield_pipeline(self, pipeline_details: OpenLineageEvent) -> Iterable[Either[
797805
pipeline_name = self.get_pipeline_name(pipeline_details)
798806
self._current_pipeline_service = self._resolve_pipeline_service(pipeline_details)
799807
try:
808+
pipeline_fqn = fqn.build(
809+
metadata=self.metadata,
810+
entity_type=Pipeline,
811+
service_name=self._current_pipeline_service,
812+
pipeline_name=pipeline_name,
813+
)
814+
owners = self._owner_resolver.get_pipeline_job_owners(
815+
pipeline_details.job,
816+
pipeline_fqn=pipeline_fqn,
817+
)
800818
description = f"""```json
801819
{json.dumps(pipeline_details.run_facet, indent=4).strip()}```"""
802-
request = CreatePipelineRequest(
820+
request = CreatePipelineRequest( # pyright: ignore[reportCallIssue]
803821
name=pipeline_name,
804822
service=self._current_pipeline_service,
805823
description=description,
806824
tasks=[],
825+
owners=owners,
807826
)
808827

809828
yield Either(right=request)
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
from typing import Any, Dict, List, Optional, Set, Tuple # noqa: UP035
12+
13+
from metadata.generated.schema.entity.teams.team import Team, TeamType
14+
from metadata.generated.schema.entity.teams.user import User
15+
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
16+
OwnershipUpdateMode,
17+
)
18+
from metadata.generated.schema.type.entityReference import EntityReference
19+
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
20+
from metadata.ingestion.ometa.ometa_api import OpenMetadata
21+
from metadata.ingestion.ometa.utils import model_str
22+
from metadata.utils.logger import ingestion_logger
23+
24+
logger = ingestion_logger()
25+
26+
OWNER_CACHE_PAGE_SIZE = 1000
27+
OWNER_PIPELINE_ENTITY = "pipeline"
28+
OWNER_TEAM_ENTITY = "team"
29+
OWNER_USER_ENTITY = "user"
30+
31+
32+
class OpenLineageOwnerResolver:
33+
"""
34+
Resolve OpenLineage job ownership facet owners.
35+
36+
OpenLineage recommends owner identifiers such as ``team:data`` and
37+
``user:jdoe``. Qualified identifiers are resolved against their matching
38+
OpenMetadata entity type. Unqualified names are resolved as Group team
39+
first, then user, with a warning when both exist.
40+
"""
41+
42+
def __init__(
43+
self,
44+
metadata: OpenMetadata,
45+
include_owners: bool | None,
46+
ownership_update_mode: OwnershipUpdateMode | str | None,
47+
):
48+
self.metadata = metadata
49+
self.include_owners = bool(include_owners)
50+
self.ownership_update_mode = OwnershipUpdateMode(
51+
ownership_update_mode.value
52+
if isinstance(ownership_update_mode, OwnershipUpdateMode)
53+
else ownership_update_mode or OwnershipUpdateMode.replace.value
54+
)
55+
self._owner_cache_loaded = False
56+
self._team_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
57+
self._user_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
58+
self._owner_refs_by_pipeline_fqn: Dict[str, List[EntityReference]] = {} # noqa: UP006
59+
60+
def get_pipeline_job_owners(
61+
self,
62+
job: Dict[str, Any], # noqa: UP006
63+
pipeline_fqn: Optional[str] = None, # noqa: UP045
64+
) -> Optional[EntityReferenceList]: # noqa: UP045
65+
"""
66+
Resolve owners from ``job.facets.ownership.owners``.
67+
68+
``replace`` returns owners resolved from the current event. ``append``
69+
returns active existing Pipeline owners plus newly resolved owners.
70+
"""
71+
if not self.include_owners:
72+
return None
73+
74+
owners = (((job or {}).get("facets") or {}).get("ownership") or {}).get("owners") or []
75+
if not isinstance(owners, list) or not owners:
76+
return None
77+
78+
self._ensure_pipeline_owner_cache()
79+
80+
resolved: List[EntityReference] = [] # noqa: UP006
81+
seen: Set[Tuple[str, str]] = set() # noqa: UP006
82+
83+
for owner in owners:
84+
owner_ref = self._get_owner_ref(owner)
85+
if not owner_ref:
86+
continue
87+
88+
ref_key = (owner_ref.type, str(owner_ref.id))
89+
if ref_key not in seen:
90+
resolved.append(owner_ref)
91+
seen.add(ref_key)
92+
93+
if not resolved:
94+
return None
95+
96+
if self.ownership_update_mode != OwnershipUpdateMode.append or not pipeline_fqn:
97+
return EntityReferenceList(root=resolved)
98+
99+
existing_owners = self._owner_refs_by_pipeline_fqn.get(pipeline_fqn, [])
100+
merged_owners = list(existing_owners)
101+
seen_refs = {(owner_ref.type, str(owner_ref.id)) for owner_ref in existing_owners}
102+
for owner_ref in resolved:
103+
ref_key = (owner_ref.type, str(owner_ref.id))
104+
if ref_key not in seen_refs:
105+
merged_owners.append(owner_ref)
106+
seen_refs.add(ref_key)
107+
108+
self._owner_refs_by_pipeline_fqn[pipeline_fqn] = merged_owners
109+
return EntityReferenceList(root=merged_owners)
110+
111+
def _get_owner_ref(self, owner: Any) -> Optional[EntityReference]: # noqa: UP045
112+
"""
113+
Resolve a single OpenLineage ownership owner object to an OpenMetadata owner reference.
114+
115+
Qualified names such as ``team:data-platform`` and ``user:jdoe`` are resolved against the
116+
matching entity cache. Unqualified names are resolved as Group team first, then user.
117+
"""
118+
if not isinstance(owner, dict):
119+
return None
120+
121+
raw_owner_name = owner.get("name")
122+
if not isinstance(raw_owner_name, str) or not raw_owner_name:
123+
return None
124+
125+
owner_type, separator, owner_name = raw_owner_name.partition(":")
126+
if separator:
127+
owner_type = owner_type.lower()
128+
else:
129+
owner_type = None
130+
owner_name = raw_owner_name
131+
132+
owner_key = owner_name.strip().lower()
133+
if not owner_key:
134+
return None
135+
136+
if owner_type == OWNER_TEAM_ENTITY:
137+
owner_ref = self._team_owner_ref_by_name.get(owner_key)
138+
elif owner_type == OWNER_USER_ENTITY:
139+
owner_ref = self._user_owner_ref_by_name.get(owner_key)
140+
else:
141+
team_ref = self._team_owner_ref_by_name.get(owner_key)
142+
user_ref = self._user_owner_ref_by_name.get(owner_key)
143+
if team_ref and user_ref:
144+
logger.warning(
145+
f"OpenLineage owner [{raw_owner_name}] matched both a team "
146+
"and a user. Using the team for pipeline ownership."
147+
)
148+
owner_ref = team_ref or user_ref
149+
150+
if not owner_ref:
151+
logger.warning(f"Unable to resolve OpenLineage owner [{raw_owner_name}] for pipeline ownership.")
152+
return owner_ref
153+
154+
def _ensure_pipeline_owner_cache(self) -> None:
155+
"""
156+
Load OpenMetadata Group teams, users, and pipeline ownership references.
157+
"""
158+
if self._owner_cache_loaded:
159+
return
160+
161+
team_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
162+
user_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
163+
owner_refs_by_pipeline_fqn: Dict[str, List[EntityReference]] = {} # noqa: UP006
164+
165+
try:
166+
for team in self.metadata.list_all_entities(
167+
entity=Team,
168+
fields=["teamType", "owns"],
169+
limit=OWNER_CACHE_PAGE_SIZE,
170+
params={"ownsEntityType": OWNER_PIPELINE_ENTITY},
171+
skip_on_failure=True,
172+
):
173+
if team.teamType != TeamType.Group:
174+
continue
175+
team_name = model_str(team.name).strip().lower()
176+
if team_name:
177+
team_ref = team_owner_ref_by_name.setdefault(
178+
team_name,
179+
EntityReference( # pyright: ignore[reportCallIssue]
180+
id=model_str(team.id),
181+
type=OWNER_TEAM_ENTITY,
182+
name=model_str(team.name),
183+
displayName=team.displayName,
184+
),
185+
)
186+
for owned_pipeline in team.owns.root if team.owns else []:
187+
pipeline_fqn_value = owned_pipeline.fullyQualifiedName or owned_pipeline.name
188+
if owned_pipeline.type == OWNER_PIPELINE_ENTITY and pipeline_fqn_value:
189+
pipeline_fqn = model_str(pipeline_fqn_value)
190+
pipeline_owner_refs = owner_refs_by_pipeline_fqn.setdefault(pipeline_fqn, [])
191+
if not any(
192+
owner.type == team_ref.type and str(owner.id) == str(team_ref.id)
193+
for owner in pipeline_owner_refs
194+
):
195+
pipeline_owner_refs.append(team_ref)
196+
except Exception as exc:
197+
logger.warning(f"Unable to load OpenMetadata teams for owner cache: {exc}")
198+
199+
try:
200+
for user in self.metadata.list_all_entities(
201+
entity=User,
202+
fields=["owns"],
203+
limit=OWNER_CACHE_PAGE_SIZE,
204+
params={
205+
"ownsEntityType": OWNER_PIPELINE_ENTITY,
206+
"directOwnsOnly": "true",
207+
},
208+
skip_on_failure=True,
209+
):
210+
user_name = model_str(user.name).strip().lower()
211+
if user_name:
212+
user_ref = user_owner_ref_by_name.setdefault(
213+
user_name,
214+
EntityReference( # pyright: ignore[reportCallIssue]
215+
id=model_str(user.id),
216+
type=OWNER_USER_ENTITY,
217+
name=model_str(user.name),
218+
displayName=user.displayName,
219+
),
220+
)
221+
for owned_pipeline in user.owns.root if user.owns else []:
222+
pipeline_fqn_value = owned_pipeline.fullyQualifiedName or owned_pipeline.name
223+
if owned_pipeline.type == OWNER_PIPELINE_ENTITY and pipeline_fqn_value:
224+
pipeline_fqn = model_str(pipeline_fqn_value)
225+
pipeline_owner_refs = owner_refs_by_pipeline_fqn.setdefault(pipeline_fqn, [])
226+
if not any(
227+
owner.type == user_ref.type and str(owner.id) == str(user_ref.id)
228+
for owner in pipeline_owner_refs
229+
):
230+
pipeline_owner_refs.append(user_ref)
231+
except Exception as exc:
232+
logger.warning(f"Unable to load OpenMetadata users for owner cache: {exc}")
233+
234+
self._team_owner_ref_by_name = team_owner_ref_by_name
235+
self._user_owner_ref_by_name = user_owner_ref_by_name
236+
self._owner_refs_by_pipeline_fqn = owner_refs_by_pipeline_fqn
237+
self._owner_cache_loaded = True

ingestion/tests/unit/topology/pipeline/test_openlineage.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import unittest
55
from pathlib import Path
66
from unittest.mock import MagicMock, Mock, patch
7-
from uuid import UUID
7+
from uuid import UUID, uuid4
88

99
from cachetools import LRUCache
1010

@@ -35,6 +35,7 @@
3535
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
3636
from metadata.generated.schema.type.entityLineage import ColumnLineage
3737
from metadata.generated.schema.type.entityReference import EntityReference
38+
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
3839
from metadata.ingestion.source.pipeline.openlineage.metadata import (
3940
RESOLUTION_CACHE_MAXSIZE,
4041
OpenlineageSource,
@@ -48,6 +49,7 @@
4849
from metadata.ingestion.source.pipeline.openlineage.utils import (
4950
message_to_open_lineage_event,
5051
)
52+
from metadata.utils import fqn
5153

5254
MOCK_WORKFLOW_CONFIG = {
5355
"openMetadataServerConfig": {
@@ -956,6 +958,69 @@ def test_get_pipelines_list(self):
956958
self.assertIsInstance(ol_event, OpenLineageEvent)
957959
self.assertEqual(ol_event, EXPECTED_OL_EVENT)
958960

961+
def test_yield_pipeline_sets_owners_from_job_ownership_facet(self):
962+
"""Test pipeline owners are populated from OpenLineage job ownership facet."""
963+
ol_event = copy.deepcopy(EXPECTED_OL_EVENT)
964+
ol_event.job = {
965+
**ol_event.job,
966+
"facets": {"ownership": {"owners": [{"name": "team:data-platform", "type": "OWNER"}]}},
967+
}
968+
owners = EntityReferenceList(
969+
root=[
970+
EntityReference(
971+
id=uuid4(),
972+
type="team",
973+
name="data-platform",
974+
displayName="Data Platform",
975+
)
976+
]
977+
)
978+
owner_resolver = Mock()
979+
owner_resolver.get_pipeline_job_owners.return_value = owners
980+
self.open_lineage_source._owner_resolver = owner_resolver
981+
982+
with (
983+
patch.object(
984+
self.open_lineage_source,
985+
"_resolve_pipeline_service",
986+
return_value=MOCK_PIPELINE_SERVICE.name.root,
987+
),
988+
patch.object(self.open_lineage_source, "register_record"),
989+
):
990+
results = list(self.open_lineage_source.yield_pipeline(ol_event))
991+
992+
self.assertEqual(len(results), 1)
993+
self.assertEqual(results[0].right.owners, owners)
994+
owner_resolver.get_pipeline_job_owners.assert_called_once()
995+
self.assertEqual(
996+
owner_resolver.get_pipeline_job_owners.call_args.args,
997+
(ol_event.job,),
998+
)
999+
self.assertEqual(
1000+
owner_resolver.get_pipeline_job_owners.call_args.kwargs["pipeline_fqn"],
1001+
fqn.build(
1002+
metadata=self.open_lineage_source.metadata,
1003+
entity_type=Pipeline,
1004+
service_name=MOCK_PIPELINE_SERVICE.name.root,
1005+
pipeline_name=self.open_lineage_source.get_pipeline_name(ol_event),
1006+
),
1007+
)
1008+
1009+
def test_prepare_passes_include_owners_to_owner_resolver(self):
1010+
self.open_lineage_source.source_config.includeOwners = False
1011+
1012+
with (
1013+
patch.object(self.open_lineage_source, "_build_db_service_type_map", return_value={}),
1014+
patch("metadata.ingestion.source.pipeline.openlineage.metadata.OpenLineageOwnerResolver") as resolver_cls,
1015+
):
1016+
self.open_lineage_source.prepare()
1017+
1018+
resolver_cls.assert_called_once_with(
1019+
self.open_lineage_source.metadata,
1020+
include_owners=False,
1021+
ownership_update_mode=self.open_lineage_source.source_config.ownershipUpdateMode,
1022+
)
1023+
9591024
@patch("metadata.ingestion.source.pipeline.openlineage.metadata.OpenlineageSource._get_table_fqn_from_om")
9601025
def test_yield_pipeline_lineage_details(self, mock_get_table_from_om):
9611026
def t_fqn_build_side_effect(

0 commit comments

Comments
 (0)