Skip to content
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
7116659
fix(restore): bulk + async restore for large entity hierarchies
harshach May 8, 2026
8940b22
perf(restore,delete): batched per-level findTo + bulk soft-delete cas…
harshach May 8, 2026
241dec3
style: formatter reflows after bulk restore changes
harshach May 9, 2026
b3172d4
review: address PR feedback on bulk restore + delete
harshach May 9, 2026
1e00abb
style: ruff reflow on AsyncJobResponse jobId guard
harshach May 9, 2026
6235b86
fix(restore,delete): walk children even when parent already at target…
harshach May 9, 2026
6137d0d
review: pre-validate async restore + batch chart restore
harshach May 9, 2026
8d2b9e6
fix(restore,delete): pre-validate async restore + batch chart restore
harshach May 9, 2026
8788d20
Merge branch 'main' into harshach/review-issue-4004
harshach May 11, 2026
6eb28d3
Merge remote-tracking branch 'origin/main' into harshach/review-issue…
harshach May 12, 2026
b6357c2
fix(restore,delete): pre-validate async restore + batch chart restore
harshach May 12, 2026
ce9167a
Merge remote-tracking branch 'origin/harshach/review-issue-4004' into…
harshach May 12, 2026
96c5228
review: dispatch bulk lifecycle event + style cleanup
harshach May 12, 2026
d0d507c
feat(ingestion): server-side async delete + restore from legacy client
harshach May 12, 2026
a8f2583
fix(async): use tryAcquire so HTTP threads never block on permit
harshach May 12, 2026
3394279
fix(restore,delete): NPE guard + universal async + SDK rejection mess…
harshach May 13, 2026
4351709
simplify: drop semaphore-based bounding from AsyncService
harshach May 13, 2026
09250ff
review: address remaining Copilot threads on async restore
harshach May 13, 2026
4b59bb7
review: address remaining Copilot threads on async restore
harshach May 13, 2026
3e2c4d2
Merge branch 'main' into harshach/review-issue-4004
harshach May 13, 2026
c9841b1
perf(delete): bulk hard-delete cascade across all entity hierarchies
harshach May 14, 2026
6794541
review: address Copilot threads on restoreEntity guard + soft-delete …
harshach May 14, 2026
4d42d56
fix(restore): load HAS-related children with Include.ALL during restore
harshach May 15, 2026
b07804e
fix(ci): invoke postDelete for cascade-deleted descendants + Python t…
harshach May 15, 2026
b6e8cb1
Merge branch 'main' into harshach/review-issue-4004
harshach May 16, 2026
2dde513
Merge branch 'main' into harshach/review-issue-4004
harshach May 17, 2026
86d5129
fix(restore/delete): address PR review feedback on bulk + async hooks
harshach May 17, 2026
46fc43f
fix(delete): populate relation fields before postDelete in bulk hard …
harshach May 17, 2026
2755b8f
fix(delete): mark NotFoundCache for bulk-hard-deleted entities
harshach May 17, 2026
15b8b70
fix(testCase): drop unreachable null check, wrap getEntity in if-block
harshach May 17, 2026
67f21d2
test(conftest): treat 404 as success in _safe_delete teardown helper
harshach May 18, 2026
9a27caa
test(conftest): route datalake + sdk teardown through _safe_delete
harshach May 18, 2026
a4ab2b2
fix(restore): walk CONTAINS + PARENT_OF in restoreChildren for cascad…
harshach May 18, 2026
e6af95b
perf(feed): batch deleteByAbout thread cleanup through IN + deleteThr…
harshach May 19, 2026
37bff3c
fix(feed): chunk deleteByAbout IN-list expansions to stay under DB pa…
harshach May 19, 2026
d99e9ec
fix(feed): restore best-effort semantics around batched thread cleanup
harshach May 19, 2026
6baa6a9
fix(entity-dao): chunk deleteByIds IN-list to match findEntitiesByIds
harshach May 19, 2026
333e87e
Merge branch 'main' into harshach/review-issue-4004
harshach May 19, 2026
8766a90
fix(dao): hoist IN-list chunk size + escape backslashes in updateFqn
harshach May 19, 2026
fe7dfbc
fix(bulk-update): hydrate HAS-relationship fields with Include.ALL be…
harshach May 19, 2026
a0f28d7
Merge branch 'main' into harshach/review-issue-4004
harshach May 19, 2026
6eaae5b
review: address remaining open review threads
harshach May 19, 2026
b46b929
Merge branch 'main' into harshach/review-issue-4004
harshach May 19, 2026
4b78c7e
fix(bulk-delete): fire deleteFromSearch per cascade-deleted descendant
harshach May 20, 2026
613b237
Merge branch 'main' into harshach/review-issue-4004
harshach May 20, 2026
f06b219
fix(escape): two-layer escape for MySQL REGEXP_REPLACE replacement, p…
harshach May 20, 2026
1df217d
feat(sdk-java): extend fluent restore() to all data-asset fluents via…
harshach May 20, 2026
fc52438
fix(feed): defensive de-dup of thread ids in legacy feed cleanup
harshach May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions ingestion/src/metadata/ingestion/api/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Delete methods
"""

import os
import traceback
from typing import Dict, Iterable, List, Optional, Type # noqa: UP035

Expand All @@ -25,13 +26,24 @@

logger = utils_logger()

# Env var that opts every connector into the server-side async delete cascade. When set,
# mark-deletion calls fire DELETE /<entity>/async/{id}?recursive=true and return 202 + a
# jobId immediately, so ingestion does not block on the server-side cascade (issue #4003).
# Explicit dispatch_async= passed to the generators overrides this default.
DELETE_DISPATCH_ASYNC_ENV = "OM_INGESTION_DELETE_ASYNC"


def _default_dispatch_async() -> bool:
return os.getenv(DELETE_DISPATCH_ASYNC_ENV, "").lower() in {"true", "1", "yes", "on"}


def delete_entity_from_source(
metadata: OpenMetadata,
entity_type: Type[T], # noqa: UP006
entity_source_state,
mark_deleted_entity: bool = True,
params: Optional[Dict[str, str]] = None, # noqa: UP006, UP045
dispatch_async: Optional[bool] = None, # noqa: UP045
) -> Iterable[Either[DeleteEntity]]:
"""
Method to delete the entities
Expand All @@ -40,16 +52,22 @@ def delete_entity_from_source(
:param entity_source_state: Current state of the service
:param mark_deleted_entity: Option to mark the entity as deleted or not
:param params: param to fetch the entity state
:param dispatch_async: Route the sink delete through the server-side async endpoint
(returns 202 + jobId, runs cascade on the server's executor) so ingestion does
not block on large hierarchies — see issue #4003.
"""
use_async = dispatch_async if dispatch_async is not None else _default_dispatch_async()
try:
entity_state = metadata.list_all_entities(entity=entity_type, params=params)
for entity in entity_state:
if str(entity.fullyQualifiedName.root) not in entity_source_state:
yield Either(
left=None,
right=DeleteEntity(
entity=entity,
mark_deleted_entities=mark_deleted_entity,
)
dispatch_async=use_async,
),
)
except Exception as exc:
yield Either(
Expand All @@ -66,19 +84,29 @@ def delete_entity_by_name(
entity_type: Type[T], # noqa: UP006
entity_names: List[str], # noqa: UP006
mark_deleted_entity: bool = True,
dispatch_async: Optional[bool] = None, # noqa: UP045
) -> Iterable[Either[DeleteEntity]]:
"""
Method to delete the entites contained on a given list
Method to delete the entities contained on a given list
:param metadata: OMeta client
:param entity_type: Pydantic Entity model
:param entity_names: List of FullyQualifiedNames of the entities to be deleted
:param mark_deleted_entity: Option to mark the entity as deleted or not
:param dispatch_async: see :func:`delete_entity_from_source`
"""
use_async = dispatch_async if dispatch_async is not None else _default_dispatch_async()
try:
for entity_name in entity_names:
entity = metadata.get_by_name(entity=entity_type, fqn=entity_name)
if entity:
yield Either(right=DeleteEntity(entity=entity, mark_deleted_entities=mark_deleted_entity))
yield Either(
left=None,
right=DeleteEntity(
entity=entity,
mark_deleted_entities=mark_deleted_entity,
dispatch_async=use_async,
),
)
except Exception as exc:
yield Either(
left=StackTraceError(
Expand Down
8 changes: 6 additions & 2 deletions ingestion/src/metadata/ingestion/models/delete_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@


class DeleteEntity(BaseModel):
"""
Entity Reference of the entity to be deleted
"""Entity reference for a deletion candidate emitted by the ingestion flow.

``dispatch_async`` flips the sink to the server-side async delete endpoint
(``DELETE /<entity>/async/{id}``) instead of the synchronous one, so ingestion
isn't blocked on the cascade for large hierarchies (issue #4003).
"""

entity: Entity
mark_deleted_entities: Optional[bool] = False # noqa: UP045
dispatch_async: Optional[bool] = False # noqa: UP045
38 changes: 38 additions & 0 deletions ingestion/src/metadata/ingestion/ometa/ometa_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,27 @@ def delete(
url += f"&hardDelete={str(hard_delete).lower()}"
self.client.delete(url)

def delete_async(
self,
entity: Type[T], # noqa: UP006
entity_id: Union[str, basic.Uuid], # noqa: UP007
recursive: bool = False,
hard_delete: bool = False,
) -> Optional[dict]: # noqa: UP045
"""Server-side async delete.

Issues ``DELETE /<entity>/async/{id}?recursive=...&hardDelete=...`` (the dedicated
async-delete endpoint defined by ``EntityResource.deleteByIdAsync``) and returns
the 202 payload ``{"jobId": ..., "message": ...}``. The actual cascade runs on the
server's executor so ingestion can avoid blocking on large hierarchies. Caller is
responsible for tracking the returned ``jobId`` if it needs completion confirmation.
"""
url = f"{self.get_suffix(entity)}/async/{model_str(entity_id)}"
url += f"?recursive={str(recursive).lower()}"
url += f"&hardDelete={str(hard_delete).lower()}"
response = self.client.delete(url)
return response if isinstance(response, dict) else None

def restore(
self,
entity: Type[T], # noqa: UP006
Expand Down Expand Up @@ -794,6 +815,23 @@ def restore(
)
return None

def restore_async(
self,
entity: Type[T], # noqa: UP006
entity_id: Union[str, basic.Uuid], # noqa: UP007
) -> Optional[dict]: # noqa: UP045
"""Server-side async restore.

Issues ``PUT /<entity>/restore?async=true`` and returns the 202 payload
``{"jobId": ..., "message": ...}``. Use this when restoring entities with large
subtrees so ingestion doesn't block on the cascade (issue #4003). Caller is
responsible for tracking the returned ``jobId`` if it needs completion confirmation.
"""
url = f"{self.get_suffix(entity)}/restore?async=true"
data = {"id": model_str(entity_id)}
response = self.client.put(url, json=data)
return response if isinstance(response, dict) else None

def compute_percentile(self, entity: Union[Type[T], str], date: str) -> None: # noqa: UP006, UP007
"""
Compute an entity usage percentile
Expand Down
36 changes: 30 additions & 6 deletions ingestion/src/metadata/ingestion/sink/metadata_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,12 +584,36 @@ def write_users(self, record: OMetaUserProfile) -> Either[User]:

@_run_dispatch.register
def delete_entity(self, record: DeleteEntity) -> Either[Entity]:
self.metadata.delete(
entity=type(record.entity),
entity_id=record.entity.id,
recursive=record.mark_deleted_entities,
)
return Either(right=record)
# record.entity is declared as a bare pydantic BaseModel; the runtime value is a
# generated entity that exposes `id` and `fullyQualifiedName`, but basedpyright can't
# see those attributes through the BaseModel alias. Pull them via getattr so the type
# checker stays quiet without changing the runtime behavior.
entity_obj: Any = record.entity
entity_id = entity_obj.id
fqn = entity_obj.fullyQualifiedName.root
recursive = bool(record.mark_deleted_entities)
if record.dispatch_async:
# Server-side async cascade — returns 202 + jobId immediately so ingestion
# doesn't block on large subtrees (issue #4003). The actual work runs on the
# server's executor; we surface the jobId in the log for operator correlation.
response = self.metadata.delete_async(
entity=type(record.entity),
entity_id=entity_id,
recursive=recursive,
)
job_id = (response or {}).get("jobId")
logger.debug(
"Dispatched async delete for %s (jobId=%s)",
fqn,
job_id,
)
else:
self.metadata.delete(
entity=type(record.entity),
entity_id=entity_id,
recursive=recursive,
)
return Either(left=None, right=record)

@_run_dispatch.register
def write_pipeline_status(self, record: OMetaPipelineStatus) -> Either[PipelineStatus]:
Expand Down
98 changes: 97 additions & 1 deletion ingestion/src/metadata/sdk/entities/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,54 @@ def execute_async(self) -> Any:
return export_async(entity=self.entity, name=self.name)


@dataclass
class AsyncJobResponse:
"""Response shape for server-side async operations.

Returned with HTTP 202 Accepted by endpoints such as ``PUT /restore?async=true``
(issue #4003). The ``job_id`` correlates with WebSocket notifications on the
``restoreEntityChannel`` channel emitted when the work completes.
"""

job_id: str
message: Optional[str] = None # noqa: UP045

@classmethod
def from_response(cls, payload: Any) -> "AsyncJobResponse": # noqa: UP037
if isinstance(payload, AsyncJobResponse):
return payload
if isinstance(payload, dict):
job_id = payload.get("jobId")
if not job_id:
raise ValueError(f"Async response is missing a non-empty jobId: {payload!r}")
return cls(job_id=str(job_id), message=payload.get("message"))
raise TypeError(f"Cannot coerce {type(payload).__name__} into AsyncJobResponse")


@dataclass
class RestoreOperation(Generic[TEntity]):
"""Fluent restore builder with optional server-side async dispatch.

Mirrors the Java SDK's ``Tables.find(id).restore().async().execute()`` style.
``execute()`` runs the synchronous restore and returns the restored entity;
``with_async()`` switches to the server-side async path that returns an
:class:`AsyncJobResponse` with a job id (issue #4003).
"""

entity_cls: Any # the BaseEntity subclass that owns this operation
entity_id: str
async_enabled: bool = field(default=False, init=False)

def with_async(self) -> "RestoreOperation[TEntity]": # noqa: UP037
self.async_enabled = True
return self

def execute(self) -> Any:
if self.async_enabled:
return self.entity_cls._restore_server_async(self.entity_id)
return self.entity_cls._restore_sync(self.entity_id)


@dataclass
class CsvImportOperation(Generic[TEntity]):
"""Stateful helper for CSV import operations."""
Expand Down Expand Up @@ -388,8 +436,12 @@ def remove_followers(cls, entity_id: UuidLike, follower_ids: Sequence[UuidLike])

@classmethod
def restore(cls, entity_id: UuidLike) -> TEntity:
"""Restore a soft-deleted entity."""
"""Restore a soft-deleted entity (synchronous)."""

return cls._restore_sync(entity_id)

@classmethod
def _restore_sync(cls, entity_id: UuidLike) -> TEntity:
client = cls._get_client()
rest_client = cls._get_rest_client(client)
endpoint = cls._get_endpoint_path(client)
Expand All @@ -399,6 +451,50 @@ def restore(cls, entity_id: UuidLike) -> TEntity:
)
return cls._coerce_entity(response)

@classmethod
def restore_async(cls, entity_id: UuidLike) -> "AsyncJobResponse": # noqa: UP037
"""Trigger a server-side async restore.

Issues ``PUT /restore?async=true`` and returns the 202 Accepted payload
containing the job id. Use this for hierarchies large enough that the
synchronous response would exceed proxy / ALB idle timeouts (issue #4003).
"""

return cls._restore_server_async(entity_id)

@classmethod
def _restore_server_async(cls, entity_id: UuidLike) -> "AsyncJobResponse": # noqa: UP037
client = cls._get_client()
rest_client = cls._get_rest_client(client)
endpoint = cls._get_endpoint_path(client)
response = rest_client.put(
f"{endpoint}/restore?async=true",
json={"id": cls._stringify_identifier(entity_id)},
)
Comment thread
harshach marked this conversation as resolved.
try:
return AsyncJobResponse.from_response(response)
except ValueError as missing_job_id:
# Defensive guard for older servers that don't honor ?async=true (or any
# future case where the resource short-circuits with a 200 + entity payload).
# Without this, the generic AsyncJobResponse jobId-missing error would be
# confusing.
raise ValueError(
f"Server did not return an async job for {endpoint}/restore. "
f"The server may be older than the async-restore release."
) from missing_job_id

@classmethod
def restore_request(cls, entity_id: UuidLike) -> "RestoreOperation[TEntity]": # noqa: UP037
"""Return a fluent restore builder.

Examples::

restored = Table.restore_request(table_id).execute()
job = Table.restore_request(table_id).with_async().execute()
"""

return RestoreOperation(entity_cls=cls, entity_id=cls._stringify_identifier(entity_id))

@classmethod
def update_custom_properties(cls, identifier: UuidLike):
"""Convenience accessor for custom property updates by entity id."""
Expand Down
18 changes: 16 additions & 2 deletions ingestion/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,19 @@ def _run(workflow_type: Type[IngestionWorkflow], config, raise_from_status=True)


def _safe_delete(metadata, entity, entity_id, retries=3, **kwargs):
"""Delete with retry logic to handle transient server errors during parallel teardown."""
"""Delete with retry logic to handle transient server errors during parallel teardown.

A 404 here means the entity is already gone (e.g., wiped as part of an earlier
cascade or another worker's teardown); treat it as success rather than retrying.
"""
for attempt in range(retries):
try:
metadata.delete(entity=entity, entity_id=entity_id, **kwargs)
return # noqa: TRY300
except Exception:
except Exception as exc:
if _is_not_found(exc):
logger.debug("Skipping %s %s delete — already gone", entity.__name__, entity_id)
return
if attempt < retries - 1:
logger.warning(
"Retry %d/%d: delete %s %s",
Expand All @@ -185,6 +192,13 @@ def _safe_delete(metadata, entity, entity_id, retries=3, **kwargs):
raise


def _is_not_found(exc: BaseException) -> bool:
status = getattr(getattr(exc, "response", None), "status_code", None)
if status == 404:
return True
return "404" in str(exc)


@pytest.fixture(scope="module")
def db_service(metadata, create_service_request, unmask_password):
service_entity = metadata.create_or_update(data=create_service_request)
Expand Down
9 changes: 8 additions & 1 deletion ingestion/tests/integration/datalake/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow

from ..conftest import _safe_delete # noqa: TID252
from ..containers import MinioContainerConfigs, get_minio_container # noqa: TID252
from ..integration_base import generate_name # noqa: TID252

Expand Down Expand Up @@ -207,7 +208,13 @@ def run_ingestion(metadata, ingestion_config, datalake_service_name):
yield
db_service = metadata.get_by_name(entity=DatabaseService, fqn=datalake_service_name)
if db_service:
metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True)
_safe_delete(
metadata,
entity=DatabaseService,
entity_id=db_service.id,
recursive=True,
hard_delete=True,
)


@pytest.fixture(scope="class")
Expand Down
Loading
Loading