-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[DBMON-6602] Avoid cleanup when cancel called while check running #23728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix a crash caused by cancel closing database connections while the check is still running. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| import copy | ||
| import functools | ||
| import os | ||
| import threading | ||
| from string import Template | ||
| from time import time | ||
|
|
||
|
|
@@ -17,7 +18,6 @@ | |
| from datadog_checks.base.utils.db.core import QueryManager | ||
| from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus | ||
| from datadog_checks.base.utils.db.utils import ( | ||
| DBMAsyncJob, | ||
| default_json_event_encoding, | ||
| tracked_query, | ||
| ) | ||
|
|
@@ -194,6 +194,10 @@ def __init__(self, name, init_config, instances): | |
|
|
||
| self.diagnosis.register(functools.partial(run_diagnostics, self)) | ||
|
|
||
| self._cancel_lock = threading.Lock() | ||
| self._is_running = False | ||
| self._cancelled = False | ||
|
|
||
| def database_monitoring_column_statistics(self, raw_event: str): | ||
| self.event_platform_event(raw_event, "dbm-column-statistics") | ||
|
|
||
|
|
@@ -476,38 +480,87 @@ def dynamic_queries(self): | |
|
|
||
| return self._dynamic_queries | ||
|
|
||
| @staticmethod | ||
| def _cancel_async_job(job: DBMAsyncJob): | ||
| job.cancel() | ||
| if job._job_loop_future: | ||
| job._job_loop_future.result() | ||
| job._job_loop_future = None | ||
| job._shutdown() | ||
| def run(self): | ||
| # TODO: move this lock into the base class | ||
| with self._cancel_lock: | ||
| if self._cancelled: | ||
| self.log.debug("run() skipped, check already cancelled") | ||
| return '' | ||
| self._is_running = True | ||
| try: | ||
| return super().run() | ||
| finally: | ||
| needs_finalize = False | ||
| with self._cancel_lock: | ||
| self._is_running = False | ||
| if self._cancelled: | ||
| needs_finalize = True | ||
| if needs_finalize: | ||
| self.log.debug("Check cancel has been signaled, finalizing now that run() is complete") | ||
| self._finalize() | ||
|
|
||
| def cancel(self): | ||
| """Signal that the check is being unscheduled. | ||
|
|
||
| This method can be called while check() is running on another thread | ||
| (the GIL is released during psycopg I/O). It must not perform any | ||
| destructive operations — closing connections or nulling attributes that | ||
| check() depends on — because that causes a SIGSEGV in libpq when | ||
| check() resumes. | ||
|
|
||
| Destructive cleanup is deferred to _finalize(), which is called either | ||
| here (if the check is idle) or by run()'s finally block (if the check | ||
| is in-flight). The Agent guarantees it will not call run() again after | ||
| cancel(). | ||
| """ | ||
| Cancels and sends cancel signal to all threads. | ||
| """ | ||
| self.log.debug("Marking check as cancelled") | ||
| self._cancel_async_jobs() | ||
| needs_finalize = False | ||
| with self._cancel_lock: | ||
| self._cancelled = True | ||
| if not self._is_running: | ||
| needs_finalize = True | ||
| if needs_finalize: | ||
| self.log.debug("cancel() finalizing immediately, check is idle") | ||
| self._finalize() | ||
| else: | ||
| self.log.debug("cancel() deferred finalize, check is still running") | ||
|
|
||
| @property | ||
| def _async_jobs(self): | ||
| """Return the async jobs active for this check's configuration.""" | ||
| jobs = [] | ||
| if self._config.dbm: | ||
| self._cancel_async_job(self.statement_metrics) | ||
| self._cancel_async_job(self.statement_samples) | ||
| self._cancel_async_job(self.metadata_samples) | ||
| jobs.extend([self.statement_metrics, self.statement_samples, self.metadata_samples]) | ||
| elif self._config.data_observability.enabled: | ||
| self._cancel_async_job(self.metadata_samples) | ||
| jobs.append(self.metadata_samples) | ||
| if self._config.data_observability.enabled: | ||
| self._cancel_async_job(self.data_observability) | ||
| jobs.append(self.data_observability) | ||
| return jobs | ||
|
|
||
| def _cancel_async_jobs(self): | ||
| """Signal async jobs to stop. Safe to call while check() is running.""" | ||
| for job in self._async_jobs: | ||
| job.cancel() | ||
|
Comment on lines
+543
to
+544
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When cancellation happens while Useful? React with 👍 / 👎. |
||
|
|
||
| def _finalize(self): | ||
| """Tear down check state. Must not run while check() is executing.""" | ||
| self.log.debug("Finalizing check: closing connections and clearing state") | ||
| for job in self._async_jobs: | ||
| if job._job_loop_future: | ||
| job._job_loop_future.result() | ||
| job._job_loop_future = None | ||
| job._shutdown() | ||
| self._clean_state() | ||
| self._query_manager = None | ||
| self.health = None | ||
| self.check_initializations.clear() | ||
| # TODO: move diagnosis cleanup into AgentCheck.cancel() in the base class | ||
| self._diagnosis = None | ||
| self.log.check = None | ||
| self._query_manager = None | ||
| self.health = None | ||
| self._close_db() | ||
| self._close_db_pool() | ||
| # CheckLoggingAdapter holds self.check until check_id is resolved via | ||
| # process(), which only happens after the agent scheduler calls run(). | ||
| # If cancel() is called before that, the back-reference is never cleared. | ||
| self.log.check = None | ||
| self.log.debug("Check cleanup complete") | ||
|
|
||
| def _clean_state(self): | ||
| self.log.debug("Cleaning state") | ||
|
|
@@ -1191,14 +1244,15 @@ def check(self, _): | |
|
|
||
| if not self._config.only_custom_queries: | ||
| self._collect_stats(tags) | ||
| if self._config.dbm: | ||
| self.statement_metrics.run_job_loop(tags) | ||
| self.statement_samples.run_job_loop(tags) | ||
| self.metadata_samples.run_job_loop(tags) | ||
| elif self._config.data_observability.enabled: | ||
| self.metadata_samples.run_job_loop(tags) | ||
| if self._config.data_observability.enabled: | ||
| self.data_observability.run_job_loop(tags) | ||
| if not self._cancelled: | ||
| if self._config.dbm: | ||
| self.statement_metrics.run_job_loop(tags) | ||
| self.statement_samples.run_job_loop(tags) | ||
| self.metadata_samples.run_job_loop(tags) | ||
| elif self._config.data_observability.enabled: | ||
| self.metadata_samples.run_job_loop(tags) | ||
| if self._config.data_observability.enabled: | ||
| self.data_observability.run_job_loop(tags) | ||
| if self._config.collect_wal_metrics is True: | ||
| # collect wal metrics for pg < 10 only when explicitly enabled | ||
| # (requires local filesystem access to the WAL directory) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.