Skip to content

Commit 04472e5

Browse files
jpnurmiclaude
andauthored
feat(sync): add delayed task submission (#1506)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0de39b4 commit 04472e5

4 files changed

Lines changed: 550 additions & 14 deletions

File tree

src/sentry_sync.c

Lines changed: 107 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,20 @@ thread_setname(sentry_threadid_t thread_id, const char *thread_name)
121121
* `done` *from* the worker signaling that it will close down and can be joined.
122122
*/
123123

124+
/**
125+
* Overflow-safe addition that clamps to UINT64_MAX instead of wrapping.
126+
*/
127+
static uint64_t
128+
add_saturate(uint64_t a, uint64_t b)
129+
{
130+
return b <= UINT64_MAX - a ? a + b : UINT64_MAX;
131+
}
132+
124133
struct sentry_bgworker_task_s;
125134
typedef struct sentry_bgworker_task_s {
126135
struct sentry_bgworker_task_s *next_task;
127136
long refcount;
137+
uint64_t execute_after;
128138
sentry_task_exec_func_t exec_func;
129139
void (*cleanup_func)(void *task_data);
130140
void *task_data;
@@ -155,6 +165,7 @@ struct sentry_bgworker_s {
155165
sentry_mutex_t task_lock;
156166
sentry_bgworker_task_t *first_task;
157167
sentry_bgworker_task_t *last_task;
168+
sentry_bgworker_task_t *current_task;
158169
void *state;
159170
void (*free_state)(void *state);
160171
long refcount;
@@ -225,7 +236,9 @@ sentry__bgworker_get_state(sentry_bgworker_t *bgw)
225236
static bool
226237
sentry__bgworker_is_done(sentry_bgworker_t *bgw)
227238
{
228-
return !bgw->first_task && !sentry__atomic_fetch(&bgw->running);
239+
return (!bgw->first_task
240+
|| sentry__monotonic_time() < bgw->first_task->execute_after)
241+
&& !sentry__atomic_fetch(&bgw->running);
229242
}
230243

231244
SENTRY_THREAD_FN
@@ -260,7 +273,18 @@ worker_thread(void *data)
260273
continue;
261274
}
262275

276+
// wait for a delayed task, wake up to new submissions
277+
{
278+
uint64_t now = sentry__monotonic_time();
279+
if (now < task->execute_after) {
280+
sentry__cond_wait_timeout(&bgw->submit_signal, &bgw->task_lock,
281+
(uint32_t)MIN(task->execute_after - now, UINT32_MAX));
282+
continue;
283+
}
284+
}
285+
263286
sentry__task_incref(task);
287+
bgw->current_task = task;
264288
sentry__mutex_unlock(&bgw->task_lock);
265289

266290
SENTRY_DEBUG("executing task on worker thread");
@@ -274,6 +298,7 @@ worker_thread(void *data)
274298
// if not, we pop it and `decref` again, removing the _is inside
275299
// list_ refcount.
276300
sentry__mutex_lock(&bgw->task_lock);
301+
bgw->current_task = NULL;
277302
if (bgw->first_task == task) {
278303
bgw->first_task = task->next_task;
279304
if (task == bgw->last_task) {
@@ -350,11 +375,28 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout)
350375
sentry__cond_init(&flush_task->signal);
351376
sentry__mutex_init(&flush_task->lock);
352377

378+
// place the flush sentinel after the last task due within the timeout;
379+
// tasks delayed beyond the timeout cannot complete in time anyway
380+
uint64_t before = sentry__monotonic_time();
381+
uint64_t deadline = add_saturate(before, timeout);
382+
uint64_t execute_after = before;
383+
sentry__mutex_lock(&bgw->task_lock);
384+
for (sentry_bgworker_task_t *t
385+
= bgw->current_task ? bgw->current_task->next_task : bgw->first_task;
386+
t && t->execute_after <= deadline; t = t->next_task) {
387+
if (t->execute_after > execute_after) {
388+
execute_after = t->execute_after;
389+
}
390+
}
391+
// NOTE: another thread could submit between unlock and submit_at, making
392+
// execute_after stale. Flush semantics make this harmless.
393+
sentry__mutex_unlock(&bgw->task_lock);
394+
353395
sentry__mutex_lock(&flush_task->lock);
354396

355397
/* submit the task that triggers our condvar once it runs */
356-
sentry__bgworker_submit(bgw, sentry__flush_task,
357-
(void (*)(void *))sentry__flush_task_decref, flush_task);
398+
sentry__bgworker_submit_at(bgw, sentry__flush_task,
399+
(void (*)(void *))sentry__flush_task_decref, flush_task, execute_after);
358400

359401
uint64_t started = sentry__monotonic_time();
360402
bool was_flushed = false;
@@ -397,12 +439,6 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
397439
uint64_t started = sentry__monotonic_time();
398440
sentry__mutex_lock(&bgw->task_lock);
399441
while (true) {
400-
if (sentry__bgworker_is_done(bgw)) {
401-
sentry__mutex_unlock(&bgw->task_lock);
402-
sentry__thread_join(bgw->thread_id);
403-
return 0;
404-
}
405-
406442
uint64_t now = sentry__monotonic_time();
407443
if (now > started && now - started > timeout) {
408444
sentry__atomic_store(&bgw->running, 0);
@@ -413,6 +449,12 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
413449
return 1;
414450
}
415451

452+
if (!sentry__atomic_fetch(&bgw->running)) {
453+
sentry__mutex_unlock(&bgw->task_lock);
454+
sentry__thread_join(bgw->thread_id);
455+
return 0;
456+
}
457+
416458
// this will implicitly release the lock, and re-acquire on wake
417459
sentry__cond_wait_timeout(&bgw->done_signal, &bgw->task_lock, 250);
418460
}
@@ -422,6 +464,29 @@ int
422464
sentry__bgworker_submit(sentry_bgworker_t *bgw,
423465
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
424466
void *task_data)
467+
{
468+
SENTRY_DEBUG("submitting task to background worker thread");
469+
return sentry__bgworker_submit_at(
470+
bgw, exec_func, cleanup_func, task_data, sentry__monotonic_time());
471+
}
472+
473+
int
474+
sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
475+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
476+
void *task_data, uint64_t delay_ms)
477+
{
478+
SENTRY_DEBUGF("submitting %" PRIu64
479+
" ms delayed task to background worker thread",
480+
delay_ms);
481+
uint64_t execute_after = add_saturate(sentry__monotonic_time(), delay_ms);
482+
return sentry__bgworker_submit_at(
483+
bgw, exec_func, cleanup_func, task_data, execute_after);
484+
}
485+
486+
int
487+
sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
488+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
489+
void *task_data, uint64_t execute_after)
425490
{
426491
sentry_bgworker_task_t *task = SENTRY_MAKE(sentry_bgworker_task_t);
427492
if (!task) {
@@ -432,19 +497,42 @@ sentry__bgworker_submit(sentry_bgworker_t *bgw,
432497
}
433498
task->next_task = NULL;
434499
task->refcount = 1;
500+
task->execute_after = execute_after;
435501
task->exec_func = exec_func;
436502
task->cleanup_func = cleanup_func;
437503
task->task_data = task_data;
438504

439-
SENTRY_DEBUG("submitting task to background worker thread");
440505
sentry__mutex_lock(&bgw->task_lock);
506+
441507
if (!bgw->first_task) {
508+
// empty queue
442509
bgw->first_task = task;
443-
}
444-
if (bgw->last_task) {
510+
bgw->last_task = task;
511+
} else if (bgw->last_task->execute_after <= task->execute_after) {
512+
// append last (common fast path for FIFO immediates)
445513
bgw->last_task->next_task = task;
514+
bgw->last_task = task;
515+
} else {
516+
// insert sorted by execute_after; skip past current_task which
517+
// may be executing without the lock held
518+
sentry_bgworker_task_t *prev = bgw->current_task;
519+
sentry_bgworker_task_t *cur = prev ? prev->next_task : bgw->first_task;
520+
while (cur && cur->execute_after <= task->execute_after) {
521+
prev = cur;
522+
cur = cur->next_task;
523+
}
524+
525+
task->next_task = cur;
526+
if (prev) {
527+
prev->next_task = task;
528+
} else {
529+
bgw->first_task = task;
530+
}
531+
if (!task->next_task) {
532+
bgw->last_task = task;
533+
}
446534
}
447-
bgw->last_task = task;
535+
448536
sentry__cond_wake(&bgw->submit_signal);
449537
sentry__mutex_unlock(&bgw->task_lock);
450538

@@ -475,6 +563,12 @@ sentry__bgworker_foreach_matching(sentry_bgworker_t *bgw,
475563
} else {
476564
bgw->first_task = next_task;
477565
}
566+
if (bgw->current_task == task) {
567+
bgw->current_task = NULL;
568+
} else if (bgw->current_task
569+
&& bgw->current_task->next_task == task) {
570+
bgw->current_task->next_task = next_task;
571+
}
478572
sentry__task_decref(task);
479573
dropped++;
480574
} else {

src/sentry_sync.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,12 +489,23 @@ const char *sentry__bgworker_get_thread_name(sentry_bgworker_t *bgw);
489489
/**
490490
* This will submit a new task to the background thread.
491491
*
492+
* The `_delayed` variant delays execution by the specified delay in
493+
* milliseconds, and the `_at` variant executes after the specified monotonic
494+
* timestamp. The latter is mostly useful for testing to ensure deterministic
495+
* ordering of tasks regardless of OS preemption between submissions.
496+
*
492497
* Takes ownership of `data`, freeing it using the provided `cleanup_func`.
493498
* Returns 0 on success.
494499
*/
495500
int sentry__bgworker_submit(sentry_bgworker_t *bgw,
496501
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
497502
void *task_data);
503+
int sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
504+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
505+
void *task_data, uint64_t delay_ms);
506+
int sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
507+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
508+
void *task_data, uint64_t execute_after);
498509

499510
/**
500511
* This function will iterate through all the current tasks of the worker

0 commit comments

Comments
 (0)