Skip to content

Commit 286d79f

Browse files
authored
[DBMON-5571] Regularly check for cancel event between intervals (DataDog#21150)
* Regularly check for cancel event between intervals * Add type hinting * Add changelog * Ignore internal dd.mongo.async_job.cancel * Avoid canceling the check too quickly
1 parent 2ce8cba commit 286d79f

7 files changed

Lines changed: 80 additions & 18 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Regularly check for cancel event between DBMAsyncJob check intervals

datadog_checks_base/datadog_checks/base/utils/db/utils.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,21 +107,33 @@ class ConstantRateLimiter:
107107
Basic rate limiter that sleeps long enough to ensure the rate limit is not exceeded. Not thread safe.
108108
"""
109109

110-
def __init__(self, rate_limit_s):
110+
def __init__(self, rate_limit_s, max_sleep_chunk_s=5):
111111
"""
112112
:param rate_limit_s: rate limit in seconds
113+
:param max_sleep_chunk_s: maximum size of each sleep chunk while waiting for the next period
113114
"""
114115
self.rate_limit_s = max(rate_limit_s, 0)
115116
self.period_s = 1.0 / self.rate_limit_s if self.rate_limit_s > 0 else 0
116117
self.last_event = 0
118+
self.max_sleep_chunk_s = max(0, max_sleep_chunk_s)
117119

118-
def update_last_time_and_sleep(self):
120+
def update_last_time_and_sleep(self, cancel_event: Optional[threading.Event] = None):
119121
"""
120122
Sleeps long enough to enforce the rate limit
121123
"""
122-
elapsed_s = time.time() - self.last_event
123-
sleep_amount = max(self.period_s - elapsed_s, 0)
124-
time.sleep(sleep_amount)
124+
if self.period_s <= 0:
125+
self.update_last_time()
126+
return
127+
128+
deadline = self.last_event + self.period_s
129+
while True:
130+
now = time.time()
131+
remaining = deadline - now
132+
if remaining <= 0:
133+
break
134+
if cancel_event is not None and cancel_event.is_set():
135+
break
136+
time.sleep(min(remaining, self.max_sleep_chunk_s if self.max_sleep_chunk_s > 0 else remaining))
125137
self.update_last_time()
126138

127139
def shall_execute(self):
@@ -275,6 +287,7 @@ def __init__(
275287
min_collection_interval=15,
276288
dbms="TODO",
277289
rate_limit=1,
290+
max_sleep_chunk_s=1,
278291
run_sync=False,
279292
enabled=True,
280293
expected_db_exceptions=(),
@@ -295,7 +308,8 @@ def __init__(
295308
self._last_check_run = 0
296309
self._shutdown_callback = shutdown_callback
297310
self._dbms = dbms
298-
self._rate_limiter = ConstantRateLimiter(rate_limit)
311+
self._rate_limiter = ConstantRateLimiter(rate_limit, max_sleep_chunk_s=max_sleep_chunk_s)
312+
self._max_sleep_chunk_s = max_sleep_chunk_s
299313
self._run_sync = run_sync
300314
self._enabled = enabled
301315
self._expected_db_exceptions = expected_db_exceptions
@@ -387,7 +401,7 @@ def _job_loop(self):
387401

388402
def _set_rate_limit(self, rate_limit):
389403
if self._rate_limiter.rate_limit_s != rate_limit:
390-
self._rate_limiter = ConstantRateLimiter(rate_limit)
404+
self._rate_limiter = ConstantRateLimiter(rate_limit, max_sleep_chunk_s=self._max_sleep_chunk_s)
391405

392406
def _run_sync_job_rate_limited(self):
393407
if self._rate_limiter.shall_execute():
@@ -401,7 +415,7 @@ def _run_job_rate_limited(self):
401415
raise
402416
finally:
403417
if not self._cancel_event.is_set():
404-
self._rate_limiter.update_last_time_and_sleep()
418+
self._rate_limiter.update_last_time_and_sleep(cancel_event=self._cancel_event)
405419
else:
406420
self._rate_limiter.update_last_time()
407421

datadog_checks_base/tests/base/utils/db/test_util.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,14 @@ def _mock_obfuscate_sql(query, options=None):
242242

243243
class JobForTesting(DBMAsyncJob):
244244
def __init__(
245-
self, check, run_sync=False, enabled=True, rate_limit=10, min_collection_interval=15, job_execution_time=0
245+
self,
246+
check,
247+
run_sync=False,
248+
enabled=True,
249+
rate_limit=10,
250+
min_collection_interval=15,
251+
job_execution_time=0,
252+
max_sleep_chunk_s=5,
246253
):
247254
super(JobForTesting, self).__init__(
248255
check,
@@ -253,6 +260,7 @@ def __init__(
253260
config_host="test-host",
254261
dbms="test-dbms",
255262
rate_limit=rate_limit,
263+
max_sleep_chunk_s=max_sleep_chunk_s,
256264
job_name="test-job",
257265
shutdown_callback=self.test_shutdown,
258266
)
@@ -307,6 +315,20 @@ def test_dbm_async_job_cancel(aggregator):
307315
aggregator.assert_metric("dbm.async_job_test.shutdown")
308316

309317

318+
def test_dbm_async_job_cancel_returns_early_on_long_sleep():
319+
# Configure a very low rate so the sleep interval would be ~60s without cancellation
320+
job = JobForTesting(AgentCheck(), rate_limit=1 / 60.0, max_sleep_chunk_s=0.1)
321+
job.run_job_loop([])
322+
# Allow the thread to start and enter the sleep window
323+
time.sleep(0.2)
324+
start = time.time()
325+
job.cancel()
326+
# Should finish well before the full ~10s timeout and 60s rate-limiter interval
327+
job._job_loop_future.result(timeout=10)
328+
elapsed = time.time() - start
329+
assert elapsed < 10, "Job did not cancel before the full sleep interval"
330+
331+
310332
def test_dbm_async_job_run_sync(aggregator):
311333
job = JobForTesting(AgentCheck(), run_sync=True)
312334
job.run_job_loop([])

mongo/tests/test_integration.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def test_integration_mongos(instance_integration_cluster, aggregator, check, dd_
107107
'dd.custom.mongo.query_a.amount',
108108
'dd.custom.mongo.query_a.el',
109109
'dd.mongo.operation.time',
110+
'dd.mongo.async_job.cancel',
110111
],
111112
check_submission_type=True,
112113
)
@@ -200,6 +201,7 @@ def test_integration_replicaset_primary_in_shard(instance_integration, aggregato
200201
'dd.custom.mongo.count',
201202
'dd.custom.mongo.query_a.amount',
202203
'dd.custom.mongo.query_a.el',
204+
'dd.mongo.async_job.cancel',
203205
],
204206
check_submission_type=True,
205207
)
@@ -299,6 +301,7 @@ def test_integration_replicaset_secondary_in_shard(instance_integration, aggrega
299301
'dd.custom.mongo.count',
300302
'dd.custom.mongo.query_a.amount',
301303
'dd.custom.mongo.query_a.el',
304+
'dd.mongo.async_job.cancel',
302305
],
303306
check_submission_type=True,
304307
)
@@ -355,6 +358,7 @@ def test_integration_replicaset_arbiter_in_shard(instance_integration, aggregato
355358
'dd.custom.mongo.count',
356359
'dd.custom.mongo.query_a.amount',
357360
'dd.custom.mongo.query_a.el',
361+
'dd.mongo.async_job.cancel',
358362
],
359363
check_submission_type=True,
360364
)
@@ -419,6 +423,7 @@ def test_integration_configsvr_primary(instance_integration, aggregator, check,
419423
'dd.custom.mongo.count',
420424
'dd.custom.mongo.query_a.amount',
421425
'dd.custom.mongo.query_a.el',
426+
'dd.mongo.async_job.cancel',
422427
],
423428
check_submission_type=True,
424429
)
@@ -516,6 +521,7 @@ def test_integration_configsvr_secondary(instance_integration, aggregator, check
516521
'dd.custom.mongo.count',
517522
'dd.custom.mongo.query_a.amount',
518523
'dd.custom.mongo.query_a.el',
524+
'dd.mongo.async_job.cancel',
519525
],
520526
check_submission_type=True,
521527
)
@@ -583,6 +589,7 @@ def test_integration_replicaset_primary(instance_integration, aggregator, check,
583589
'dd.custom.mongo.count',
584590
'dd.custom.mongo.query_a.amount',
585591
'dd.custom.mongo.query_a.el',
592+
'dd.mongo.async_job.cancel',
586593
],
587594
check_submission_type=True,
588595
)
@@ -688,6 +695,7 @@ def test_integration_replicaset_primary_config(instance_integration, aggregator,
688695
'dd.custom.mongo.count',
689696
'dd.custom.mongo.query_a.amount',
690697
'dd.custom.mongo.query_a.el',
698+
'dd.mongo.async_job.cancel',
691699
],
692700
check_submission_type=True,
693701
)
@@ -797,6 +805,7 @@ def test_integration_replicaset_secondary(
797805
'dd.custom.mongo.count',
798806
'dd.custom.mongo.query_a.amount',
799807
'dd.custom.mongo.query_a.el',
808+
'dd.mongo.async_job.cancel',
800809
],
801810
check_submission_type=True,
802811
)
@@ -852,6 +861,7 @@ def test_integration_replicaset_arbiter(instance_integration, aggregator, check,
852861
'dd.custom.mongo.count',
853862
'dd.custom.mongo.query_a.amount',
854863
'dd.custom.mongo.query_a.el',
864+
'dd.mongo.async_job.cancel',
855865
],
856866
check_submission_type=True,
857867
)
@@ -908,6 +918,7 @@ def test_standalone(instance_integration, aggregator, check, dd_run_check):
908918
'dd.custom.mongo.count',
909919
'dd.custom.mongo.query_a.amount',
910920
'dd.custom.mongo.query_a.el',
921+
'dd.mongo.async_job.cancel',
911922
],
912923
check_submission_type=True,
913924
)
@@ -974,6 +985,7 @@ def test_db_names_with_nonexistent_database(check, instance_integration, aggrega
974985
'dd.custom.mongo.count',
975986
'dd.custom.mongo.query_a.amount',
976987
'dd.custom.mongo.query_a.el',
988+
'dd.mongo.async_job.cancel',
977989
],
978990
check_submission_type=True,
979991
)
@@ -1009,6 +1021,7 @@ def test_db_names_missing_existent_database(check, instance_integration, aggrega
10091021
'dd.custom.mongo.count',
10101022
'dd.custom.mongo.query_a.amount',
10111023
'dd.custom.mongo.query_a.el',
1024+
'dd.mongo.async_job.cancel',
10121025
],
10131026
check_submission_type=True,
10141027
)

mongo/tests/test_integration_shard.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ def test_mongo_arbiter(aggregator, check, instance_arbiter, dd_run_check):
2727
if metric_name in METRIC_VAL_CHECKS:
2828
metric = aggregator.metrics(metric_name)[0]
2929
assert METRIC_VAL_CHECKS[metric_name](metric.value)
30-
aggregator.assert_metrics_using_metadata(get_metadata_metrics(), check_submission_type=True)
30+
aggregator.assert_metrics_using_metadata(
31+
get_metadata_metrics(), check_submission_type=True, exclude=['dd.mongo.async_job.cancel']
32+
)
3133

3234
expected_metrics = {
3335
'mongodb.replset.health': 1.0,
@@ -77,4 +79,6 @@ def test_mongo_replset(instance_shard, aggregator, check, dd_run_check):
7779
'mongodb.replset.optime_lag',
7880
tags=replset_common_tags + ['replset_state:secondary', 'member:shard01b:27019', 'replset_me:shard01a:27018'],
7981
)
80-
aggregator.assert_metrics_using_metadata(get_metadata_metrics(), check_submission_type=True)
82+
aggregator.assert_metrics_using_metadata(
83+
get_metadata_metrics(), check_submission_type=True, exclude=['dd.mongo.async_job.cancel']
84+
)

mongo/tests/test_integration_standalone.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ def test_mongo_authdb(aggregator, check, instance_authdb, dd_run_check):
4242
if metric_name in METRIC_VAL_CHECKS:
4343
metric = aggregator.metrics(metric_name)[0]
4444
assert METRIC_VAL_CHECKS[metric_name](metric.value)
45-
aggregator.assert_metrics_using_metadata(get_metadata_metrics(), check_submission_type=True)
45+
aggregator.assert_metrics_using_metadata(
46+
get_metadata_metrics(), check_submission_type=True, exclude=['dd.mongo.async_job.cancel']
47+
)
4648

4749

4850
@pytest.mark.parametrize(
@@ -63,7 +65,9 @@ def test_mongo_db_test(aggregator, check, instance_user, dd_run_check):
6365
if metric_name in METRIC_VAL_CHECKS:
6466
metric = aggregator.metrics(metric_name)[0]
6567
assert METRIC_VAL_CHECKS[metric_name](metric.value)
66-
aggregator.assert_metrics_using_metadata(get_metadata_metrics(), check_submission_type=True)
68+
aggregator.assert_metrics_using_metadata(
69+
get_metadata_metrics(), check_submission_type=True, exclude=['dd.mongo.async_job.cancel']
70+
)
6771

6872

6973
def test_mongo_old_config(aggregator, check, instance, dd_run_check):
@@ -77,7 +81,9 @@ def test_mongo_old_config(aggregator, check, instance, dd_run_check):
7781
if metric_name in METRIC_VAL_CHECKS_OLD:
7882
metric = aggregator.metrics(metric_name)[0]
7983
assert METRIC_VAL_CHECKS_OLD[metric_name](metric.value)
80-
aggregator.assert_metrics_using_metadata(get_metadata_metrics(), check_submission_type=True)
84+
aggregator.assert_metrics_using_metadata(
85+
get_metadata_metrics(), check_submission_type=True, exclude=['dd.mongo.async_job.cancel']
86+
)
8187

8288

8389
def test_mongo_dbstats_tag(aggregator, check, instance_dbstats_tag_dbname, dd_run_check):
@@ -91,7 +97,9 @@ def test_mongo_dbstats_tag(aggregator, check, instance_dbstats_tag_dbname, dd_ru
9197
if metric_name in METRIC_VAL_CHECKS:
9298
metric = aggregator.metrics(metric_name)[0]
9399
assert METRIC_VAL_CHECKS[metric_name](metric.value)
94-
aggregator.assert_metrics_using_metadata(get_metadata_metrics(), check_submission_type=True)
100+
aggregator.assert_metrics_using_metadata(
101+
get_metadata_metrics(), check_submission_type=True, exclude=['dd.mongo.async_job.cancel']
102+
)
95103

96104
expected_metrics = {
97105
'mongodb.stats.avgobjsize': None,

sqlserver/tests/test_statements.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -382,16 +382,16 @@ def _obfuscate_sql(sql_query, options=None):
382382
# 3) emit the query metrics based on the diff of current and last state
383383
with mock.patch.object(datadog_agent, 'obfuscate_sql', passthrough=True) as mock_agent:
384384
mock_agent.side_effect = _obfuscate_sql
385-
dd_run_check(check)
385+
dd_run_check(check, cancel=False)
386386
for _ in range(0, exe_count):
387387
for params in param_groups:
388388
bob_conn.execute_with_retries(query, params, database=database)
389-
dd_run_check(check)
389+
dd_run_check(check, cancel=False)
390390
aggregator.reset()
391391
for _ in range(0, exe_count):
392392
for params in param_groups:
393393
bob_conn.execute_with_retries(query, params, database=database)
394-
dd_run_check(check)
394+
dd_run_check(check, cancel=False)
395395

396396
_conn_key_prefix = "dbm-"
397397
with check.connection.open_managed_default_connection(key_prefix=_conn_key_prefix):

0 commit comments

Comments
 (0)