Skip to content

Commit 005cf4d

Browse files
committed
feat(sync): add delayed task submission for throttling (#1506)
1 parent a783bc3 commit 005cf4d

File tree

4 files changed

+394
-7
lines changed

4 files changed

+394
-7
lines changed

src/sentry_sync.c

Lines changed: 86 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,17 @@ thread_setname(sentry_threadid_t thread_id, const char *thread_name)
121121
* `done` *from* the worker signaling that it will close down and can be joined.
122122
*/
123123

124+
static uint64_t
125+
add_saturate(uint64_t a, uint64_t b)
126+
{
127+
return b <= UINT64_MAX - a ? a + b : UINT64_MAX;
128+
}
129+
124130
struct sentry_bgworker_task_s;
125131
typedef struct sentry_bgworker_task_s {
126132
struct sentry_bgworker_task_s *next_task;
127133
long refcount;
134+
uint64_t execute_after;
128135
sentry_task_exec_func_t exec_func;
129136
void (*cleanup_func)(void *task_data);
130137
void *task_data;
@@ -225,7 +232,9 @@ sentry__bgworker_get_state(sentry_bgworker_t *bgw)
225232
static bool
226233
sentry__bgworker_is_done(sentry_bgworker_t *bgw)
227234
{
228-
return !bgw->first_task && !sentry__atomic_fetch(&bgw->running);
235+
return (!bgw->first_task
236+
|| sentry__monotonic_time() < bgw->first_task->execute_after)
237+
&& !sentry__atomic_fetch(&bgw->running);
229238
}
230239

231240
SENTRY_THREAD_FN
@@ -260,6 +269,16 @@ worker_thread(void *data)
260269
continue;
261270
}
262271

272+
// wait for a delayed task, wake up to new submissions
273+
{
274+
uint64_t now = sentry__monotonic_time();
275+
if (now < task->execute_after) {
276+
sentry__cond_wait_timeout(&bgw->submit_signal, &bgw->task_lock,
277+
(uint32_t)(task->execute_after - now));
278+
continue;
279+
}
280+
}
281+
263282
sentry__task_incref(task);
264283
sentry__mutex_unlock(&bgw->task_lock);
265284

@@ -350,11 +369,25 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout)
350369
sentry__cond_init(&flush_task->signal);
351370
sentry__mutex_init(&flush_task->lock);
352371

372+
// place the flush sentinel after the last task due within the timeout;
373+
// tasks delayed beyond the timeout cannot complete in time anyway
374+
uint64_t delay_ms = 0;
375+
uint64_t before = sentry__monotonic_time();
376+
uint64_t deadline = add_saturate(before, timeout);
377+
sentry__mutex_lock(&bgw->task_lock);
378+
for (sentry_bgworker_task_t *t = bgw->first_task;
379+
t && t->execute_after <= deadline; t = t->next_task) {
380+
if (t->execute_after > before) {
381+
delay_ms = t->execute_after - before;
382+
}
383+
}
384+
sentry__mutex_unlock(&bgw->task_lock);
385+
353386
sentry__mutex_lock(&flush_task->lock);
354387

355388
/* submit the task that triggers our condvar once it runs */
356-
sentry__bgworker_submit(bgw, sentry__flush_task,
357-
(void (*)(void *))sentry__flush_task_decref, flush_task);
389+
sentry__bgworker_submit_delayed(bgw, sentry__flush_task,
390+
(void (*)(void *))sentry__flush_task_decref, flush_task, delay_ms);
358391

359392
uint64_t started = sentry__monotonic_time();
360393
bool was_flushed = false;
@@ -396,6 +429,7 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
396429

397430
uint64_t started = sentry__monotonic_time();
398431
sentry__mutex_lock(&bgw->task_lock);
432+
399433
while (true) {
400434
if (sentry__bgworker_is_done(bgw)) {
401435
sentry__mutex_unlock(&bgw->task_lock);
@@ -422,6 +456,29 @@ int
422456
sentry__bgworker_submit(sentry_bgworker_t *bgw,
423457
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
424458
void *task_data)
459+
{
460+
SENTRY_DEBUG("submitting task to background worker thread");
461+
return sentry__bgworker_submit_at(
462+
bgw, exec_func, cleanup_func, task_data, sentry__monotonic_time());
463+
}
464+
465+
int
466+
sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
467+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
468+
void *task_data, uint64_t delay_ms)
469+
{
470+
SENTRY_DEBUGF("submitting %" PRIu64
471+
" ms delayed task to background worker thread",
472+
delay_ms);
473+
uint64_t execute_after = add_saturate(sentry__monotonic_time(), delay_ms);
474+
return sentry__bgworker_submit_at(
475+
bgw, exec_func, cleanup_func, task_data, execute_after);
476+
}
477+
478+
int
479+
sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
480+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
481+
void *task_data, uint64_t execute_after)
425482
{
426483
sentry_bgworker_task_t *task = SENTRY_MAKE(sentry_bgworker_task_t);
427484
if (!task) {
@@ -432,19 +489,41 @@ sentry__bgworker_submit(sentry_bgworker_t *bgw,
432489
}
433490
task->next_task = NULL;
434491
task->refcount = 1;
492+
task->execute_after = execute_after;
435493
task->exec_func = exec_func;
436494
task->cleanup_func = cleanup_func;
437495
task->task_data = task_data;
438496

439-
SENTRY_DEBUG("submitting task to background worker thread");
440497
sentry__mutex_lock(&bgw->task_lock);
498+
441499
if (!bgw->first_task) {
500+
// empty queue
442501
bgw->first_task = task;
443-
}
444-
if (bgw->last_task) {
502+
bgw->last_task = task;
503+
} else if (bgw->last_task->execute_after <= task->execute_after) {
504+
// append last (common fast path for FIFO immediates)
445505
bgw->last_task->next_task = task;
506+
bgw->last_task = task;
507+
} else {
508+
// insert sorted by execute_after
509+
sentry_bgworker_task_t *prev = NULL;
510+
sentry_bgworker_task_t *cur = bgw->first_task;
511+
while (cur && cur->execute_after <= task->execute_after) {
512+
prev = cur;
513+
cur = cur->next_task;
514+
}
515+
516+
task->next_task = cur;
517+
if (prev) {
518+
prev->next_task = task;
519+
} else {
520+
bgw->first_task = task;
521+
}
522+
if (!task->next_task) {
523+
bgw->last_task = task;
524+
}
446525
}
447-
bgw->last_task = task;
526+
448527
sentry__cond_wake(&bgw->submit_signal);
449528
sentry__mutex_unlock(&bgw->task_lock);
450529

src/sentry_sync.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,12 +471,23 @@ const char *sentry__bgworker_get_thread_name(sentry_bgworker_t *bgw);
471471
/**
472472
* This will submit a new task to the background thread.
473473
*
474+
* The `_delayed` variant delays execution by the specified delay in
475+
* milliseconds, and the `_at` variant executes after the specified monotonic
476+
* timestamp. The latter is mostly useful for testing to ensure deterministic
477+
* ordering of tasks regardless of OS preemption between submissions.
478+
*
474479
* Takes ownership of `data`, freeing it using the provided `cleanup_func`.
475480
* Returns 0 on success.
476481
*/
477482
int sentry__bgworker_submit(sentry_bgworker_t *bgw,
478483
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
479484
void *task_data);
485+
int sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
486+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
487+
void *task_data, uint64_t delay_ms);
488+
int sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
489+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
490+
void *task_data, uint64_t execute_after);
480491

481492
/**
482493
* This function will iterate through all the current tasks of the worker

0 commit comments

Comments
 (0)