Skip to content

Commit ad7e469

Browse files
committed
feat(sync): add delayed task submission for throttling (#1506)
1 parent 9e8a954 commit ad7e469

4 files changed

Lines changed: 310 additions & 6 deletions

File tree

src/sentry_sync.c

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ struct sentry_bgworker_task_s;
125125
typedef struct sentry_bgworker_task_s {
126126
struct sentry_bgworker_task_s *next_task;
127127
long refcount;
128+
uint64_t execute_after;
128129
sentry_task_exec_func_t exec_func;
129130
void (*cleanup_func)(void *task_data);
130131
void *task_data;
@@ -260,6 +261,16 @@ worker_thread(void *data)
260261
continue;
261262
}
262263

264+
// wait for a delayed task, wake up to new submissions
265+
{
266+
uint64_t now = sentry__monotonic_time();
267+
if (now < task->execute_after) {
268+
sentry__cond_wait_timeout(&bgw->submit_signal, &bgw->task_lock,
269+
(uint32_t)(task->execute_after - now));
270+
continue;
271+
}
272+
}
273+
263274
sentry__task_incref(task);
264275
sentry__mutex_unlock(&bgw->task_lock);
265276

@@ -350,11 +361,23 @@ sentry__bgworker_flush(sentry_bgworker_t *bgw, uint64_t timeout)
350361
sentry__cond_init(&flush_task->signal);
351362
sentry__mutex_init(&flush_task->lock);
352363

364+
// flush potential delayed tasks up until the timeout
365+
uint64_t delay_ms = 0;
366+
uint64_t before = sentry__monotonic_time();
367+
sentry__mutex_lock(&bgw->task_lock);
368+
if (bgw->last_task && bgw->last_task->execute_after > before) {
369+
delay_ms = bgw->last_task->execute_after - before;
370+
if (delay_ms > timeout) {
371+
delay_ms = timeout;
372+
}
373+
}
374+
sentry__mutex_unlock(&bgw->task_lock);
375+
353376
sentry__mutex_lock(&flush_task->lock);
354377

355378
/* submit the task that triggers our condvar once it runs */
356-
sentry__bgworker_submit(bgw, sentry__flush_task,
357-
(void (*)(void *))sentry__flush_task_decref, flush_task);
379+
sentry__bgworker_submit_delayed(bgw, sentry__flush_task,
380+
(void (*)(void *))sentry__flush_task_decref, flush_task, delay_ms);
358381

359382
uint64_t started = sentry__monotonic_time();
360383
bool was_flushed = false;
@@ -396,6 +419,30 @@ sentry__bgworker_shutdown(sentry_bgworker_t *bgw, uint64_t timeout)
396419

397420
uint64_t started = sentry__monotonic_time();
398421
sentry__mutex_lock(&bgw->task_lock);
422+
423+
// prune delayed tasks that exceed the shutdown timeout
424+
sentry_bgworker_task_t *prev = NULL;
425+
sentry_bgworker_task_t *cur = bgw->first_task;
426+
while (cur) {
427+
if (cur->execute_after > started + timeout) {
428+
if (prev) {
429+
prev->next_task = NULL;
430+
bgw->last_task = prev;
431+
} else {
432+
bgw->first_task = NULL;
433+
bgw->last_task = NULL;
434+
}
435+
while (cur) {
436+
sentry_bgworker_task_t *next = cur->next_task;
437+
sentry__task_decref(cur);
438+
cur = next;
439+
}
440+
break;
441+
}
442+
prev = cur;
443+
cur = cur->next_task;
444+
}
445+
399446
while (true) {
400447
if (sentry__bgworker_is_done(bgw)) {
401448
sentry__mutex_unlock(&bgw->task_lock);
@@ -422,6 +469,28 @@ int
422469
sentry__bgworker_submit(sentry_bgworker_t *bgw,
423470
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
424471
void *task_data)
472+
{
473+
SENTRY_DEBUG("submitting task to background worker thread");
474+
return sentry__bgworker_submit_at(
475+
bgw, exec_func, cleanup_func, task_data, sentry__monotonic_time());
476+
}
477+
478+
int
479+
sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
480+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
481+
void *task_data, uint64_t delay_ms)
482+
{
483+
SENTRY_DEBUGF("submitting %" PRIu64
484+
" ms delayed task to background worker thread",
485+
delay_ms);
486+
return sentry__bgworker_submit_at(bgw, exec_func, cleanup_func, task_data,
487+
sentry__monotonic_time() + delay_ms);
488+
}
489+
490+
int
491+
sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
492+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
493+
void *task_data, uint64_t execute_after)
425494
{
426495
sentry_bgworker_task_t *task = SENTRY_MAKE(sentry_bgworker_task_t);
427496
if (!task) {
@@ -432,19 +501,41 @@ sentry__bgworker_submit(sentry_bgworker_t *bgw,
432501
}
433502
task->next_task = NULL;
434503
task->refcount = 1;
504+
task->execute_after = execute_after;
435505
task->exec_func = exec_func;
436506
task->cleanup_func = cleanup_func;
437507
task->task_data = task_data;
438508

439-
SENTRY_DEBUG("submitting task to background worker thread");
440509
sentry__mutex_lock(&bgw->task_lock);
510+
441511
if (!bgw->first_task) {
512+
// empty queue
442513
bgw->first_task = task;
443-
}
444-
if (bgw->last_task) {
514+
bgw->last_task = task;
515+
} else if (bgw->last_task->execute_after <= task->execute_after) {
516+
// append last (common fast path for FIFO immediates)
445517
bgw->last_task->next_task = task;
518+
bgw->last_task = task;
519+
} else {
520+
// insert sorted by execute_after
521+
sentry_bgworker_task_t *prev = NULL;
522+
sentry_bgworker_task_t *cur = bgw->first_task;
523+
while (cur && cur->execute_after <= task->execute_after) {
524+
prev = cur;
525+
cur = cur->next_task;
526+
}
527+
528+
task->next_task = cur;
529+
if (prev) {
530+
prev->next_task = task;
531+
} else {
532+
bgw->first_task = task;
533+
}
534+
if (!task->next_task) {
535+
bgw->last_task = task;
536+
}
446537
}
447-
bgw->last_task = task;
538+
448539
sentry__cond_wake(&bgw->submit_signal);
449540
sentry__mutex_unlock(&bgw->task_lock);
450541

src/sentry_sync.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,12 +471,23 @@ const char *sentry__bgworker_get_thread_name(sentry_bgworker_t *bgw);
471471
/**
472472
* This will submit a new task to the background thread.
473473
*
474+
* The `_delayed` variant delays execution by the specified delay in
475+
* milliseconds, and the `_at` variant executes after the specified monotonic
476+
* timestamp. The latter is mostly useful for testing to ensure deterministic
477+
* ordering of tasks regardless of OS preemption between submissions.
478+
*
474479
* Takes ownership of `data`, freeing it using the provided `cleanup_func`.
475480
* Returns 0 on success.
476481
*/
477482
int sentry__bgworker_submit(sentry_bgworker_t *bgw,
478483
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
479484
void *task_data);
485+
int sentry__bgworker_submit_delayed(sentry_bgworker_t *bgw,
486+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
487+
void *task_data, uint64_t delay_ms);
488+
int sentry__bgworker_submit_at(sentry_bgworker_t *bgw,
489+
sentry_task_exec_func_t exec_func, void (*cleanup_func)(void *task_data),
490+
void *task_data, uint64_t execute_after);
480491

481492
/**
482493
* This function will iterate through all the current tasks of the worker

tests/unit/test_sync.c

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
#include "sentry_core.h"
22
#include "sentry_sync.h"
33
#include "sentry_testsupport.h"
4+
#include "sentry_utils.h"
45

56
#ifdef SENTRY_PLATFORM_WINDOWS
67
# include <windows.h>
78
# define sleep_s(SECONDS) Sleep((SECONDS) * 1000)
9+
# define sleep_ms(MS) Sleep(MS)
810
#else
911
# include <unistd.h>
1012
# define sleep_s(SECONDS) sleep(SECONDS)
13+
# define sleep_ms(MS) usleep((MS) * 1000)
1114
#endif
1215

1316
struct task_state {
@@ -167,3 +170,198 @@ SENTRY_TEST(bgworker_flush)
167170
TEST_CHECK_INT_EQUAL(shutdown, 0);
168171
sentry__bgworker_decref(bgw);
169172
}
173+
174+
static sentry_cond_t blocker_signal;
175+
#ifdef SENTRY__MUTEX_INIT_DYN
176+
SENTRY__MUTEX_INIT_DYN(blocker_lock)
177+
#else
178+
static sentry_mutex_t blocker_lock = SENTRY__MUTEX_INIT;
179+
#endif
180+
static bool blocker_released;
181+
182+
static void
183+
blocker_task(void *UNUSED(data), void *UNUSED(state))
184+
{
185+
SENTRY__MUTEX_INIT_DYN_ONCE(blocker_lock);
186+
sentry__mutex_lock(&blocker_lock);
187+
while (!blocker_released) {
188+
sentry__cond_wait_timeout(&blocker_signal, &blocker_lock, 100);
189+
}
190+
sentry__mutex_unlock(&blocker_lock);
191+
}
192+
193+
struct order_state {
194+
int order[10];
195+
int count;
196+
};
197+
198+
static void
199+
record_order_task(void *data, void *_state)
200+
{
201+
struct order_state *state = (struct order_state *)_state;
202+
state->order[state->count++] = (int)(size_t)data;
203+
}
204+
205+
SENTRY_TEST(bgworker_task_delay)
206+
{
207+
struct order_state os;
208+
os.count = 0;
209+
210+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
211+
TEST_ASSERT(!!bgw);
212+
213+
uint64_t before = sentry__monotonic_time();
214+
sentry__bgworker_submit_delayed(
215+
bgw, record_order_task, NULL, (void *)1, 50);
216+
217+
sentry__bgworker_start(bgw);
218+
TEST_CHECK_INT_EQUAL(sentry__bgworker_flush(bgw, 500), 0);
219+
uint64_t after = sentry__monotonic_time();
220+
221+
TEST_CHECK_INT_EQUAL(os.count, 1);
222+
TEST_CHECK_INT_EQUAL(os.order[0], 1);
223+
TEST_CHECK(after - before >= 50);
224+
225+
sentry__bgworker_shutdown(bgw, 500);
226+
sentry__bgworker_decref(bgw);
227+
}
228+
229+
SENTRY_TEST(bgworker_delayed_tasks)
230+
{
231+
struct order_state os;
232+
os.count = 0;
233+
234+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
235+
TEST_ASSERT(!!bgw);
236+
237+
// submit_at with a fixed base so ordering is deterministic regardless
238+
// of OS preemption between submissions (submit_delayed reads the clock
239+
// per call, so a pause between calls could shift execute_after values
240+
// and change the expected sort order)
241+
uint64_t base = sentry__monotonic_time();
242+
243+
// all tasks sorted by execute_after: immediate (0) first, then delayed
244+
// by deadline
245+
//
246+
// queue after each submit:
247+
// i(1)
248+
// i(1) d100(3)
249+
// i(1) i(6) d100(3)
250+
// i(1) i(6) d50(2) d100(3)
251+
// i(1) i(6) i(7) d50(2) d100(3)
252+
// i(1) i(6) i(7) d50(2) d100(3) d200(5)
253+
// i(1) i(6) i(7) d50(2) d100(3) d150(4) d200(5)
254+
// i(1) i(6) i(7) i(8) d50(2) d100(3) d150(4) d200(5)
255+
// i(1) i(6) i(7) i(8) d50(2) d75(9) d100(3) d150(4) d200(5)
256+
// i(1) i(6) i(7) i(8) i(10) d50(2) d75(9) d100(3) d150(4) d200(5)
257+
sentry__bgworker_submit_at(bgw, record_order_task, NULL, (void *)1, base);
258+
sentry__bgworker_submit_at(
259+
bgw, record_order_task, NULL, (void *)3, base + 100);
260+
sentry__bgworker_submit_at(bgw, record_order_task, NULL, (void *)6, base);
261+
sentry__bgworker_submit_at(
262+
bgw, record_order_task, NULL, (void *)2, base + 50);
263+
sentry__bgworker_submit_at(bgw, record_order_task, NULL, (void *)7, base);
264+
sentry__bgworker_submit_at(
265+
bgw, record_order_task, NULL, (void *)5, base + 200);
266+
sentry__bgworker_submit_at(
267+
bgw, record_order_task, NULL, (void *)4, base + 150);
268+
sentry__bgworker_submit_at(bgw, record_order_task, NULL, (void *)8, base);
269+
sentry__bgworker_submit_at(
270+
bgw, record_order_task, NULL, (void *)9, base + 75);
271+
sentry__bgworker_submit_at(bgw, record_order_task, NULL, (void *)10, base);
272+
273+
sentry__bgworker_start(bgw);
274+
TEST_CHECK_INT_EQUAL(sentry__bgworker_flush(bgw, 5000), 0);
275+
276+
// all tasks execute: immediate first, then delayed in deadline order
277+
TEST_CHECK_INT_EQUAL(os.count, 10);
278+
TEST_CHECK_INT_EQUAL(os.order[0], 1);
279+
TEST_CHECK_INT_EQUAL(os.order[1], 6);
280+
TEST_CHECK_INT_EQUAL(os.order[2], 7);
281+
TEST_CHECK_INT_EQUAL(os.order[3], 8);
282+
TEST_CHECK_INT_EQUAL(os.order[4], 10);
283+
TEST_CHECK_INT_EQUAL(os.order[5], 2);
284+
TEST_CHECK_INT_EQUAL(os.order[6], 9);
285+
TEST_CHECK_INT_EQUAL(os.order[7], 3);
286+
TEST_CHECK_INT_EQUAL(os.order[8], 4);
287+
TEST_CHECK_INT_EQUAL(os.order[9], 5);
288+
289+
sentry__bgworker_shutdown(bgw, 500);
290+
sentry__bgworker_decref(bgw);
291+
}
292+
293+
SENTRY_TEST(bgworker_delayed_priority)
294+
{
295+
SENTRY__MUTEX_INIT_DYN_ONCE(blocker_lock);
296+
sentry__cond_init(&blocker_signal);
297+
blocker_released = false;
298+
299+
struct order_state os;
300+
os.count = 0;
301+
302+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
303+
TEST_ASSERT(!!bgw);
304+
305+
// blocker holds the worker busy
306+
sentry__bgworker_submit(bgw, blocker_task, NULL, NULL);
307+
// delayed task queued behind the blocker
308+
sentry__bgworker_submit_delayed(
309+
bgw, record_order_task, NULL, (void *)1, 50);
310+
311+
sentry__bgworker_start(bgw);
312+
313+
// wait for the delayed task to become ready
314+
sleep_ms(100);
315+
316+
// submit an immediate task — should NOT bypass the ready delayed task
317+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)2);
318+
319+
// release the blocker
320+
sentry__mutex_lock(&blocker_lock);
321+
blocker_released = true;
322+
sentry__cond_wake(&blocker_signal);
323+
sentry__mutex_unlock(&blocker_lock);
324+
325+
TEST_CHECK_INT_EQUAL(sentry__bgworker_shutdown(bgw, 5000), 0);
326+
327+
TEST_CHECK_INT_EQUAL(os.count, 2);
328+
TEST_CHECK_INT_EQUAL(os.order[0], 1); // delayed (was ready first)
329+
TEST_CHECK_INT_EQUAL(os.order[1], 2); // immediate (submitted later)
330+
331+
sentry__bgworker_decref(bgw);
332+
}
333+
334+
SENTRY_TEST(bgworker_delayed_shutdown)
335+
{
336+
struct order_state os;
337+
os.count = 0;
338+
339+
sentry_bgworker_t *bgw = sentry__bgworker_new(&os, NULL);
340+
TEST_ASSERT(!!bgw);
341+
342+
// immediate tasks
343+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)1);
344+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)2);
345+
sentry__bgworker_submit(bgw, record_order_task, NULL, (void *)3);
346+
347+
// short delay fits within shutdown deadline
348+
sentry__bgworker_submit_delayed(
349+
bgw, record_order_task, NULL, (void *)4, 50);
350+
351+
// long delay exceeds shutdown deadline and should be pruned
352+
sentry__bgworker_submit_delayed(
353+
bgw, record_order_task, NULL, (void *)5, 5000);
354+
sentry__bgworker_submit_delayed(
355+
bgw, record_order_task, NULL, (void *)6, 5000);
356+
357+
sentry__bgworker_start(bgw);
358+
TEST_CHECK_INT_EQUAL(sentry__bgworker_shutdown(bgw, 1000), 0);
359+
360+
TEST_CHECK_INT_EQUAL(os.count, 4);
361+
TEST_CHECK_INT_EQUAL(os.order[0], 1);
362+
TEST_CHECK_INT_EQUAL(os.order[1], 2);
363+
TEST_CHECK_INT_EQUAL(os.order[2], 3);
364+
TEST_CHECK_INT_EQUAL(os.order[3], 4);
365+
366+
sentry__bgworker_decref(bgw);
367+
}

0 commit comments

Comments
 (0)