Skip to content

Commit 996b3d5

Browse files
authored
[DBMON-6589] Clean up dangling references on cancel to improve GC (#23640)
* Close and dereference main db connection on cancel * Remove unused db_pool reference * Avoid extra reference to db_pool * Deference cyclical check variable on shutdown * Clear functools.partial closures in cancel() to release pool references _dynamic_queries holds partial(db_pool.get_connection, ...) closures and _query_manager.executor holds a partial(self.execute_query_raw, db=self.db) closure. Both root the check instance and pool manager in memory after cancel. * Null _job_loop_future after join in cancel() to release completed Futures * dbms="postgres", _compiled_patterns_cache ⌘K to generate command ok next commit message suggestion Null _job_loop_future after join in cancel() to release completed Futures Completed Future objects are lightweight but keep the async job reachable in the reference graph. Nulling them immediately after .result() returns removes an unnecessary link that delays GC of the check instance. ok next Let me check the current state of each _shutdown method and the caches each job holds. Now I'll update each _shutdown to clear caches. StatementMetrics -- _full_statement_text_cache, _state, _query_calls_cache, _baseline_metrics: self._check = None self._full_statement_text_cache = None self._state = None self._query_calls_cache = None self._baseline_metrics = None except Exception: StatementSamples -- _collection_strategy_cache, _explain_errors_cache, _explained_statements_ratelimiter, _seen_samples_ratelimiter, _raw_statement_text_cache: self._check = None self._collection_strategy_cache = None self._explain_errors_cache = None self._explained_statements_ratelimiter = None self._seen_samples_ratelimiter = None self._raw_statement_text_cache = None except Exception: Metadata -- _schema_collector (holds back-ref to check) and _compiled_patterns_cache: self._check = None self._schema_collector = None self._compiled_patterns_cache = None except Exception: DataObservability only has a small _last_execution dict -- not worth clearing. Its _shutdown already just nulls _check, which is sufficient. All pre-existing. Done. Ready for your commit. commit message suggestion Clear caches in async job shutdown callbacks to free memory eagerly * Add changelog * Only invoke shutdown logic is cancel called * Update tests * Call shutdown explicitly * Find and break cyclical references on cancel
1 parent 9b11284 commit 996b3d5

10 files changed

Lines changed: 158 additions & 40 deletions

File tree

postgres/changelog.d/23640.fixed

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Close dangling connections and break reference cycles on check cancel to reduce memory retention when checks are restarted or rescheduled

postgres/datadog_checks/postgres/data_observability.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
3838
job_name="data-observability",
3939
)
4040

41+
def _shutdown(self):
42+
self._check = None
43+
4144
@property
4245
def _do_config(self):
4346
return self._config.data_observability

postgres/datadog_checks/postgres/metadata.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
108108
)
109109
self._check = check
110110
self._config = config
111-
self.db_pool = self._check.db_pool
112111
self._collect_pg_settings_enabled = config.collect_settings.enabled
113112
self._collect_extensions_enabled = self._collect_pg_settings_enabled
114113
self._collect_schemas_enabled = config.collect_schemas.enabled
@@ -122,6 +121,11 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
122121
self._tags_no_db = None
123122
self.tags = None
124123

124+
def _shutdown(self):
125+
self._check = None
126+
self._schema_collector = None
127+
self._compiled_patterns_cache = None
128+
125129
def _dbtags(self, db, *extra_tags):
126130
"""
127131
Returns the default instance tags with the initial "db" tag replaced with the provided tag

postgres/datadog_checks/postgres/postgres.py

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from datadog_checks.base.utils.db.core import QueryManager
1818
from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus
1919
from datadog_checks.base.utils.db.utils import (
20+
DBMAsyncJob,
2021
default_json_event_encoding,
2122
tracked_query,
2223
)
@@ -301,6 +302,15 @@ def execute_query_raw(self, query, db):
301302
rows = cursor.fetchall()
302303
return rows
303304

305+
def _close_db(self):
306+
if self._db:
307+
try:
308+
self._db.close()
309+
except Exception:
310+
pass
311+
finally:
312+
self._db = None
313+
304314
@contextlib.contextmanager
305315
def db(self):
306316
"""
@@ -321,12 +331,7 @@ def db(self):
321331
self.log.warning(
322332
"Connection to the database %s has been interrupted, closing connection", self._config.dbname
323333
)
324-
try:
325-
self._db.close()
326-
except Exception:
327-
pass
328-
finally:
329-
self._db = None
334+
self._close_db()
330335
raise
331336
except Exception:
332337
self.log.exception("Unhandled exception while using database connection %s", self._config.dbname)
@@ -468,29 +473,38 @@ def dynamic_queries(self):
468473

469474
return self._dynamic_queries
470475

476+
@staticmethod
477+
def _cancel_async_job(job: DBMAsyncJob):
478+
job.cancel()
479+
if job._job_loop_future:
480+
job._job_loop_future.result()
481+
job._job_loop_future = None
482+
job._shutdown()
483+
471484
def cancel(self):
472485
"""
473486
Cancels and sends cancel signal to all threads.
474487
"""
475488
if self._config.dbm:
476-
self.statement_samples.cancel()
477-
self.statement_metrics.cancel()
478-
self.metadata_samples.cancel()
479-
if self.statement_metrics._job_loop_future:
480-
self.statement_metrics._job_loop_future.result()
481-
if self.statement_samples._job_loop_future:
482-
self.statement_samples._job_loop_future.result()
483-
if self.metadata_samples._job_loop_future:
484-
self.metadata_samples._job_loop_future.result()
489+
self._cancel_async_job(self.statement_metrics)
490+
self._cancel_async_job(self.statement_samples)
491+
self._cancel_async_job(self.metadata_samples)
485492
elif self._config.data_observability.enabled:
486-
self.metadata_samples.cancel()
487-
if self.metadata_samples._job_loop_future:
488-
self.metadata_samples._job_loop_future.result()
493+
self._cancel_async_job(self.metadata_samples)
489494
if self._config.data_observability.enabled:
490-
self.data_observability.cancel()
491-
if self.data_observability._job_loop_future:
492-
self.data_observability._job_loop_future.result()
495+
self._cancel_async_job(self.data_observability)
496+
self._clean_state()
497+
self._query_manager = None
498+
self.health = None
499+
self.check_initializations.clear()
500+
# TODO: move diagnosis cleanup into AgentCheck.cancel() in the base class
501+
self._diagnosis = None
502+
self._close_db()
493503
self._close_db_pool()
504+
# CheckLoggingAdapter holds self.check until check_id is resolved via
505+
# process(), which only happens after the agent scheduler calls run().
506+
# If cancel() is called before that, the back-reference is never cleared.
507+
self.log.check = None
494508

495509
def _clean_state(self):
496510
self.log.debug("Cleaning state")

postgres/datadog_checks/postgres/statement_samples.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,6 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
152152
if not config.query_samples.enabled:
153153
collection_interval = config.query_activity.collection_interval
154154

155-
self.db_pool = check.db_pool
156-
157155
super(PostgresStatementSamples, self).__init__(
158156
check,
159157
rate_limit=1 / collection_interval,
@@ -222,6 +220,15 @@ def __init__(self, check: PostgreSql, config: InstanceConfig):
222220
self._time_since_last_activity_event = 0
223221
self._pg_stat_activity_cols = None
224222

223+
def _shutdown(self):
224+
self._check = None
225+
self._explain_parameterized_queries = None
226+
self._collection_strategy_cache = None
227+
self._explain_errors_cache = None
228+
self._explained_statements_ratelimiter = None
229+
self._seen_samples_ratelimiter = None
230+
self._raw_statement_text_cache = None
231+
225232
def _dbtags(self, db, *extra_tags):
226233
"""
227234
Returns the default instance tags with the initial "db" tag replaced with the provided tag
@@ -646,7 +653,7 @@ def _can_explain_statement(self, obfuscated_statement):
646653
def _get_db_explain_setup_state(self, dbname):
647654
# type: (str) -> Tuple[Optional[DBExplainError], Optional[Exception]]
648655
try:
649-
self.db_pool.get_connection(dbname)
656+
self._check.db_pool.get_connection(dbname)
650657
except psycopg.OperationalError as e:
651658
self._log.warning(
652659
"cannot collect execution plans due to failed DB connection to dbname=%s: %s", dbname, repr(e)
@@ -712,7 +719,7 @@ def _run_explain(self, dbname, statement, obfuscated_statement):
712719
start_time = time.time()
713720
if self._cancel_event.is_set():
714721
raise Exception("Job loop cancelled. Aborting query.")
715-
with self.db_pool.get_connection(dbname) as conn:
722+
with self._check.db_pool.get_connection(dbname) as conn:
716723
# When sending potentially non-ascii data, e.g. UTF8, we need to force
717724
# the client encoding to UTF-8 to match Python string encoding
718725
if conn.info.encoding.lower() in ["ascii", "sqlascii", "sql_ascii"]:

postgres/datadog_checks/postgres/statements.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,13 @@ def __init__(self, check, config: InstanceConfig):
202202
ttl=60 * 60 / config.query_metrics.full_statement_text_samples_per_hour_per_query,
203203
)
204204

205+
def _shutdown(self):
206+
self._check = None
207+
self._full_statement_text_cache = None
208+
self._state = None
209+
self._query_calls_cache = None
210+
self._baseline_metrics = None
211+
205212
def _execute_query(self, query, params=(), binary=False, row_factory=None) -> Tuple[list, list]:
206213
if self._cancel_event.is_set():
207214
raise Exception("Job loop cancelled. Aborting query.")

postgres/tests/test_pg_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,7 @@ def test_database_instance_metadata(aggregator, pg_instance, dbm_enabled, report
857857
'database_instance:{}'.format(expected_database_instance),
858858
]
859859
check = integration_check(pg_instance)
860-
run_one_check(check)
860+
run_one_check(check, cancel=False)
861861

862862
# These tags are a bit dynamic in value, so we get them from the check and ensure they are present
863863
expected_tags.append('postgresql_version:{}'.format(check.raw_version))

postgres/tests/test_statements.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,7 +1215,7 @@ def test_activity_reported_hostname(
12151215
check = integration_check(dbm_instance)
12161216
check._connect()
12171217

1218-
run_one_check(check)
1218+
run_one_check(check, cancel=False)
12191219
run_one_check(check)
12201220

12211221
dbm_activity = aggregator.get_event_platform_events("dbm-activity")
@@ -1480,7 +1480,7 @@ def test_async_job_enabled(
14801480
dbm_instance['query_metrics'] = {'enabled': statement_metrics_enabled, 'run_sync': False}
14811481
check = integration_check(dbm_instance)
14821482
check._connect()
1483-
run_one_check(check)
1483+
run_one_check(check, cancel=False)
14841484
if statement_samples_enabled or statement_activity_enabled:
14851485
assert check.statement_samples._job_loop_future is not None
14861486
else:
@@ -1713,8 +1713,8 @@ def test_async_job_cancel_cancel(aggregator, integration_check, dbm_instance):
17131713
check = integration_check(dbm_instance)
17141714
check._connect()
17151715
run_one_check(check)
1716-
assert not check.statement_samples._job_loop_future.running(), "samples thread should be stopped"
1717-
assert not check.statement_metrics._job_loop_future.running(), "metrics thread should be stopped"
1716+
assert check.statement_samples._job_loop_future is None, "samples future should be cleaned up after cancel"
1717+
assert check.statement_metrics._job_loop_future is None, "metrics future should be cleaned up after cancel"
17181718
# if the thread doesn't start until after the cancel signal is set then the db connection will never
17191719
# be created in the first place
17201720
assert check.db_pool.pools.get(dbm_instance['dbname']) is None, "db connection should be gone"

postgres/tests/test_unit.py

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@
22
# All rights reserved
33
# Licensed under Simplified BSD License (see LICENSE)
44
import copy
5+
import gc
6+
import weakref
57

68
import mock
79
import psycopg
810
import pytest
911
from pytest import fail
1012
from semver import VersionInfo
1113

12-
from datadog_checks.postgres import util
14+
from datadog_checks.postgres import PostgreSql, util
1315
from datadog_checks.postgres.schemas import PostgresSchemaCollector
1416

1517
pytestmark = pytest.mark.unit
@@ -315,7 +317,7 @@ def test_run_explain_uses_parameterized_statement(pg_instance, integration_check
315317
conn_cm = mock.MagicMock()
316318
conn_cm.__enter__.return_value = mock_conn
317319

318-
with mock.patch.object(check.statement_samples.db_pool, 'get_connection', return_value=conn_cm):
320+
with mock.patch.object(check.db_pool, 'get_connection', return_value=conn_cm):
319321
with mock.patch.object(check, 'histogram'):
320322
check.statement_samples._run_explain('testdb', statement, statement)
321323

@@ -336,3 +338,89 @@ def test_new_connection_closes_conn_when_configure_raises(integration_check, pg_
336338
with pytest.raises(psycopg.Error):
337339
check._new_connection(check._config.dbname)
338340
conn.close.assert_called_once()
341+
342+
343+
def test_close_db_closes_open_connection(integration_check, pg_instance):
344+
check = integration_check(pg_instance)
345+
conn = mock.MagicMock()
346+
conn.closed = False
347+
check._db = conn
348+
349+
check._close_db()
350+
351+
conn.close.assert_called_once()
352+
assert check._db is None
353+
354+
355+
def test_close_db_handles_already_closed_connection(integration_check, pg_instance):
356+
check = integration_check(pg_instance)
357+
conn = mock.MagicMock()
358+
conn.close.side_effect = Exception("already closed")
359+
check._db = conn
360+
361+
check._close_db()
362+
363+
assert check._db is None
364+
365+
366+
def test_close_db_noop_when_no_connection(integration_check, pg_instance):
367+
check = integration_check(pg_instance)
368+
check._db = None
369+
370+
check._close_db()
371+
372+
assert check._db is None
373+
374+
375+
def test_cancel_closes_main_db_connection(integration_check, pg_instance):
376+
check = integration_check(pg_instance)
377+
conn = mock.MagicMock()
378+
check._db = conn
379+
380+
check.cancel()
381+
382+
conn.close.assert_called_once()
383+
assert check._db is None
384+
385+
386+
def test_check_gc_after_cancel(pg_instance):
387+
"""Verify cancel() breaks all reference cycles so refcount alone reclaims the check.
388+
389+
If this test fails, the assertion message lists the types still holding a
390+
reference to the check. To fix it:
391+
392+
1. Identify the referrer type in the failure message (e.g. ``QueryManager``).
393+
2. Find which attribute on that object points back to the check (usually
394+
``self.check`` or ``self._check``).
395+
3. Null that attribute in ``cancel()`` or add it to the relevant
396+
``_shutdown()`` method.
397+
4. If the referrer is a closure or ``functools.partial``, find the
398+
registration site and null or clear the container that holds it.
399+
"""
400+
pg_instance['dbm'] = True
401+
pg_instance['query_samples'] = {'enabled': True, 'run_sync': True, 'collection_interval': 1}
402+
pg_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 10}
403+
pg_instance['query_activity'] = {'enabled': True, 'collection_interval': 1}
404+
pg_instance['data_observability'] = {'enabled': True, 'run_sync': True, 'collection_interval': 1}
405+
406+
check = PostgreSql('postgres', {}, [pg_instance])
407+
ref = weakref.ref(check)
408+
409+
check.cancel()
410+
411+
gc.collect()
412+
gc.disable()
413+
try:
414+
del check
415+
obj = ref()
416+
if obj is not None:
417+
import inspect
418+
419+
referrers = [
420+
f"bound method {r.__qualname__}" if inspect.ismethod(r) else type(r).__name__
421+
for r in gc.get_referrers(obj)
422+
]
423+
del obj
424+
fail(f"Check still alive after cancel() + del -- pinned by: {referrers}")
425+
finally:
426+
gc.enable()

postgres/tests/utils.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,17 +137,11 @@ def run_vacuum_thread(pg_instance, vacuum_query, application_name='test'):
137137
def run_one_check(check: AgentCheck, cancel=True):
138138
"""
139139
Run check and immediately cancel.
140-
Waits for all threads to close before continuing.
140+
cancel() joins all threads and nulls futures, so no extra .result() calls needed.
141141
"""
142142
check.run()
143143
if cancel:
144144
check.cancel()
145-
if check.statement_samples._job_loop_future is not None:
146-
check.statement_samples._job_loop_future.result()
147-
if check.statement_metrics._job_loop_future is not None:
148-
check.statement_metrics._job_loop_future.result()
149-
if check.metadata_samples._job_loop_future is not None:
150-
check.metadata_samples._job_loop_future.result()
151145

152146

153147
def normalize_object(obj):

0 commit comments

Comments
 (0)