Skip to content

Commit e176424

Browse files
committed
feat(sync): add delayed task submission for throttling (#1506)
1 parent a783bc3 commit e176424

File tree

4 files changed

+546
-13
lines changed

4 files changed

+546
-13
lines changed

src/sentry_sync.c

Lines changed: 102 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,17 @@ thread_setname(sentry_threadid_t thread_id, const char *thread_name)
121121
* `done` *from* the worker signaling that it will close down and can be joined.
122122
*/
123123

124+
static uint64_t
125+
add_saturate(uint64_t a, uint64_t b)
126+
{
127+
return b <= UINT64_MAX - a ? a + b : UINT64_MAX;
128+
}
129+
124130
struct sentry_bgworker_task_s;
125131
typedef struct sentry_bgworker_task_s {
126132
struct sentry_bgworker_task_s *next_task;
127133
long refcount;
134+
uint64_t execute_after;
128135
sentry_task_exec_func_t exec_func;
129136
void (*cleanup_func)(void *task_data);
130137
void *task_data;
@@ -155,6 +162,7 @@ struct sentry_bgworker_s {
155162
sentry_mutex_t task_lock;
156163
sentry_bgworker_task_t *first_task;
157164
sentry_bgworker_task_t *last_task;
165+
sentry_bgworker_task_t *current_task;
158166
void *state;
159167
void (*free_state)(void *state);
160168
long refcount;
@@ -225,7 +233,9 @@ sentry__bgworker_get_state(sentry_bgworker_t *bgw)
225233
static bool
226234
sentry__bgworker_is_done(sentry_bgworker_t *bgw)
227235
{
228-
return !bgw->first_task && !sentry__atomic_fetch(&bgw->running);
236+
return (!bgw->first_task
237+
|| sentry__monotonic_time() < bgw->first_task->execute_after)
238+
&& !sentry__atomic_fetch(&bgw->running);
229239
}
230240

231241
SENTRY_THREAD_FN
@@ -260,7 +270,18 @@ worker_thread(void *data)
260270
continue;
261271
}
262272

273+
// wait for a delayed task, wake up to new submissions
274+
{
275+
uint64_t now = sentry__monotonic_time();
276+
if (now < task->execute_after) {
277+
sentry__cond_wait_timeout(&bgw->submit_signal, &bgw->task_lock,
278+
(uint32_t)(task->execute_after - now));
279+
continue;
280+
}
281+
}
282+
263283
sentry__task_incref(task);
284+
bgw->current_task = task;
264285
sentry__mutex_unlock(&bgw->task_lock);
265286

266287
SENTRY_DEBUG("executing task on worker thread");
@@ -274,6 +295,7 @@ worker_thread(void *data)
274295
// if not, we pop it and `decref` again, removing the _is inside
275296
// list_ refcount.
276297
sentry__mutex_lock(&bgw->task_lock);
298+
bgw->current_task = NULL;
277299
if (bgw->first_task == task) {
278300
bgw->first_task = task->next_task;
279301
if (task == bgw->last_task) {
@@ -350,11 +372,25 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout)
350372
sentry__cond_init(&flush_task->signal);
351373
sentry__mutex_init(&flush_task->lock);
352374

375+
// place the flush sentinel after the last task due within the timeout;
376+
// tasks delayed beyond the timeout cannot complete in time anyway
377+
uint64_t before = sentry__monotonic_time();
378+
uint64_t deadline = add_saturate(before, timeout);
379+
uint64_t execute_after = before;
380+
sentry__mutex_lock(&bgw->task_lock);
381+
for (sentry_bgworker_task_t *t = bgw->first_task;
382+
t && t->execute_after <= deadline; t = t->next_task) {
383+
if (t->execute_after > execute_after) {
384+
execute_after = t->execute_after;
385+
}
386+
}
387+
sentry__mutex_unlock(&bgw->task_lock);
388+
353389
sentry__mutex_lock(&flush_task->lock);
354390

355391
/* submit the task that triggers our condvar once it runs */
356-
sentry__bgworker_submit(bgw, sentry__flush_task,
357-
(void (*)(void *))sentry__flush_task_decref, flush_task);
392+
sentry__bgworker_submit_at(bgw, sentry__flush_task,
393+
(void (*)(void *))sentry__flush_task_decref, flush_task, execute_after);
358394

359395
uint64_t started = sentry__monotonic_time();
360396
bool was_flushed = false;
@@ -396,13 +432,8 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
396432

397433
uint64_t started = sentry__monotonic_time();
398434
sentry__mutex_lock(&bgw->task_lock);
399-
while (true) {
400-
if (sentry__bgworker_is_done(bgw)) {
401-
sentry__mutex_unlock(&bgw->task_lock);
402-
sentry__thread_join(bgw->thread_id);
403-
return 0;
404-
}
405435

436+
while (true) {
406437
uint64_t now = sentry__monotonic_time();
407438
if (now > started && now - started > timeout) {
408439
sentry__atomic_store(&bgw->running, 0);
@@ -413,6 +444,12 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
413444
return 1;
414445
}
415446

447+
if (!sentry__atomic_fetch(&bgw->running)) {
448+
sentry__mutex_unlock(&bgw->task_lock);
449+
sentry__thread_join(bgw->thread_id);
450+
return 0;
451+
}
452+
416453
// this will implicitly release the lock, and re-acquire on wake
417454
sentry__cond_wait_timeout(&bgw->done_signal, &bgw->task_lock, 250);
418455
}
@@ -422,6 +459,29 @@ int
422459
sentry__bgworker_submit(sentry_bgworker_t *bgw,
423460
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
424461
void *task_data)
462+
{
463+
SENTRY_DEBUG("submitting task to background worker thread");
464+
return sentry__bgworker_submit_at(
465+
bgw, exec_func, cleanup_func, task_data, sentry__monotonic_time());
466+
}
467+
468+
int
469+
sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
470+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
471+
void *task_data, uint64_t delay_ms)
472+
{
473+
SENTRY_DEBUGF("submitting %" PRIu64
474+
" ms delayed task to background worker thread",
475+
delay_ms);
476+
uint64_t execute_after = add_saturate(sentry__monotonic_time(), delay_ms);
477+
return sentry__bgworker_submit_at(
478+
bgw, exec_func, cleanup_func, task_data, execute_after);
479+
}
480+
481+
int
482+
sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
483+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
484+
void *task_data, uint64_t execute_after)
425485
{
426486
sentry_bgworker_task_t *task = SENTRY_MAKE(sentry_bgworker_task_t);
427487
if (!task) {
@@ -432,19 +492,42 @@ sentry__bgworker_submit(sentry_bgworker_t *bgw,
432492
}
433493
task->next_task = NULL;
434494
task->refcount = 1;
495+
task->execute_after = execute_after;
435496
task->exec_func = exec_func;
436497
task->cleanup_func = cleanup_func;
437498
task->task_data = task_data;
438499

439-
SENTRY_DEBUG("submitting task to background worker thread");
440500
sentry__mutex_lock(&bgw->task_lock);
501+
441502
if (!bgw->first_task) {
503+
// empty queue
442504
bgw->first_task = task;
443-
}
444-
if (bgw->last_task) {
505+
bgw->last_task = task;
506+
} else if (bgw->last_task->execute_after <= task->execute_after) {
507+
// append last (common fast path for FIFO immediates)
445508
bgw->last_task->next_task = task;
509+
bgw->last_task = task;
510+
} else {
511+
// insert sorted by execute_after; skip past current_task which
512+
// may be executing without the lock held
513+
sentry_bgworker_task_t *prev = bgw->current_task;
514+
sentry_bgworker_task_t *cur = prev ? prev->next_task : bgw->first_task;
515+
while (cur && cur->execute_after <= task->execute_after) {
516+
prev = cur;
517+
cur = cur->next_task;
518+
}
519+
520+
task->next_task = cur;
521+
if (prev) {
522+
prev->next_task = task;
523+
} else {
524+
bgw->first_task = task;
525+
}
526+
if (!task->next_task) {
527+
bgw->last_task = task;
528+
}
446529
}
447-
bgw->last_task = task;
530+
448531
sentry__cond_wake(&bgw->submit_signal);
449532
sentry__mutex_unlock(&bgw->task_lock);
450533

@@ -475,6 +558,12 @@ sentry__bgworker_foreach_matching(sentry_bgworker_t *bgw,
475558
} else {
476559
bgw->first_task = next_task;
477560
}
561+
if (bgw->current_task == task) {
562+
bgw->current_task = NULL;
563+
} else if (bgw->current_task
564+
&& bgw->current_task->next_task == task) {
565+
bgw->current_task->next_task = next_task;
566+
}
478567
sentry__task_decref(task);
479568
dropped++;
480569
} else {

src/sentry_sync.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,12 +471,23 @@ const char *sentry__bgworker_get_thread_name(sentry_bgworker_t *bgw);
471471
/**
472472
* This will submit a new task to the background thread.
473473
*
474+
* The `_delayed` variant delays execution by the specified delay in
475+
* milliseconds, and the `_at` variant executes after the specified monotonic
476+
* timestamp. The latter is mostly useful for testing to ensure deterministic
477+
* ordering of tasks regardless of OS preemption between submissions.
478+
*
474479
* Takes ownership of `data`, freeing it using the provided `cleanup_func`.
475480
* Returns 0 on success.
476481
*/
477482
int sentry__bgworker_submit(sentry_bgworker_t *bgw,
478483
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
479484
void *task_data);
485+
int sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
486+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
487+
void *task_data, uint64_t delay_ms);
488+
int sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
489+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
490+
void *task_data, uint64_t execute_after);
480491

481492
/**
482493
* This function will iterate through all the current tasks of the worker

0 commit comments

Comments
 (0)