Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ingestion/src/metadata/ingestion/api/topology_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ def yield_and_update_context(
stage=stage, entity_name=entity_name
)

if stage.source_state:
source_state_set = getattr(self, stage.source_state, None)
if source_state_set is not None:
source_state_set.add(entity_fqn)

# If we don't want to write data in OM, we'll return what we fetch from the API.
# This will be applicable for service entities since we do not want to overwrite the data
same_fingerprint = False
Expand Down
7 changes: 7 additions & 0 deletions ingestion/src/metadata/ingestion/models/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ class NodeStage(BaseModel, Generic[T]):
description="Enable this to get the entity from cached state in the context",
)

# Source state tracking for mark-as-deleted
source_state: Optional[str] = Field(
None,
description="Attribute name on the source for tracking entity FQNs seen during ingestion. "
"Used by mark_*_as_deleted post-process to identify stale entities.",
)


class TopologyNode(BaseModel):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class DashboardServiceTopology(ServiceTopology):
consumer=["dashboard_service"],
nullable=True,
use_cache=True,
source_state="datamodel_source_state",
)
],
)
Expand Down Expand Up @@ -174,13 +175,15 @@ class DashboardServiceTopology(ServiceTopology):
store_all_in_context=True,
clear_context=True,
use_cache=True,
source_state="datamodel_source_state",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not even need this, just build it on the fly based on the context/type_

),
NodeStage(
type_=Dashboard,
context="dashboard",
processor="yield_dashboard",
consumer=["dashboard_service"],
use_cache=True,
source_state="dashboard_source_state",
),
NodeStage(
type_=AddLineageRequest,
Expand Down
Loading