Skip to content

Commit 08d2a26

Browse files
ulixius9claude
andcommitted
fix(ingestion): keep connection_obj in sync with engine across DB switches
self.connection_obj is set once in __init__ to the initial engine and never updated. After set_inspector rebuilds self.engine, connection_obj still points at the disposed original engine — pinning its dialect and compiled_cache alive for the source's lifetime. Rebind connection_obj when creating the new engine in set_inspector, and clear it in _release_engine so close() leaves nothing dangling. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 67c4cea commit 08d2a26

2 files changed

Lines changed: 13 additions & 0 deletions

File tree

ingestion/src/metadata/ingestion/source/database/common_db_source.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ def set_inspector(self, database_name: str) -> None:
158158
new_service_connection.database = database_name
159159
self.engine = get_connection(new_service_connection)
160160
self.session = create_and_bind_thread_safe_session(self.engine)
161+
self.connection_obj = self.engine
161162

162163
def _release_engine(self) -> None:
163164
# Close fairies first so _ConnectionRecord drops its pool reference;
@@ -185,6 +186,7 @@ def _release_engine(self) -> None:
185186
except Exception as exc: # pylint: disable=broad-except
186187
logger.warning(f"Failed to dispose engine: {exc}")
187188
self.engine = None
189+
self.connection_obj = None
188190

189191
def get_database_names(self) -> Iterable[str]:
190192
"""

ingestion/tests/unit/topology/database/test_common_db_source.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ class _ReleaseOnlySurrogate(CommonDbSourceService):
353353

354354
def __init__(self, engine=None): # pylint: disable=super-init-not-called
355355
self.engine = engine
356+
self.connection_obj = engine
356357
self._connection_map = {}
357358
self._inspector_map = {}
358359
self.session = None
@@ -454,6 +455,16 @@ def test_tolerates_already_closed_connection(self, surrogate):
454455
assert healthy.closed is True
455456
assert surrogate._connection_map == {}
456457

458+
def test_clears_connection_obj_alongside_engine(self, surrogate):
459+
# connection_obj is set in __init__ to the initial engine and used by
460+
# test_connection(); without clearing it on release, it pins the
461+
# original Engine alive for the source's lifetime even after dispose.
462+
assert surrogate.connection_obj is surrogate.engine
463+
464+
surrogate._release_engine()
465+
466+
assert surrogate.connection_obj is None
467+
457468
def test_closes_connections_from_arbitrary_thread_ids(self, surrogate):
458469
"""Key property of Option B: close-all, not detach-current-thread.
459470
Every fairy in _connection_map must close regardless of the caller's

0 commit comments

Comments
 (0)