Skip to content

Commit 26953b0

Browse files
fix(e2e): improve lineage ingestion reliability with retry mechanism (#27058)
1 parent 9e17f90 commit 26953b0

1 file changed

Lines changed: 28 additions & 6 deletions

File tree

ingestion/tests/cli_e2e/base/test_cli_db.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import pytest
2121
from pydantic import TypeAdapter
22+
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
2223

2324
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
2425
from metadata.data_quality.api.models import TestCaseDefinition
@@ -223,14 +224,35 @@ def test_lineage(self) -> None:
223224
if hasattr(self, "get_service_type"):
224225
service_type = self.get_service_type()
225226

227+
# Metadata ingest runs once; the lineage ingest is retried separately
228+
# because DB audit logs (BigQuery INFORMATION_SCHEMA.JOBS_BY_PROJECT,
229+
# MSSQL sys.dm_exec_query_stats, Postgres pg_stat_statements) have
230+
# eventual consistency — newly executed queries may not appear for
231+
# 30-120s, causing the first lineage run to return 0 records.
226232
self.run_command()
227-
self.build_config_file(
228-
E2EType.LINEAGE,
229-
{"source": f"{service_type}-lineage", **self.get_lineage_config_args()},
233+
234+
# Re-run the full lineage ingest + assert on each retry so that the
235+
# ingestion re-reads the audit log (which may now be populated) and
236+
# writes fresh lineage edges to OpenMetadata before asserting.
237+
@retry(
238+
retry=retry_if_exception_type((AssertionError, IndexError)),
239+
wait=wait_fixed(30),
240+
stop=stop_after_attempt(3),
241+
reraise=True,
230242
)
231-
result = self.run_command()
232-
sink_status, source_status = self.retrieve_statuses(result)
233-
self.assert_for_test_lineage(source_status, sink_status)
243+
def _run_lineage_and_assert() -> None:
244+
self.build_config_file(
245+
E2EType.LINEAGE,
246+
{
247+
"source": f"{service_type}-lineage",
248+
**self.get_lineage_config_args(),
249+
},
250+
)
251+
result = self.run_command()
252+
sink_status, source_status = self.retrieve_statuses(result)
253+
self.assert_for_test_lineage(source_status, sink_status)
254+
255+
_run_lineage_and_assert()
234256

235257
@pytest.mark.order(12)
236258
def test_profiler_with_time_partition(self) -> None:

0 commit comments

Comments
 (0)