diff --git a/ext/extconf.rb b/ext/extconf.rb index 7aa6d0f8..60bd519f 100755 --- a/ext/extconf.rb +++ b/ext/extconf.rb @@ -58,6 +58,16 @@ have_header("ruby/io/buffer.h") +# Feature detection for blocking operation support +if have_func("rb_fiber_scheduler_blocking_operation_extract") + # Feature detection for pthread support (needed for WorkerPool) + if have_header("pthread.h") + append_cflags(["-DHAVE_IO_EVENT_WORKER_POOL"]) + $srcs << "io/event/worker_pool.c" + $srcs << "io/event/worker_pool_test.c" + end +end + if ENV.key?("RUBY_SANITIZE") $stderr.puts "Enabling sanitizers..." diff --git a/ext/io/event/event.c b/ext/io/event/event.c index 5c947773..59492544 100644 --- a/ext/io/event/event.c +++ b/ext/io/event/event.c @@ -14,7 +14,11 @@ void Init_IO_Event(void) VALUE IO_Event = rb_define_module_under(rb_cIO, "Event"); Init_IO_Event_Fiber(IO_Event); - + + #ifdef HAVE_IO_EVENT_WORKER_POOL + Init_IO_Event_WorkerPool(IO_Event); + #endif + VALUE IO_Event_Selector = rb_define_module_under(IO_Event, "Selector"); Init_IO_Event_Selector(IO_Event_Selector); diff --git a/ext/io/event/event.h b/ext/io/event/event.h index b2884b46..fe69d2c2 100644 --- a/ext/io/event/event.h +++ b/ext/io/event/event.h @@ -18,3 +18,7 @@ void Init_IO_Event(void); #ifdef HAVE_SYS_EVENT_H #include "selector/kqueue.h" #endif + +#ifdef HAVE_IO_EVENT_WORKER_POOL +#include "worker_pool.h" +#endif diff --git a/ext/io/event/fiber.c b/ext/io/event/fiber.c index d049cc26..95d19394 100644 --- a/ext/io/event/fiber.c +++ b/ext/io/event/fiber.c @@ -35,7 +35,7 @@ VALUE IO_Event_Fiber_raise(VALUE fiber, int argc, VALUE *argv) { #ifndef HAVE_RB_FIBER_CURRENT static ID id_current; -static VALUE IO_Event_Fiber_current(void) { +VALUE IO_Event_Fiber_current(void) { return rb_funcall(rb_cFiber, id_current, 0); } #endif diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c new file mode 100644 index 00000000..a6975f4c --- /dev/null +++ b/ext/io/event/worker_pool.c @@ -0,0 +1,464 @@ +// Released under the MIT License. +// Copyright, 2025, by Samuel Williams. + +#include "worker_pool.h" +#include "worker_pool_test.h" +#include "fiber.h" + +#include +#include + +#include +#include +#include +#include +#include + +enum { + DEBUG = 0, +}; + +static VALUE IO_Event_WorkerPool; +static ID id_maximum_worker_count; + +// Thread pool structure +struct IO_Event_WorkerPool_Worker { + VALUE thread; + + // Flag to indicate this specific worker should exit: + bool interrupted; + + // Currently executing operation: + rb_fiber_scheduler_blocking_operation_t *current_blocking_operation; + + struct IO_Event_WorkerPool *pool; + struct IO_Event_WorkerPool_Worker *next; +}; + +// Work item structure +struct IO_Event_WorkerPool_Work { + rb_fiber_scheduler_blocking_operation_t *blocking_operation; + + bool completed; + + VALUE scheduler; + VALUE blocker; + VALUE fiber; + + struct IO_Event_WorkerPool_Work *next; +}; + +// Worker pool structure +struct IO_Event_WorkerPool { + pthread_mutex_t mutex; + pthread_cond_t work_available; + + struct IO_Event_WorkerPool_Work *work_queue; + struct IO_Event_WorkerPool_Work *work_queue_tail; + + struct IO_Event_WorkerPool_Worker *workers; + size_t current_worker_count; + size_t maximum_worker_count; + + size_t call_count; + size_t completed_count; + size_t cancelled_count; + + bool shutdown; +}; + +// Free functions for Ruby GC +static void worker_pool_free(void *ptr) { + struct IO_Event_WorkerPool *pool = (struct IO_Event_WorkerPool *)ptr; + + if (pool) { + // Signal shutdown to all workers + if (!pool->shutdown) { + pthread_mutex_lock(&pool->mutex); + pool->shutdown = true; + pthread_cond_broadcast(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); + } + + // Note: We don't free worker structures or wait for threads during GC + // as this can cause deadlocks. The Ruby GC will handle the thread objects. + // Workers will see the shutdown flag and exit cleanly. + } +} + +// Size functions for Ruby GC +static size_t worker_pool_size(const void *ptr) { + return sizeof(struct IO_Event_WorkerPool); +} + +// Ruby TypedData structures +static const rb_data_type_t IO_Event_WorkerPool_type = { + "IO::Event::WorkerPool", + {0, worker_pool_free, worker_pool_size,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY +}; + +// Helper function to enqueue work (must be called with mutex held) +static void enqueue_work(struct IO_Event_WorkerPool *pool, struct IO_Event_WorkerPool_Work *work) { + if (pool->work_queue_tail) { + pool->work_queue_tail->next = work; + } else { + pool->work_queue = work; + } + pool->work_queue_tail = work; +} + +// Helper function to dequeue work (must be called with mutex held) +static struct IO_Event_WorkerPool_Work *dequeue_work(struct IO_Event_WorkerPool *pool) { + struct IO_Event_WorkerPool_Work *work = pool->work_queue; + if (work) { + pool->work_queue = work->next; + if (!pool->work_queue) { + pool->work_queue_tail = NULL; + } + work->next = NULL; // Clear the next pointer for safety + } + return work; +} + +// Unblock function to interrupt a specific worker. +static void worker_unblock_func(void *_worker) { + struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker; + struct IO_Event_WorkerPool *pool = worker->pool; + + // Mark this specific worker as interrupted + pthread_mutex_lock(&pool->mutex); + worker->interrupted = true; + pthread_cond_broadcast(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); + + // If there's a currently executing blocking operation, cancel it + if (worker->current_blocking_operation) { + rb_fiber_scheduler_blocking_operation_cancel(worker->current_blocking_operation); + } +} + +// Function to wait for work and execute it without GVL. +static void *worker_wait_and_execute(void *_worker) { + struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker; + struct IO_Event_WorkerPool *pool = worker->pool; + + while (true) { + struct IO_Event_WorkerPool_Work *work = NULL; + + pthread_mutex_lock(&pool->mutex); + + // Wait for work, shutdown, or interruption + while (!pool->work_queue && !pool->shutdown && !worker->interrupted) { + pthread_cond_wait(&pool->work_available, &pool->mutex); + } + + if (pool->shutdown || worker->interrupted) { + pthread_mutex_unlock(&pool->mutex); + break; + } + + work = dequeue_work(pool); + + pthread_mutex_unlock(&pool->mutex); + + // Execute work WITHOUT GVL (this is the whole point!) + if (work) { + worker->current_blocking_operation = work->blocking_operation; + rb_fiber_scheduler_blocking_operation_execute(work->blocking_operation); + worker->current_blocking_operation = NULL; + } + + return work; + } + + return NULL; // Shutdown signal +} + +static VALUE worker_thread_func(void *_worker) { + struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker; + + while (true) { + // Wait for work and execute it without holding GVL + struct IO_Event_WorkerPool_Work *work = (struct IO_Event_WorkerPool_Work *)rb_thread_call_without_gvl(worker_wait_and_execute, worker, worker_unblock_func, worker); + + if (!work) { + // Shutdown signal received + break; + } + + // Protected by GVL: + work->completed = true; + worker->pool->completed_count++; + + // Work was executed without GVL, now unblock the waiting fiber (we have GVL here) + rb_fiber_scheduler_unblock(work->scheduler, work->blocker, work->fiber); + } + + return Qnil; +} + +// Create a new worker thread +static int create_worker_thread(struct IO_Event_WorkerPool *pool) { + if (pool->current_worker_count >= pool->maximum_worker_count) { + return -1; + } + + struct IO_Event_WorkerPool_Worker *worker = malloc(sizeof(struct IO_Event_WorkerPool_Worker)); + if (!worker) { + return -1; + } + + worker->pool = pool; + worker->interrupted = false; + worker->current_blocking_operation = NULL; + worker->next = pool->workers; + + worker->thread = rb_thread_create(worker_thread_func, worker); + if (NIL_P(worker->thread)) { + free(worker); + return -1; + } + + pool->workers = worker; + pool->current_worker_count++; + + return 0; +} + +// Ruby constructor for WorkerPool +static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { + size_t maximum_worker_count = 1; // Default + + // Extract keyword arguments + VALUE kwargs = Qnil; + VALUE rb_maximum_worker_count = Qnil; + + rb_scan_args(argc, argv, "0:", &kwargs); + + if (!NIL_P(kwargs)) { + VALUE kwvals[1]; + ID kwkeys[1] = {id_maximum_worker_count}; + rb_get_kwargs(kwargs, kwkeys, 0, 1, kwvals); + rb_maximum_worker_count = kwvals[0]; + } + + if (!NIL_P(rb_maximum_worker_count)) { + maximum_worker_count = NUM2SIZET(rb_maximum_worker_count); + if (maximum_worker_count == 0) { + rb_raise(rb_eArgError, "maximum_worker_count must be greater than 0!"); + } + } + + // Get the pool that was allocated by worker_pool_allocate + struct IO_Event_WorkerPool *pool; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + + if (!pool) { + rb_raise(rb_eRuntimeError, "WorkerPool allocation failed!"); + } + + pthread_mutex_init(&pool->mutex, NULL); + pthread_cond_init(&pool->work_available, NULL); + + pool->work_queue = NULL; + pool->work_queue_tail = NULL; + pool->workers = NULL; + pool->current_worker_count = 0; + pool->maximum_worker_count = maximum_worker_count; + pool->call_count = 0; + pool->completed_count = 0; + pool->cancelled_count = 0; + pool->shutdown = false; + + // Create initial workers + for (size_t i = 0; i < maximum_worker_count; i++) { + if (create_worker_thread(pool) != 0) { + // Just set the maximum_worker_count for debugging, don't fail completely + // worker_pool_free(pool); + // rb_raise(rb_eRuntimeError, "Failed to create workers"); + break; + } + } + + return self; +} + +static VALUE worker_pool_work_begin(VALUE _work) { + struct IO_Event_WorkerPool_Work *work = (void*)_work; + + if (DEBUG) fprintf(stderr, "worker_pool_work_begin:rb_fiber_scheduler_block work=%p\n", work); + rb_fiber_scheduler_block(work->scheduler, work->blocker, Qnil); + + return Qnil; +} + +// Ruby method to submit work and wait for completion +static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) { + struct IO_Event_WorkerPool *pool; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + + if (pool->shutdown) { + rb_raise(rb_eRuntimeError, "Worker pool is shut down!"); + } + + // Increment call count (protected by GVL) + pool->call_count++; + + // Get current fiber and scheduler + VALUE fiber = rb_fiber_current(); + VALUE scheduler = rb_fiber_scheduler_current(); + if (NIL_P(scheduler)) { + rb_raise(rb_eRuntimeError, "WorkerPool requires a fiber scheduler!"); + } + + // Extract blocking operation handle + rb_fiber_scheduler_blocking_operation_t *blocking_operation = rb_fiber_scheduler_blocking_operation_extract(_blocking_operation); + + if (!blocking_operation) { + rb_raise(rb_eArgError, "Invalid blocking operation!"); + } + + // Create work item + struct IO_Event_WorkerPool_Work work = { + .blocking_operation = blocking_operation, + .completed = false, + .scheduler = scheduler, + .blocker = self, + .fiber = fiber, + .next = NULL + }; + + // Enqueue work: + pthread_mutex_lock(&pool->mutex); + enqueue_work(pool, &work); + pthread_cond_signal(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); + + // Block the current fiber until work is completed: + int state; + while (true) { + rb_protect(worker_pool_work_begin, (VALUE)&work, &state); + + if (work.completed) { + break; + } else { + if (DEBUG) fprintf(stderr, "worker_pool_call:rb_fiber_scheduler_blocking_operation_cancel\n"); + rb_fiber_scheduler_blocking_operation_cancel(blocking_operation); + // The work was not completed, we need to wait for it to be completed. + } + } + + if (state) { + rb_jump_tag(state); + } else { + return Qtrue; + } +} + +static VALUE worker_pool_allocate(VALUE klass) { + struct IO_Event_WorkerPool *pool; + VALUE self = TypedData_Make_Struct(klass, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + + // Initialize to NULL/zero so we can detect uninitialized pools + memset(pool, 0, sizeof(struct IO_Event_WorkerPool)); + + return self; +} + +// Ruby method to close the worker pool +static VALUE worker_pool_close(VALUE self) { + struct IO_Event_WorkerPool *pool; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + + if (!pool) { + rb_raise(rb_eRuntimeError, "WorkerPool not initialized!"); + } + + if (pool->shutdown) { + return Qnil; // Already closed + } + + // Signal shutdown to all workers + pthread_mutex_lock(&pool->mutex); + pool->shutdown = true; + pthread_cond_broadcast(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); + + // Wait for all worker threads to finish + struct IO_Event_WorkerPool_Worker *worker = pool->workers; + while (worker) { + if (!NIL_P(worker->thread)) { + rb_funcall(worker->thread, rb_intern("join"), 0); + } + worker = worker->next; + } + + // Clean up worker structures + worker = pool->workers; + while (worker) { + struct IO_Event_WorkerPool_Worker *next = worker->next; + free(worker); + worker = next; + } + pool->workers = NULL; + pool->current_worker_count = 0; + + // Clean up mutex and condition variable + pthread_mutex_destroy(&pool->mutex); + pthread_cond_destroy(&pool->work_available); + + return Qnil; +} + +// Test helper: get pool statistics for debugging/testing +static VALUE worker_pool_statistics(VALUE self) { + struct IO_Event_WorkerPool *pool; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + + if (!pool) { + rb_raise(rb_eRuntimeError, "WorkerPool not initialized!"); + } + + VALUE stats = rb_hash_new(); + rb_hash_aset(stats, ID2SYM(rb_intern("current_worker_count")), SIZET2NUM(pool->current_worker_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("maximum_worker_count")), SIZET2NUM(pool->maximum_worker_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("call_count")), SIZET2NUM(pool->call_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("completed_count")), SIZET2NUM(pool->completed_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("cancelled_count")), SIZET2NUM(pool->cancelled_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("shutdown")), pool->shutdown ? Qtrue : Qfalse); + + // Count work items in queue (only if properly initialized) + if (pool->maximum_worker_count > 0) { + pthread_mutex_lock(&pool->mutex); + size_t current_queue_size = 0; + struct IO_Event_WorkerPool_Work *work = pool->work_queue; + while (work) { + current_queue_size++; + work = work->next; + } + pthread_mutex_unlock(&pool->mutex); + rb_hash_aset(stats, ID2SYM(rb_intern("current_queue_size")), SIZET2NUM(current_queue_size)); + } else { + rb_hash_aset(stats, ID2SYM(rb_intern("current_queue_size")), SIZET2NUM(0)); + } + + return stats; +} + +void Init_IO_Event_WorkerPool(VALUE IO_Event) { + // Initialize symbols + id_maximum_worker_count = rb_intern("maximum_worker_count"); + + IO_Event_WorkerPool = rb_define_class_under(IO_Event, "WorkerPool", rb_cObject); + rb_define_alloc_func(IO_Event_WorkerPool, worker_pool_allocate); + + rb_define_method(IO_Event_WorkerPool, "initialize", worker_pool_initialize, -1); + rb_define_method(IO_Event_WorkerPool, "call", worker_pool_call, 1); + rb_define_method(IO_Event_WorkerPool, "close", worker_pool_close, 0); + + rb_define_method(IO_Event_WorkerPool, "statistics", worker_pool_statistics, 0); + + // Initialize test functions + Init_IO_Event_WorkerPool_Test(IO_Event_WorkerPool); +} diff --git a/ext/io/event/worker_pool.h b/ext/io/event/worker_pool.h new file mode 100644 index 00000000..dbec34ca --- /dev/null +++ b/ext/io/event/worker_pool.h @@ -0,0 +1,8 @@ +// Released under the MIT License. +// Copyright, 2025, by Samuel Williams. + +#pragma once + +#include + +void Init_IO_Event_WorkerPool(VALUE IO_Event); diff --git a/ext/io/event/worker_pool_test.c b/ext/io/event/worker_pool_test.c new file mode 100644 index 00000000..4c913551 --- /dev/null +++ b/ext/io/event/worker_pool_test.c @@ -0,0 +1,200 @@ +// worker_pool_test.c - Test functions for WorkerPool cancellation +// Released under the MIT License. +// Copyright, 2025, by Samuel Williams. + +#include "worker_pool_test.h" + +#include +#include +#include + +#include +#include +#include +#include + +static ID id_duration; + +struct BusyOperationData { + int read_fd; + int write_fd; + volatile int cancelled; + double duration; // How long to wait (for testing) + clock_t start_time; + clock_t end_time; + int operation_result; + VALUE exception; +}; + +// The actual blocking operation that can be cancelled +static void* busy_blocking_operation(void *data) { + struct BusyOperationData *busy_data = (struct BusyOperationData*)data; + + // Use select() to wait for the pipe to become readable + fd_set read_fds; + struct timeval timeout; + + FD_ZERO(&read_fds); + FD_SET(busy_data->read_fd, &read_fds); + + // Set timeout based on duration + timeout.tv_sec = (long)busy_data->duration; + timeout.tv_usec = ((busy_data->duration - timeout.tv_sec) * 1000000); + + // This will block until: + // 1. The pipe becomes readable (cancellation) + // 2. The timeout expires + // 3. An error occurs + int result = select(busy_data->read_fd + 1, &read_fds, NULL, NULL, &timeout); + + if (result > 0 && FD_ISSET(busy_data->read_fd, &read_fds)) { + // Pipe became readable - we were cancelled + char buffer; + read(busy_data->read_fd, &buffer, 1); // Consume the byte + busy_data->cancelled = 1; + return (void*)-1; // Indicate cancellation + } else if (result == 0) { + // Timeout - operation completed normally + return (void*)0; // Indicate success + } else { + // Error occurred + return (void*)-2; // Indicate error + } +} + +// Unblock function that writes to the pipe to cancel the operation +static void busy_unblock_function(void *data) { + struct BusyOperationData *busy_data = (struct BusyOperationData*)data; + + busy_data->cancelled = 1; + + // Write a byte to the pipe to wake up the select() + char wake_byte = 1; + write(busy_data->write_fd, &wake_byte, 1); +} + +// Function for the main operation execution (for rb_rescue) +static VALUE busy_operation_execute(VALUE data_value) { + struct BusyOperationData *busy_data = (struct BusyOperationData*)data_value; + + // Record start time + busy_data->start_time = clock(); + + // Execute the blocking operation + void *block_result = rb_nogvl( + busy_blocking_operation, + busy_data, + busy_unblock_function, + busy_data, + RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_OFFLOAD_SAFE + ); + + // Record end time + busy_data->end_time = clock(); + + // Store the operation result + busy_data->operation_result = (int)(intptr_t)block_result; + + return Qnil; +} + +// Function for exception handling (for rb_rescue) +static VALUE busy_operation_rescue(VALUE data_value, VALUE exception) { + struct BusyOperationData *busy_data = (struct BusyOperationData*)data_value; + + // Record end time even in case of exception + busy_data->end_time = clock(); + + // Mark that an exception was caught + busy_data->exception = exception; + + return exception; +} + +// Ruby method: IO::Event::WorkerPool.busy(duration: 1.0) +// This creates a cancellable blocking operation for testing +static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) { + double duration = 1.0; // Default 1 second + + // Extract keyword arguments + VALUE kwargs = Qnil; + VALUE rb_duration = Qnil; + + rb_scan_args(argc, argv, "0:", &kwargs); + + if (!NIL_P(kwargs)) { + VALUE kwvals[1]; + ID kwkeys[1] = {id_duration}; + rb_get_kwargs(kwargs, kwkeys, 0, 1, kwvals); + rb_duration = kwvals[0]; + } + + if (!NIL_P(rb_duration)) { + duration = NUM2DBL(rb_duration); + } + + // Create pipe for cancellation + int pipe_fds[2]; + if (pipe(pipe_fds) != 0) { + rb_sys_fail("pipe creation failed"); + } + + // Stack allocate operation data + struct BusyOperationData busy_data = { + .read_fd = pipe_fds[0], + .write_fd = pipe_fds[1], + .cancelled = 0, + .duration = duration, + .start_time = 0, + .end_time = 0, + .operation_result = 0, + .exception = Qnil + }; + + // Execute the blocking operation with exception handling using function pointers + rb_rescue( + busy_operation_execute, + (VALUE)&busy_data, + busy_operation_rescue, + (VALUE)&busy_data + ); + + // Calculate elapsed time from the state stored in busy_data + double elapsed = ((double)(busy_data.end_time - busy_data.start_time)) / CLOCKS_PER_SEC; + + // Create result hash using the state from busy_data + VALUE result = rb_hash_new(); + rb_hash_aset(result, ID2SYM(rb_intern("duration")), DBL2NUM(duration)); + rb_hash_aset(result, ID2SYM(rb_intern("elapsed")), DBL2NUM(elapsed)); + + // Determine result based on operation outcome + if (busy_data.exception != Qnil) { + rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("exception"))); + rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue); + rb_hash_aset(result, ID2SYM(rb_intern("exception")), busy_data.exception); + } else if (busy_data.operation_result == -1) { + rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("cancelled"))); + rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue); + } else if (busy_data.operation_result == 0) { + rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("completed"))); + rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse); + } else { + rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("error"))); + rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse); + } + + // Clean up pipe file descriptors + close(pipe_fds[0]); + close(pipe_fds[1]); + + return result; +} + +// Initialize the test functions +void Init_IO_Event_WorkerPool_Test(VALUE IO_Event_WorkerPool) { + // Initialize symbols + id_duration = rb_intern("duration"); + + // Add test methods to IO::Event::WorkerPool class + rb_define_singleton_method(IO_Event_WorkerPool, "busy", worker_pool_test_busy, -1); +} diff --git a/ext/io/event/worker_pool_test.h b/ext/io/event/worker_pool_test.h new file mode 100644 index 00000000..f58ce713 --- /dev/null +++ b/ext/io/event/worker_pool_test.h @@ -0,0 +1,9 @@ +// worker_pool_test.h - Header for WorkerPool test functions +// Released under the MIT License. +// Copyright, 2025, by Samuel Williams. + +#pragma once + +#include + +void Init_IO_Event_WorkerPool_Test(VALUE IO_Event_WorkerPool); diff --git a/fixtures/io/event/test_scheduler.rb b/fixtures/io/event/test_scheduler.rb new file mode 100644 index 00000000..47345d01 --- /dev/null +++ b/fixtures/io/event/test_scheduler.rb @@ -0,0 +1,163 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "io/event/timers" + +module IO::Event + # A test fiber scheduler that uses WorkerPool for blocking operations. + # + # This scheduler implements the fiber scheduler interface and delegates + # blocking operations to a WorkerPool instance for testing. + # + # @example Testing usage + # ```ruby + # # Create with default selector and worker pool + # scheduler = IO::Event::TestScheduler.new + # + # # Or provide custom selector and/or worker pool + # selector = IO::Event::Selector.new(Fiber.current) + # worker_pool = IO::Event::WorkerPool.new(maximum_worker_count: 4) + # scheduler = IO::Event::TestScheduler.new(selector: selector, worker_pool: worker_pool) + # + # Fiber.set_scheduler(scheduler) + # + # # Standard Ruby operations that use rb_nogvl will be handled by the worker pool + # # Examples: sleep, file I/O, network operations, etc. + # Fiber.schedule do + # sleep(0.001) # This triggers rb_nogvl and uses the worker pool + # end.resume + # ``` + class TestScheduler + def initialize(selector: nil, worker_pool: nil, maximum_worker_count: nil) + @selector = selector || ::IO::Event::Selector.new(Fiber.current) + @worker_pool = worker_pool || WorkerPool.new(maximum_worker_count: maximum_worker_count) + @timers = ::IO::Event::Timers.new + + # Track the number of fibers that are blocked. + @blocked = 0 + end + + # @attribute [WorkerPool] The worker pool used for executing blocking operations. + attr_reader :worker_pool + + # @attribute [IO::Event::Selector] The I/O event selector used for managing fiber scheduling. + attr_reader :selector + + # Required fiber scheduler hook - delegates to WorkerPool + def blocking_operation_wait(operation) + # Submit the operation to the worker pool and wait for completion + @worker_pool.call(operation) + end + + # Required fiber scheduler hooks + def close + @selector&.close + @worker_pool&.close + @worker_pool = nil + end + + def block(blocker, timeout = nil) + fiber = Fiber.current + + if timeout + timer = @timers.after(timeout) do + if fiber.alive? + fiber.transfer(false) + end + end + end + + begin + @blocked += 1 + @selector.transfer + ensure + @blocked -= 1 + end + ensure + timer&.cancel! + end + + def unblock(blocker, fiber) + if selector = @selector + selector.push(fiber) + selector.wakeup + end + end + + class FiberInterrupt + def initialize(fiber, exception) + @fiber = fiber + @exception = exception + end + + def alive? + @fiber.alive? + end + + def transfer + @fiber.raise(@exception) + end + end + + def fiber_interrupt(fiber, exception) + unblock(nil, FiberInterrupt.new(fiber, exception)) + end + + def io_wait(io, events, timeout = nil) + fiber = Fiber.current + + if timeout + timer = @timers.after(timeout) do + fiber.transfer + end + end + + return @selector.io_wait(fiber, io, events) + ensure + timer&.cancel! + end + + def kernel_sleep(duration = nil) + if duration + self.block(nil, duration) + else + @selector.transfer + end + end + + def fiber(&block) + Fiber.new(&block).tap(&:transfer) + end + + # Run the scheduler event loop + def run + while @blocked > 0 or @timers.size > 0 + interval = @timers.wait_interval + @selector.select(interval) + @timers.fire + end + end + + def scheduler_close(error = $!) + self.run + ensure + self.close + end + + private + + def transfer + @selector.transfer + end + + def push(fiber) + @selector.push(fiber) + end + + def wakeup + @selector.wakeup + end + end +end \ No newline at end of file diff --git a/releases.md b/releases.md index a9370be1..bddf4a72 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,31 @@ # Releases +## Unreleased + +### Introduce `IO::Event::WorkerPool` for off-loading blocking operations. + +The {ruby IO::Event::WorkerPool} provides a mechanism for executing blocking operations on separate OS threads while properly integrating with Ruby's fiber scheduler and GVL (Global VM Lock) management. This enables true parallelism for CPU-intensive or blocking operations that would otherwise block the event loop. + +```ruby +# Fiber scheduler integration via blocking_operation_wait hook +class MyScheduler + def initialize + @worker_pool = IO::Event::WorkerPool.new + end + + def blocking_operation_wait(operation) + @worker_pool.call(operation) + end +end + +# Usage with automatic offloading +Fiber.set_scheduler(MyScheduler.new) +# Automatically offload `rb_nogvl(..., RB_NOGVL_OFFLOAD_SAFE)` to a background thread: +result = some_blocking_operation() +``` + +The implementation uses one or more background threads and a list of pending blocking operations. Those operations either execute through to completion or may be cancelled, which executes the "unblock function" provided to `rb_nogvl`. + ## v1.10.2 - Improved consistency of handling closed IO when invoking `#select`. diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb new file mode 100644 index 00000000..fdbb697a --- /dev/null +++ b/test/io/event/worker_pool.rb @@ -0,0 +1,177 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "io/event" +require "io/event/test_scheduler" + +return unless defined?(IO::Event::WorkerPool) + +describe IO::Event::WorkerPool do + with "an instance" do + let(:worker_pool) {subject.new} + + after do + worker_pool&.close + end + + it "can create a worker pool" do + expect(worker_pool).to be_a(IO::Event::WorkerPool) + end + + it "provides statistics" do + # Force initialization by calling a method on the pool + pool = worker_pool # This should trigger initialization + statistics = pool.statistics + + expect(statistics).to be_a(Hash) + expect(statistics).to have_keys( + current_worker_count: be_a(Integer), + maximum_worker_count: be == 1, + current_queue_size: be == 0, + shutdown: be == false + ) + end + + it "can close the worker pool" do + pool = worker_pool + + # Check that it's not shut down initially + expect(pool.statistics[:shutdown]).to be == false + + # Close the pool + result = pool.close + expect(result).to be_nil + + # Check that it's now shut down + expect(pool.statistics[:shutdown]).to be == true + expect(pool.statistics[:current_worker_count]).to be == 0 + end + + it "can close the worker pool multiple times safely" do + pool = worker_pool + + # Close the pool twice + pool.close + pool.close + + # Should still be shut down + expect(pool.statistics[:shutdown]).to be == true + end + end + + with "TestScheduler integration" do + let(:scheduler) {IO::Event::TestScheduler.new} + + it "can create a test scheduler" do + expect(scheduler).to be_a(IO::Event::TestScheduler) + expect(scheduler.worker_pool).to be_a(IO::Event::WorkerPool) + end + + it "interrupts IO::Buffer.copy operations larger than 1MiB" do + skip "IO::Buffer not available" unless defined?(IO::Buffer) + + # Create buffers larger than 1MiB to trigger GVL release + buffer_size = 2 * 1024 * 1024 # 2MiB + source = IO::Buffer.new(buffer_size) + destination = IO::Buffer.new(buffer_size) + + # Fill source buffer with some data + source.clear("A".ord) + worker_pool = nil + + Thread.new do + Fiber.set_scheduler(scheduler) + worker_pool = scheduler.worker_pool + + # Perform the large copy operation in a scheduled fiber + Fiber.schedule do + destination.copy(source, 0, buffer_size, 0) + end + end.join + + # Confirm that the copy worked: + expect(destination.get_string(0, 10)).to be == "AAAAAAAAAA" + + expect(worker_pool.statistics[:call_count]).to be > 0 + expect(worker_pool.statistics[:completed_count]).to be > 0 + inform worker_pool.statistics + end + end + + with "cancellable busy operation" do + let(:scheduler) {IO::Event::TestScheduler.new} + + it "can perform a busy operation that completes normally" do + start_time = Time.now + result = IO::Event::WorkerPool.busy(duration: 0.1) + end_time = Time.now + elapsed = end_time - start_time + + expect(result).to be_a(Hash) + expect(result[:cancelled]).to be == false + expect(result[:result]).to be == :completed + end + + it "can perform a busy operation with different durations" do + result = IO::Event::WorkerPool.busy(duration: 0.05) + + expect(result).to be_a(Hash) + expect(result[:cancelled]).to be == false + expect(result[:result]).to be == :completed + expect(result[:duration]).to be == 0.05 + end + + it "can cancel a busy operation using unblock function" do + # This tests the cancellation mechanism through rb_thread_call_without_gvl + completed = false + thread = Thread.new do + start_time = Time.now + result = IO::Event::WorkerPool.busy(duration: 1.0) # Long operation + end_time = Time.now + elapsed = end_time - start_time + completed = true + + {result: result, elapsed: elapsed} + end + + # Let it start, then kill the thread (which should trigger the unblock function) + sleep(0.1) + thread.kill + thread.join(0.5) # Wait up to 0.5s for thread to finish + + # The operation should have been interrupted before completion + expect(completed).to be == false + end + + it "can be cancelled when executed in a worker pool" do + result = nil + elapsed = nil + error = nil + + Thread.new do + Fiber.set_scheduler(scheduler) + + busy_fiber = Fiber.schedule do + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + result = IO::Event::WorkerPool.busy(duration: 2.0) + rescue Interrupt => error + # Ignore. + ensure + end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + elapsed = end_time - start_time + end + + Fiber.schedule do + sleep(0.5) + Fiber.scheduler.fiber_interrupt(busy_fiber, StandardError) + end + end.join + + expect(result[:cancelled]).to be == true + expect(elapsed).to be < 1.0 + expect(error).to be_nil + end + end +end