From 996b3d59cfc71dfed809d491fe777582da9d53d5 Mon Sep 17 00:00:00 2001 From: Eric Weaver Date: Mon, 11 May 2026 09:46:08 -0400 Subject: [PATCH 1/2] [DBMON-6589] Clean up dangling references on cancel to improve GC (#23640) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Close and dereference main db connection on cancel * Remove unused db_pool reference * Avoid extra reference to db_pool * Deference cyclical check variable on shutdown * Clear functools.partial closures in cancel() to release pool references _dynamic_queries holds partial(db_pool.get_connection, ...) closures and _query_manager.executor holds a partial(self.execute_query_raw, db=self.db) closure. Both root the check instance and pool manager in memory after cancel. * Null _job_loop_future after join in cancel() to release completed Futures * dbms="postgres", _compiled_patterns_cache ⌘K to generate command ok next commit message suggestion Null _job_loop_future after join in cancel() to release completed Futures Completed Future objects are lightweight but keep the async job reachable in the reference graph. Nulling them immediately after .result() returns removes an unnecessary link that delays GC of the check instance. ok next Let me check the current state of each _shutdown method and the caches each job holds. Now I'll update each _shutdown to clear caches. StatementMetrics -- _full_statement_text_cache, _state, _query_calls_cache, _baseline_metrics: self._check = None self._full_statement_text_cache = None self._state = None self._query_calls_cache = None self._baseline_metrics = None except Exception: StatementSamples -- _collection_strategy_cache, _explain_errors_cache, _explained_statements_ratelimiter, _seen_samples_ratelimiter, _raw_statement_text_cache: self._check = None self._collection_strategy_cache = None self._explain_errors_cache = None self._explained_statements_ratelimiter = None self._seen_samples_ratelimiter = None self._raw_statement_text_cache = None except Exception: Metadata -- _schema_collector (holds back-ref to check) and _compiled_patterns_cache: self._check = None self._schema_collector = None self._compiled_patterns_cache = None except Exception: DataObservability only has a small _last_execution dict -- not worth clearing. Its _shutdown already just nulls _check, which is sufficient. All pre-existing. Done. Ready for your commit. commit message suggestion Clear caches in async job shutdown callbacks to free memory eagerly * Add changelog * Only invoke shutdown logic is cancel called * Update tests * Call shutdown explicitly * Find and break cyclical references on cancel --- postgres/changelog.d/23640.fixed | 1 + .../postgres/data_observability.py | 3 + postgres/datadog_checks/postgres/metadata.py | 6 +- postgres/datadog_checks/postgres/postgres.py | 56 ++++++----- .../postgres/statement_samples.py | 15 ++- .../datadog_checks/postgres/statements.py | 7 ++ postgres/tests/test_pg_integration.py | 2 +- postgres/tests/test_statements.py | 8 +- postgres/tests/test_unit.py | 92 ++++++++++++++++++- postgres/tests/utils.py | 8 +- 10 files changed, 158 insertions(+), 40 deletions(-) create mode 100644 postgres/changelog.d/23640.fixed diff --git a/postgres/changelog.d/23640.fixed b/postgres/changelog.d/23640.fixed new file mode 100644 index 0000000000000..92778f0dd54ad --- /dev/null +++ b/postgres/changelog.d/23640.fixed @@ -0,0 +1 @@ +Close dangling connections and break reference cycles on check cancel to reduce memory retention when checks are restarted or rescheduled \ No newline at end of file diff --git a/postgres/datadog_checks/postgres/data_observability.py b/postgres/datadog_checks/postgres/data_observability.py index 6f4107b29e5ab..5dce7eabb5b01 100644 --- a/postgres/datadog_checks/postgres/data_observability.py +++ b/postgres/datadog_checks/postgres/data_observability.py @@ -38,6 +38,9 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): job_name="data-observability", ) + def _shutdown(self): + self._check = None + @property def _do_config(self): return self._config.data_observability diff --git a/postgres/datadog_checks/postgres/metadata.py b/postgres/datadog_checks/postgres/metadata.py index cd341d12b23e5..d8a2959754291 100644 --- a/postgres/datadog_checks/postgres/metadata.py +++ b/postgres/datadog_checks/postgres/metadata.py @@ -108,7 +108,6 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): ) self._check = check self._config = config - self.db_pool = self._check.db_pool self._collect_pg_settings_enabled = config.collect_settings.enabled self._collect_extensions_enabled = self._collect_pg_settings_enabled self._collect_schemas_enabled = config.collect_schemas.enabled @@ -122,6 +121,11 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): self._tags_no_db = None self.tags = None + def _shutdown(self): + self._check = None + self._schema_collector = None + self._compiled_patterns_cache = None + def _dbtags(self, db, *extra_tags): """ Returns the default instance tags with the initial "db" tag replaced with the provided tag diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index 0d9223662d657..caab366180792 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -17,6 +17,7 @@ from datadog_checks.base.utils.db.core import QueryManager from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus from datadog_checks.base.utils.db.utils import ( + DBMAsyncJob, default_json_event_encoding, tracked_query, ) @@ -301,6 +302,15 @@ def execute_query_raw(self, query, db): rows = cursor.fetchall() return rows + def _close_db(self): + if self._db: + try: + self._db.close() + except Exception: + pass + finally: + self._db = None + @contextlib.contextmanager def db(self): """ @@ -321,12 +331,7 @@ def db(self): self.log.warning( "Connection to the database %s has been interrupted, closing connection", self._config.dbname ) - try: - self._db.close() - except Exception: - pass - finally: - self._db = None + self._close_db() raise except Exception: self.log.exception("Unhandled exception while using database connection %s", self._config.dbname) @@ -468,29 +473,38 @@ def dynamic_queries(self): return self._dynamic_queries + @staticmethod + def _cancel_async_job(job: DBMAsyncJob): + job.cancel() + if job._job_loop_future: + job._job_loop_future.result() + job._job_loop_future = None + job._shutdown() + def cancel(self): """ Cancels and sends cancel signal to all threads. """ if self._config.dbm: - self.statement_samples.cancel() - self.statement_metrics.cancel() - self.metadata_samples.cancel() - if self.statement_metrics._job_loop_future: - self.statement_metrics._job_loop_future.result() - if self.statement_samples._job_loop_future: - self.statement_samples._job_loop_future.result() - if self.metadata_samples._job_loop_future: - self.metadata_samples._job_loop_future.result() + self._cancel_async_job(self.statement_metrics) + self._cancel_async_job(self.statement_samples) + self._cancel_async_job(self.metadata_samples) elif self._config.data_observability.enabled: - self.metadata_samples.cancel() - if self.metadata_samples._job_loop_future: - self.metadata_samples._job_loop_future.result() + self._cancel_async_job(self.metadata_samples) if self._config.data_observability.enabled: - self.data_observability.cancel() - if self.data_observability._job_loop_future: - self.data_observability._job_loop_future.result() + self._cancel_async_job(self.data_observability) + self._clean_state() + self._query_manager = None + self.health = None + self.check_initializations.clear() + # TODO: move diagnosis cleanup into AgentCheck.cancel() in the base class + self._diagnosis = None + self._close_db() self._close_db_pool() + # CheckLoggingAdapter holds self.check until check_id is resolved via + # process(), which only happens after the agent scheduler calls run(). + # If cancel() is called before that, the back-reference is never cleared. + self.log.check = None def _clean_state(self): self.log.debug("Cleaning state") diff --git a/postgres/datadog_checks/postgres/statement_samples.py b/postgres/datadog_checks/postgres/statement_samples.py index e8b45dd1ef891..b4c731a270cc3 100644 --- a/postgres/datadog_checks/postgres/statement_samples.py +++ b/postgres/datadog_checks/postgres/statement_samples.py @@ -152,8 +152,6 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): if not config.query_samples.enabled: collection_interval = config.query_activity.collection_interval - self.db_pool = check.db_pool - super(PostgresStatementSamples, self).__init__( check, rate_limit=1 / collection_interval, @@ -222,6 +220,15 @@ def __init__(self, check: PostgreSql, config: InstanceConfig): self._time_since_last_activity_event = 0 self._pg_stat_activity_cols = None + def _shutdown(self): + self._check = None + self._explain_parameterized_queries = None + self._collection_strategy_cache = None + self._explain_errors_cache = None + self._explained_statements_ratelimiter = None + self._seen_samples_ratelimiter = None + self._raw_statement_text_cache = None + def _dbtags(self, db, *extra_tags): """ Returns the default instance tags with the initial "db" tag replaced with the provided tag @@ -646,7 +653,7 @@ def _can_explain_statement(self, obfuscated_statement): def _get_db_explain_setup_state(self, dbname): # type: (str) -> Tuple[Optional[DBExplainError], Optional[Exception]] try: - self.db_pool.get_connection(dbname) + self._check.db_pool.get_connection(dbname) except psycopg.OperationalError as e: self._log.warning( "cannot collect execution plans due to failed DB connection to dbname=%s: %s", dbname, repr(e) @@ -712,7 +719,7 @@ def _run_explain(self, dbname, statement, obfuscated_statement): start_time = time.time() if self._cancel_event.is_set(): raise Exception("Job loop cancelled. Aborting query.") - with self.db_pool.get_connection(dbname) as conn: + with self._check.db_pool.get_connection(dbname) as conn: # When sending potentially non-ascii data, e.g. UTF8, we need to force # the client encoding to UTF-8 to match Python string encoding if conn.info.encoding.lower() in ["ascii", "sqlascii", "sql_ascii"]: diff --git a/postgres/datadog_checks/postgres/statements.py b/postgres/datadog_checks/postgres/statements.py index 8934812ecc79f..b515b717c599d 100644 --- a/postgres/datadog_checks/postgres/statements.py +++ b/postgres/datadog_checks/postgres/statements.py @@ -202,6 +202,13 @@ def __init__(self, check, config: InstanceConfig): ttl=60 * 60 / config.query_metrics.full_statement_text_samples_per_hour_per_query, ) + def _shutdown(self): + self._check = None + self._full_statement_text_cache = None + self._state = None + self._query_calls_cache = None + self._baseline_metrics = None + def _execute_query(self, query, params=(), binary=False, row_factory=None) -> Tuple[list, list]: if self._cancel_event.is_set(): raise Exception("Job loop cancelled. Aborting query.") diff --git a/postgres/tests/test_pg_integration.py b/postgres/tests/test_pg_integration.py index 18aa7a510d4a8..6e27a1f974fa4 100644 --- a/postgres/tests/test_pg_integration.py +++ b/postgres/tests/test_pg_integration.py @@ -857,7 +857,7 @@ def test_database_instance_metadata(aggregator, pg_instance, dbm_enabled, report 'database_instance:{}'.format(expected_database_instance), ] check = integration_check(pg_instance) - run_one_check(check) + run_one_check(check, cancel=False) # These tags are a bit dynamic in value, so we get them from the check and ensure they are present expected_tags.append('postgresql_version:{}'.format(check.raw_version)) diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 10c3c363f7f49..f2983eae8befe 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -1215,7 +1215,7 @@ def test_activity_reported_hostname( check = integration_check(dbm_instance) check._connect() - run_one_check(check) + run_one_check(check, cancel=False) run_one_check(check) dbm_activity = aggregator.get_event_platform_events("dbm-activity") @@ -1480,7 +1480,7 @@ def test_async_job_enabled( dbm_instance['query_metrics'] = {'enabled': statement_metrics_enabled, 'run_sync': False} check = integration_check(dbm_instance) check._connect() - run_one_check(check) + run_one_check(check, cancel=False) if statement_samples_enabled or statement_activity_enabled: assert check.statement_samples._job_loop_future is not None else: @@ -1713,8 +1713,8 @@ def test_async_job_cancel_cancel(aggregator, integration_check, dbm_instance): check = integration_check(dbm_instance) check._connect() run_one_check(check) - assert not check.statement_samples._job_loop_future.running(), "samples thread should be stopped" - assert not check.statement_metrics._job_loop_future.running(), "metrics thread should be stopped" + assert check.statement_samples._job_loop_future is None, "samples future should be cleaned up after cancel" + assert check.statement_metrics._job_loop_future is None, "metrics future should be cleaned up after cancel" # if the thread doesn't start until after the cancel signal is set then the db connection will never # be created in the first place assert check.db_pool.pools.get(dbm_instance['dbname']) is None, "db connection should be gone" diff --git a/postgres/tests/test_unit.py b/postgres/tests/test_unit.py index ce7d0505880df..decd10285d07d 100644 --- a/postgres/tests/test_unit.py +++ b/postgres/tests/test_unit.py @@ -2,6 +2,8 @@ # All rights reserved # Licensed under Simplified BSD License (see LICENSE) import copy +import gc +import weakref import mock import psycopg @@ -9,7 +11,7 @@ from pytest import fail from semver import VersionInfo -from datadog_checks.postgres import util +from datadog_checks.postgres import PostgreSql, util from datadog_checks.postgres.schemas import PostgresSchemaCollector pytestmark = pytest.mark.unit @@ -315,7 +317,7 @@ def test_run_explain_uses_parameterized_statement(pg_instance, integration_check conn_cm = mock.MagicMock() conn_cm.__enter__.return_value = mock_conn - with mock.patch.object(check.statement_samples.db_pool, 'get_connection', return_value=conn_cm): + with mock.patch.object(check.db_pool, 'get_connection', return_value=conn_cm): with mock.patch.object(check, 'histogram'): check.statement_samples._run_explain('testdb', statement, statement) @@ -336,3 +338,89 @@ def test_new_connection_closes_conn_when_configure_raises(integration_check, pg_ with pytest.raises(psycopg.Error): check._new_connection(check._config.dbname) conn.close.assert_called_once() + + +def test_close_db_closes_open_connection(integration_check, pg_instance): + check = integration_check(pg_instance) + conn = mock.MagicMock() + conn.closed = False + check._db = conn + + check._close_db() + + conn.close.assert_called_once() + assert check._db is None + + +def test_close_db_handles_already_closed_connection(integration_check, pg_instance): + check = integration_check(pg_instance) + conn = mock.MagicMock() + conn.close.side_effect = Exception("already closed") + check._db = conn + + check._close_db() + + assert check._db is None + + +def test_close_db_noop_when_no_connection(integration_check, pg_instance): + check = integration_check(pg_instance) + check._db = None + + check._close_db() + + assert check._db is None + + +def test_cancel_closes_main_db_connection(integration_check, pg_instance): + check = integration_check(pg_instance) + conn = mock.MagicMock() + check._db = conn + + check.cancel() + + conn.close.assert_called_once() + assert check._db is None + + +def test_check_gc_after_cancel(pg_instance): + """Verify cancel() breaks all reference cycles so refcount alone reclaims the check. + + If this test fails, the assertion message lists the types still holding a + reference to the check. To fix it: + + 1. Identify the referrer type in the failure message (e.g. ``QueryManager``). + 2. Find which attribute on that object points back to the check (usually + ``self.check`` or ``self._check``). + 3. Null that attribute in ``cancel()`` or add it to the relevant + ``_shutdown()`` method. + 4. If the referrer is a closure or ``functools.partial``, find the + registration site and null or clear the container that holds it. + """ + pg_instance['dbm'] = True + pg_instance['query_samples'] = {'enabled': True, 'run_sync': True, 'collection_interval': 1} + pg_instance['query_metrics'] = {'enabled': True, 'run_sync': True, 'collection_interval': 10} + pg_instance['query_activity'] = {'enabled': True, 'collection_interval': 1} + pg_instance['data_observability'] = {'enabled': True, 'run_sync': True, 'collection_interval': 1} + + check = PostgreSql('postgres', {}, [pg_instance]) + ref = weakref.ref(check) + + check.cancel() + + gc.collect() + gc.disable() + try: + del check + obj = ref() + if obj is not None: + import inspect + + referrers = [ + f"bound method {r.__qualname__}" if inspect.ismethod(r) else type(r).__name__ + for r in gc.get_referrers(obj) + ] + del obj + fail(f"Check still alive after cancel() + del -- pinned by: {referrers}") + finally: + gc.enable() diff --git a/postgres/tests/utils.py b/postgres/tests/utils.py index 30d0181ec3ad1..fd948c4781ba0 100644 --- a/postgres/tests/utils.py +++ b/postgres/tests/utils.py @@ -137,17 +137,11 @@ def run_vacuum_thread(pg_instance, vacuum_query, application_name='test'): def run_one_check(check: AgentCheck, cancel=True): """ Run check and immediately cancel. - Waits for all threads to close before continuing. + cancel() joins all threads and nulls futures, so no extra .result() calls needed. """ check.run() if cancel: check.cancel() - if check.statement_samples._job_loop_future is not None: - check.statement_samples._job_loop_future.result() - if check.statement_metrics._job_loop_future is not None: - check.statement_metrics._job_loop_future.result() - if check.metadata_samples._job_loop_future is not None: - check.metadata_samples._job_loop_future.result() def normalize_object(obj): From 019c285ce74f4ba973ca5f0766dfd0c5780b6b73 Mon Sep 17 00:00:00 2001 From: Nick Sollecito Date: Mon, 11 May 2026 10:07:03 -0400 Subject: [PATCH 2/2] [WEB-8990] Migrate guarddog and supply_chain_firewall dashboard logos off imgix (#23605) * Migrate guarddog and supply_chain_firewall dashboard logos off imgix Replace datadog-securitylabs.imgix.net logo URLs with canonical DRUID paths on static.datadoghq.com as part of the Imgix CDN migration (WEB-8990). * add dark_theme --- guarddog/assets/dashboards/guarddog_rules_overview.json | 4 ++-- .../dashboards/guarddog_scan_and_ecosystem_overview.json | 4 ++-- .../assets/dashboards/supply_chain_firewall_events.json | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/guarddog/assets/dashboards/guarddog_rules_overview.json b/guarddog/assets/dashboards/guarddog_rules_overview.json index 0ffd0be516ce2..c082d1cead55a 100644 --- a/guarddog/assets/dashboards/guarddog_rules_overview.json +++ b/guarddog/assets/dashboards/guarddog_rules_overview.json @@ -6,8 +6,8 @@ "id": 5827532687075026, "definition": { "type": "image", - "url": "https://datadog-securitylabs.imgix.net/img/guarddog-2-0-release/logo.png?auto=format", - "url_dark_theme": "https://datadog-securitylabs.imgix.net/img/guarddog-2-0-release/logo.png?auto=format", + "url": "https://static.datadoghq.com/static/images/logos/guarddog_large.svg", + "url_dark_theme": "https://static.datadoghq.com/static/images/logos/guarddog_reversed_large.svg", "sizing": "contain", "has_background": false, "has_border": false, diff --git a/guarddog/assets/dashboards/guarddog_scan_and_ecosystem_overview.json b/guarddog/assets/dashboards/guarddog_scan_and_ecosystem_overview.json index f0e298e0bcde8..6f0d49845fcc5 100644 --- a/guarddog/assets/dashboards/guarddog_scan_and_ecosystem_overview.json +++ b/guarddog/assets/dashboards/guarddog_scan_and_ecosystem_overview.json @@ -6,8 +6,8 @@ "id": 2918516675459237, "definition": { "type": "image", - "url": "https://datadog-securitylabs.imgix.net/img/guarddog-2-0-release/logo.png?auto=format", - "url_dark_theme": "https://datadog-securitylabs.imgix.net/img/guarddog-2-0-release/logo.png?auto=format", + "url": "https://static.datadoghq.com/static/images/logos/guarddog_large.svg", + "url_dark_theme": "https://static.datadoghq.com/static/images/logos/guarddog_reversed_large.svg", "sizing": "contain", "has_background": false, "has_border": false, diff --git a/supply_chain_firewall/assets/dashboards/supply_chain_firewall_events.json b/supply_chain_firewall/assets/dashboards/supply_chain_firewall_events.json index 2cdd9b59febe0..3526a25d9812c 100644 --- a/supply_chain_firewall/assets/dashboards/supply_chain_firewall_events.json +++ b/supply_chain_firewall/assets/dashboards/supply_chain_firewall_events.json @@ -6,8 +6,8 @@ "id": 3929729493655156, "definition": { "type": "image", - "url": "https://datadog-securitylabs.imgix.net/img/introducing-supply-chain-firewall/logo.png", - "url_dark_theme": "https://datadog-securitylabs.imgix.net/img/introducing-supply-chain-firewall/logo.png", + "url": "https://static.datadoghq.com/static/images/logos/supply-chain-firewall_large.svg", + "url_dark_theme": "https://static.datadoghq.com/static/images/logos/supply-chain-firewall_reversed_large.svg", "sizing": "contain", "has_background": false, "has_border": false,