Skip to content

Commit 6181c70

Browse files
Direct dispatch: bypass worker threads, execute blocking op inline
Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 8841faa commit 6181c70

1 file changed

Lines changed: 3 additions & 51 deletions

File tree

ext/io/event/worker_pool.c

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -342,58 +342,10 @@ static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) {
342342
rb_raise(rb_eArgError, "Invalid blocking operation!");
343343
}
344344

345-
// Create work item.
346-
// Store the VALUE alongside the raw pointer — the compacting GC may move the
347-
// TypedData object while the calling fiber is suspended, making the raw pointer
348-
// stale. Registering the VALUEs as precise GC roots ensures they are updated
349-
// to the new address, allowing safe re-extraction in the worker thread.
350-
struct IO_Event_WorkerPool_Work work = {
351-
.blocking_operation = blocking_operation,
352-
.completed = false,
353-
.scheduler = scheduler,
354-
.blocker = self,
355-
.fiber = fiber,
356-
.next = NULL
357-
};
358-
359-
// Enqueue work:
360-
pthread_mutex_lock(&pool->mutex);
361-
enqueue_work(pool, &work);
362-
pthread_cond_signal(&pool->work_available);
363-
pthread_mutex_unlock(&pool->mutex);
345+
// Direct dispatch: execute the blocking operation inline without worker threads.
346+
rb_fiber_scheduler_blocking_operation_execute(blocking_operation);
364347

365-
// Block the current fiber until work is completed:
366-
int state = 0;
367-
while (true) {
368-
int current_state = 0;
369-
rb_protect(worker_pool_work_begin, (VALUE)&work, &current_state);
370-
if (DEBUG) fprintf(stderr, "-- worker_pool_call:work completed=%d, current_state=%d, state=%d\n", work.completed, current_state, state);
371-
372-
// Store the first exception state:
373-
if (!state) {
374-
state = current_state;
375-
}
376-
377-
// If the work is still in the queue, we must wait for a worker to complete it (even if cancelled):
378-
if (work.completed) {
379-
// The work was completed, we can exit the loop:
380-
break;
381-
} else {
382-
if (DEBUG) fprintf(stderr, "worker_pool_call:rb_fiber_scheduler_blocking_operation_cancel\n");
383-
// Ensure the blocking operation is cancelled:
384-
rb_fiber_scheduler_blocking_operation_cancel(work.blocking_operation);
385-
386-
// The work was not completed, we need to wait for it to be completed, so we go around the loop again.
387-
}
388-
}
389-
390-
if (DEBUG) fprintf(stderr, "<- worker_pool_call:work completed=%d, state=%d\n", work.completed, state);
391-
392-
if (state) {
393-
rb_jump_tag(state);
394-
} else {
395-
return Qtrue;
396-
}
348+
return Qtrue;
397349
}
398350

399351
static VALUE worker_pool_allocate(VALUE klass) {

0 commit comments

Comments
 (0)