|
5 | 5 | import copy |
6 | 6 | import functools |
7 | 7 | import os |
| 8 | +import threading |
8 | 9 | from string import Template |
9 | 10 | from time import time |
10 | 11 |
|
|
17 | 18 | from datadog_checks.base.utils.db.core import QueryManager |
18 | 19 | from datadog_checks.base.utils.db.health import HealthEvent, HealthStatus |
19 | 20 | from datadog_checks.base.utils.db.utils import ( |
20 | | - DBMAsyncJob, |
21 | 21 | default_json_event_encoding, |
22 | 22 | tracked_query, |
23 | 23 | ) |
@@ -194,6 +194,10 @@ def __init__(self, name, init_config, instances): |
194 | 194 |
|
195 | 195 | self.diagnosis.register(functools.partial(run_diagnostics, self)) |
196 | 196 |
|
| 197 | + self._cancel_lock = threading.Lock() |
| 198 | + self._is_running = False |
| 199 | + self._cancelled = False |
| 200 | + |
197 | 201 | def database_monitoring_column_statistics(self, raw_event: str): |
198 | 202 | self.event_platform_event(raw_event, "dbm-column-statistics") |
199 | 203 |
|
@@ -476,38 +480,79 @@ def dynamic_queries(self): |
476 | 480 |
|
477 | 481 | return self._dynamic_queries |
478 | 482 |
|
479 | | - @staticmethod |
480 | | - def _cancel_async_job(job: DBMAsyncJob): |
481 | | - job.cancel() |
482 | | - if job._job_loop_future: |
483 | | - job._job_loop_future.result() |
484 | | - job._job_loop_future = None |
485 | | - job._shutdown() |
| 483 | + def run(self): |
| 484 | + # TODO: move this lock into the base class |
| 485 | + with self._cancel_lock: |
| 486 | + if self._cancelled: |
| 487 | + return '' |
| 488 | + self._is_running = True |
| 489 | + try: |
| 490 | + return super().run() |
| 491 | + finally: |
| 492 | + needs_finalize = False |
| 493 | + with self._cancel_lock: |
| 494 | + self._is_running = False |
| 495 | + if self._cancelled: |
| 496 | + needs_finalize = True |
| 497 | + if needs_finalize: |
| 498 | + self._finalize() |
486 | 499 |
|
487 | 500 | def cancel(self): |
| 501 | + """Signal that the check is being unscheduled. |
| 502 | +
|
| 503 | + This method can be called while check() is running on another thread |
| 504 | + (the GIL is released during psycopg I/O). It must not perform any |
| 505 | + destructive operations — closing connections or nulling attributes that |
| 506 | + check() depends on — because that causes a SIGSEGV in libpq when |
| 507 | + check() resumes. |
| 508 | +
|
| 509 | + Destructive cleanup is deferred to _finalize(), which is called either |
| 510 | + here (if the check is idle) or by run()'s finally block (if the check |
| 511 | + is in-flight). The Agent guarantees it will not call run() again after |
| 512 | + cancel(). |
488 | 513 | """ |
489 | | - Cancels and sends cancel signal to all threads. |
490 | | - """ |
| 514 | + self._cancel_async_jobs() |
| 515 | + needs_finalize = False |
| 516 | + with self._cancel_lock: |
| 517 | + self._cancelled = True |
| 518 | + if not self._is_running: |
| 519 | + needs_finalize = True |
| 520 | + if needs_finalize: |
| 521 | + self._finalize() |
| 522 | + |
| 523 | + @property |
| 524 | + def _async_jobs(self): |
| 525 | + """Return the async jobs active for this check's configuration.""" |
| 526 | + jobs = [] |
491 | 527 | if self._config.dbm: |
492 | | - self._cancel_async_job(self.statement_metrics) |
493 | | - self._cancel_async_job(self.statement_samples) |
494 | | - self._cancel_async_job(self.metadata_samples) |
| 528 | + jobs.extend([self.statement_metrics, self.statement_samples, self.metadata_samples]) |
495 | 529 | elif self._config.data_observability.enabled: |
496 | | - self._cancel_async_job(self.metadata_samples) |
| 530 | + jobs.append(self.metadata_samples) |
497 | 531 | if self._config.data_observability.enabled: |
498 | | - self._cancel_async_job(self.data_observability) |
| 532 | + jobs.append(self.data_observability) |
| 533 | + return jobs |
| 534 | + |
| 535 | + def _cancel_async_jobs(self): |
| 536 | + """Signal async jobs to stop. Safe to call while check() is running.""" |
| 537 | + for job in self._async_jobs: |
| 538 | + job.cancel() |
| 539 | + |
| 540 | + def _finalize(self): |
| 541 | + """Tear down check state. Must not run while check() is executing.""" |
| 542 | + for job in self._async_jobs: |
| 543 | + if job._job_loop_future: |
| 544 | + job._job_loop_future.result() |
| 545 | + job._job_loop_future = None |
| 546 | + job._shutdown() |
499 | 547 | self._clean_state() |
500 | | - self._query_manager = None |
501 | | - self.health = None |
502 | 548 | self.check_initializations.clear() |
503 | 549 | # TODO: move diagnosis cleanup into AgentCheck.cancel() in the base class |
504 | 550 | self._diagnosis = None |
| 551 | + self.log.check = None |
| 552 | + self._query_manager = None |
| 553 | + self.health = None |
505 | 554 | self._close_db() |
506 | 555 | self._close_db_pool() |
507 | | - # CheckLoggingAdapter holds self.check until check_id is resolved via |
508 | | - # process(), which only happens after the agent scheduler calls run(). |
509 | | - # If cancel() is called before that, the back-reference is never cleared. |
510 | | - self.log.check = None |
511 | 556 |
|
512 | 557 | def _clean_state(self): |
513 | 558 | self.log.debug("Cleaning state") |
|
0 commit comments