|
5 | 5 | import copy |
6 | 6 | import functools |
7 | 7 | import os |
| 8 | +import threading |
8 | 9 | from string import Template |
9 | 10 | from time import time |
10 | 11 |
|
|
17 | 18 | from datadog_checks.base.utils.db.core import QueryManager |
18 | 19 | from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus |
19 | 20 | from datadog_checks.base.utils.db.utils import ( |
20 | | - DBMAsyncJob, |
21 | 21 | default_json_event_encoding, |
22 | 22 | tracked_query, |
23 | 23 | ) |
@@ -194,6 +194,10 @@ def __init__(self, name, init_config, instances): |
194 | 194 |
|
195 | 195 | self.diagnosis.register(functools.partial(run_diagnostics, self)) |
196 | 196 |
|
| 197 | + self._cancel_lock = threading.Lock() |
| 198 | + self._is_running = False |
| 199 | + self._cancelled = False |
| 200 | + |
197 | 201 | def database_monitoring_column_statistics(self, raw_event: str): |
198 | 202 | self.event_platform_event(raw_event, "dbm-column-statistics") |
199 | 203 |
|
@@ -476,38 +480,87 @@ def dynamic_queries(self): |
476 | 480 |
|
477 | 481 | return self._dynamic_queries |
478 | 482 |
|
479 | | - @staticmethod |
480 | | - def _cancel_async_job(job: DBMAsyncJob): |
481 | | - job.cancel() |
482 | | - if job._job_loop_future: |
483 | | - job._job_loop_future.result() |
484 | | - job._job_loop_future = None |
485 | | - job._shutdown() |
| 483 | + def run(self): |
| 484 | + # TODO: move this lock into the base class |
| 485 | + with self._cancel_lock: |
| 486 | + if self._cancelled: |
| 487 | + self.log.debug("run() skipped, check already cancelled") |
| 488 | + return '' |
| 489 | + self._is_running = True |
| 490 | + try: |
| 491 | + return super().run() |
| 492 | + finally: |
| 493 | + needs_finalize = False |
| 494 | + with self._cancel_lock: |
| 495 | + self._is_running = False |
| 496 | + if self._cancelled: |
| 497 | + needs_finalize = True |
| 498 | + if needs_finalize: |
| 499 | + self.log.debug("Check cancel has been signaled, finalizing now that run() is complete") |
| 500 | + self._finalize() |
486 | 501 |
|
487 | 502 | def cancel(self): |
| 503 | + """Signal that the check is being unscheduled. |
| 504 | +
|
| 505 | + This method can be called while check() is running on another thread |
| 506 | + (the GIL is released during psycopg I/O). It must not perform any |
| 507 | + destructive operations — closing connections or nulling attributes that |
| 508 | + check() depends on — because that causes a SIGSEGV in libpq when |
| 509 | + check() resumes. |
| 510 | +
|
| 511 | + Destructive cleanup is deferred to _finalize(), which is called either |
| 512 | + here (if the check is idle) or by run()'s finally block (if the check |
| 513 | + is in-flight). The Agent guarantees it will not call run() again after |
| 514 | + cancel(). |
488 | 515 | """ |
489 | | - Cancels and sends cancel signal to all threads. |
490 | | - """ |
| 516 | + self.log.debug("Marking check as cancelled") |
| 517 | + self._cancel_async_jobs() |
| 518 | + needs_finalize = False |
| 519 | + with self._cancel_lock: |
| 520 | + self._cancelled = True |
| 521 | + if not self._is_running: |
| 522 | + needs_finalize = True |
| 523 | + if needs_finalize: |
| 524 | + self.log.debug("cancel() finalizing immediately, check is idle") |
| 525 | + self._finalize() |
| 526 | + else: |
| 527 | + self.log.debug("cancel() deferred finalize, check is still running") |
| 528 | + |
| 529 | + @property |
| 530 | + def _async_jobs(self): |
| 531 | + """Return the async jobs active for this check's configuration.""" |
| 532 | + jobs = [] |
491 | 533 | if self._config.dbm: |
492 | | - self._cancel_async_job(self.statement_metrics) |
493 | | - self._cancel_async_job(self.statement_samples) |
494 | | - self._cancel_async_job(self.metadata_samples) |
| 534 | + jobs.extend([self.statement_metrics, self.statement_samples, self.metadata_samples]) |
495 | 535 | elif self._config.data_observability.enabled: |
496 | | - self._cancel_async_job(self.metadata_samples) |
| 536 | + jobs.append(self.metadata_samples) |
497 | 537 | if self._config.data_observability.enabled: |
498 | | - self._cancel_async_job(self.data_observability) |
| 538 | + jobs.append(self.data_observability) |
| 539 | + return jobs |
| 540 | + |
| 541 | + def _cancel_async_jobs(self): |
| 542 | + """Signal async jobs to stop. Safe to call while check() is running.""" |
| 543 | + for job in self._async_jobs: |
| 544 | + job.cancel() |
| 545 | + |
| 546 | + def _finalize(self): |
| 547 | + """Tear down check state. Must not run while check() is executing.""" |
| 548 | + self.log.debug("Finalizing check: closing connections and clearing state") |
| 549 | + for job in self._async_jobs: |
| 550 | + if job._job_loop_future: |
| 551 | + job._job_loop_future.result() |
| 552 | + job._job_loop_future = None |
| 553 | + job._shutdown() |
499 | 554 | self._clean_state() |
500 | | - self._query_manager = None |
501 | | - self.health = None |
502 | 555 | self.check_initializations.clear() |
503 | 556 | # TODO: move diagnosis cleanup into AgentCheck.cancel() in the base class |
504 | 557 | self._diagnosis = None |
| 558 | + self.log.check = None |
| 559 | + self._query_manager = None |
| 560 | + self.health = None |
505 | 561 | self._close_db() |
506 | 562 | self._close_db_pool() |
507 | | - # CheckLoggingAdapter holds self.check until check_id is resolved via |
508 | | - # process(), which only happens after the agent scheduler calls run(). |
509 | | - # If cancel() is called before that, the back-reference is never cleared. |
510 | | - self.log.check = None |
| 563 | + self.log.debug("Check cleanup complete") |
511 | 564 |
|
512 | 565 | def _clean_state(self): |
513 | 566 | self.log.debug("Cleaning state") |
@@ -1191,14 +1244,15 @@ def check(self, _): |
1191 | 1244 |
|
1192 | 1245 | if not self._config.only_custom_queries: |
1193 | 1246 | self._collect_stats(tags) |
1194 | | - if self._config.dbm: |
1195 | | - self.statement_metrics.run_job_loop(tags) |
1196 | | - self.statement_samples.run_job_loop(tags) |
1197 | | - self.metadata_samples.run_job_loop(tags) |
1198 | | - elif self._config.data_observability.enabled: |
1199 | | - self.metadata_samples.run_job_loop(tags) |
1200 | | - if self._config.data_observability.enabled: |
1201 | | - self.data_observability.run_job_loop(tags) |
| 1247 | + if not self._cancelled: |
| 1248 | + if self._config.dbm: |
| 1249 | + self.statement_metrics.run_job_loop(tags) |
| 1250 | + self.statement_samples.run_job_loop(tags) |
| 1251 | + self.metadata_samples.run_job_loop(tags) |
| 1252 | + elif self._config.data_observability.enabled: |
| 1253 | + self.metadata_samples.run_job_loop(tags) |
| 1254 | + if self._config.data_observability.enabled: |
| 1255 | + self.data_observability.run_job_loop(tags) |
1202 | 1256 | if self._config.collect_wal_metrics is True: |
1203 | 1257 | # collect wal metrics for pg < 10 only when explicitly enabled |
1204 | 1258 | # (requires local filesystem access to the WAL directory) |
|
0 commit comments