diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index bed61447b2eb..716809bd4858 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -462,6 +462,11 @@ def yield_and_update_context( stage=stage, entity_name=entity_name ) + if stage.source_state: + source_state_set = getattr(self, stage.source_state, None) + if source_state_set is not None: + source_state_set.add(entity_fqn) + # If we don't want to write data in OM, we'll return what we fetch from the API. # This will be applicable for service entities since we do not want to overwrite the data same_fingerprint = False diff --git a/ingestion/src/metadata/ingestion/models/topology.py b/ingestion/src/metadata/ingestion/models/topology.py index 2609fdfae01e..aff5da02ea7a 100644 --- a/ingestion/src/metadata/ingestion/models/topology.py +++ b/ingestion/src/metadata/ingestion/models/topology.py @@ -92,6 +92,13 @@ class NodeStage(BaseModel, Generic[T]): description="Enable this to get the entity from cached state in the context", ) + # Source state tracking for mark-as-deleted + source_state: Optional[str] = Field( + None, + description="Attribute name on the source for tracking entity FQNs seen during ingestion. " + "Used by mark_*_as_deleted post-process to identify stale entities.", + ) + class TopologyNode(BaseModel): """ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index 8edb0a6fc22c..4fb55e3bce61 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -142,6 +142,7 @@ class DashboardServiceTopology(ServiceTopology): consumer=["dashboard_service"], nullable=True, use_cache=True, + source_state="datamodel_source_state", ) ], ) @@ -174,6 +175,7 @@ class DashboardServiceTopology(ServiceTopology): store_all_in_context=True, clear_context=True, use_cache=True, + source_state="datamodel_source_state", ), NodeStage( type_=Dashboard, @@ -181,6 +183,7 @@ class DashboardServiceTopology(ServiceTopology): processor="yield_dashboard", consumer=["dashboard_service"], use_cache=True, + source_state="dashboard_source_state", ), NodeStage( type_=AddLineageRequest,