Skip to content

Commit 3dae659

Browse files
committed
Merge branch 'fix/cascading-testcase-status-cleanup-27418' of https://github.com/Megh-Shah-08/OpenMetadata into fix/cascading-testcase-status-cleanup-27418
2 parents 9ffc85f + ec7c322 commit 3dae659

225 files changed

Lines changed: 4715 additions & 3532 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/py-cli-e2e-tests.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,12 @@ jobs:
182182
echo "import os" >> $SITE_CUSTOMIZE_PATH
183183
echo "try:" >> $SITE_CUSTOMIZE_PATH
184184
echo " import coverage" >> $SITE_CUSTOMIZE_PATH
185-
echo " os.environ['COVERAGE_PROCESS_START'] = 'ingestion/pyproject.toml'" >> $SITE_CUSTOMIZE_PATH
185+
echo " os.environ['COVERAGE_PROCESS_START'] = os.path.join(os.environ.get('GITHUB_WORKSPACE', os.getcwd()), 'ingestion', 'pyproject.toml')" >> $SITE_CUSTOMIZE_PATH
186186
echo " coverage.process_startup()" >> $SITE_CUSTOMIZE_PATH
187187
echo "except ImportError:" >> $SITE_CUSTOMIZE_PATH
188188
echo " pass" >> $SITE_CUSTOMIZE_PATH
189-
coverage run --rcfile ingestion/pyproject.toml -a --branch -m pytest -c ingestion/pyproject.toml --junitxml=ingestion/junit/test-results-$E2E_TEST.xml --ignore=ingestion/tests/unit/source ingestion/tests/cli_e2e/test_cli_$E2E_TEST.py
190-
coverage combine --data-file=.coverage.$E2E_TEST --rcfile=ingestion/pyproject.toml --keep -a .coverage*
189+
coverage run --rcfile ingestion/pyproject.toml --branch -m pytest -c ingestion/pyproject.toml --junitxml=ingestion/junit/test-results-$E2E_TEST.xml --ignore=ingestion/tests/unit/source ingestion/tests/cli_e2e/test_cli_$E2E_TEST.py
190+
coverage combine --data-file=.coverage.$E2E_TEST --rcfile=ingestion/pyproject.toml --keep .coverage*
191191
coverage report --rcfile ingestion/pyproject.toml --data-file .coverage.$E2E_TEST || true
192192
193193
- name: Upload coverage artifact for Python unit tests
@@ -293,7 +293,7 @@ jobs:
293293
done
294294
source env/bin/activate
295295
cd ingestion
296-
coverage combine --rcfile=pyproject.toml --keep -a .coverage*
296+
coverage combine --rcfile=pyproject.toml --keep .coverage*
297297
coverage xml --rcfile=pyproject.toml --data-file=.coverage
298298
shell: bash
299299

ingestion/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ provider_info = "airflow_provider_openmetadata:get_provider_config"
4343
source = ["metadata"]
4444
relative_files = true
4545
branch = true
46+
parallel = true
4647

4748
# Remap installed-package paths back to the source tree so that
4849
# ``coverage combine`` (in a lightweight CI job without the package

ingestion/src/metadata/ingestion/source/dashboard/ssrs/client.py

Lines changed: 65 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
"""
1212
SSRS REST client
1313
"""
14-
import traceback
15-
from typing import List, Optional, Union
14+
from typing import Iterable, Iterator, Optional, Union
1615

1716
import requests
17+
from requests.adapters import HTTPAdapter
1818
from requests_ntlm import HttpNtlmAuth
19+
from urllib3.util.retry import Retry
1920

2021
from metadata.generated.schema.entity.services.connections.dashboard.ssrsConnection import (
2122
SsrsConnection,
@@ -33,8 +34,14 @@
3334
logger = ingestion_logger()
3435

3536
API_VERSION = "api/v2.0"
36-
DEFAULT_TIMEOUT = 30
37+
CONNECT_TIMEOUT = 10
38+
READ_TIMEOUT = 120
3739
PAGE_SIZE = 100
40+
MAX_RETRIES = 2
41+
BACKOFF_FACTOR = 1
42+
RETRY_STATUS_CODES = (500, 502, 503, 504)
43+
REPORT_SELECT_FIELDS = "Id,Name,Path,Description,Type,Hidden,HasDataSources"
44+
FOLDER_SELECT_FIELDS = "Id,Name,Path"
3845

3946

4047
class SsrsClient:
@@ -53,17 +60,51 @@ def __init__(
5360
self.session.headers.update({"Accept": "application/json"})
5461
if verify_ssl is not None:
5562
self.session.verify = verify_ssl
63+
retry = Retry(
64+
total=MAX_RETRIES,
65+
connect=MAX_RETRIES,
66+
read=MAX_RETRIES,
67+
status=MAX_RETRIES,
68+
backoff_factor=BACKOFF_FACTOR,
69+
status_forcelist=RETRY_STATUS_CODES,
70+
allowed_methods=frozenset(["GET"]),
71+
raise_on_status=False,
72+
)
73+
adapter = HTTPAdapter(max_retries=retry)
74+
self.session.mount("http://", adapter)
75+
self.session.mount("https://", adapter)
5676

5777
def close(self) -> None:
5878
if self.session:
5979
self.session.close()
6080

6181
def _get(self, path: str, params: Optional[dict] = None) -> dict:
6282
url = f"{self.base_url}{path}"
63-
resp = self.session.get(url, timeout=DEFAULT_TIMEOUT, params=params)
83+
resp = self.session.get(
84+
url, timeout=(CONNECT_TIMEOUT, READ_TIMEOUT), params=params
85+
)
6486
resp.raise_for_status()
6587
return resp.json()
6688

89+
def _paginate(self, path: str, params: dict, resource_label: str) -> Iterable[dict]:
90+
"""Yield pages from an OData endpoint. Any per-page failure raises
91+
``SourceConnectionException`` so callers can surface it instead of
92+
producing a silently truncated result set."""
93+
skip = 0
94+
while True:
95+
page_params = {**params, "$top": str(PAGE_SIZE), "$skip": str(skip)}
96+
try:
97+
data = self._get(path, params=page_params)
98+
except Exception as exc:
99+
raise SourceConnectionException(
100+
f"Failed to fetch SSRS {resource_label} at skip={skip}: {exc}"
101+
) from exc
102+
yield data
103+
value = data.get("value") or []
104+
if len(value) < PAGE_SIZE:
105+
return
106+
skip += PAGE_SIZE
107+
67108
def test_access(self) -> None:
68109
try:
69110
self._get("/Folders", params={"$top": "1"})
@@ -72,40 +113,26 @@ def test_access(self) -> None:
72113
f"Failed to connect to SSRS: {exc}"
73114
) from exc
74115

75-
def get_folders(self) -> List[SsrsFolder]:
116+
def test_get_reports(self) -> None:
76117
try:
77-
results: List[SsrsFolder] = []
78-
skip = 0
79-
while True:
80-
data = self._get(
81-
"/Folders", params={"$top": str(PAGE_SIZE), "$skip": str(skip)}
82-
)
83-
response = SsrsFolderListResponse(**data)
84-
results.extend(response.value)
85-
if len(response.value) < PAGE_SIZE:
86-
break
87-
skip += PAGE_SIZE
88-
return results
118+
self._get("/Reports", params={"$top": "1"})
89119
except Exception as exc:
90-
logger.debug(traceback.format_exc())
91-
logger.warning("Failed to fetch SSRS folders: %s", exc)
92-
return []
120+
raise SourceConnectionException(
121+
f"Failed to fetch SSRS reports: {exc}"
122+
) from exc
93123

94-
def get_reports(self) -> List[SsrsReport]:
95-
try:
96-
results: List[SsrsReport] = []
97-
skip = 0
98-
while True:
99-
data = self._get(
100-
"/Reports", params={"$top": str(PAGE_SIZE), "$skip": str(skip)}
101-
)
102-
response = SsrsReportListResponse(**data)
103-
results.extend(response.value)
104-
if len(response.value) < PAGE_SIZE:
105-
break
106-
skip += PAGE_SIZE
107-
return results
108-
except Exception as exc:
109-
logger.debug(traceback.format_exc())
110-
logger.warning("Failed to fetch SSRS reports: %s", exc)
111-
return []
124+
def get_folders(self) -> Iterator[SsrsFolder]:
125+
params = {
126+
"$orderby": "Id",
127+
"$select": FOLDER_SELECT_FIELDS,
128+
}
129+
for data in self._paginate("/Folders", params, "folders"):
130+
yield from SsrsFolderListResponse(**data).value
131+
132+
def get_reports(self) -> Iterator[SsrsReport]:
133+
params = {
134+
"$orderby": "Id",
135+
"$select": REPORT_SELECT_FIELDS,
136+
}
137+
for data in self._paginate("/Reports", params, "reports"):
138+
yield from SsrsReportListResponse(**data).value

ingestion/src/metadata/ingestion/source/dashboard/ssrs/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def test_connection(
4646
) -> TestConnectionResult:
4747
test_fn = {
4848
"CheckAccess": client.test_access,
49-
"GetDashboards": client.get_reports,
49+
"GetDashboards": client.test_get_reports,
5050
}
5151

5252
return test_connection_steps(

ingestion/src/metadata/ingestion/source/dashboard/ssrs/metadata.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
SSRS source module
1313
"""
1414
import traceback
15-
from typing import Any, Dict, Iterable, List, Optional
15+
from typing import Any, Dict, Iterable, Optional
1616

1717
from metadata.generated.schema.api.data.createChart import CreateChartRequest
1818
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
@@ -77,13 +77,15 @@ def __init__(
7777
self.folder_path_map: Dict[str, str] = {}
7878

7979
def prepare(self):
80-
folders = self.client.get_folders()
81-
self.folder_path_map = {folder.path: folder.name for folder in folders}
80+
self.folder_path_map = {
81+
folder.path: folder.name for folder in self.client.get_folders()
82+
}
8283
return super().prepare()
8384

84-
def get_dashboards_list(self) -> Optional[List[SsrsReport]]:
85-
reports = self.client.get_reports()
86-
return [r for r in reports if not r.hidden]
85+
def get_dashboards_list(self) -> Iterable[SsrsReport]:
86+
for report in self.client.get_reports():
87+
if not report.hidden:
88+
yield report
8789

8890
def get_dashboard_name(self, dashboard: SsrsReport) -> str:
8991
return dashboard.name

ingestion/src/metadata/ingestion/source/database/common_db_source.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest
6363
from metadata.ingestion.ometa.ometa_api import OpenMetadata
6464
from metadata.ingestion.source.connections import get_connection
65-
from metadata.ingestion.source.connections_utils import kill_active_connections
6665
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
6766
from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin
6867
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
@@ -152,15 +151,42 @@ def set_inspector(self, database_name: str) -> None:
152151
:param database_name: new database to set
153152
"""
154153

155-
kill_active_connections(self.engine)
154+
self._release_engine()
156155
logger.info(f"Ingesting from database: {database_name}")
157156

158157
new_service_connection = deepcopy(self.service_connection)
159158
new_service_connection.database = database_name
160159
self.engine = get_connection(new_service_connection)
160+
self.session = create_and_bind_thread_safe_session(self.engine)
161+
self.connection_obj = self.engine
161162

162-
self._connection_map = {} # Lazy init as well
163+
def _release_engine(self) -> None:
164+
# Close fairies first so _ConnectionRecord drops its pool reference;
165+
# dispose alone leaves them orphaned and causes _finalize_fairy
166+
# RecursionErrors at GC time. Clearing _inspector_map is what
167+
# actually frees Inspector.info_cache — dispose() does not.
168+
if getattr(self, "engine", None) is None:
169+
return
170+
for conn in self._connection_map.values():
171+
try:
172+
conn.close()
173+
except Exception: # pylint: disable=broad-except
174+
logger.debug("Connection already closed", exc_info=True)
175+
self._connection_map = {}
163176
self._inspector_map = {}
177+
session = getattr(self, "session", None)
178+
if session is not None:
179+
try:
180+
session.remove()
181+
except Exception: # pylint: disable=broad-except
182+
logger.debug("Session cleanup failed", exc_info=True)
183+
self.session = None
184+
try:
185+
self.engine.dispose()
186+
except Exception as exc: # pylint: disable=broad-except
187+
logger.warning(f"Failed to dispose engine: {exc}")
188+
self.engine = None
189+
self.connection_obj = None
164190

165191
def get_database_names(self) -> Iterable[str]:
166192
"""
@@ -780,14 +806,10 @@ def inspector(self) -> Inspector:
780806
return self._inspector_map[thread_id]
781807

782808
def close(self):
783-
if self.connection is not None:
784-
self.connection.close()
785-
for connection in self._connection_map.values():
786-
connection.close()
809+
self._release_engine()
787810
if hasattr(self, "ssl_manager") and self.ssl_manager:
788811
self.ssl_manager = cast(SSLManager, self.ssl_manager)
789812
self.ssl_manager.cleanup_temp_files()
790-
self.engine.dispose()
791813

792814
def fetch_table_tags(
793815
self,

0 commit comments

Comments
 (0)