diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index 16cf71978620..96ae232545bd 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -19,6 +19,7 @@ import pytest from pydantic import TypeAdapter +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects from metadata.data_quality.api.models import TestCaseDefinition @@ -223,14 +224,35 @@ def test_lineage(self) -> None: if hasattr(self, "get_service_type"): service_type = self.get_service_type() + # Metadata ingest runs once; the lineage ingest is retried separately + # because DB audit logs (BigQuery INFORMATION_SCHEMA.JOBS_BY_PROJECT, + # MSSQL sys.dm_exec_query_stats, Postgres pg_stat_statements) have + # eventual consistency — newly executed queries may not appear for + # 30-120s, causing the first lineage run to return 0 records. self.run_command() - self.build_config_file( - E2EType.LINEAGE, - {"source": f"{service_type}-lineage", **self.get_lineage_config_args()}, + + # Re-run the full lineage ingest + assert on each retry so that the + # ingestion re-reads the audit log (which may now be populated) and + # writes fresh lineage edges to OpenMetadata before asserting. + @retry( + retry=retry_if_exception_type((AssertionError, IndexError)), + wait=wait_fixed(30), + stop=stop_after_attempt(3), + reraise=True, ) - result = self.run_command() - sink_status, source_status = self.retrieve_statuses(result) - self.assert_for_test_lineage(source_status, sink_status) + def _run_lineage_and_assert() -> None: + self.build_config_file( + E2EType.LINEAGE, + { + "source": f"{service_type}-lineage", + **self.get_lineage_config_args(), + }, + ) + result = self.run_command() + sink_status, source_status = self.retrieve_statuses(result) + self.assert_for_test_lineage(source_status, sink_status) + + _run_lineage_and_assert() @pytest.mark.order(12) def test_profiler_with_time_partition(self) -> None: