Skip to content

Commit 5f48f12

Browse files
jpnurmiclaude
andcommitted
feat(sync): add delayed task submission for throttling
Add sentry__bgworker_submit_delayed() to defer task execution by a given number of milliseconds. Tasks are sorted by readiness time using monotonic timestamps, so a ready delayed task is not bypassed by a later-submitted immediate task. On shutdown, tasks that exceed the deadline (started + timeout) are pruned while the rest execute normally. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 76fbfd6 commit 5f48f12

4 files changed

Lines changed: 283 additions & 4 deletions

File tree

src/sentry_sync.c

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ struct sentry_bgworker_task_s;
125125
typedef struct sentry_bgworker_task_s {
126126
struct sentry_bgworker_task_s *next_task;
127127
long refcount;
128+
uint64_t execute_after;
128129
sentry_task_exec_func_t exec_func;
129130
void (*cleanup_func)(void *task_data);
130131
void *task_data;
@@ -260,6 +261,16 @@ worker_thread(void *data)
260261
continue;
261262
}
262263

264+
// wait for a delayed task, wake up to new submissions
265+
{
266+
uint64_t now = sentry__monotonic_time();
267+
if (now < task->execute_after) {
268+
sentry__cond_wait_timeout(&bgw->submit_signal, &bgw->task_lock,
269+
(uint32_t)(task->execute_after - now));
270+
continue;
271+
}
272+
}
273+
263274
sentry__task_incref(task);
264275
sentry__mutex_unlock(&bgw->task_lock);
265276

@@ -396,6 +407,30 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
396407

397408
uint64_t started = sentry__monotonic_time();
398409
sentry__mutex_lock(&bgw->task_lock);
410+
411+
// prune delayed tasks that exceed the shutdown timeout
412+
sentry_bgworker_task_t *prev = NULL;
413+
sentry_bgworker_task_t *cur = bgw->first_task;
414+
while (cur) {
415+
if (cur->execute_after > started + timeout) {
416+
if (prev) {
417+
prev->next_task = NULL;
418+
bgw->last_task = prev;
419+
} else {
420+
bgw->first_task = NULL;
421+
bgw->last_task = NULL;
422+
}
423+
while (cur) {
424+
sentry_bgworker_task_t *next = cur->next_task;
425+
sentry__task_decref(cur);
426+
cur = next;
427+
}
428+
break;
429+
}
430+
prev = cur;
431+
cur = cur->next_task;
432+
}
433+
399434
while (true) {
400435
if (sentry__bgworker_is_done(bgw)) {
401436
sentry__mutex_unlock(&bgw->task_lock);
@@ -422,6 +457,15 @@ int
422457
sentry__bgworker_submit(sentry_bgworker_t *bgw,
423458
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
424459
void *task_data)
460+
{
461+
return sentry__bgworker_submit_delayed(
462+
bgw, exec_func, cleanup_func, task_data, 0);
463+
}
464+
465+
int
466+
sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
467+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
468+
void *task_data, uint64_t delay_ms)
425469
{
426470
sentry_bgworker_task_t *task = SENTRY_MAKE(sentry_bgworker_task_t);
427471
if (!task) {
@@ -432,19 +476,48 @@ sentry__bgworker_submit(sentry_bgworker_t *bgw,
432476
}
433477
task->next_task = NULL;
434478
task->refcount = 1;
479+
task->execute_after = sentry__monotonic_time() + delay_ms;
435480
task->exec_func = exec_func;
436481
task->cleanup_func = cleanup_func;
437482
task->task_data = task_data;
438483

439-
SENTRY_DEBUG("submitting task to background worker thread");
484+
if (delay_ms > 0) {
485+
SENTRY_DEBUGF("submitting %" PRIu64
486+
" ms delayed task to background worker thread",
487+
delay_ms);
488+
} else {
489+
SENTRY_DEBUG("submitting task to background worker thread");
490+
}
440491
sentry__mutex_lock(&bgw->task_lock);
492+
441493
if (!bgw->first_task) {
494+
// empty queue
442495
bgw->first_task = task;
443-
}
444-
if (bgw->last_task) {
496+
bgw->last_task = task;
497+
} else if (bgw->last_task->execute_after <= task->execute_after) {
498+
// append last (common fast path for FIFO immediates)
445499
bgw->last_task->next_task = task;
500+
bgw->last_task = task;
501+
} else {
502+
// insert sorted by execute_after
503+
sentry_bgworker_task_t *prev = NULL;
504+
sentry_bgworker_task_t *cur = bgw->first_task;
505+
while (cur && cur->execute_after <= task->execute_after) {
506+
prev = cur;
507+
cur = cur->next_task;
508+
}
509+
510+
task->next_task = cur;
511+
if (prev) {
512+
prev->next_task = task;
513+
} else {
514+
bgw->first_task = task;
515+
}
516+
if (!task->next_task) {
517+
bgw->last_task = task;
518+
}
446519
}
447-
bgw->last_task = task;
520+
448521
sentry__cond_wake(&bgw->submit_signal);
449522
sentry__mutex_unlock(&bgw->task_lock);
450523

src/sentry_sync.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,18 @@ int sentry__bgworker_submit(sentry_bgworker_t *bgw,
478478
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
479479
void *task_data);
480480

481+
/**
482+
* This will submit a new delayed task to the background thread.
483+
*
484+
* Execution is deferred by `delay_ms` milliseconds.
485+
*
486+
* Takes ownership of `data`, freeing it using the provided `cleanup_func`.
487+
* Returns 0 on success.
488+
*/
489+
int sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
490+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
491+
void *task_data, uint64_t delay_ms);
492+
481493
/**
482494
* This function will iterate through all the current tasks of the worker
483495
* thread, and will call the `callback` function for each task with a matching

tests/unit/test_sync.c

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
#include "sentry_core.h"
22
#include "sentry_sync.h"
33
#include "sentry_testsupport.h"
4+
#include "sentry_utils.h"
45

56
#ifdef SENTRY_PLATFORM_WINDOWS
67
# include <windows.h>
78
# define sleep_s(SECONDS) Sleep((SECONDS) * 1000)
9+
# define sleep_ms(MS) Sleep(MS)
810
#else
911
# include <unistd.h>
1012
# define sleep_s(SECONDS) sleep(SECONDS)
13+
# define sleep_ms(MS) usleep((MS) * 1000)
1114
#endif
1215

1316
struct task_state {
@@ -167,3 +170,190 @@ SENTRY_TEST(bgworker_flush)
167170
TEST_CHECK_INT_EQUAL(shutdown, 0);
168171
sentry__bgworker_decref(bgw);
169172
}
173+
174+
static sentry_cond_t blocker_signal;
175+
#ifdef SENTRY__MUTEX_INIT_DYN
176+
SENTRY__MUTEX_INIT_DYN(blocker_lock)
177+
#else
178+
static sentry_mutex_t blocker_lock = SENTRY__MUTEX_INIT;
179+
#endif
180+
static bool blocker_released;
181+
182+
static void
183+
blocker_task(void *UNUSED(data), void *UNUSED(state))
184+
{
185+
SENTRY__MUTEX_INIT_DYN_ONCE(blocker_lock);
186+
sentry__mutex_lock(&blocker_lock);
187+
while (!blocker_released) {
188+
sentry__cond_wait_timeout(&blocker_signal, &blocker_lock, 100);
189+
}
190+
sentry__mutex_unlock(&blocker_lock);
191+
}
192+
193+
struct order_state {
194+
int order[10];
195+
int count;
196+
};
197+
198+
static void
199+
record_order_task(void *data, void *_state)
200+
{
201+
struct order_state *state = (struct order_state *)_state;
202+
state->order[state->count++] = (int)(size_t)data;
203+
}
204+
205+
SENTRY_TEST(bgworker_task_delay)
206+
{
207+
struct order_state os;
208+
os.count = 0;
209+
210+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
211+
TEST_ASSERT(!!bgw);
212+
213+
uint64_t before = sentry__monotonic_time();
214+
sentry__bgworker_submit_delayed(
215+
bgw, record_order_task, NULL, (void *)1, 50);
216+
217+
sentry__bgworker_start(bgw);
218+
TEST_CHECK_INT_EQUAL(sentry__bgworker_shutdown(bgw, 500), 0);
219+
uint64_t after = sentry__monotonic_time();
220+
221+
TEST_CHECK_INT_EQUAL(os.count, 1);
222+
TEST_CHECK_INT_EQUAL(os.order[0], 1);
223+
TEST_CHECK(after - before >= 50);
224+
225+
sentry__bgworker_decref(bgw);
226+
}
227+
228+
SENTRY_TEST(bgworker_delayed_tasks)
229+
{
230+
struct order_state os;
231+
os.count = 0;
232+
233+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
234+
TEST_ASSERT(!!bgw);
235+
236+
// all tasks sorted by execute_after: immediate (0) first, then delayed
237+
// by deadline
238+
//
239+
// queue after each submit:
240+
// i(1)
241+
// i(1) d100(3)
242+
// i(1) i(6) d100(3)
243+
// i(1) i(6) d50(2) d100(3)
244+
// i(1) i(6) i(7) d50(2) d100(3)
245+
// i(1) i(6) i(7) d50(2) d100(3) d200(5)
246+
// i(1) i(6) i(7) d50(2) d100(3) d150(4) d200(5)
247+
// i(1) i(6) i(7) i(8) d50(2) d100(3) d150(4) d200(5)
248+
// i(1) i(6) i(7) i(8) d50(2) d75(9) d100(3) d150(4) d200(5)
249+
// i(1) i(6) i(7) i(8) i(10) d50(2) d75(9) d100(3) d150(4) d200(5)
250+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)1);
251+
sentry__bgworker_submit_delayed(
252+
bgw, record_order_task, NULL, (void *)3, 100);
253+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)6);
254+
sentry__bgworker_submit_delayed(
255+
bgw, record_order_task, NULL, (void *)2, 50);
256+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)7);
257+
sentry__bgworker_submit_delayed(
258+
bgw, record_order_task, NULL, (void *)5, 200);
259+
sentry__bgworker_submit_delayed(
260+
bgw, record_order_task, NULL, (void *)4, 150);
261+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)8);
262+
sentry__bgworker_submit_delayed(
263+
bgw, record_order_task, NULL, (void *)9, 75);
264+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)10);
265+
266+
sentry__bgworker_start(bgw);
267+
TEST_CHECK_INT_EQUAL(sentry__bgworker_shutdown(bgw, 5000), 0);
268+
269+
// all tasks execute: immediate first, then delayed in deadline order
270+
TEST_CHECK_INT_EQUAL(os.count, 10);
271+
TEST_CHECK_INT_EQUAL(os.order[0], 1);
272+
TEST_CHECK_INT_EQUAL(os.order[1], 6);
273+
TEST_CHECK_INT_EQUAL(os.order[2], 7);
274+
TEST_CHECK_INT_EQUAL(os.order[3], 8);
275+
TEST_CHECK_INT_EQUAL(os.order[4], 10);
276+
TEST_CHECK_INT_EQUAL(os.order[5], 2);
277+
TEST_CHECK_INT_EQUAL(os.order[6], 9);
278+
TEST_CHECK_INT_EQUAL(os.order[7], 3);
279+
TEST_CHECK_INT_EQUAL(os.order[8], 4);
280+
TEST_CHECK_INT_EQUAL(os.order[9], 5);
281+
282+
sentry__bgworker_decref(bgw);
283+
}
284+
285+
SENTRY_TEST(bgworker_delayed_priority)
286+
{
287+
SENTRY__MUTEX_INIT_DYN_ONCE(blocker_lock);
288+
sentry__cond_init(&blocker_signal);
289+
blocker_released = false;
290+
291+
struct order_state os;
292+
os.count = 0;
293+
294+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
295+
TEST_ASSERT(!!bgw);
296+
297+
// blocker holds the worker busy
298+
sentry__bgworker_submit(bgw, blocker_task, NULL, NULL);
299+
// delayed task queued behind the blocker
300+
sentry__bgworker_submit_delayed(
301+
bgw, record_order_task, NULL, (void *)1, 50);
302+
303+
sentry__bgworker_start(bgw);
304+
305+
// wait for the delayed task to become ready
306+
sleep_ms(100);
307+
308+
// submit an immediate task — should NOT bypass the ready delayed task
309+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)2);
310+
311+
// release the blocker
312+
sentry__mutex_lock(&blocker_lock);
313+
blocker_released = true;
314+
sentry__cond_wake(&blocker_signal);
315+
sentry__mutex_unlock(&blocker_lock);
316+
317+
TEST_CHECK_INT_EQUAL(sentry__bgworker_shutdown(bgw, 5000), 0);
318+
319+
TEST_CHECK_INT_EQUAL(os.count, 2);
320+
TEST_CHECK_INT_EQUAL(os.order[0], 1); // delayed (was ready first)
321+
TEST_CHECK_INT_EQUAL(os.order[1], 2); // immediate (submitted later)
322+
323+
sentry__bgworker_decref(bgw);
324+
}
325+
326+
SENTRY_TEST(bgworker_delayed_shutdown)
327+
{
328+
struct order_state os;
329+
os.count = 0;
330+
331+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
332+
TEST_ASSERT(!!bgw);
333+
334+
// immediate tasks
335+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)1);
336+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)2);
337+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)3);
338+
339+
// short delay fits within shutdown deadline
340+
sentry__bgworker_submit_delayed(
341+
bgw, record_order_task, NULL, (void *)4, 50);
342+
343+
// long delay exceeds shutdown deadline and should be pruned
344+
sentry__bgworker_submit_delayed(
345+
bgw, record_order_task, NULL, (void *)5, 5000);
346+
sentry__bgworker_submit_delayed(
347+
bgw, record_order_task, NULL, (void *)6, 5000);
348+
349+
sentry__bgworker_start(bgw);
350+
TEST_CHECK_INT_EQUAL(sentry__bgworker_shutdown(bgw, 1000), 0);
351+
352+
TEST_CHECK_INT_EQUAL(os.count, 4);
353+
TEST_CHECK_INT_EQUAL(os.order[0], 1);
354+
TEST_CHECK_INT_EQUAL(os.order[1], 2);
355+
TEST_CHECK_INT_EQUAL(os.order[2], 3);
356+
TEST_CHECK_INT_EQUAL(os.order[3], 4);
357+
358+
sentry__bgworker_decref(bgw);
359+
}

tests/unit/tests.inc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ XX(basic_tracing_context)
2424
XX(basic_transaction)
2525
XX(basic_transport_thread_name)
2626
XX(basic_write_envelope_to_file)
27+
XX(bgworker_delayed_priority)
28+
XX(bgworker_delayed_shutdown)
29+
XX(bgworker_delayed_tasks)
2730
XX(bgworker_flush)
31+
XX(bgworker_task_delay)
2832
XX(breadcrumb_without_type_or_message_still_valid)
2933
XX(build_id_parser)
3034
XX(cache_keep)

0 commit comments

Comments
 (0)