Skip to content

Commit cb6474d

Browse files
committed
resolve static type checking errors
1 parent 601c9ba commit cb6474d

4 files changed

Lines changed: 52 additions & 19 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/openlineage/metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,7 @@ def yield_pipeline(self, pipeline_details: OpenLineageEvent) -> Iterable[Either[
817817
)
818818
description = f"""```json
819819
{json.dumps(pipeline_details.run_facet, indent=4).strip()}```"""
820-
request = CreatePipelineRequest(
820+
request = CreatePipelineRequest( # pyright: ignore[reportCallIssue]
821821
name=pipeline_name,
822822
service=self._current_pipeline_service,
823823
description=description,

ingestion/src/metadata/ingestion/source/pipeline/openlineage/ownership_resolver.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,20 @@ class OpenLineageOwnerResolver:
4242
def __init__(
4343
self,
4444
metadata: OpenMetadata,
45-
include_owners: bool = True,
46-
ownership_update_mode: OwnershipUpdateMode = OwnershipUpdateMode.replace,
45+
include_owners: bool | None,
46+
ownership_update_mode: OwnershipUpdateMode | str | None,
4747
):
4848
self.metadata = metadata
49-
self.include_owners = include_owners
50-
self.ownership_update_mode = (
51-
ownership_update_mode
49+
self.include_owners = bool(include_owners)
50+
self.ownership_update_mode = OwnershipUpdateMode(
51+
ownership_update_mode.value
5252
if isinstance(ownership_update_mode, OwnershipUpdateMode)
53-
else OwnershipUpdateMode(ownership_update_mode)
53+
else ownership_update_mode or OwnershipUpdateMode.replace.value
5454
)
55-
self._team_owner_ref_by_name: Optional[Dict[str, EntityReference]] = None # noqa: UP006, UP045
56-
self._user_owner_ref_by_name: Optional[Dict[str, EntityReference]] = None # noqa: UP006, UP045
57-
self._owner_refs_by_pipeline_fqn: Optional[Dict[str, List[EntityReference]]] = None # noqa: UP006, UP045
55+
self._owner_cache_loaded = False
56+
self._team_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
57+
self._user_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
58+
self._owner_refs_by_pipeline_fqn: Dict[str, List[EntityReference]] = {} # noqa: UP006
5859

5960
def get_pipeline_job_owners(
6061
self,
@@ -154,11 +155,7 @@ def _ensure_pipeline_owner_cache(self) -> None:
154155
"""
155156
Load OpenMetadata Group teams, users, and pipeline ownership references.
156157
"""
157-
if (
158-
self._team_owner_ref_by_name is not None
159-
and self._user_owner_ref_by_name is not None
160-
and self._owner_refs_by_pipeline_fqn is not None
161-
):
158+
if self._owner_cache_loaded:
162159
return
163160

164161
team_owner_ref_by_name: Dict[str, EntityReference] = {} # noqa: UP006
@@ -179,7 +176,7 @@ def _ensure_pipeline_owner_cache(self) -> None:
179176
if team_name:
180177
team_ref = team_owner_ref_by_name.setdefault(
181178
team_name,
182-
EntityReference(
179+
EntityReference( # pyright: ignore[reportCallIssue]
183180
id=model_str(team.id),
184181
type=OWNER_TEAM_ENTITY,
185182
name=model_str(team.name),
@@ -214,7 +211,7 @@ def _ensure_pipeline_owner_cache(self) -> None:
214211
if user_name:
215212
user_ref = user_owner_ref_by_name.setdefault(
216213
user_name,
217-
EntityReference(
214+
EntityReference( # pyright: ignore[reportCallIssue]
218215
id=model_str(user.id),
219216
type=OWNER_USER_ENTITY,
220217
name=model_str(user.name),
@@ -237,3 +234,4 @@ def _ensure_pipeline_owner_cache(self) -> None:
237234
self._team_owner_ref_by_name = team_owner_ref_by_name
238235
self._user_owner_ref_by_name = user_owner_ref_by_name
239236
self._owner_refs_by_pipeline_fqn = owner_refs_by_pipeline_fqn
237+
self._owner_cache_loaded = True

ingestion/tests/unit/topology/pipeline/test_openlineage.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
from metadata.generated.schema.type.entityLineage import ColumnLineage
3737
from metadata.generated.schema.type.entityReference import EntityReference
3838
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
39-
from metadata.ingestion.api.models import Either
40-
from metadata.ingestion.source.pipeline.openlineage.metadata import OpenlineageSource
4139
from metadata.ingestion.source.pipeline.openlineage.metadata import (
4240
RESOLUTION_CACHE_MAXSIZE,
4341
OpenlineageSource,

ingestion/tests/unit/topology/pipeline/test_openlineage_ownership.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,19 @@ def test_does_not_build_cache_when_include_owners_is_disabled():
136136
metadata.list_all_entities.assert_not_called()
137137

138138

139+
def test_does_not_build_cache_when_include_owners_is_none():
140+
resolver, metadata = build_resolver(
141+
users=[build_user("jdoe")],
142+
teams=[build_team("data-platform")],
143+
include_owners=None,
144+
)
145+
146+
owners = resolver.get_pipeline_job_owners(build_job(["user:jdoe", "team:data-platform"]))
147+
148+
assert owners is None
149+
metadata.list_all_entities.assert_not_called()
150+
151+
139152
def test_builds_pipeline_owner_cache_with_filtered_owns_api():
140153
pipeline_ref = EntityReference(
141154
id=uuid4(),
@@ -189,6 +202,30 @@ def test_replace_mode_does_not_carry_existing_pipeline_owners():
189202
assert [(owner.type, owner.name) for owner in owners.root] == [("user", "jdoe")]
190203

191204

205+
def test_none_ownership_update_mode_defaults_to_replace():
206+
pipeline_ref = EntityReference(
207+
id=uuid4(),
208+
type="pipeline",
209+
name="daily_orders",
210+
fullyQualifiedName="airflow.daily_orders",
211+
)
212+
existing_team = build_team("data-platform", owns=[pipeline_ref])
213+
new_user = build_user("jdoe")
214+
resolver, _ = build_resolver(
215+
users=[new_user],
216+
teams=[existing_team],
217+
ownership_update_mode=None,
218+
)
219+
220+
owners = resolver.get_pipeline_job_owners(
221+
build_job(["user:jdoe"]),
222+
pipeline_fqn="airflow.daily_orders",
223+
)
224+
225+
assert owners is not None
226+
assert [(owner.type, owner.name) for owner in owners.root] == [("user", "jdoe")]
227+
228+
192229
def test_append_mode_merges_existing_pipeline_owners_and_updates_cache():
193230
pipeline_ref = EntityReference(
194231
id=uuid4(),

0 commit comments

Comments
 (0)