Skip to content

Commit 217a8bb

Browse files
Add Fiber#runtime, Fiber#quantum, and back-edge based preemption
## Background Ruby's Fiber Scheduler is cooperative: fibers yield voluntarily when waiting for I/O or calling Fiber.yield. This breaks down when CPU-bound work enters the picture — a tight loop blocks the scheduler's event loop indefinitely, starving every waiting fiber. This change follows Erlang's reduction-counting model. Erlang assigns each process a fixed budget of "reductions" (function calls) per scheduling slot; when the budget is exhausted the scheduler preempts it without OS involvement. YARV already calls rb_vm_check_ints at every loop back-edge. We add a single counter increment there, and yield to the fiber scheduler when the count reaches a configurable quantum. ## New API Fiber#runtime -> Integer Back-edges executed in the current (or most recently completed) scheduling slot. Resets to 0 on every resume/transfer. Fiber#quantum -> Integer Fiber#quantum= Integer Slot size in back-edges before forced preemption. Default: 50_000 (~7 ms at ~7M iterations/s). Also settable at construction time via Fiber.new(quantum: n). rb_fiber_runtime_advance(uint32_t n) [C API] Advance the current fiber's runtime counter by n units. Intended for C extensions doing CPU work outside YARV dispatch (JSON parsers, image codecs, matrix math). Triggers preemption if the quantum is exhausted. ## Scheduler integration When a non-blocking fiber exhausts its quantum, rb_fiber_scheduler_yield is called. Schedulers that implement the `yield` hook (introduced previously) can use this for work-conserving preemption. The scheduler sees Fiber#runtime before the fiber is resumed. A scheduler can order its ready queue by runtime — lowest first — so that I/O-bound fibers (which yield early with low runtime) naturally sort ahead of CPU-bound ones. This is stride scheduling: accumulated CPU consumption drives priority without explicit priority assignments. test/fiber/fair_scheduler.rb demonstrates a complete implementation: a min-heap ordered by Fiber#runtime that gives I/O-bound fibers automatic scheduling priority over CPU-bound ones. ## Implementation details Three new fields in rb_execution_context_t: uint32_t runtime — back-edges this slot (reset to 0 on resume) uint32_t quantum — preemption threshold (default RUBY_FIBER_QUANTUM_DEFAULT) uint8_t preempted — re-entrancy guard; prevents the scheduler's own Ruby code from immediately re-triggering preemption All three are correctly initialized in: - fiber_t_alloc (new fibers) - rb_threadptr_root_fiber_setup (root fiber per thread) - rb_fiber_atfork (child process after fork) The preempted flag is cleared in fiber_switch on every resume, giving each scheduling slot a fresh start. ## Compatibility Existing schedulers that do not implement `yield` see no change: the call falls back to kernel_sleep(0). Blocking fibers are never preempted. Programs without a scheduler pay only the cost of one counter increment and one comparison per loop back-edge in rb_vm_check_ints. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 9fefb48 commit 217a8bb

17 files changed

Lines changed: 952 additions & 25 deletions

File tree

cont.c

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ enum {
5353
#define RB_PAGE_MASK (~(RB_PAGE_SIZE - 1))
5454
static long pagesize;
5555

56+
// Default scheduling quantum: back-edges per slot before forced preemption.
57+
// At ~7M iterations/s, 50_000 gives roughly a 7ms slice.
58+
#define RUBY_FIBER_QUANTUM_DEFAULT 50000u
59+
5660
static const rb_data_type_t rb_cont_data_type;
5761
static const rb_data_type_t rb_fiber_data_type;
5862
static VALUE rb_cContinuation;
@@ -296,7 +300,7 @@ rb_free_shared_fiber_pool(void)
296300
}
297301
}
298302

299-
static ID fiber_initialize_keywords[3] = {0};
303+
static ID fiber_initialize_keywords[4] = {0};
300304

301305
/*
302306
* FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL
@@ -2171,6 +2175,9 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking)
21712175

21722176
fiber->cont.saved_ec.fiber_ptr = fiber;
21732177
fiber->cont.saved_ec.serial = next_ec_serial(th->ractor);
2178+
fiber->cont.saved_ec.runtime = 0;
2179+
fiber->cont.saved_ec.preempted = 0;
2180+
fiber->cont.saved_ec.quantum = RUBY_FIBER_QUANTUM_DEFAULT;
21742181
rb_ec_clear_vm_stack(&fiber->cont.saved_ec);
21752182

21762183
fiber->prev = NULL;
@@ -2362,7 +2369,7 @@ rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value)
23622369
}
23632370

23642371
static VALUE
2365-
fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking, VALUE storage)
2372+
fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking, VALUE storage, VALUE quantum)
23662373
{
23672374
if (storage == Qundef || storage == Qtrue) {
23682375
// The default, inherit storage (dup) from the current fiber:
@@ -2380,6 +2387,12 @@ fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigne
23802387
fiber->stack.base = NULL;
23812388
fiber->stack.pool = fiber_pool;
23822389

2390+
if (!UNDEF_P(quantum)) {
2391+
uint32_t q = NUM2UINT(quantum);
2392+
if (q == 0) rb_raise(rb_eArgError, "quantum must be positive");
2393+
fiber->cont.saved_ec.quantum = q;
2394+
}
2395+
23832396
return self;
23842397
}
23852398

@@ -2422,13 +2435,14 @@ rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
24222435
VALUE pool = Qnil;
24232436
VALUE blocking = Qfalse;
24242437
VALUE storage = Qundef;
2438+
VALUE quantum = Qundef;
24252439

24262440
if (kw_splat != RB_NO_KEYWORDS) {
24272441
VALUE options = Qnil;
2428-
VALUE arguments[3] = {Qundef};
2442+
VALUE arguments[4] = {Qundef};
24292443

24302444
argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options);
2431-
rb_get_kwargs(options, fiber_initialize_keywords, 0, 3, arguments);
2445+
rb_get_kwargs(options, fiber_initialize_keywords, 0, 4, arguments);
24322446

24332447
if (!UNDEF_P(arguments[0])) {
24342448
blocking = arguments[0];
@@ -2439,9 +2453,10 @@ rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
24392453
}
24402454

24412455
storage = arguments[2];
2456+
quantum = arguments[3];
24422457
}
24432458

2444-
return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking), storage);
2459+
return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking), storage, quantum);
24452460
}
24462461

24472462
/*
@@ -2502,7 +2517,7 @@ rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
25022517
VALUE
25032518
rb_fiber_new_storage(rb_block_call_func_t func, VALUE obj, VALUE storage)
25042519
{
2505-
return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 0, storage);
2520+
return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 0, storage, Qundef);
25062521
}
25072522

25082523
VALUE
@@ -2698,6 +2713,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th)
26982713
fiber->cont.saved_ec.fiber_ptr = fiber;
26992714
fiber->cont.saved_ec.serial = next_ec_serial(th->ractor);
27002715
fiber->cont.saved_ec.thread_ptr = th;
2716+
fiber->cont.saved_ec.quantum = RUBY_FIBER_QUANTUM_DEFAULT;
27012717
fiber->blocking = 1;
27022718
fiber->killed = 0;
27032719
fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */
@@ -2887,6 +2903,16 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, rb_fi
28872903
cont->kw_splat = kw_splat;
28882904
cont->value = make_passing_arg(argc, argv);
28892905

2906+
// Give the incoming fiber a fresh scheduling slot on resume:
2907+
// - runtime resets to 0 so it gets a full new quantum
2908+
// - preempted clears so the quantum check re-engages
2909+
// Suspended fibers retain their runtime/preempted values between yield
2910+
// and resume so the scheduler can read them for ordering decisions.
2911+
if (!fiber->blocking) {
2912+
fiber->cont.saved_ec.runtime = 0;
2913+
fiber->cont.saved_ec.preempted = 0;
2914+
}
2915+
28902916
fiber_store(fiber, th);
28912917

28922918
// We cannot free the stack until the pthread is joined:
@@ -2945,6 +2971,62 @@ rb_fiber_blocking_p(VALUE fiber)
29452971
return RBOOL(fiber_ptr(fiber)->blocking);
29462972
}
29472973

2974+
// Advance the fiber's runtime counter by n back-edges worth of work.
2975+
// Intended for C extensions doing CPU-bound work outside YARV's instruction
2976+
// dispatch loop (e.g. JSON parsers, codec loops, matrix operations).
2977+
// Triggers preemption and scheduler yield if the quantum is exhausted.
2978+
// Safe to call with the GVL held; no-op when there is no scheduler or the
2979+
// current fiber is blocking.
2980+
void
2981+
rb_fiber_runtime_advance(uint32_t runtime)
2982+
{
2983+
rb_execution_context_t *ec = GET_EC();
2984+
ec->runtime += runtime;
2985+
rb_fiber_scheduler_maybe_preempt(ec);
2986+
}
2987+
2988+
/*
2989+
* call-seq: fiber.runtime -> integer
2990+
*
2991+
* Back-edges executed in the current (or last completed) scheduling slot.
2992+
* Resets to 0 on resume/transfer.
2993+
*/
2994+
static VALUE
2995+
rb_fiber_runtime(VALUE self)
2996+
{
2997+
rb_fiber_t *fiber = fiber_ptr(self);
2998+
return UINT2NUM(fiber->cont.saved_ec.runtime);
2999+
}
3000+
3001+
/*
3002+
* call-seq: fiber.quantum -> integer
3003+
*
3004+
* Back-edges per scheduling slot before forced preemption. Default: 50_000.
3005+
*/
3006+
static VALUE
3007+
rb_fiber_quantum_get(VALUE self)
3008+
{
3009+
rb_fiber_t *fiber = fiber_ptr(self);
3010+
uint32_t q = fiber->cont.saved_ec.quantum;
3011+
return UINT2NUM(q ? q : RUBY_FIBER_QUANTUM_DEFAULT);
3012+
}
3013+
3014+
/*
3015+
* call-seq: fiber.quantum = integer
3016+
*
3017+
* Sets the per-slot quantum. Must be a positive integer.
3018+
*/
3019+
static VALUE
3020+
rb_fiber_quantum_set(VALUE self, VALUE val)
3021+
{
3022+
rb_fiber_t *fiber = fiber_ptr(self);
3023+
uint32_t q = NUM2UINT(val);
3024+
if (q == 0) rb_raise(rb_eArgError, "quantum must be positive");
3025+
fiber->cont.saved_ec.quantum = q;
3026+
return val;
3027+
}
3028+
3029+
29483030
static VALUE
29493031
fiber_blocking_yield(VALUE fiber_value)
29503032
{
@@ -3508,6 +3590,9 @@ rb_fiber_atfork(rb_thread_t *th)
35083590
}
35093591
th->root_fiber->prev = 0;
35103592
th->root_fiber->blocking = 1;
3593+
th->root_fiber->cont.saved_ec.runtime = 0;
3594+
th->root_fiber->cont.saved_ec.preempted = 0;
3595+
th->root_fiber->cont.saved_ec.quantum = RUBY_FIBER_QUANTUM_DEFAULT;
35113596
th->blocking = 1;
35123597
}
35133598
}
@@ -3658,6 +3743,7 @@ Init_Cont(void)
36583743
fiber_initialize_keywords[0] = rb_intern_const("blocking");
36593744
fiber_initialize_keywords[1] = rb_intern_const("pool");
36603745
fiber_initialize_keywords[2] = rb_intern_const("storage");
3746+
fiber_initialize_keywords[3] = rb_intern_const("quantum");
36613747

36623748
const char *fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS");
36633749
if (fiber_shared_fiber_pool_free_stacks) {
@@ -3684,6 +3770,9 @@ Init_Cont(void)
36843770

36853771
rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1);
36863772
rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0);
3773+
rb_define_method(rb_cFiber, "runtime", rb_fiber_runtime, 0);
3774+
rb_define_method(rb_cFiber, "quantum", rb_fiber_quantum_get, 0);
3775+
rb_define_method(rb_cFiber, "quantum=", rb_fiber_quantum_set, 1);
36873776
rb_define_method(rb_cFiber, "storage", rb_fiber_storage_get, 0);
36883777
rb_define_method(rb_cFiber, "storage=", rb_fiber_storage_set, 1);
36893778
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);

0 commit comments

Comments
 (0)