Skip to content

Commit 348fe05

Browse files
authored
Add owner ingestion to Airflow REST API connector (#26928)
* Feat: Add support for dag owner ingestion in airflow rest conn. * address comments
1 parent 6c771e1 commit 348fe05

2 files changed

Lines changed: 172 additions & 0 deletions

File tree

ingestion/src/metadata/ingestion/source/pipeline/airflow/api/source.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
SourceUrl,
4343
Timestamp,
4444
)
45+
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
4546
from metadata.ingestion.api.models import Either
4647
from metadata.ingestion.api.steps import InvalidSourceException
4748
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
@@ -126,6 +127,23 @@ def _get_dag_source_url(self, dag_id: str) -> str:
126127
return f"{host}/dags/{quote(dag_id)}"
127128
return f"{host}/dags/{quote(dag_id)}/grid"
128129

130+
def get_owners(self, owners: Optional[List[str]]) -> Optional[EntityReferenceList]:
131+
if not self.source_config.includeOwners or not owners:
132+
return None
133+
refs = EntityReferenceList(root=[])
134+
for owner_name in owners:
135+
try:
136+
ref = self.metadata.get_reference_by_name(
137+
name=owner_name, is_owner=True
138+
)
139+
if ref:
140+
refs.root.extend(ref.root)
141+
except Exception as exc:
142+
logger.warning(
143+
f"Error while getting details of user {owner_name} - {exc}"
144+
)
145+
return refs if refs.root else None
146+
129147
def _build_tasks(self, dag_details: AirflowApiDagDetails) -> List[Task]:
130148
return [
131149
Task(
@@ -164,6 +182,7 @@ def yield_pipeline(
164182
),
165183
tasks=self._build_tasks(pipeline_details),
166184
service=FullyQualifiedEntityName(self.context.get().pipeline_service),
185+
owners=self.get_owners(pipeline_details.owners),
167186
scheduleInterval=pipeline_details.schedule_interval,
168187
tags=get_tag_labels(
169188
metadata=self.metadata,

ingestion/tests/unit/topology/pipeline/test_airflowapi.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
from metadata.generated.schema.entity.data.pipeline import PipelineState, StatusType
2323
from metadata.generated.schema.entity.utils.common.accessTokenConfig import AccessToken
24+
from metadata.generated.schema.type.entityReference import EntityReference
25+
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
2426
from metadata.ingestion.source.pipeline.airflow.api.client import AirflowApiClient
2527
from metadata.ingestion.source.pipeline.airflow.api.models import (
2628
AirflowApiDagDetails,
@@ -680,6 +682,157 @@ def test_yields_error_on_exception(self, _mock_tags):
680682
assert "test_dag" in results[0].left.name
681683

682684

685+
# ── Owner Resolution ─────────────────────────────────────────────────────
686+
687+
688+
def _make_entity_ref(name, ref_type="user"):
689+
"""Create a real EntityReference for testing."""
690+
import uuid
691+
692+
return EntityReference(
693+
id=str(uuid.uuid4()),
694+
type=ref_type,
695+
name=name,
696+
)
697+
698+
699+
class TestGetOwners:
700+
def _make_source(self):
701+
source = MagicMock()
702+
source.metadata = MagicMock()
703+
return source
704+
705+
def test_returns_none_when_include_owners_disabled(self):
706+
source = self._make_source()
707+
source.source_config.includeOwners = False
708+
result = AirflowApiSource.get_owners(source, ["admin"])
709+
assert result is None
710+
source.metadata.get_reference_by_name.assert_not_called()
711+
712+
def test_returns_none_for_none_owners(self):
713+
source = self._make_source()
714+
result = AirflowApiSource.get_owners(source, None)
715+
assert result is None
716+
717+
def test_returns_none_for_empty_list(self):
718+
source = self._make_source()
719+
result = AirflowApiSource.get_owners(source, [])
720+
assert result is None
721+
722+
def test_resolves_single_owner(self):
723+
source = self._make_source()
724+
admin_ref = _make_entity_ref("admin")
725+
source.metadata.get_reference_by_name.return_value = EntityReferenceList(
726+
root=[admin_ref]
727+
)
728+
729+
result = AirflowApiSource.get_owners(source, ["admin"])
730+
assert result is not None
731+
assert len(result.root) == 1
732+
assert result.root[0].name == "admin"
733+
source.metadata.get_reference_by_name.assert_called_once_with(
734+
name="admin", is_owner=True
735+
)
736+
737+
def test_resolves_multiple_owners(self):
738+
source = self._make_source()
739+
admin_ref = _make_entity_ref("admin")
740+
analyst_ref = _make_entity_ref("analyst")
741+
source.metadata.get_reference_by_name.side_effect = [
742+
EntityReferenceList(root=[admin_ref]),
743+
EntityReferenceList(root=[analyst_ref]),
744+
]
745+
746+
result = AirflowApiSource.get_owners(source, ["admin", "analyst"])
747+
assert result is not None
748+
assert len(result.root) == 2
749+
names = {r.name for r in result.root}
750+
assert names == {"admin", "analyst"}
751+
752+
def test_skips_unresolved_owner(self):
753+
source = self._make_source()
754+
admin_ref = _make_entity_ref("admin")
755+
source.metadata.get_reference_by_name.side_effect = [
756+
EntityReferenceList(root=[admin_ref]),
757+
None,
758+
]
759+
760+
result = AirflowApiSource.get_owners(source, ["admin", "unknown_user"])
761+
assert result is not None
762+
assert len(result.root) == 1
763+
assert result.root[0].name == "admin"
764+
765+
def test_returns_none_when_all_lookups_fail(self):
766+
source = self._make_source()
767+
source.metadata.get_reference_by_name.side_effect = Exception("ES down")
768+
769+
result = AirflowApiSource.get_owners(source, ["admin"])
770+
assert result is None
771+
772+
def test_partial_failure_returns_resolved_owners(self):
773+
source = self._make_source()
774+
admin_ref = _make_entity_ref("admin")
775+
776+
def side_effect(name, is_owner):
777+
if name == "admin":
778+
return EntityReferenceList(root=[admin_ref])
779+
raise Exception(f"User {name} not found")
780+
781+
source.metadata.get_reference_by_name.side_effect = side_effect
782+
783+
result = AirflowApiSource.get_owners(source, ["admin", "bad_user"])
784+
assert result is not None
785+
assert len(result.root) == 1
786+
assert result.root[0].name == "admin"
787+
788+
789+
# ── Yield Pipeline with Owners ───────────────────────────────────────────
790+
791+
792+
class TestYieldPipelineOwners:
793+
@patch(
794+
"metadata.ingestion.source.pipeline.airflow.api.source.get_tag_labels",
795+
return_value=[],
796+
)
797+
def test_owners_propagated_to_request(self, _mock_tags):
798+
source, dag = _make_source_and_dag()
799+
dag.owners = ["airflow_admin"]
800+
admin_ref = _make_entity_ref("airflow_admin")
801+
owner_list = EntityReferenceList(root=[admin_ref])
802+
source.get_owners = lambda owners: owner_list if owners else None
803+
804+
results = list(AirflowApiSource.yield_pipeline(source, dag))
805+
assert len(results) == 1
806+
assert results[0].right.owners is not None
807+
assert len(results[0].right.owners.root) == 1
808+
809+
@patch(
810+
"metadata.ingestion.source.pipeline.airflow.api.source.get_tag_labels",
811+
return_value=[],
812+
)
813+
def test_no_owners_sets_none(self, _mock_tags):
814+
source, dag = _make_source_and_dag()
815+
dag.owners = None
816+
source.get_owners = lambda owners: None
817+
818+
results = list(AirflowApiSource.yield_pipeline(source, dag))
819+
assert len(results) == 1
820+
assert results[0].right.owners is None
821+
822+
@patch(
823+
"metadata.ingestion.source.pipeline.airflow.api.source.get_tag_labels",
824+
return_value=[],
825+
)
826+
def test_empty_owners_sets_none(self, _mock_tags):
827+
source, dag = _make_source_and_dag()
828+
dag.owners = []
829+
source.get_owners = lambda owners: None
830+
831+
results = list(AirflowApiSource.yield_pipeline(source, dag))
832+
assert len(results) == 1
833+
assert results[0].right.owners is None
834+
835+
683836
# ── Client: DAG Runs Parsing ─────────────────────────────────────────────
684837

685838

0 commit comments

Comments
 (0)