Skip to content

Commit e1ac1b7

Browse files
committed
feat(sync): add delayed task submission for throttling (#1506)
1 parent 733453c commit e1ac1b7

4 files changed

Lines changed: 550 additions & 13 deletions

File tree

src/sentry_sync.c

Lines changed: 106 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,20 @@ thread_setname(sentry_threadid_t thread_id, const char *thread_name)
121121
* `done` *from* the worker signaling that it will close down and can be joined.
122122
*/
123123

124+
/**
125+
* Overflow-safe addition that clamps to UINT64_MAX instead of wrapping.
126+
*/
127+
static uint64_t
128+
add_saturate(uint64_t a, uint64_t b)
129+
{
130+
return b <= UINT64_MAX - a ? a + b : UINT64_MAX;
131+
}
132+
124133
struct sentry_bgworker_task_s;
125134
typedef struct sentry_bgworker_task_s {
126135
struct sentry_bgworker_task_s *next_task;
127136
long refcount;
137+
uint64_t execute_after;
128138
sentry_task_exec_func_t exec_func;
129139
void (*cleanup_func)(void *task_data);
130140
void *task_data;
@@ -155,6 +165,7 @@ struct sentry_bgworker_s {
155165
sentry_mutex_t task_lock;
156166
sentry_bgworker_task_t *first_task;
157167
sentry_bgworker_task_t *last_task;
168+
sentry_bgworker_task_t *current_task;
158169
void *state;
159170
void (*free_state)(void *state);
160171
long refcount;
@@ -225,7 +236,9 @@ sentry__bgworker_get_state(sentry_bgworker_t *bgw)
225236
static bool
226237
sentry__bgworker_is_done(sentry_bgworker_t *bgw)
227238
{
228-
return !bgw->first_task && !sentry__atomic_fetch(&bgw->running);
239+
return (!bgw->first_task
240+
|| sentry__monotonic_time() < bgw->first_task->execute_after)
241+
&& !sentry__atomic_fetch(&bgw->running);
229242
}
230243

231244
SENTRY_THREAD_FN
@@ -260,7 +273,18 @@ worker_thread(void *data)
260273
continue;
261274
}
262275

276+
// wait for a delayed task, wake up to new submissions
277+
{
278+
uint64_t now = sentry__monotonic_time();
279+
if (now < task->execute_after) {
280+
sentry__cond_wait_timeout(&bgw->submit_signal, &bgw->task_lock,
281+
(uint32_t)MIN(task->execute_after - now, UINT32_MAX));
282+
continue;
283+
}
284+
}
285+
263286
sentry__task_incref(task);
287+
bgw->current_task = task;
264288
sentry__mutex_unlock(&bgw->task_lock);
265289

266290
SENTRY_DEBUG("executing task on worker thread");
@@ -274,6 +298,7 @@ worker_thread(void *data)
274298
// if not, we pop it and `decref` again, removing the _is inside
275299
// list_ refcount.
276300
sentry__mutex_lock(&bgw->task_lock);
301+
bgw->current_task = NULL;
277302
if (bgw->first_task == task) {
278303
bgw->first_task = task->next_task;
279304
if (task == bgw->last_task) {
@@ -350,11 +375,26 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout)
350375
sentry__cond_init(&flush_task->signal);
351376
sentry__mutex_init(&flush_task->lock);
352377

378+
// place the flush sentinel after the last task due within the timeout;
379+
// tasks delayed beyond the timeout cannot complete in time anyway
380+
uint64_t before = sentry__monotonic_time();
381+
uint64_t deadline = add_saturate(before, timeout);
382+
uint64_t execute_after = before;
383+
sentry__mutex_lock(&bgw->task_lock);
384+
for (sentry_bgworker_task_t *t
385+
= bgw->current_task ? bgw->current_task->next_task : bgw->first_task;
386+
t && t->execute_after <= deadline; t = t->next_task) {
387+
if (t->execute_after > execute_after) {
388+
execute_after = t->execute_after;
389+
}
390+
}
391+
sentry__mutex_unlock(&bgw->task_lock);
392+
353393
sentry__mutex_lock(&flush_task->lock);
354394

355395
/* submit the task that triggers our condvar once it runs */
356-
sentry__bgworker_submit(bgw, sentry__flush_task,
357-
(void (*)(void *))sentry__flush_task_decref, flush_task);
396+
sentry__bgworker_submit_at(bgw, sentry__flush_task,
397+
(void (*)(void *))sentry__flush_task_decref, flush_task, execute_after);
358398

359399
uint64_t started = sentry__monotonic_time();
360400
bool was_flushed = false;
@@ -396,13 +436,8 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
396436

397437
uint64_t started = sentry__monotonic_time();
398438
sentry__mutex_lock(&bgw->task_lock);
399-
while (true) {
400-
if (sentry__bgworker_is_done(bgw)) {
401-
sentry__mutex_unlock(&bgw->task_lock);
402-
sentry__thread_join(bgw->thread_id);
403-
return 0;
404-
}
405439

440+
while (true) {
406441
uint64_t now = sentry__monotonic_time();
407442
if (now > started && now - started > timeout) {
408443
sentry__atomic_store(&bgw->running, 0);
@@ -413,6 +448,12 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
413448
return 1;
414449
}
415450

451+
if (!sentry__atomic_fetch(&bgw->running)) {
452+
sentry__mutex_unlock(&bgw->task_lock);
453+
sentry__thread_join(bgw->thread_id);
454+
return 0;
455+
}
456+
416457
// this will implicitly release the lock, and re-acquire on wake
417458
sentry__cond_wait_timeout(&bgw->done_signal, &bgw->task_lock, 250);
418459
}
@@ -422,6 +463,29 @@ int
422463
sentry__bgworker_submit(sentry_bgworker_t *bgw,
423464
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
424465
void *task_data)
466+
{
467+
SENTRY_DEBUG("submitting task to background worker thread");
468+
return sentry__bgworker_submit_at(
469+
bgw, exec_func, cleanup_func, task_data, sentry__monotonic_time());
470+
}
471+
472+
int
473+
sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
474+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
475+
void *task_data, uint64_t delay_ms)
476+
{
477+
SENTRY_DEBUGF("submitting %" PRIu64
478+
" ms delayed task to background worker thread",
479+
delay_ms);
480+
uint64_t execute_after = add_saturate(sentry__monotonic_time(), delay_ms);
481+
return sentry__bgworker_submit_at(
482+
bgw, exec_func, cleanup_func, task_data, execute_after);
483+
}
484+
485+
int
486+
sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
487+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
488+
void *task_data, uint64_t execute_after)
425489
{
426490
sentry_bgworker_task_t *task = SENTRY_MAKE(sentry_bgworker_task_t);
427491
if (!task) {
@@ -432,19 +496,42 @@ sentry__bgworker_submit(sentry_bgworker_t *bgw,
432496
}
433497
task->next_task = NULL;
434498
task->refcount = 1;
499+
task->execute_after = execute_after;
435500
task->exec_func = exec_func;
436501
task->cleanup_func = cleanup_func;
437502
task->task_data = task_data;
438503

439-
SENTRY_DEBUG("submitting task to background worker thread");
440504
sentry__mutex_lock(&bgw->task_lock);
505+
441506
if (!bgw->first_task) {
507+
// empty queue
442508
bgw->first_task = task;
443-
}
444-
if (bgw->last_task) {
509+
bgw->last_task = task;
510+
} else if (bgw->last_task->execute_after <= task->execute_after) {
511+
// append last (common fast path for FIFO immediates)
445512
bgw->last_task->next_task = task;
513+
bgw->last_task = task;
514+
} else {
515+
// insert sorted by execute_after; skip past current_task which
516+
// may be executing without the lock held
517+
sentry_bgworker_task_t *prev = bgw->current_task;
518+
sentry_bgworker_task_t *cur = prev ? prev->next_task : bgw->first_task;
519+
while (cur && cur->execute_after <= task->execute_after) {
520+
prev = cur;
521+
cur = cur->next_task;
522+
}
523+
524+
task->next_task = cur;
525+
if (prev) {
526+
prev->next_task = task;
527+
} else {
528+
bgw->first_task = task;
529+
}
530+
if (!task->next_task) {
531+
bgw->last_task = task;
532+
}
446533
}
447-
bgw->last_task = task;
534+
448535
sentry__cond_wake(&bgw->submit_signal);
449536
sentry__mutex_unlock(&bgw->task_lock);
450537

@@ -475,6 +562,12 @@ sentry__bgworker_foreach_matching(sentry_bgworker_t *bgw,
475562
} else {
476563
bgw->first_task = next_task;
477564
}
565+
if (bgw->current_task == task) {
566+
bgw->current_task = NULL;
567+
} else if (bgw->current_task
568+
&& bgw->current_task->next_task == task) {
569+
bgw->current_task->next_task = next_task;
570+
}
478571
sentry__task_decref(task);
479572
dropped++;
480573
} else {

src/sentry_sync.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,12 +471,23 @@ const char *sentry__bgworker_get_thread_name(sentry_bgworker_t *bgw);
471471
/**
472472
* This will submit a new task to the background thread.
473473
*
474+
* The `_delayed` variant delays execution by the specified delay in
475+
* milliseconds, and the `_at` variant executes after the specified monotonic
476+
* timestamp. The latter is mostly useful for testing to ensure deterministic
477+
* ordering of tasks regardless of OS preemption between submissions.
478+
*
474479
* Takes ownership of `data`, freeing it using the provided `cleanup_func`.
475480
* Returns 0 on success.
476481
*/
477482
int sentry__bgworker_submit(sentry_bgworker_t *bgw,
478483
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
479484
void *task_data);
485+
int sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
486+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
487+
void *task_data, uint64_t delay_ms);
488+
int sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
489+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
490+
void *task_data, uint64_t execute_after);
480491

481492
/**
482493
* This function will iterate through all the current tasks of the worker

0 commit comments

Comments
 (0)