Skip to content

Commit 9424204

Browse files
committed
feat(sync): add delayed task submission for throttling (#1506)
1 parent a783bc3 commit 9424204

4 files changed

Lines changed: 377 additions & 7 deletions

File tree

src/sentry_sync.c

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ struct sentry_bgworker_task_s;
125125
typedef struct sentry_bgworker_task_s {
126126
struct sentry_bgworker_task_s *next_task;
127127
long refcount;
128+
uint64_t execute_after;
128129
sentry_task_exec_func_t exec_func;
129130
void (*cleanup_func)(void *task_data);
130131
void *task_data;
@@ -225,7 +226,9 @@ sentry__bgworker_get_state(sentry_bgworker_t *bgw)
225226
static bool
226227
sentry__bgworker_is_done(sentry_bgworker_t *bgw)
227228
{
228-
return !bgw->first_task && !sentry__atomic_fetch(&bgw->running);
229+
return (!bgw->first_task
230+
|| sentry__monotonic_time() < bgw->first_task->execute_after)
231+
&& !sentry__atomic_fetch(&bgw->running);
229232
}
230233

231234
SENTRY_THREAD_FN
@@ -260,6 +263,16 @@ worker_thread(void *data)
260263
continue;
261264
}
262265

266+
// wait for a delayed task, wake up to new submissions
267+
{
268+
uint64_t now = sentry__monotonic_time();
269+
if (now < task->execute_after) {
270+
sentry__cond_wait_timeout(&bgw->submit_signal, &bgw->task_lock,
271+
(uint32_t)(task->execute_after - now));
272+
continue;
273+
}
274+
}
275+
263276
sentry__task_incref(task);
264277
sentry__mutex_unlock(&bgw->task_lock);
265278

@@ -350,11 +363,22 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout)
350363
sentry__cond_init(&flush_task->signal);
351364
sentry__mutex_init(&flush_task->lock);
352365

366+
// place the flush sentinel after the last task due within the timeout;
367+
// tasks delayed beyond the timeout cannot complete in time anyway
368+
uint64_t delay_ms = 0;
369+
uint64_t before = sentry__monotonic_time();
370+
sentry__mutex_lock(&bgw->task_lock);
371+
if (bgw->last_task && bgw->last_task->execute_after > before
372+
&& bgw->last_task->execute_after - before <= timeout) {
373+
delay_ms = bgw->last_task->execute_after - before;
374+
}
375+
sentry__mutex_unlock(&bgw->task_lock);
376+
353377
sentry__mutex_lock(&flush_task->lock);
354378

355379
/* submit the task that triggers our condvar once it runs */
356-
sentry__bgworker_submit(bgw, sentry__flush_task,
357-
(void (*)(void *))sentry__flush_task_decref, flush_task);
380+
sentry__bgworker_submit_delayed(bgw, sentry__flush_task,
381+
(void (*)(void *))sentry__flush_task_decref, flush_task, delay_ms);
358382

359383
uint64_t started = sentry__monotonic_time();
360384
bool was_flushed = false;
@@ -396,6 +420,7 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
396420

397421
uint64_t started = sentry__monotonic_time();
398422
sentry__mutex_lock(&bgw->task_lock);
423+
399424
while (true) {
400425
if (sentry__bgworker_is_done(bgw)) {
401426
sentry__mutex_unlock(&bgw->task_lock);
@@ -422,6 +447,28 @@ int
422447
sentry__bgworker_submit(sentry_bgworker_t *bgw,
423448
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
424449
void *task_data)
450+
{
451+
SENTRY_DEBUG("submitting task to background worker thread");
452+
return sentry__bgworker_submit_at(
453+
bgw, exec_func, cleanup_func, task_data, sentry__monotonic_time());
454+
}
455+
456+
int
457+
sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
458+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
459+
void *task_data, uint64_t delay_ms)
460+
{
461+
SENTRY_DEBUGF("submitting %" PRIu64
462+
" ms delayed task to background worker thread",
463+
delay_ms);
464+
return sentry__bgworker_submit_at(bgw, exec_func, cleanup_func, task_data,
465+
sentry__monotonic_time() + delay_ms);
466+
}
467+
468+
int
469+
sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
470+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
471+
void *task_data, uint64_t execute_after)
425472
{
426473
sentry_bgworker_task_t *task = SENTRY_MAKE(sentry_bgworker_task_t);
427474
if (!task) {
@@ -432,19 +479,41 @@ sentry__bgworker_submit(sentry_bgworker_t *bgw,
432479
}
433480
task->next_task = NULL;
434481
task->refcount = 1;
482+
task->execute_after = execute_after;
435483
task->exec_func = exec_func;
436484
task->cleanup_func = cleanup_func;
437485
task->task_data = task_data;
438486

439-
SENTRY_DEBUG("submitting task to background worker thread");
440487
sentry__mutex_lock(&bgw->task_lock);
488+
441489
if (!bgw->first_task) {
490+
// empty queue
442491
bgw->first_task = task;
443-
}
444-
if (bgw->last_task) {
492+
bgw->last_task = task;
493+
} else if (bgw->last_task->execute_after <= task->execute_after) {
494+
// append last (common fast path for FIFO immediates)
445495
bgw->last_task->next_task = task;
496+
bgw->last_task = task;
497+
} else {
498+
// insert sorted by execute_after
499+
sentry_bgworker_task_t *prev = NULL;
500+
sentry_bgworker_task_t *cur = bgw->first_task;
501+
while (cur && cur->execute_after <= task->execute_after) {
502+
prev = cur;
503+
cur = cur->next_task;
504+
}
505+
506+
task->next_task = cur;
507+
if (prev) {
508+
prev->next_task = task;
509+
} else {
510+
bgw->first_task = task;
511+
}
512+
if (!task->next_task) {
513+
bgw->last_task = task;
514+
}
446515
}
447-
bgw->last_task = task;
516+
448517
sentry__cond_wake(&bgw->submit_signal);
449518
sentry__mutex_unlock(&bgw->task_lock);
450519

src/sentry_sync.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,12 +471,23 @@ const char *sentry__bgworker_get_thread_name(sentry_bgworker_t *bgw);
471471
/**
472472
* This will submit a new task to the background thread.
473473
*
474+
* The `_delayed` variant delays execution by the specified delay in
475+
* milliseconds, and the `_at` variant executes after the specified monotonic
476+
* timestamp. The latter is mostly useful for testing to ensure deterministic
477+
* ordering of tasks regardless of OS preemption between submissions.
478+
*
474479
* Takes ownership of `data`, freeing it using the provided `cleanup_func`.
475480
* Returns 0 on success.
476481
*/
477482
int sentry__bgworker_submit(sentry_bgworker_t *bgw,
478483
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
479484
void *task_data);
485+
int sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
486+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
487+
void *task_data, uint64_t delay_ms);
488+
int sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
489+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
490+
void *task_data, uint64_t execute_after);
480491

481492
/**
482493
* This function will iterate through all the current tasks of the worker

0 commit comments

Comments
 (0)