Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions ingestion/tests/cli_e2e/base/test_cli_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import pytest
from pydantic import TypeAdapter
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed

from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.data_quality.api.models import TestCaseDefinition
Expand Down Expand Up @@ -223,14 +224,35 @@ def test_lineage(self) -> None:
if hasattr(self, "get_service_type"):
service_type = self.get_service_type()

# Metadata ingest runs once; the lineage ingest is retried separately
# because DB audit logs (BigQuery INFORMATION_SCHEMA.JOBS_BY_PROJECT,
# MSSQL sys.dm_exec_query_stats, Postgres pg_stat_statements) have
# eventual consistency — newly executed queries may not appear for
# 30-120s, causing the first lineage run to return 0 records.
Comment on lines +227 to +231
Copy link

Copilot AI Apr 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says audit logs can take 30–120s to become consistent, but the retry policy (wait_fixed(30) + stop_after_attempt(3)) only waits ~60s between the first and last attempt. Either adjust the retry window to actually cover the stated 120s, or update the comment so it matches the behavior (otherwise this can still be flaky and the rationale is misleading).

Copilot uses AI. Check for mistakes.
self.run_command()
self.build_config_file(
E2EType.LINEAGE,
{"source": f"{service_type}-lineage", **self.get_lineage_config_args()},

# Re-run the full lineage ingest + assert on each retry so that the
# ingestion re-reads the audit log (which may now be populated) and
# writes fresh lineage edges to OpenMetadata before asserting.
@retry(
retry=retry_if_exception_type((AssertionError, IndexError)),
wait=wait_fixed(30),
stop=stop_after_attempt(3),
reraise=True,
)
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_test_lineage(source_status, sink_status)
def _run_lineage_and_assert() -> None:
self.build_config_file(
E2EType.LINEAGE,
{
"source": f"{service_type}-lineage",
**self.get_lineage_config_args(),
},
)
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_test_lineage(source_status, sink_status)

_run_lineage_and_assert()

@pytest.mark.order(12)
def test_profiler_with_time_partition(self) -> None:
Expand Down
Loading