From f9164a2e5ae9b7e99652f365a19a1db347a729c2 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 9 Jun 2025 08:59:18 +0900 Subject: [PATCH 01/13] Introduce `IO::Event::WorkerPool` for blocking operation execution. --- ext/extconf.rb | 7 +- ext/io/event/event.c | 2 + ext/io/event/fiber.c | 2 +- ext/io/event/worker_pool.c | 463 ++++++++++++++++++++++++++++ ext/io/event/worker_pool.h | 13 + fixtures/io/event/test_scheduler.rb | 171 ++++++++++ test/io/event/worker_pool.rb | 73 +++++ 7 files changed, 729 insertions(+), 2 deletions(-) create mode 100644 ext/io/event/worker_pool.c create mode 100644 ext/io/event/worker_pool.h create mode 100644 fixtures/io/event/test_scheduler.rb create mode 100644 test/io/event/worker_pool.rb diff --git a/ext/extconf.rb b/ext/extconf.rb index 7aa6d0f8..483e59ca 100755 --- a/ext/extconf.rb +++ b/ext/extconf.rb @@ -23,7 +23,7 @@ append_cflags(["-DRUBY_DEBUG", "-O0"]) end -$srcs = ["io/event/event.c", "io/event/time.c", "io/event/fiber.c", "io/event/selector/selector.c"] +$srcs = ["io/event/event.c", "io/event/time.c", "io/event/fiber.c", "io/event/worker_pool.c", "io/event/selector/selector.c"] $VPATH << "$(srcdir)/io/event" $VPATH << "$(srcdir)/io/event/selector" @@ -56,6 +56,11 @@ have_func("&rb_fiber_raise") have_func("epoll_pwait2") +# Feature detection for blocking operation support +have_func("rb_fiber_scheduler_blocking_operation_extract") +have_func("rb_fiber_scheduler_blocking_operation_execute") +have_func("rb_fiber_scheduler_blocking_operation_cancel") + have_header("ruby/io/buffer.h") if ENV.key?("RUBY_SANITIZE") diff --git a/ext/io/event/event.c b/ext/io/event/event.c index 5c947773..435fc02c 100644 --- a/ext/io/event/event.c +++ b/ext/io/event/event.c @@ -3,6 +3,7 @@ #include "event.h" #include "fiber.h" +#include "worker_pool.h" #include "selector/selector.h" void Init_IO_Event(void) @@ -14,6 +15,7 @@ void Init_IO_Event(void) VALUE IO_Event = rb_define_module_under(rb_cIO, "Event"); Init_IO_Event_Fiber(IO_Event); + Init_IO_Event_WorkerPool(IO_Event); VALUE IO_Event_Selector = rb_define_module_under(IO_Event, "Selector"); Init_IO_Event_Selector(IO_Event_Selector); diff --git a/ext/io/event/fiber.c b/ext/io/event/fiber.c index d049cc26..95d19394 100644 --- a/ext/io/event/fiber.c +++ b/ext/io/event/fiber.c @@ -35,7 +35,7 @@ VALUE IO_Event_Fiber_raise(VALUE fiber, int argc, VALUE *argv) { #ifndef HAVE_RB_FIBER_CURRENT static ID id_current; -static VALUE IO_Event_Fiber_current(void) { +VALUE IO_Event_Fiber_current(void) { return rb_funcall(rb_cFiber, id_current, 0); } #endif diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c new file mode 100644 index 00000000..8038c636 --- /dev/null +++ b/ext/io/event/worker_pool.c @@ -0,0 +1,463 @@ +// Released under the MIT License. +// Copyright, 2025, by Samuel Williams. + +#include "worker_pool.h" +#include "fiber.h" + +#include +#include +#include +#include +#include + +#ifdef HAVE_RB_FIBER_SCHEDULER_BLOCKING_OPERATION_EXTRACT + +// Forward declarations +static VALUE cWorkerPool; +static VALUE cWorkerPoolPromise; + +// Thread pool structure +typedef struct worker_thread { + pthread_t thread; + struct worker_pool *pool; + struct worker_thread *next; +} worker_thread_t; + +// Work item structure +typedef struct work_item { + rb_fiber_scheduler_blocking_operation_t *blocking_operation; + VALUE promise; + bool cancelled; + struct work_item *next; +} work_item_t; + +// Worker pool structure +typedef struct worker_pool { + pthread_mutex_t mutex; + pthread_cond_t work_available; + pthread_cond_t work_completed; + + work_item_t *work_queue; + work_item_t *work_queue_tail; + + worker_thread_t *threads; + size_t thread_count; + size_t max_threads; + + bool shutdown; +} worker_pool_t; + +// Promise structure +typedef struct worker_pool_promise { + work_item_t *work_item; + worker_pool_t *pool; + VALUE fiber; + bool completed; + bool cancelled; +} worker_pool_promise_t; + +// Free functions for Ruby GC +static void worker_pool_free(void *ptr) { + worker_pool_t *pool = (worker_pool_t *)ptr; + + if (pool) { + // Signal shutdown + pthread_mutex_lock(&pool->mutex); + pool->shutdown = true; + pthread_cond_broadcast(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); + + // Wait for all threads to finish + worker_thread_t *thread = pool->threads; + while (thread) { + pthread_join(thread->thread, NULL); + worker_thread_t *next = thread->next; + free(thread); + thread = next; + } + + // Clean up work queue + work_item_t *work = pool->work_queue; + while (work) { + work_item_t *next = work->next; + free(work); + work = next; + } + + pthread_mutex_destroy(&pool->mutex); + pthread_cond_destroy(&pool->work_available); + pthread_cond_destroy(&pool->work_completed); + + free(pool); + } +} + +static void worker_pool_promise_free(void *ptr) { + worker_pool_promise_t *promise = (worker_pool_promise_t *)ptr; + if (promise) { + free(promise); + } +} + +// Size functions for Ruby GC +static size_t worker_pool_size(const void *ptr) { + return sizeof(worker_pool_t); +} + +static size_t worker_pool_promise_size(const void *ptr) { + return sizeof(worker_pool_promise_t); +} + +// Ruby TypedData structures +static const rb_data_type_t worker_pool_type = { + "IO::Event::WorkerPool", + {0, worker_pool_free, worker_pool_size,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY +}; + +static const rb_data_type_t worker_pool_promise_type = { + "IO::Event::WorkerPool::Promise", + {0, worker_pool_promise_free, worker_pool_promise_size,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY +}; + +// Worker thread function +static void* worker_thread_func(void *arg) { + worker_thread_t *worker = (worker_thread_t *)arg; + worker_pool_t *pool = worker->pool; + + while (true) { + work_item_t *work = NULL; + + pthread_mutex_lock(&pool->mutex); + + // Wait for work or shutdown + while (!pool->work_queue && !pool->shutdown) { + pthread_cond_wait(&pool->work_available, &pool->mutex); + } + + if (pool->shutdown) { + pthread_mutex_unlock(&pool->mutex); + break; + } + + // Dequeue work item + if (pool->work_queue) { + work = pool->work_queue; + pool->work_queue = work->next; + if (!pool->work_queue) { + pool->work_queue_tail = NULL; + } + } + + pthread_mutex_unlock(&pool->mutex); + + // Execute work if not cancelled + if (work && !work->cancelled) { + rb_fiber_scheduler_blocking_operation_execute(work->blocking_operation); + + // Mark promise as completed + worker_pool_promise_t *promise_data; + TypedData_Get_Struct(work->promise, worker_pool_promise_t, &worker_pool_promise_type, promise_data); + promise_data->completed = true; + } + + // Signal completion + if (work) { + pthread_mutex_lock(&pool->mutex); + pthread_cond_signal(&pool->work_completed); + pthread_mutex_unlock(&pool->mutex); + } + } + + return NULL; +} + +// Create a new worker thread +static int create_worker_thread(worker_pool_t *pool) { + if (pool->thread_count >= pool->max_threads) { + return -1; + } + + worker_thread_t *worker = malloc(sizeof(worker_thread_t)); + if (!worker) { + return -1; + } + + worker->pool = pool; + worker->next = pool->threads; + + if (pthread_create(&worker->thread, NULL, worker_thread_func, worker) != 0) { + free(worker); + return -1; + } + + pool->threads = worker; + pool->thread_count++; + + return 0; +} + +// Ruby constructor for WorkerPool +static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { + + + VALUE rb_max_threads = Qnil; + size_t max_threads = 4; // Default + + // Handle keyword arguments + if (argc == 1 && RB_TYPE_P(argv[0], T_HASH)) { + VALUE hash = argv[0]; + VALUE max_threads_key = ID2SYM(rb_intern("max_threads")); + if (rb_hash_lookup(hash, max_threads_key) != Qnil) { + rb_max_threads = rb_hash_aref(hash, max_threads_key); + } + } else if (argc == 1) { + rb_max_threads = argv[0]; + } else if (argc > 1) { + rb_raise(rb_eArgError, "wrong number of arguments (given %d, expected 0..1)", argc); + } + + if (!NIL_P(rb_max_threads)) { + max_threads = NUM2SIZET(rb_max_threads); + if (max_threads == 0) { + rb_raise(rb_eArgError, "max_threads must be greater than 0"); + } + } + + // Get the pool that was allocated by worker_pool_allocate + worker_pool_t *pool; + TypedData_Get_Struct(self, worker_pool_t, &worker_pool_type, pool); + + if (!pool) { + rb_raise(rb_eRuntimeError, "WorkerPool allocation failed"); + } + + pthread_mutex_init(&pool->mutex, NULL); + pthread_cond_init(&pool->work_available, NULL); + pthread_cond_init(&pool->work_completed, NULL); + + pool->work_queue = NULL; + pool->work_queue_tail = NULL; + pool->threads = NULL; + pool->thread_count = 0; + pool->max_threads = max_threads; + pool->shutdown = false; + + + + // Create initial worker threads + for (size_t i = 0; i < max_threads; i++) { + if (create_worker_thread(pool) != 0) { + // Just set the max_threads for debugging, don't fail completely + // worker_pool_free(pool); + // rb_raise(rb_eRuntimeError, "Failed to create worker threads"); + break; + } + } + + return self; +} + +// Ruby method to submit work +static VALUE worker_pool_call(VALUE self, VALUE work) { + worker_pool_t *pool; + TypedData_Get_Struct(self, worker_pool_t, &worker_pool_type, pool); + + if (pool->shutdown) { + rb_raise(rb_eRuntimeError, "Worker pool is shut down"); + } + + // Extract blocking operation handle + rb_fiber_scheduler_blocking_operation_t *blocking_operation = + rb_fiber_scheduler_blocking_operation_extract(work); + + if (!blocking_operation) { + rb_raise(rb_eArgError, "Invalid blocking operation"); + } + + // Create work item + work_item_t *work_item = malloc(sizeof(work_item_t)); + if (!work_item) { + rb_raise(rb_eNoMemError, "Failed to allocate work item"); + } + + work_item->blocking_operation = blocking_operation; + work_item->cancelled = false; + work_item->next = NULL; + + // Create promise + worker_pool_promise_t *promise_data; + VALUE promise = TypedData_Make_Struct(cWorkerPoolPromise, worker_pool_promise_t, + &worker_pool_promise_type, promise_data); + + promise_data->work_item = work_item; + promise_data->pool = pool; +#ifdef HAVE_RB_FIBER_CURRENT + promise_data->fiber = rb_fiber_current(); +#else + promise_data->fiber = IO_Event_Fiber_current(); +#endif + promise_data->completed = false; + promise_data->cancelled = false; + + work_item->promise = promise; + + // Enqueue work + pthread_mutex_lock(&pool->mutex); + + if (pool->work_queue_tail) { + pool->work_queue_tail->next = work_item; + } else { + pool->work_queue = work_item; + } + pool->work_queue_tail = work_item; + + pthread_cond_signal(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); + + return promise; +} + +// Promise cancel method +static VALUE worker_pool_promise_cancel(VALUE self) { + worker_pool_promise_t *promise; + TypedData_Get_Struct(self, worker_pool_promise_t, &worker_pool_promise_type, promise); + + if (promise->completed || promise->cancelled) { + return Qfalse; + } + + promise->cancelled = true; + promise->work_item->cancelled = true; + + // Try to cancel the blocking operation + if (promise->work_item->blocking_operation) { + rb_fiber_scheduler_blocking_operation_cancel(promise->work_item->blocking_operation); + } + + return Qtrue; +} + +// Promise cancelled? predicate +static VALUE worker_pool_promise_cancelled_p(VALUE self) { + worker_pool_promise_t *promise; + TypedData_Get_Struct(self, worker_pool_promise_t, &worker_pool_promise_type, promise); + + return promise->cancelled ? Qtrue : Qfalse; +} + +// Promise completed? predicate +static VALUE worker_pool_promise_completed_p(VALUE self) { + worker_pool_promise_t *promise; + TypedData_Get_Struct(self, worker_pool_promise_t, &worker_pool_promise_type, promise); + + return promise->completed ? Qtrue : Qfalse; +} + +// Promise wait method +static VALUE worker_pool_promise_wait(VALUE self) { + worker_pool_promise_t *promise; + TypedData_Get_Struct(self, worker_pool_promise_t, &worker_pool_promise_type, promise); + + if (promise->completed) { + return self; + } + + if (promise->cancelled) { + rb_raise(rb_eRuntimeError, "Operation was cancelled"); + } + + worker_pool_t *pool = promise->pool; + + // Wait for completion + pthread_mutex_lock(&pool->mutex); + while (!promise->completed && !promise->cancelled) { + pthread_cond_wait(&pool->work_completed, &pool->mutex); + } + pthread_mutex_unlock(&pool->mutex); + + if (promise->cancelled) { + rb_raise(rb_eRuntimeError, "Operation was cancelled"); + } + + return self; +} + +static VALUE worker_pool_allocate(VALUE klass) { + worker_pool_t *pool; + VALUE obj = TypedData_Make_Struct(klass, worker_pool_t, &worker_pool_type, pool); + + // Initialize to NULL/zero so we can detect uninitialized pools + memset(pool, 0, sizeof(worker_pool_t)); + + return obj; +} + +static VALUE worker_pool_promise_allocate(VALUE klass) { + worker_pool_promise_t *promise; + return TypedData_Make_Struct(klass, worker_pool_promise_t, &worker_pool_promise_type, promise); +} + +// Test helper: get pool statistics for debugging/testing +static VALUE worker_pool_stats(VALUE self) { + worker_pool_t *pool; + TypedData_Get_Struct(self, worker_pool_t, &worker_pool_type, pool); + + if (!pool) { + rb_raise(rb_eRuntimeError, "WorkerPool not initialized"); + } + + + + VALUE stats = rb_hash_new(); + rb_hash_aset(stats, ID2SYM(rb_intern("thread_count")), SIZET2NUM(pool->thread_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("max_threads")), SIZET2NUM(pool->max_threads)); + rb_hash_aset(stats, ID2SYM(rb_intern("shutdown")), pool->shutdown ? Qtrue : Qfalse); + + // Count work items in queue (only if properly initialized) + if (pool->max_threads > 0) { + pthread_mutex_lock(&pool->mutex); + size_t queue_size = 0; + work_item_t *work = pool->work_queue; + while (work) { + queue_size++; + work = work->next; + } + pthread_mutex_unlock(&pool->mutex); + rb_hash_aset(stats, ID2SYM(rb_intern("queue_size")), SIZET2NUM(queue_size)); + } else { + rb_hash_aset(stats, ID2SYM(rb_intern("queue_size")), SIZET2NUM(0)); + } + + return stats; +} + +// Test helper: check if blocking operations are supported +static VALUE worker_pool_blocking_operations_supported_p(VALUE self) { + return Qtrue; +} + +void Init_IO_Event_WorkerPool(VALUE IO_Event) { + cWorkerPool = rb_define_class_under(IO_Event, "WorkerPool", rb_cObject); + rb_define_alloc_func(cWorkerPool, worker_pool_allocate); + rb_define_method(cWorkerPool, "initialize", worker_pool_initialize, -1); + rb_define_method(cWorkerPool, "call", worker_pool_call, 1); + rb_define_method(cWorkerPool, "stats", worker_pool_stats, 0); + rb_define_singleton_method(cWorkerPool, "blocking_operations_supported?", worker_pool_blocking_operations_supported_p, 0); + + cWorkerPoolPromise = rb_define_class_under(cWorkerPool, "Promise", rb_cObject); + rb_define_alloc_func(cWorkerPoolPromise, worker_pool_promise_allocate); + rb_define_method(cWorkerPoolPromise, "cancel", worker_pool_promise_cancel, 0); + rb_define_method(cWorkerPoolPromise, "cancelled?", worker_pool_promise_cancelled_p, 0); + rb_define_method(cWorkerPoolPromise, "completed?", worker_pool_promise_completed_p, 0); + rb_define_method(cWorkerPoolPromise, "wait", worker_pool_promise_wait, 0); +} + +#else + +void Init_IO_Event_WorkerPool(VALUE IO_Event) { + // No-op. +} + +#endif diff --git a/ext/io/event/worker_pool.h b/ext/io/event/worker_pool.h new file mode 100644 index 00000000..4f03bef5 --- /dev/null +++ b/ext/io/event/worker_pool.h @@ -0,0 +1,13 @@ +// Released under the MIT License. +// Copyright, 2025, by Samuel Williams. + +#pragma once + +#include + +#ifdef HAVE_RB_FIBER_SCHEDULER_BLOCKING_OPERATION_EXTRACT +#include +#endif + +// Worker pool functionality +void Init_IO_Event_WorkerPool(VALUE IO_Event); \ No newline at end of file diff --git a/fixtures/io/event/test_scheduler.rb b/fixtures/io/event/test_scheduler.rb new file mode 100644 index 00000000..fc18f836 --- /dev/null +++ b/fixtures/io/event/test_scheduler.rb @@ -0,0 +1,171 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "io/event/timers" + +module IO::Event + # A test fiber scheduler that uses WorkerPool for blocking operations. + # + # This scheduler implements the fiber scheduler interface and delegates + # blocking operations to a WorkerPool instance for testing. + # + # @example Testing usage + # # Create with default selector and worker pool + # scheduler = IO::Event::TestScheduler.new + # + # # Or provide custom selector and/or worker pool + # selector = IO::Event::Selector.new(Fiber.current) + # worker_pool = IO::Event::WorkerPool.new(max_threads: 4) + # scheduler = IO::Event::TestScheduler.new(selector: selector, worker_pool: worker_pool) + # + # Fiber.set_scheduler(scheduler) + # + # # Standard Ruby operations that use rb_nogvl will be handled by the worker pool + # # Examples: sleep, file I/O, network operations, etc. + # Fiber.schedule do + # sleep(0.001) # This triggers rb_nogvl and uses the worker pool + # end.resume + class TestScheduler + def initialize(selector: nil, worker_pool: nil, max_threads: 2) + @selector = selector || ::IO::Event::Selector.new(Fiber.current) + @worker_pool = worker_pool || WorkerPool.new(max_threads: max_threads) + @timers = ::IO::Event::Timers.new + + # Track the number of fibers that are blocked. + @blocked = 0 + + # Track how many times blocking_operation_wait was called. + @blocking_operation_count = 0 + end + + # @attribute [WorkerPool] The worker pool used for executing blocking operations. + attr_reader :worker_pool + + # @attribute [IO::Event::Selector] The I/O event selector used for managing fiber scheduling. + attr_reader :selector + + # @attribute [Integer] The number of times blocking_operation_wait was called. + attr_reader :blocking_operation_count + + # Required fiber scheduler hook - delegates to WorkerPool + def blocking_operation_wait(operation) + @blocking_operation_count += 1 + + # Submit the operation to the worker pool + promise = @worker_pool.call(operation) + + # Wait for completion + result = promise&.wait + + promise = nil + return result + ensure + promise&.cancel + end + + # Required fiber scheduler hooks + def close + @selector&.close + # WorkerPool doesn't have a close method, just clear the reference + @worker_pool = nil + end + + def block(blocker, timeout = nil) + fiber = Fiber.current + + if timeout + timer = @timers.after(timeout) do + if fiber.alive? + fiber.transfer(false) + end + end + end + + begin + @blocked += 1 + @selector.transfer + ensure + @blocked -= 1 + end + ensure + timer&.cancel! + end + + def unblock(blocker, fiber) + if selector = @selector + selector.push(fiber) + selector.wakeup + end + end + + class FiberInterrupt + def initialize(fiber, exception) + @fiber = fiber + @exception = exception + end + + def alive? + @fiber.alive? + end + + def transfer + @fiber.raise(@exception) + end + end + + def fiber_interrupt(fiber, exception) + unblock(nil, FiberInterrupt.new(fiber, exception)) + end + + def io_wait(io, events, timeout = nil) + fiber = Fiber.current + + if timeout + timer = @timers.after(timeout) do + fiber.transfer + end + end + + return @selector.io_wait(fiber, io, events) + ensure + timer&.cancel! + end + + def kernel_sleep(duration = nil) + if duration + self.block(nil, duration) + else + @selector.transfer + end + end + + def fiber(&block) + Fiber.new(&block).tap(&:resume) + end + + # Run the scheduler event loop + def run + while @blocked > 0 || @timers.size > 0 + interval = @timers.wait_interval + @selector.select(interval) + @timers.fire + end + end + + private + + def transfer + @selector.transfer + end + + def push(fiber) + @selector.push(fiber) + end + + def wakeup + @selector.wakeup + end + end +end \ No newline at end of file diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb new file mode 100644 index 00000000..0d2d7284 --- /dev/null +++ b/test/io/event/worker_pool.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "io/event" +require "io/event/test_scheduler" + +return unless defined?(IO::Event::WorkerPool) + +describe IO::Event::WorkerPool do + with "an instance" do + let(:worker_pool) { subject.new(max_threads: 2) } + + after do + worker_pool = nil # This should trigger GC cleanup + end + + it "can create a worker pool" do + expect(worker_pool).to be_a(IO::Event::WorkerPool) + end + + it "provides stats" do + # Force initialization by calling a method on the pool + pool = worker_pool # This should trigger initialization + stats = pool.stats + + expect(stats).to be_a(Hash) + expect(stats[:thread_count]).to be_a(Integer) + expect(stats[:max_threads]).to be == 2 + expect(stats[:queue_size]).to be == 0 + expect(stats[:shutdown]).to be == false + end + end + + with "TestScheduler integration" do + let(:scheduler) {IO::Event::TestScheduler.new(max_threads: 1)} + + it "can create a test scheduler" do + expect(scheduler).to be_a(IO::Event::TestScheduler) + expect(scheduler.worker_pool).to be_a(IO::Event::WorkerPool) + end + + it "intercepts IO::Buffer.copy operations larger than 1MiB" do + skip "IO::Buffer not available" unless defined?(IO::Buffer) + + # Create buffers larger than 1MiB to trigger GVL release + buffer_size = 2 * 1024 * 1024 # 2MiB + source = IO::Buffer.new(buffer_size) + destination = IO::Buffer.new(buffer_size) + + # Fill source buffer with some data + source.clear("A".ord) + + # Track initial count + initial_count = scheduler.blocking_operation_count + + Thread.new do + Fiber.set_scheduler(scheduler) + + # Perform the large copy operation in a scheduled fiber + completed = false + fiber = Fiber.schedule do + destination.copy(source, 0, buffer_size, 0) + end + end.join + + # Confirm that the copy worked: + expect(destination.get_string(0, 10)).to be == "AAAAAAAAAA" + expect(scheduler.blocking_operation_count).to be > initial_count + end + end +end From e939ae8aab62ec1460df0caecd197871fb904741 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 9 Jun 2025 10:19:17 +0900 Subject: [PATCH 02/13] Better conditional compilation + naming. --- ext/extconf.rb | 16 +- ext/io/event/event.c | 6 +- ext/io/event/event.h | 4 + ext/io/event/worker_pool.c | 726 +++++++++++++++++------------------ ext/io/event/worker_pool.h | 3 +- test/io/event/worker_pool.rb | 14 +- 6 files changed, 376 insertions(+), 393 deletions(-) diff --git a/ext/extconf.rb b/ext/extconf.rb index 483e59ca..49667520 100755 --- a/ext/extconf.rb +++ b/ext/extconf.rb @@ -23,7 +23,7 @@ append_cflags(["-DRUBY_DEBUG", "-O0"]) end -$srcs = ["io/event/event.c", "io/event/time.c", "io/event/fiber.c", "io/event/worker_pool.c", "io/event/selector/selector.c"] +$srcs = ["io/event/event.c", "io/event/time.c", "io/event/fiber.c", "io/event/selector/selector.c"] $VPATH << "$(srcdir)/io/event" $VPATH << "$(srcdir)/io/event/selector" @@ -56,13 +56,17 @@ have_func("&rb_fiber_raise") have_func("epoll_pwait2") -# Feature detection for blocking operation support -have_func("rb_fiber_scheduler_blocking_operation_extract") -have_func("rb_fiber_scheduler_blocking_operation_execute") -have_func("rb_fiber_scheduler_blocking_operation_cancel") - have_header("ruby/io/buffer.h") +# Feature detection for blocking operation support +if have_func("rb_fiber_scheduler_blocking_operation_extract") + # Feature detection for pthread support (needed for WorkerPool) + if have_header("pthread.h") + append_cflags(["-DHAVE_IO_EVENT_WORKER_POOL"]) + $srcs << "io/event/worker_pool.c" + end +end + if ENV.key?("RUBY_SANITIZE") $stderr.puts "Enabling sanitizers..." diff --git a/ext/io/event/event.c b/ext/io/event/event.c index 435fc02c..59492544 100644 --- a/ext/io/event/event.c +++ b/ext/io/event/event.c @@ -3,7 +3,6 @@ #include "event.h" #include "fiber.h" -#include "worker_pool.h" #include "selector/selector.h" void Init_IO_Event(void) @@ -15,8 +14,11 @@ void Init_IO_Event(void) VALUE IO_Event = rb_define_module_under(rb_cIO, "Event"); Init_IO_Event_Fiber(IO_Event); + + #ifdef HAVE_IO_EVENT_WORKER_POOL Init_IO_Event_WorkerPool(IO_Event); - + #endif + VALUE IO_Event_Selector = rb_define_module_under(IO_Event, "Selector"); Init_IO_Event_Selector(IO_Event_Selector); diff --git a/ext/io/event/event.h b/ext/io/event/event.h index b2884b46..fe69d2c2 100644 --- a/ext/io/event/event.h +++ b/ext/io/event/event.h @@ -18,3 +18,7 @@ void Init_IO_Event(void); #ifdef HAVE_SYS_EVENT_H #include "selector/kqueue.h" #endif + +#ifdef HAVE_IO_EVENT_WORKER_POOL +#include "worker_pool.h" +#endif diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c index 8038c636..90ea683d 100644 --- a/ext/io/event/worker_pool.c +++ b/ext/io/event/worker_pool.c @@ -10,454 +10,428 @@ #include #include -#ifdef HAVE_RB_FIBER_SCHEDULER_BLOCKING_OPERATION_EXTRACT - // Forward declarations -static VALUE cWorkerPool; -static VALUE cWorkerPoolPromise; +static VALUE IO_Event_WorkerPool; +static VALUE IO_Event_WorkerPool_Promise; // Thread pool structure typedef struct worker_thread { - pthread_t thread; - struct worker_pool *pool; - struct worker_thread *next; + pthread_t thread; + struct IO_Event_WorkerPool *pool; + struct worker_thread *next; } worker_thread_t; // Work item structure typedef struct work_item { - rb_fiber_scheduler_blocking_operation_t *blocking_operation; - VALUE promise; - bool cancelled; - struct work_item *next; + rb_fiber_scheduler_blocking_operation_t *blocking_operation; + VALUE promise; + bool cancelled; + struct work_item *next; } work_item_t; // Worker pool structure -typedef struct worker_pool { - pthread_mutex_t mutex; - pthread_cond_t work_available; - pthread_cond_t work_completed; - - work_item_t *work_queue; - work_item_t *work_queue_tail; - - worker_thread_t *threads; - size_t thread_count; - size_t max_threads; - - bool shutdown; -} worker_pool_t; +struct IO_Event_WorkerPool { + pthread_mutex_t mutex; + pthread_cond_t work_available; + pthread_cond_t work_completed; + + work_item_t *work_queue; + work_item_t *work_queue_tail; + + worker_thread_t *threads; + size_t thread_count; + size_t max_threads; + + bool shutdown; +}; // Promise structure -typedef struct worker_pool_promise { - work_item_t *work_item; - worker_pool_t *pool; - VALUE fiber; - bool completed; - bool cancelled; -} worker_pool_promise_t; +struct IO_Event_WorkerPool_Promise { + work_item_t *work_item; + struct IO_Event_WorkerPool *pool; + VALUE fiber; + bool completed; + bool cancelled; +}; // Free functions for Ruby GC static void worker_pool_free(void *ptr) { - worker_pool_t *pool = (worker_pool_t *)ptr; - - if (pool) { - // Signal shutdown - pthread_mutex_lock(&pool->mutex); - pool->shutdown = true; - pthread_cond_broadcast(&pool->work_available); - pthread_mutex_unlock(&pool->mutex); - - // Wait for all threads to finish - worker_thread_t *thread = pool->threads; - while (thread) { - pthread_join(thread->thread, NULL); - worker_thread_t *next = thread->next; - free(thread); - thread = next; - } - - // Clean up work queue - work_item_t *work = pool->work_queue; - while (work) { - work_item_t *next = work->next; - free(work); - work = next; - } - - pthread_mutex_destroy(&pool->mutex); - pthread_cond_destroy(&pool->work_available); - pthread_cond_destroy(&pool->work_completed); - - free(pool); - } + struct IO_Event_WorkerPool *pool = (struct IO_Event_WorkerPool *)ptr; + + if (pool) { + // Signal shutdown + pthread_mutex_lock(&pool->mutex); + pool->shutdown = true; + pthread_cond_broadcast(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); + + // Wait for all threads to finish + worker_thread_t *thread = pool->threads; + while (thread) { + pthread_join(thread->thread, NULL); + worker_thread_t *next = thread->next; + free(thread); + thread = next; + } + + // Clean up work queue + work_item_t *work = pool->work_queue; + while (work) { + work_item_t *next = work->next; + free(work); + work = next; + } + + pthread_mutex_destroy(&pool->mutex); + pthread_cond_destroy(&pool->work_available); + pthread_cond_destroy(&pool->work_completed); + + free(pool); + } } static void worker_pool_promise_free(void *ptr) { - worker_pool_promise_t *promise = (worker_pool_promise_t *)ptr; - if (promise) { - free(promise); - } + struct IO_Event_WorkerPool_Promise *promise = (struct IO_Event_WorkerPool_Promise *)ptr; + if (promise) { + free(promise); + } } // Size functions for Ruby GC static size_t worker_pool_size(const void *ptr) { - return sizeof(worker_pool_t); + return sizeof(struct IO_Event_WorkerPool); } static size_t worker_pool_promise_size(const void *ptr) { - return sizeof(worker_pool_promise_t); + return sizeof(struct IO_Event_WorkerPool_Promise); } // Ruby TypedData structures -static const rb_data_type_t worker_pool_type = { - "IO::Event::WorkerPool", - {0, worker_pool_free, worker_pool_size,}, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY +static const rb_data_type_t IO_Event_WorkerPool_type = { + "IO::Event::WorkerPool", + {0, worker_pool_free, worker_pool_size,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY }; -static const rb_data_type_t worker_pool_promise_type = { - "IO::Event::WorkerPool::Promise", - {0, worker_pool_promise_free, worker_pool_promise_size,}, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY +static const rb_data_type_t IO_Event_WorkerPool_Promise_type = { + "IO::Event::WorkerPool::Promise", + {0, worker_pool_promise_free, worker_pool_promise_size,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY }; // Worker thread function static void* worker_thread_func(void *arg) { - worker_thread_t *worker = (worker_thread_t *)arg; - worker_pool_t *pool = worker->pool; - - while (true) { - work_item_t *work = NULL; - - pthread_mutex_lock(&pool->mutex); - - // Wait for work or shutdown - while (!pool->work_queue && !pool->shutdown) { - pthread_cond_wait(&pool->work_available, &pool->mutex); - } - - if (pool->shutdown) { - pthread_mutex_unlock(&pool->mutex); - break; - } - - // Dequeue work item - if (pool->work_queue) { - work = pool->work_queue; - pool->work_queue = work->next; - if (!pool->work_queue) { - pool->work_queue_tail = NULL; - } - } - - pthread_mutex_unlock(&pool->mutex); - - // Execute work if not cancelled - if (work && !work->cancelled) { - rb_fiber_scheduler_blocking_operation_execute(work->blocking_operation); - - // Mark promise as completed - worker_pool_promise_t *promise_data; - TypedData_Get_Struct(work->promise, worker_pool_promise_t, &worker_pool_promise_type, promise_data); - promise_data->completed = true; - } - - // Signal completion - if (work) { - pthread_mutex_lock(&pool->mutex); - pthread_cond_signal(&pool->work_completed); - pthread_mutex_unlock(&pool->mutex); - } - } - - return NULL; + worker_thread_t *worker = (worker_thread_t *)arg; + struct IO_Event_WorkerPool *pool = worker->pool; + + while (true) { + work_item_t *work = NULL; + + pthread_mutex_lock(&pool->mutex); + + // Wait for work or shutdown + while (!pool->work_queue && !pool->shutdown) { + pthread_cond_wait(&pool->work_available, &pool->mutex); + } + + if (pool->shutdown) { + pthread_mutex_unlock(&pool->mutex); + break; + } + + // Dequeue work item + if (pool->work_queue) { + work = pool->work_queue; + pool->work_queue = work->next; + if (!pool->work_queue) { + pool->work_queue_tail = NULL; + } + } + + pthread_mutex_unlock(&pool->mutex); + + // Execute work if not cancelled + if (work && !work->cancelled) { + rb_fiber_scheduler_blocking_operation_execute(work->blocking_operation); + + // Mark promise as completed + struct IO_Event_WorkerPool_Promise *promise_data; + TypedData_Get_Struct(work->promise, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise_data); + promise_data->completed = true; + } + + // Signal completion + if (work) { + pthread_mutex_lock(&pool->mutex); + pthread_cond_signal(&pool->work_completed); + pthread_mutex_unlock(&pool->mutex); + } + } + + return NULL; } // Create a new worker thread -static int create_worker_thread(worker_pool_t *pool) { - if (pool->thread_count >= pool->max_threads) { - return -1; - } - - worker_thread_t *worker = malloc(sizeof(worker_thread_t)); - if (!worker) { - return -1; - } - - worker->pool = pool; - worker->next = pool->threads; - - if (pthread_create(&worker->thread, NULL, worker_thread_func, worker) != 0) { - free(worker); - return -1; - } - - pool->threads = worker; - pool->thread_count++; - - return 0; +static int create_worker_thread(struct IO_Event_WorkerPool *pool) { + if (pool->thread_count >= pool->max_threads) { + return -1; + } + + worker_thread_t *worker = malloc(sizeof(worker_thread_t)); + if (!worker) { + return -1; + } + + worker->pool = pool; + worker->next = pool->threads; + + if (pthread_create(&worker->thread, NULL, worker_thread_func, worker) != 0) { + free(worker); + return -1; + } + + pool->threads = worker; + pool->thread_count++; + + return 0; } // Ruby constructor for WorkerPool static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { - - - VALUE rb_max_threads = Qnil; - size_t max_threads = 4; // Default - - // Handle keyword arguments - if (argc == 1 && RB_TYPE_P(argv[0], T_HASH)) { - VALUE hash = argv[0]; - VALUE max_threads_key = ID2SYM(rb_intern("max_threads")); - if (rb_hash_lookup(hash, max_threads_key) != Qnil) { - rb_max_threads = rb_hash_aref(hash, max_threads_key); - } - } else if (argc == 1) { - rb_max_threads = argv[0]; - } else if (argc > 1) { - rb_raise(rb_eArgError, "wrong number of arguments (given %d, expected 0..1)", argc); - } - - if (!NIL_P(rb_max_threads)) { - max_threads = NUM2SIZET(rb_max_threads); - if (max_threads == 0) { - rb_raise(rb_eArgError, "max_threads must be greater than 0"); - } - } - - // Get the pool that was allocated by worker_pool_allocate - worker_pool_t *pool; - TypedData_Get_Struct(self, worker_pool_t, &worker_pool_type, pool); - - if (!pool) { - rb_raise(rb_eRuntimeError, "WorkerPool allocation failed"); - } - - pthread_mutex_init(&pool->mutex, NULL); - pthread_cond_init(&pool->work_available, NULL); - pthread_cond_init(&pool->work_completed, NULL); - - pool->work_queue = NULL; - pool->work_queue_tail = NULL; - pool->threads = NULL; - pool->thread_count = 0; - pool->max_threads = max_threads; - pool->shutdown = false; - - - - // Create initial worker threads - for (size_t i = 0; i < max_threads; i++) { - if (create_worker_thread(pool) != 0) { - // Just set the max_threads for debugging, don't fail completely - // worker_pool_free(pool); - // rb_raise(rb_eRuntimeError, "Failed to create worker threads"); - break; - } - } - - return self; + VALUE rb_max_threads = Qnil; + size_t max_threads = 4; // Default + + // Handle keyword arguments + if (argc == 1 && RB_TYPE_P(argv[0], T_HASH)) { + VALUE hash = argv[0]; + VALUE max_threads_key = ID2SYM(rb_intern("max_threads")); + if (rb_hash_lookup(hash, max_threads_key) != Qnil) { + rb_max_threads = rb_hash_aref(hash, max_threads_key); + } + } else if (argc == 1) { + rb_max_threads = argv[0]; + } else if (argc > 1) { + rb_raise(rb_eArgError, "wrong number of arguments (given %d, expected 0..1)!", argc); + } + + if (!NIL_P(rb_max_threads)) { + max_threads = NUM2SIZET(rb_max_threads); + if (max_threads == 0) { + rb_raise(rb_eArgError, "max_threads must be greater than 0!"); + } + } + + // Get the pool that was allocated by worker_pool_allocate + struct IO_Event_WorkerPool *pool; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + + if (!pool) { + rb_raise(rb_eRuntimeError, "WorkerPool allocation failed!"); + } + + pthread_mutex_init(&pool->mutex, NULL); + pthread_cond_init(&pool->work_available, NULL); + pthread_cond_init(&pool->work_completed, NULL); + + pool->work_queue = NULL; + pool->work_queue_tail = NULL; + pool->threads = NULL; + pool->thread_count = 0; + pool->max_threads = max_threads; + pool->shutdown = false; + + // Create initial worker threads + for (size_t i = 0; i < max_threads; i++) { + if (create_worker_thread(pool) != 0) { + // Just set the max_threads for debugging, don't fail completely + // worker_pool_free(pool); + // rb_raise(rb_eRuntimeError, "Failed to create worker threads"); + break; + } + } + + return self; } // Ruby method to submit work static VALUE worker_pool_call(VALUE self, VALUE work) { - worker_pool_t *pool; - TypedData_Get_Struct(self, worker_pool_t, &worker_pool_type, pool); - - if (pool->shutdown) { - rb_raise(rb_eRuntimeError, "Worker pool is shut down"); - } - - // Extract blocking operation handle - rb_fiber_scheduler_blocking_operation_t *blocking_operation = - rb_fiber_scheduler_blocking_operation_extract(work); - - if (!blocking_operation) { - rb_raise(rb_eArgError, "Invalid blocking operation"); - } - - // Create work item - work_item_t *work_item = malloc(sizeof(work_item_t)); - if (!work_item) { - rb_raise(rb_eNoMemError, "Failed to allocate work item"); - } - - work_item->blocking_operation = blocking_operation; - work_item->cancelled = false; - work_item->next = NULL; - - // Create promise - worker_pool_promise_t *promise_data; - VALUE promise = TypedData_Make_Struct(cWorkerPoolPromise, worker_pool_promise_t, - &worker_pool_promise_type, promise_data); - - promise_data->work_item = work_item; - promise_data->pool = pool; -#ifdef HAVE_RB_FIBER_CURRENT - promise_data->fiber = rb_fiber_current(); -#else - promise_data->fiber = IO_Event_Fiber_current(); -#endif - promise_data->completed = false; - promise_data->cancelled = false; - - work_item->promise = promise; - - // Enqueue work - pthread_mutex_lock(&pool->mutex); - - if (pool->work_queue_tail) { - pool->work_queue_tail->next = work_item; - } else { - pool->work_queue = work_item; - } - pool->work_queue_tail = work_item; - - pthread_cond_signal(&pool->work_available); - pthread_mutex_unlock(&pool->mutex); - - return promise; + struct IO_Event_WorkerPool *pool; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + + if (pool->shutdown) { + rb_raise(rb_eRuntimeError, "Worker pool is shut down!"); + } + + // Extract blocking operation handle + rb_fiber_scheduler_blocking_operation_t *blocking_operation = + rb_fiber_scheduler_blocking_operation_extract(work); + + if (!blocking_operation) { + rb_raise(rb_eArgError, "Invalid blocking operation!"); + } + + // Create work item + work_item_t *work_item = malloc(sizeof(work_item_t)); + if (!work_item) { + rb_raise(rb_eNoMemError, "Failed to allocate work item!"); + } + + work_item->blocking_operation = blocking_operation; + work_item->cancelled = false; + work_item->next = NULL; + + // Create promise + struct IO_Event_WorkerPool_Promise *promise_data; + VALUE promise = TypedData_Make_Struct(IO_Event_WorkerPool_Promise, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise_data); + + promise_data->work_item = work_item; + promise_data->pool = pool; + promise_data->fiber = IO_Event_Fiber_current(); + promise_data->completed = false; + promise_data->cancelled = false; + + work_item->promise = promise; + + // Enqueue work + pthread_mutex_lock(&pool->mutex); + + if (pool->work_queue_tail) { + pool->work_queue_tail->next = work_item; + } else { + pool->work_queue = work_item; + } + pool->work_queue_tail = work_item; + + pthread_cond_signal(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); + + return promise; } // Promise cancel method static VALUE worker_pool_promise_cancel(VALUE self) { - worker_pool_promise_t *promise; - TypedData_Get_Struct(self, worker_pool_promise_t, &worker_pool_promise_type, promise); - - if (promise->completed || promise->cancelled) { - return Qfalse; - } - - promise->cancelled = true; - promise->work_item->cancelled = true; - - // Try to cancel the blocking operation - if (promise->work_item->blocking_operation) { - rb_fiber_scheduler_blocking_operation_cancel(promise->work_item->blocking_operation); - } - - return Qtrue; + struct IO_Event_WorkerPool_Promise *promise; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise); + + if (promise->completed || promise->cancelled) { + return Qfalse; + } + + promise->cancelled = true; + promise->work_item->cancelled = true; + + // Try to cancel the blocking operation + if (promise->work_item->blocking_operation) { + rb_fiber_scheduler_blocking_operation_cancel(promise->work_item->blocking_operation); + } + + return Qtrue; } // Promise cancelled? predicate static VALUE worker_pool_promise_cancelled_p(VALUE self) { - worker_pool_promise_t *promise; - TypedData_Get_Struct(self, worker_pool_promise_t, &worker_pool_promise_type, promise); - - return promise->cancelled ? Qtrue : Qfalse; + struct IO_Event_WorkerPool_Promise *promise; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise); + + return promise->cancelled ? Qtrue : Qfalse; } // Promise completed? predicate static VALUE worker_pool_promise_completed_p(VALUE self) { - worker_pool_promise_t *promise; - TypedData_Get_Struct(self, worker_pool_promise_t, &worker_pool_promise_type, promise); - - return promise->completed ? Qtrue : Qfalse; + struct IO_Event_WorkerPool_Promise *promise; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise); + + return promise->completed ? Qtrue : Qfalse; } // Promise wait method static VALUE worker_pool_promise_wait(VALUE self) { - worker_pool_promise_t *promise; - TypedData_Get_Struct(self, worker_pool_promise_t, &worker_pool_promise_type, promise); - - if (promise->completed) { - return self; - } - - if (promise->cancelled) { - rb_raise(rb_eRuntimeError, "Operation was cancelled"); - } - - worker_pool_t *pool = promise->pool; - - // Wait for completion - pthread_mutex_lock(&pool->mutex); - while (!promise->completed && !promise->cancelled) { - pthread_cond_wait(&pool->work_completed, &pool->mutex); - } - pthread_mutex_unlock(&pool->mutex); - - if (promise->cancelled) { - rb_raise(rb_eRuntimeError, "Operation was cancelled"); - } - - return self; + struct IO_Event_WorkerPool_Promise *promise; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise); + + if (promise->completed) { + return self; + } + + if (promise->cancelled) { + rb_raise(rb_eRuntimeError, "Operation was cancelled!"); + } + + struct IO_Event_WorkerPool *pool = promise->pool; + + // Wait for completion + pthread_mutex_lock(&pool->mutex); + while (!promise->completed && !promise->cancelled) { + pthread_cond_wait(&pool->work_completed, &pool->mutex); + } + pthread_mutex_unlock(&pool->mutex); + + if (promise->cancelled) { + rb_raise(rb_eRuntimeError, "Operation was cancelled!"); + } + + return self; } static VALUE worker_pool_allocate(VALUE klass) { - worker_pool_t *pool; - VALUE obj = TypedData_Make_Struct(klass, worker_pool_t, &worker_pool_type, pool); - - // Initialize to NULL/zero so we can detect uninitialized pools - memset(pool, 0, sizeof(worker_pool_t)); - - return obj; + struct IO_Event_WorkerPool *pool; + VALUE obj = TypedData_Make_Struct(klass, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + + // Initialize to NULL/zero so we can detect uninitialized pools + memset(pool, 0, sizeof(struct IO_Event_WorkerPool)); + + return obj; } static VALUE worker_pool_promise_allocate(VALUE klass) { - worker_pool_promise_t *promise; - return TypedData_Make_Struct(klass, worker_pool_promise_t, &worker_pool_promise_type, promise); + struct IO_Event_WorkerPool_Promise *promise; + return TypedData_Make_Struct(klass, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise); } // Test helper: get pool statistics for debugging/testing -static VALUE worker_pool_stats(VALUE self) { - worker_pool_t *pool; - TypedData_Get_Struct(self, worker_pool_t, &worker_pool_type, pool); - - if (!pool) { - rb_raise(rb_eRuntimeError, "WorkerPool not initialized"); - } - - - - VALUE stats = rb_hash_new(); - rb_hash_aset(stats, ID2SYM(rb_intern("thread_count")), SIZET2NUM(pool->thread_count)); - rb_hash_aset(stats, ID2SYM(rb_intern("max_threads")), SIZET2NUM(pool->max_threads)); - rb_hash_aset(stats, ID2SYM(rb_intern("shutdown")), pool->shutdown ? Qtrue : Qfalse); - - // Count work items in queue (only if properly initialized) - if (pool->max_threads > 0) { - pthread_mutex_lock(&pool->mutex); - size_t queue_size = 0; - work_item_t *work = pool->work_queue; - while (work) { - queue_size++; - work = work->next; - } - pthread_mutex_unlock(&pool->mutex); - rb_hash_aset(stats, ID2SYM(rb_intern("queue_size")), SIZET2NUM(queue_size)); - } else { - rb_hash_aset(stats, ID2SYM(rb_intern("queue_size")), SIZET2NUM(0)); - } - - return stats; +static VALUE worker_pool_statistics(VALUE self) { + struct IO_Event_WorkerPool *pool; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + + if (!pool) { + rb_raise(rb_eRuntimeError, "WorkerPool not initialized!"); + } + + VALUE stats = rb_hash_new(); + rb_hash_aset(stats, ID2SYM(rb_intern("thread_count")), SIZET2NUM(pool->thread_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("max_threads")), SIZET2NUM(pool->max_threads)); + rb_hash_aset(stats, ID2SYM(rb_intern("shutdown")), pool->shutdown ? Qtrue : Qfalse); + + // Count work items in queue (only if properly initialized) + if (pool->max_threads > 0) { + pthread_mutex_lock(&pool->mutex); + size_t queue_size = 0; + work_item_t *work = pool->work_queue; + while (work) { + queue_size++; + work = work->next; + } + pthread_mutex_unlock(&pool->mutex); + rb_hash_aset(stats, ID2SYM(rb_intern("queue_size")), SIZET2NUM(queue_size)); + } else { + rb_hash_aset(stats, ID2SYM(rb_intern("queue_size")), SIZET2NUM(0)); + } + + return stats; } -// Test helper: check if blocking operations are supported -static VALUE worker_pool_blocking_operations_supported_p(VALUE self) { - return Qtrue; -} - -void Init_IO_Event_WorkerPool(VALUE IO_Event) { - cWorkerPool = rb_define_class_under(IO_Event, "WorkerPool", rb_cObject); - rb_define_alloc_func(cWorkerPool, worker_pool_allocate); - rb_define_method(cWorkerPool, "initialize", worker_pool_initialize, -1); - rb_define_method(cWorkerPool, "call", worker_pool_call, 1); - rb_define_method(cWorkerPool, "stats", worker_pool_stats, 0); - rb_define_singleton_method(cWorkerPool, "blocking_operations_supported?", worker_pool_blocking_operations_supported_p, 0); - - cWorkerPoolPromise = rb_define_class_under(cWorkerPool, "Promise", rb_cObject); - rb_define_alloc_func(cWorkerPoolPromise, worker_pool_promise_allocate); - rb_define_method(cWorkerPoolPromise, "cancel", worker_pool_promise_cancel, 0); - rb_define_method(cWorkerPoolPromise, "cancelled?", worker_pool_promise_cancelled_p, 0); - rb_define_method(cWorkerPoolPromise, "completed?", worker_pool_promise_completed_p, 0); - rb_define_method(cWorkerPoolPromise, "wait", worker_pool_promise_wait, 0); -} - -#else - void Init_IO_Event_WorkerPool(VALUE IO_Event) { - // No-op. + IO_Event_WorkerPool = rb_define_class_under(IO_Event, "WorkerPool", rb_cObject); + rb_define_alloc_func(IO_Event_WorkerPool, worker_pool_allocate); + rb_define_method(IO_Event_WorkerPool, "initialize", worker_pool_initialize, -1); + rb_define_method(IO_Event_WorkerPool, "call", worker_pool_call, 1); + rb_define_method(IO_Event_WorkerPool, "statistics", worker_pool_statistics, 0); + + + IO_Event_WorkerPool_Promise = rb_define_class_under(IO_Event_WorkerPool, "Promise", rb_cObject); + rb_define_alloc_func(IO_Event_WorkerPool_Promise, worker_pool_promise_allocate); + rb_define_method(IO_Event_WorkerPool_Promise, "cancel", worker_pool_promise_cancel, 0); + rb_define_method(IO_Event_WorkerPool_Promise, "cancelled?", worker_pool_promise_cancelled_p, 0); + rb_define_method(IO_Event_WorkerPool_Promise, "completed?", worker_pool_promise_completed_p, 0); + rb_define_method(IO_Event_WorkerPool_Promise, "wait", worker_pool_promise_wait, 0); } - -#endif diff --git a/ext/io/event/worker_pool.h b/ext/io/event/worker_pool.h index 4f03bef5..09085868 100644 --- a/ext/io/event/worker_pool.h +++ b/ext/io/event/worker_pool.h @@ -9,5 +9,4 @@ #include #endif -// Worker pool functionality -void Init_IO_Event_WorkerPool(VALUE IO_Event); \ No newline at end of file +void Init_IO_Event_WorkerPool(VALUE IO_Event); diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb index 0d2d7284..d25cea62 100644 --- a/test/io/event/worker_pool.rb +++ b/test/io/event/worker_pool.rb @@ -20,16 +20,16 @@ expect(worker_pool).to be_a(IO::Event::WorkerPool) end - it "provides stats" do + it "provides statistics" do # Force initialization by calling a method on the pool pool = worker_pool # This should trigger initialization - stats = pool.stats + statistics = pool.statistics - expect(stats).to be_a(Hash) - expect(stats[:thread_count]).to be_a(Integer) - expect(stats[:max_threads]).to be == 2 - expect(stats[:queue_size]).to be == 0 - expect(stats[:shutdown]).to be == false + expect(statistics).to be_a(Hash) + expect(statistics[:thread_count]).to be_a(Integer) + expect(statistics[:max_threads]).to be == 2 + expect(statistics[:queue_size]).to be == 0 + expect(statistics[:shutdown]).to be == false end end From e7519d04f7ee42215ef8ac874c22aae6d6b3fbb7 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 9 Jun 2025 13:50:31 +0900 Subject: [PATCH 03/13] Inline promise + internal naming + statistics. --- ext/io/event/worker_pool.c | 310 +++++++++++----------------- fixtures/io/event/test_scheduler.rb | 17 +- test/io/event/worker_pool.rb | 15 +- 3 files changed, 132 insertions(+), 210 deletions(-) diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c index 90ea683d..d3dda180 100644 --- a/ext/io/event/worker_pool.c +++ b/ext/io/event/worker_pool.c @@ -12,22 +12,21 @@ // Forward declarations static VALUE IO_Event_WorkerPool; -static VALUE IO_Event_WorkerPool_Promise; // Thread pool structure -typedef struct worker_thread { +struct IO_Event_WorkerPool_Worker { pthread_t thread; struct IO_Event_WorkerPool *pool; - struct worker_thread *next; -} worker_thread_t; + struct IO_Event_WorkerPool_Worker *next; +}; // Work item structure -typedef struct work_item { +struct IO_Event_WorkerPool_Work { rb_fiber_scheduler_blocking_operation_t *blocking_operation; - VALUE promise; - bool cancelled; - struct work_item *next; -} work_item_t; + bool completed; + + struct IO_Event_WorkerPool_Work *next; +}; // Worker pool structure struct IO_Event_WorkerPool { @@ -35,25 +34,20 @@ struct IO_Event_WorkerPool { pthread_cond_t work_available; pthread_cond_t work_completed; - work_item_t *work_queue; - work_item_t *work_queue_tail; + struct IO_Event_WorkerPool_Work *work_queue; + struct IO_Event_WorkerPool_Work *work_queue_tail; + + struct IO_Event_WorkerPool_Worker *workers; + size_t current_worker_count; + size_t maximum_worker_count; - worker_thread_t *threads; - size_t thread_count; - size_t max_threads; + size_t call_count; + size_t completed_count; + size_t cancelled_count; bool shutdown; }; -// Promise structure -struct IO_Event_WorkerPool_Promise { - work_item_t *work_item; - struct IO_Event_WorkerPool *pool; - VALUE fiber; - bool completed; - bool cancelled; -}; - // Free functions for Ruby GC static void worker_pool_free(void *ptr) { struct IO_Event_WorkerPool *pool = (struct IO_Event_WorkerPool *)ptr; @@ -65,19 +59,19 @@ static void worker_pool_free(void *ptr) { pthread_cond_broadcast(&pool->work_available); pthread_mutex_unlock(&pool->mutex); - // Wait for all threads to finish - worker_thread_t *thread = pool->threads; + // Wait for all workers to finish + struct IO_Event_WorkerPool_Worker *thread = pool->workers; while (thread) { pthread_join(thread->thread, NULL); - worker_thread_t *next = thread->next; + struct IO_Event_WorkerPool_Worker *next = thread->next; free(thread); thread = next; } // Clean up work queue - work_item_t *work = pool->work_queue; + struct IO_Event_WorkerPool_Work *work = pool->work_queue; while (work) { - work_item_t *next = work->next; + struct IO_Event_WorkerPool_Work *next = work->next; free(work); work = next; } @@ -90,22 +84,11 @@ static void worker_pool_free(void *ptr) { } } -static void worker_pool_promise_free(void *ptr) { - struct IO_Event_WorkerPool_Promise *promise = (struct IO_Event_WorkerPool_Promise *)ptr; - if (promise) { - free(promise); - } -} - // Size functions for Ruby GC static size_t worker_pool_size(const void *ptr) { return sizeof(struct IO_Event_WorkerPool); } -static size_t worker_pool_promise_size(const void *ptr) { - return sizeof(struct IO_Event_WorkerPool_Promise); -} - // Ruby TypedData structures static const rb_data_type_t IO_Event_WorkerPool_type = { "IO::Event::WorkerPool", @@ -113,19 +96,13 @@ static const rb_data_type_t IO_Event_WorkerPool_type = { 0, 0, RUBY_TYPED_FREE_IMMEDIATELY }; -static const rb_data_type_t IO_Event_WorkerPool_Promise_type = { - "IO::Event::WorkerPool::Promise", - {0, worker_pool_promise_free, worker_pool_promise_size,}, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY -}; - // Worker thread function static void* worker_thread_func(void *arg) { - worker_thread_t *worker = (worker_thread_t *)arg; - struct IO_Event_WorkerPool *pool = worker->pool; + struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)arg; + struct IO_Event_WorkerPool *pool = worker->pool; while (true) { - work_item_t *work = NULL; + struct IO_Event_WorkerPool_Work *work = NULL; pthread_mutex_lock(&pool->mutex); @@ -150,19 +127,14 @@ static void* worker_thread_func(void *arg) { pthread_mutex_unlock(&pool->mutex); - // Execute work if not cancelled - if (work && !work->cancelled) { + // Execute work + if (work) { rb_fiber_scheduler_blocking_operation_execute(work->blocking_operation); - // Mark promise as completed - struct IO_Event_WorkerPool_Promise *promise_data; - TypedData_Get_Struct(work->promise, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise_data); - promise_data->completed = true; - } - - // Signal completion - if (work) { + // Mark work as completed (mutex required for worker thread) pthread_mutex_lock(&pool->mutex); + work->completed = true; + pool->completed_count++; pthread_cond_signal(&pool->work_completed); pthread_mutex_unlock(&pool->mutex); } @@ -173,50 +145,50 @@ static void* worker_thread_func(void *arg) { // Create a new worker thread static int create_worker_thread(struct IO_Event_WorkerPool *pool) { - if (pool->thread_count >= pool->max_threads) { + if (pool->current_worker_count >= pool->maximum_worker_count) { return -1; } - worker_thread_t *worker = malloc(sizeof(worker_thread_t)); + struct IO_Event_WorkerPool_Worker *worker = malloc(sizeof(struct IO_Event_WorkerPool_Worker)); if (!worker) { return -1; } worker->pool = pool; - worker->next = pool->threads; + worker->next = pool->workers; if (pthread_create(&worker->thread, NULL, worker_thread_func, worker) != 0) { free(worker); return -1; } - pool->threads = worker; - pool->thread_count++; + pool->workers = worker; + pool->current_worker_count++; return 0; } // Ruby constructor for WorkerPool static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { - VALUE rb_max_threads = Qnil; - size_t max_threads = 4; // Default + VALUE rb_maximum_worker_count = Qnil; + size_t maximum_worker_count = 4; // Default // Handle keyword arguments if (argc == 1 && RB_TYPE_P(argv[0], T_HASH)) { VALUE hash = argv[0]; VALUE max_threads_key = ID2SYM(rb_intern("max_threads")); if (rb_hash_lookup(hash, max_threads_key) != Qnil) { - rb_max_threads = rb_hash_aref(hash, max_threads_key); + rb_maximum_worker_count = rb_hash_aref(hash, max_threads_key); } } else if (argc == 1) { - rb_max_threads = argv[0]; + rb_maximum_worker_count = argv[0]; } else if (argc > 1) { rb_raise(rb_eArgError, "wrong number of arguments (given %d, expected 0..1)!", argc); } - if (!NIL_P(rb_max_threads)) { - max_threads = NUM2SIZET(rb_max_threads); - if (max_threads == 0) { + if (!NIL_P(rb_maximum_worker_count)) { + maximum_worker_count = NUM2SIZET(rb_maximum_worker_count); + if (maximum_worker_count == 0) { rb_raise(rb_eArgError, "max_threads must be greater than 0!"); } } @@ -235,17 +207,20 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { pool->work_queue = NULL; pool->work_queue_tail = NULL; - pool->threads = NULL; - pool->thread_count = 0; - pool->max_threads = max_threads; + pool->workers = NULL; + pool->current_worker_count = 0; + pool->maximum_worker_count = maximum_worker_count; + pool->call_count = 0; + pool->completed_count = 0; + pool->cancelled_count = 0; pool->shutdown = false; - // Create initial worker threads - for (size_t i = 0; i < max_threads; i++) { + // Create initial workers + for (size_t i = 0; i < maximum_worker_count; i++) { if (create_worker_thread(pool) != 0) { - // Just set the max_threads for debugging, don't fail completely + // Just set the maximum_worker_count for debugging, don't fail completely // worker_pool_free(pool); - // rb_raise(rb_eRuntimeError, "Failed to create worker threads"); + // rb_raise(rb_eRuntimeError, "Failed to create workers"); break; } } @@ -253,8 +228,46 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { return self; } -// Ruby method to submit work -static VALUE worker_pool_call(VALUE self, VALUE work) { +// Structure to pass both work and pool to rb_ensure functions +struct worker_pool_call_arguments { + struct IO_Event_WorkerPool_Work *work; + struct IO_Event_WorkerPool *pool; +}; + +// Cleanup function for rb_ensure +static VALUE worker_pool_call_cleanup(VALUE _arguments) { + struct worker_pool_call_arguments *arguments = (struct worker_pool_call_arguments *)_arguments; + if (arguments && arguments->work) { + // Cancel the blocking operation if possible + if (arguments->work->blocking_operation) { + rb_fiber_scheduler_blocking_operation_cancel(arguments->work->blocking_operation); + + // Increment cancelled count (protected by GVL) + arguments->pool->cancelled_count++; + } + free(arguments->work); + } + return Qnil; +} + +// Main work execution function +static VALUE worker_pool_call_body(VALUE _arguments) { + struct worker_pool_call_arguments *arguments = (struct worker_pool_call_arguments *)_arguments; + struct IO_Event_WorkerPool_Work *work = arguments->work; + struct IO_Event_WorkerPool *pool = arguments->pool; + + // Wait for completion + pthread_mutex_lock(&pool->mutex); + while (!work->completed) { + pthread_cond_wait(&pool->work_completed, &pool->mutex); + } + pthread_mutex_unlock(&pool->mutex); + + return Qnil; +} + +// Ruby method to submit work and wait for completion +static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) { struct IO_Event_WorkerPool *pool; TypedData_Get_Struct(self, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); @@ -262,130 +275,54 @@ static VALUE worker_pool_call(VALUE self, VALUE work) { rb_raise(rb_eRuntimeError, "Worker pool is shut down!"); } + // Increment call count (protected by GVL) + pool->call_count++; + // Extract blocking operation handle - rb_fiber_scheduler_blocking_operation_t *blocking_operation = - rb_fiber_scheduler_blocking_operation_extract(work); + rb_fiber_scheduler_blocking_operation_t *blocking_operation = rb_fiber_scheduler_blocking_operation_extract(_blocking_operation); if (!blocking_operation) { rb_raise(rb_eArgError, "Invalid blocking operation!"); } // Create work item - work_item_t *work_item = malloc(sizeof(work_item_t)); - if (!work_item) { + struct IO_Event_WorkerPool_Work *work = malloc(sizeof(struct IO_Event_WorkerPool_Work)); + if (!work) { rb_raise(rb_eNoMemError, "Failed to allocate work item!"); } - work_item->blocking_operation = blocking_operation; - work_item->cancelled = false; - work_item->next = NULL; - - // Create promise - struct IO_Event_WorkerPool_Promise *promise_data; - VALUE promise = TypedData_Make_Struct(IO_Event_WorkerPool_Promise, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise_data); - - promise_data->work_item = work_item; - promise_data->pool = pool; - promise_data->fiber = IO_Event_Fiber_current(); - promise_data->completed = false; - promise_data->cancelled = false; - - work_item->promise = promise; + work->blocking_operation = blocking_operation; + work->completed = false; + work->next = NULL; // Enqueue work pthread_mutex_lock(&pool->mutex); if (pool->work_queue_tail) { - pool->work_queue_tail->next = work_item; + pool->work_queue_tail->next = work; } else { - pool->work_queue = work_item; + pool->work_queue = work; } - pool->work_queue_tail = work_item; + pool->work_queue_tail = work; pthread_cond_signal(&pool->work_available); pthread_mutex_unlock(&pool->mutex); - return promise; -} - -// Promise cancel method -static VALUE worker_pool_promise_cancel(VALUE self) { - struct IO_Event_WorkerPool_Promise *promise; - TypedData_Get_Struct(self, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise); - - if (promise->completed || promise->cancelled) { - return Qfalse; - } - - promise->cancelled = true; - promise->work_item->cancelled = true; - - // Try to cancel the blocking operation - if (promise->work_item->blocking_operation) { - rb_fiber_scheduler_blocking_operation_cancel(promise->work_item->blocking_operation); - } - - return Qtrue; -} - -// Promise cancelled? predicate -static VALUE worker_pool_promise_cancelled_p(VALUE self) { - struct IO_Event_WorkerPool_Promise *promise; - TypedData_Get_Struct(self, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise); - - return promise->cancelled ? Qtrue : Qfalse; -} - -// Promise completed? predicate -static VALUE worker_pool_promise_completed_p(VALUE self) { - struct IO_Event_WorkerPool_Promise *promise; - TypedData_Get_Struct(self, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise); - - return promise->completed ? Qtrue : Qfalse; -} - -// Promise wait method -static VALUE worker_pool_promise_wait(VALUE self) { - struct IO_Event_WorkerPool_Promise *promise; - TypedData_Get_Struct(self, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise); - - if (promise->completed) { - return self; - } - - if (promise->cancelled) { - rb_raise(rb_eRuntimeError, "Operation was cancelled!"); - } - - struct IO_Event_WorkerPool *pool = promise->pool; - - // Wait for completion - pthread_mutex_lock(&pool->mutex); - while (!promise->completed && !promise->cancelled) { - pthread_cond_wait(&pool->work_completed, &pool->mutex); - } - pthread_mutex_unlock(&pool->mutex); - - if (promise->cancelled) { - rb_raise(rb_eRuntimeError, "Operation was cancelled!"); - } + // Wait for completion with proper cleanup using rb_ensure + struct worker_pool_call_arguments arguments = {work, pool}; + rb_ensure(worker_pool_call_body, (VALUE)&arguments, worker_pool_call_cleanup, (VALUE)&arguments); - return self; + return Qnil; } static VALUE worker_pool_allocate(VALUE klass) { struct IO_Event_WorkerPool *pool; - VALUE obj = TypedData_Make_Struct(klass, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + VALUE self = TypedData_Make_Struct(klass, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); // Initialize to NULL/zero so we can detect uninitialized pools memset(pool, 0, sizeof(struct IO_Event_WorkerPool)); - return obj; -} - -static VALUE worker_pool_promise_allocate(VALUE klass) { - struct IO_Event_WorkerPool_Promise *promise; - return TypedData_Make_Struct(klass, struct IO_Event_WorkerPool_Promise, &IO_Event_WorkerPool_Promise_type, promise); + return self; } // Test helper: get pool statistics for debugging/testing @@ -398,23 +335,26 @@ static VALUE worker_pool_statistics(VALUE self) { } VALUE stats = rb_hash_new(); - rb_hash_aset(stats, ID2SYM(rb_intern("thread_count")), SIZET2NUM(pool->thread_count)); - rb_hash_aset(stats, ID2SYM(rb_intern("max_threads")), SIZET2NUM(pool->max_threads)); + rb_hash_aset(stats, ID2SYM(rb_intern("current_worker_count")), SIZET2NUM(pool->current_worker_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("maximum_worker_count")), SIZET2NUM(pool->maximum_worker_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("call_count")), SIZET2NUM(pool->call_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("completed_count")), SIZET2NUM(pool->completed_count)); + rb_hash_aset(stats, ID2SYM(rb_intern("cancelled_count")), SIZET2NUM(pool->cancelled_count)); rb_hash_aset(stats, ID2SYM(rb_intern("shutdown")), pool->shutdown ? Qtrue : Qfalse); // Count work items in queue (only if properly initialized) - if (pool->max_threads > 0) { + if (pool->maximum_worker_count > 0) { pthread_mutex_lock(&pool->mutex); - size_t queue_size = 0; - work_item_t *work = pool->work_queue; + size_t current_queue_size = 0; + struct IO_Event_WorkerPool_Work *work = pool->work_queue; while (work) { - queue_size++; + current_queue_size++; work = work->next; } pthread_mutex_unlock(&pool->mutex); - rb_hash_aset(stats, ID2SYM(rb_intern("queue_size")), SIZET2NUM(queue_size)); + rb_hash_aset(stats, ID2SYM(rb_intern("current_queue_size")), SIZET2NUM(current_queue_size)); } else { - rb_hash_aset(stats, ID2SYM(rb_intern("queue_size")), SIZET2NUM(0)); + rb_hash_aset(stats, ID2SYM(rb_intern("current_queue_size")), SIZET2NUM(0)); } return stats; @@ -426,12 +366,4 @@ void Init_IO_Event_WorkerPool(VALUE IO_Event) { rb_define_method(IO_Event_WorkerPool, "initialize", worker_pool_initialize, -1); rb_define_method(IO_Event_WorkerPool, "call", worker_pool_call, 1); rb_define_method(IO_Event_WorkerPool, "statistics", worker_pool_statistics, 0); - - - IO_Event_WorkerPool_Promise = rb_define_class_under(IO_Event_WorkerPool, "Promise", rb_cObject); - rb_define_alloc_func(IO_Event_WorkerPool_Promise, worker_pool_promise_allocate); - rb_define_method(IO_Event_WorkerPool_Promise, "cancel", worker_pool_promise_cancel, 0); - rb_define_method(IO_Event_WorkerPool_Promise, "cancelled?", worker_pool_promise_cancelled_p, 0); - rb_define_method(IO_Event_WorkerPool_Promise, "completed?", worker_pool_promise_completed_p, 0); - rb_define_method(IO_Event_WorkerPool_Promise, "wait", worker_pool_promise_wait, 0); } diff --git a/fixtures/io/event/test_scheduler.rb b/fixtures/io/event/test_scheduler.rb index fc18f836..35c4cff2 100644 --- a/fixtures/io/event/test_scheduler.rb +++ b/fixtures/io/event/test_scheduler.rb @@ -45,24 +45,11 @@ def initialize(selector: nil, worker_pool: nil, max_threads: 2) # @attribute [IO::Event::Selector] The I/O event selector used for managing fiber scheduling. attr_reader :selector - - # @attribute [Integer] The number of times blocking_operation_wait was called. - attr_reader :blocking_operation_count # Required fiber scheduler hook - delegates to WorkerPool def blocking_operation_wait(operation) - @blocking_operation_count += 1 - - # Submit the operation to the worker pool - promise = @worker_pool.call(operation) - - # Wait for completion - result = promise&.wait - - promise = nil - return result - ensure - promise&.cancel + # Submit the operation to the worker pool and wait for completion + @worker_pool.call(operation) end # Required fiber scheduler hooks diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb index d25cea62..8c575c28 100644 --- a/test/io/event/worker_pool.rb +++ b/test/io/event/worker_pool.rb @@ -26,10 +26,12 @@ statistics = pool.statistics expect(statistics).to be_a(Hash) - expect(statistics[:thread_count]).to be_a(Integer) - expect(statistics[:max_threads]).to be == 2 - expect(statistics[:queue_size]).to be == 0 - expect(statistics[:shutdown]).to be == false + expect(statistics).to have_keys( + current_worker_count: be_a(Integer), + maximum_worker_count: be == 2, + current_queue_size: be == 0, + shutdown: be == false + ) end end @@ -53,7 +55,8 @@ source.clear("A".ord) # Track initial count - initial_count = scheduler.blocking_operation_count + worker_pool = scheduler.worker_pool + initial_count = worker_pool.statistics[:completed_count] Thread.new do Fiber.set_scheduler(scheduler) @@ -67,7 +70,7 @@ # Confirm that the copy worked: expect(destination.get_string(0, 10)).to be == "AAAAAAAAAA" - expect(scheduler.blocking_operation_count).to be > initial_count + expect(worker_pool.statistics[:completed_count]).to be > initial_count end end end From 199170ee1592ffd3427687acb8375dd3450156be Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 9 Jun 2025 16:35:00 +0900 Subject: [PATCH 04/13] Better cancellation. --- ext/io/event/worker_pool.c | 199 +++++++++++++++++----------- fixtures/io/event/test_scheduler.rb | 43 +++--- lib/io/event/test_scheduler.rb | 54 ++++++++ test/io/event/worker_pool.rb | 14 +- 4 files changed, 204 insertions(+), 106 deletions(-) create mode 100644 lib/io/event/test_scheduler.rb diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c index d3dda180..583d35fd 100644 --- a/ext/io/event/worker_pool.c +++ b/ext/io/event/worker_pool.c @@ -4,6 +4,8 @@ #include "worker_pool.h" #include "fiber.h" +#include + #include #include #include @@ -15,7 +17,14 @@ static VALUE IO_Event_WorkerPool; // Thread pool structure struct IO_Event_WorkerPool_Worker { - pthread_t thread; + VALUE thread; + + // Flag to indicate this specific worker should exit: + bool interrupted; + + // Currently executing operation: + rb_fiber_scheduler_blocking_operation_t *current_blocking_operation; + struct IO_Event_WorkerPool *pool; struct IO_Event_WorkerPool_Worker *next; }; @@ -23,8 +32,13 @@ struct IO_Event_WorkerPool_Worker { // Work item structure struct IO_Event_WorkerPool_Work { rb_fiber_scheduler_blocking_operation_t *blocking_operation; + bool completed; + VALUE scheduler; + VALUE blocker; + VALUE fiber; + struct IO_Event_WorkerPool_Work *next; }; @@ -32,7 +46,6 @@ struct IO_Event_WorkerPool_Work { struct IO_Event_WorkerPool { pthread_mutex_t mutex; pthread_cond_t work_available; - pthread_cond_t work_completed; struct IO_Event_WorkerPool_Work *work_queue; struct IO_Event_WorkerPool_Work *work_queue_tail; @@ -62,7 +75,7 @@ static void worker_pool_free(void *ptr) { // Wait for all workers to finish struct IO_Event_WorkerPool_Worker *thread = pool->workers; while (thread) { - pthread_join(thread->thread, NULL); + rb_funcall(thread->thread, rb_intern("join"), 0); struct IO_Event_WorkerPool_Worker *next = thread->next; free(thread); thread = next; @@ -78,7 +91,6 @@ static void worker_pool_free(void *ptr) { pthread_mutex_destroy(&pool->mutex); pthread_cond_destroy(&pool->work_available); - pthread_cond_destroy(&pool->work_completed); free(pool); } @@ -96,9 +108,49 @@ static const rb_data_type_t IO_Event_WorkerPool_type = { 0, 0, RUBY_TYPED_FREE_IMMEDIATELY }; -// Worker thread function -static void* worker_thread_func(void *arg) { - struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)arg; +// Helper function to enqueue work (must be called with mutex held) +static void enqueue_work(struct IO_Event_WorkerPool *pool, struct IO_Event_WorkerPool_Work *work) { + if (pool->work_queue_tail) { + pool->work_queue_tail->next = work; + } else { + pool->work_queue = work; + } + pool->work_queue_tail = work; +} + +// Helper function to dequeue work (must be called with mutex held) +static struct IO_Event_WorkerPool_Work *dequeue_work(struct IO_Event_WorkerPool *pool) { + struct IO_Event_WorkerPool_Work *work = pool->work_queue; + if (work) { + pool->work_queue = work->next; + if (!pool->work_queue) { + pool->work_queue_tail = NULL; + } + work->next = NULL; // Clear the next pointer for safety + } + return work; +} + +// Unblock function to interrupt a specific worker. +static void worker_unblock_func(void *_worker) { + struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker; + struct IO_Event_WorkerPool *pool = worker->pool; + + // Mark this specific worker as interrupted + pthread_mutex_lock(&pool->mutex); + worker->interrupted = true; + pthread_cond_broadcast(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); + + // If there's a currently executing blocking operation, cancel it + if (worker->current_blocking_operation) { + rb_fiber_scheduler_blocking_operation_cancel(worker->current_blocking_operation); + } +} + +// Function to wait for work and execute it without GVL. +static void *worker_wait_and_execute(void *_worker) { + struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker; struct IO_Event_WorkerPool *pool = worker->pool; while (true) { @@ -106,41 +158,54 @@ static void* worker_thread_func(void *arg) { pthread_mutex_lock(&pool->mutex); - // Wait for work or shutdown - while (!pool->work_queue && !pool->shutdown) { + // Wait for work, shutdown, or interruption + while (!pool->work_queue && !pool->shutdown && !worker->interrupted) { pthread_cond_wait(&pool->work_available, &pool->mutex); } - if (pool->shutdown) { + if (pool->shutdown || worker->interrupted) { pthread_mutex_unlock(&pool->mutex); break; } - // Dequeue work item - if (pool->work_queue) { - work = pool->work_queue; - pool->work_queue = work->next; - if (!pool->work_queue) { - pool->work_queue_tail = NULL; - } - } + work = dequeue_work(pool); pthread_mutex_unlock(&pool->mutex); - // Execute work + // Execute work WITHOUT GVL (this is the whole point!) if (work) { + worker->current_blocking_operation = work->blocking_operation; rb_fiber_scheduler_blocking_operation_execute(work->blocking_operation); - - // Mark work as completed (mutex required for worker thread) - pthread_mutex_lock(&pool->mutex); - work->completed = true; - pool->completed_count++; - pthread_cond_signal(&pool->work_completed); - pthread_mutex_unlock(&pool->mutex); + worker->current_blocking_operation = NULL; } + + return work; } - return NULL; + return NULL; // Shutdown signal +} + +static VALUE worker_thread_func(void *_worker) { + struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker; + + while (true) { + // Wait for work and execute it without holding GVL + struct IO_Event_WorkerPool_Work *work = (struct IO_Event_WorkerPool_Work *)rb_thread_call_without_gvl(worker_wait_and_execute, worker, worker_unblock_func, worker); + + if (!work) { + // Shutdown signal received + break; + } + + // Protected by GVL: + work->completed = true; + worker->pool->completed_count++; + + // Work was executed without GVL, now unblock the waiting fiber (we have GVL here) + rb_fiber_scheduler_unblock(work->scheduler, work->blocker, work->fiber); + } + + return Qnil; } // Create a new worker thread @@ -155,9 +220,12 @@ static int create_worker_thread(struct IO_Event_WorkerPool *pool) { } worker->pool = pool; + worker->interrupted = false; + worker->current_blocking_operation = NULL; worker->next = pool->workers; - if (pthread_create(&worker->thread, NULL, worker_thread_func, worker) != 0) { + worker->thread = rb_thread_create(worker_thread_func, worker); + if (NIL_P(worker->thread)) { free(worker); return -1; } @@ -203,7 +271,6 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { pthread_mutex_init(&pool->mutex, NULL); pthread_cond_init(&pool->work_available, NULL); - pthread_cond_init(&pool->work_completed, NULL); pool->work_queue = NULL; pool->work_queue_tail = NULL; @@ -228,44 +295,6 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { return self; } -// Structure to pass both work and pool to rb_ensure functions -struct worker_pool_call_arguments { - struct IO_Event_WorkerPool_Work *work; - struct IO_Event_WorkerPool *pool; -}; - -// Cleanup function for rb_ensure -static VALUE worker_pool_call_cleanup(VALUE _arguments) { - struct worker_pool_call_arguments *arguments = (struct worker_pool_call_arguments *)_arguments; - if (arguments && arguments->work) { - // Cancel the blocking operation if possible - if (arguments->work->blocking_operation) { - rb_fiber_scheduler_blocking_operation_cancel(arguments->work->blocking_operation); - - // Increment cancelled count (protected by GVL) - arguments->pool->cancelled_count++; - } - free(arguments->work); - } - return Qnil; -} - -// Main work execution function -static VALUE worker_pool_call_body(VALUE _arguments) { - struct worker_pool_call_arguments *arguments = (struct worker_pool_call_arguments *)_arguments; - struct IO_Event_WorkerPool_Work *work = arguments->work; - struct IO_Event_WorkerPool *pool = arguments->pool; - - // Wait for completion - pthread_mutex_lock(&pool->mutex); - while (!work->completed) { - pthread_cond_wait(&pool->work_completed, &pool->mutex); - } - pthread_mutex_unlock(&pool->mutex); - - return Qnil; -} - // Ruby method to submit work and wait for completion static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) { struct IO_Event_WorkerPool *pool; @@ -278,6 +307,13 @@ static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) { // Increment call count (protected by GVL) pool->call_count++; + // Get current fiber and scheduler + VALUE fiber = rb_fiber_current(); + VALUE scheduler = rb_fiber_scheduler_current(); + if (NIL_P(scheduler)) { + rb_raise(rb_eRuntimeError, "WorkerPool requires a fiber scheduler!"); + } + // Extract blocking operation handle rb_fiber_scheduler_blocking_operation_t *blocking_operation = rb_fiber_scheduler_blocking_operation_extract(_blocking_operation); @@ -293,26 +329,27 @@ static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) { work->blocking_operation = blocking_operation; work->completed = false; + work->scheduler = scheduler; + work->blocker = self; + work->fiber = fiber; work->next = NULL; - // Enqueue work + // Enqueue work: pthread_mutex_lock(&pool->mutex); - - if (pool->work_queue_tail) { - pool->work_queue_tail->next = work; - } else { - pool->work_queue = work; - } - pool->work_queue_tail = work; - + enqueue_work(pool, work); pthread_cond_signal(&pool->work_available); pthread_mutex_unlock(&pool->mutex); - // Wait for completion with proper cleanup using rb_ensure - struct worker_pool_call_arguments arguments = {work, pool}; - rb_ensure(worker_pool_call_body, (VALUE)&arguments, worker_pool_call_cleanup, (VALUE)&arguments); + // Block the current fiber until work is completed + while (true) { + rb_fiber_scheduler_block(scheduler, work->blocker, Qnil); + + if (work->completed) { + break; + } + } - return Qnil; + return Qtrue; } static VALUE worker_pool_allocate(VALUE klass) { @@ -363,7 +400,9 @@ static VALUE worker_pool_statistics(VALUE self) { void Init_IO_Event_WorkerPool(VALUE IO_Event) { IO_Event_WorkerPool = rb_define_class_under(IO_Event, "WorkerPool", rb_cObject); rb_define_alloc_func(IO_Event_WorkerPool, worker_pool_allocate); + rb_define_method(IO_Event_WorkerPool, "initialize", worker_pool_initialize, -1); rb_define_method(IO_Event_WorkerPool, "call", worker_pool_call, 1); + rb_define_method(IO_Event_WorkerPool, "statistics", worker_pool_statistics, 0); } diff --git a/fixtures/io/event/test_scheduler.rb b/fixtures/io/event/test_scheduler.rb index 35c4cff2..705a2df8 100644 --- a/fixtures/io/event/test_scheduler.rb +++ b/fixtures/io/event/test_scheduler.rb @@ -12,21 +12,23 @@ module IO::Event # blocking operations to a WorkerPool instance for testing. # # @example Testing usage - # # Create with default selector and worker pool - # scheduler = IO::Event::TestScheduler.new - # - # # Or provide custom selector and/or worker pool - # selector = IO::Event::Selector.new(Fiber.current) - # worker_pool = IO::Event::WorkerPool.new(max_threads: 4) - # scheduler = IO::Event::TestScheduler.new(selector: selector, worker_pool: worker_pool) - # - # Fiber.set_scheduler(scheduler) - # - # # Standard Ruby operations that use rb_nogvl will be handled by the worker pool - # # Examples: sleep, file I/O, network operations, etc. - # Fiber.schedule do - # sleep(0.001) # This triggers rb_nogvl and uses the worker pool - # end.resume + # ```ruby + # # Create with default selector and worker pool + # scheduler = IO::Event::TestScheduler.new + # + # # Or provide custom selector and/or worker pool + # selector = IO::Event::Selector.new(Fiber.current) + # worker_pool = IO::Event::WorkerPool.new(max_threads: 4) + # scheduler = IO::Event::TestScheduler.new(selector: selector, worker_pool: worker_pool) + # + # Fiber.set_scheduler(scheduler) + # + # # Standard Ruby operations that use rb_nogvl will be handled by the worker pool + # # Examples: sleep, file I/O, network operations, etc. + # Fiber.schedule do + # sleep(0.001) # This triggers rb_nogvl and uses the worker pool + # end.resume + # ``` class TestScheduler def initialize(selector: nil, worker_pool: nil, max_threads: 2) @selector = selector || ::IO::Event::Selector.new(Fiber.current) @@ -35,9 +37,6 @@ def initialize(selector: nil, worker_pool: nil, max_threads: 2) # Track the number of fibers that are blocked. @blocked = 0 - - # Track how many times blocking_operation_wait was called. - @blocking_operation_count = 0 end # @attribute [WorkerPool] The worker pool used for executing blocking operations. @@ -134,12 +133,18 @@ def fiber(&block) # Run the scheduler event loop def run - while @blocked > 0 || @timers.size > 0 + while @blocked > 0 or @timers.size > 0 interval = @timers.wait_interval @selector.select(interval) @timers.fire end end + + def scheduler_close(error = $!) + self.run + ensure + self.close + end private diff --git a/lib/io/event/test_scheduler.rb b/lib/io/event/test_scheduler.rb new file mode 100644 index 00000000..c4b50a8d --- /dev/null +++ b/lib/io/event/test_scheduler.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "io/event" + +module IO::Event + class TestScheduler + def initialize(max_threads: 4) + @worker_pool = IO::Event::WorkerPool.new(max_threads: max_threads) + end + + attr_reader :worker_pool + + def fiber(&block) + Fiber.new(blocking: false, &block).tap(&:resume) + end + + def blocking_operation_wait(blocking_operation) + @worker_pool.call(blocking_operation) + end + + def close + # Close worker pool if needed + end + + def run + # Simple run implementation + end + + # Placeholder implementations for required scheduler methods + def io_wait(io, events, timeout) + # Simple blocking implementation for testing + io.wait_readable if events & IO::READABLE != 0 + io.wait_writable if events & IO::WRITABLE != 0 + end + + def kernel_sleep(duration) + # Simple blocking implementation + sleep(duration) if duration + end + + def block(blocker, timeout = nil) + # Simple blocking implementation + true + end + + def unblock(blocker, fiber) + # Simple unblock implementation + fiber.resume if fiber.alive? + end + end +end \ No newline at end of file diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb index 8c575c28..469f31b9 100644 --- a/test/io/event/worker_pool.rb +++ b/test/io/event/worker_pool.rb @@ -53,24 +53,24 @@ # Fill source buffer with some data source.clear("A".ord) - - # Track initial count - worker_pool = scheduler.worker_pool - initial_count = worker_pool.statistics[:completed_count] + worker_pool = nil Thread.new do Fiber.set_scheduler(scheduler) + worker_pool = scheduler.worker_pool # Perform the large copy operation in a scheduled fiber - completed = false - fiber = Fiber.schedule do + Fiber.schedule do destination.copy(source, 0, buffer_size, 0) end end.join # Confirm that the copy worked: expect(destination.get_string(0, 10)).to be == "AAAAAAAAAA" - expect(worker_pool.statistics[:completed_count]).to be > initial_count + + expect(worker_pool.statistics[:call_count]).to be > 0 + expect(worker_pool.statistics[:completed_count]).to be > 0 + inform worker_pool.statistics end end end From 3e3075abeecb53bad2abeeeb604d6c5e33d8edf3 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 9 Jun 2025 22:48:02 +0900 Subject: [PATCH 05/13] Add better cancellation tests. --- ext/extconf.rb | 1 + ext/io/event/worker_pool.c | 39 +++--- ext/io/event/worker_pool.h | 4 - ext/io/event/worker_pool_test.c | 189 ++++++++++++++++++++++++++++ ext/io/event/worker_pool_test.h | 9 ++ fixtures/io/event/test_scheduler.rb | 2 +- lib/io/event/test_scheduler.rb | 54 -------- test/io/event/worker_pool.rb | 97 +++++++++++++- 8 files changed, 306 insertions(+), 89 deletions(-) create mode 100644 ext/io/event/worker_pool_test.c create mode 100644 ext/io/event/worker_pool_test.h delete mode 100644 lib/io/event/test_scheduler.rb diff --git a/ext/extconf.rb b/ext/extconf.rb index 49667520..60bd519f 100755 --- a/ext/extconf.rb +++ b/ext/extconf.rb @@ -64,6 +64,7 @@ if have_header("pthread.h") append_cflags(["-DHAVE_IO_EVENT_WORKER_POOL"]) $srcs << "io/event/worker_pool.c" + $srcs << "io/event/worker_pool_test.c" end end diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c index 583d35fd..de7371c2 100644 --- a/ext/io/event/worker_pool.c +++ b/ext/io/event/worker_pool.c @@ -2,9 +2,11 @@ // Copyright, 2025, by Samuel Williams. #include "worker_pool.h" +#include "worker_pool_test.h" #include "fiber.h" #include +#include #include #include @@ -66,33 +68,17 @@ static void worker_pool_free(void *ptr) { struct IO_Event_WorkerPool *pool = (struct IO_Event_WorkerPool *)ptr; if (pool) { - // Signal shutdown - pthread_mutex_lock(&pool->mutex); - pool->shutdown = true; - pthread_cond_broadcast(&pool->work_available); - pthread_mutex_unlock(&pool->mutex); - - // Wait for all workers to finish - struct IO_Event_WorkerPool_Worker *thread = pool->workers; - while (thread) { - rb_funcall(thread->thread, rb_intern("join"), 0); - struct IO_Event_WorkerPool_Worker *next = thread->next; - free(thread); - thread = next; - } - - // Clean up work queue - struct IO_Event_WorkerPool_Work *work = pool->work_queue; - while (work) { - struct IO_Event_WorkerPool_Work *next = work->next; - free(work); - work = next; + // Signal shutdown to all workers + if (!pool->shutdown) { + pthread_mutex_lock(&pool->mutex); + pool->shutdown = true; + pthread_cond_broadcast(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); } - pthread_mutex_destroy(&pool->mutex); - pthread_cond_destroy(&pool->work_available); - - free(pool); + // Note: We don't free worker structures or wait for threads during GC + // as this can cause deadlocks. The Ruby GC will handle the thread objects. + // Workers will see the shutdown flag and exit cleanly. } } @@ -405,4 +391,7 @@ void Init_IO_Event_WorkerPool(VALUE IO_Event) { rb_define_method(IO_Event_WorkerPool, "call", worker_pool_call, 1); rb_define_method(IO_Event_WorkerPool, "statistics", worker_pool_statistics, 0); + + // Initialize test functions + Init_IO_Event_WorkerPool_Test(IO_Event_WorkerPool); } diff --git a/ext/io/event/worker_pool.h b/ext/io/event/worker_pool.h index 09085868..dbec34ca 100644 --- a/ext/io/event/worker_pool.h +++ b/ext/io/event/worker_pool.h @@ -5,8 +5,4 @@ #include -#ifdef HAVE_RB_FIBER_SCHEDULER_BLOCKING_OPERATION_EXTRACT -#include -#endif - void Init_IO_Event_WorkerPool(VALUE IO_Event); diff --git a/ext/io/event/worker_pool_test.c b/ext/io/event/worker_pool_test.c new file mode 100644 index 00000000..5c3eb957 --- /dev/null +++ b/ext/io/event/worker_pool_test.c @@ -0,0 +1,189 @@ +// worker_pool_test.c - Test functions for WorkerPool cancellation +// Released under the MIT License. +// Copyright, 2025, by Samuel Williams. + +#include "worker_pool_test.h" + +#include + +#include +#include +#include +#include + +struct BusyOperationData { + int read_fd; + int write_fd; + volatile int cancelled; + double duration; // How long to wait (for testing) + clock_t start_time; + clock_t end_time; + int operation_result; + VALUE exception; +}; + +// The actual blocking operation that can be cancelled +static void* busy_blocking_operation(void *data) { + struct BusyOperationData *busy_data = (struct BusyOperationData*)data; + + // Use select() to wait for the pipe to become readable + fd_set read_fds; + struct timeval timeout; + + FD_ZERO(&read_fds); + FD_SET(busy_data->read_fd, &read_fds); + + // Set timeout based on duration + timeout.tv_sec = (long)busy_data->duration; + timeout.tv_usec = ((busy_data->duration - timeout.tv_sec) * 1000000); + + // This will block until: + // 1. The pipe becomes readable (cancellation) + // 2. The timeout expires + // 3. An error occurs + int result = select(busy_data->read_fd + 1, &read_fds, NULL, NULL, &timeout); + + if (result > 0 && FD_ISSET(busy_data->read_fd, &read_fds)) { + // Pipe became readable - we were cancelled + char buffer; + read(busy_data->read_fd, &buffer, 1); // Consume the byte + busy_data->cancelled = 1; + return (void*)-1; // Indicate cancellation + } else if (result == 0) { + // Timeout - operation completed normally + return (void*)0; // Indicate success + } else { + // Error occurred + return (void*)-2; // Indicate error + } +} + +// Unblock function that writes to the pipe to cancel the operation +static void busy_unblock_function(void *data) { + struct BusyOperationData *busy_data = (struct BusyOperationData*)data; + + // Write a byte to the pipe to wake up the select() + char wake_byte = 1; + write(busy_data->write_fd, &wake_byte, 1); + + busy_data->cancelled = 1; +} + +// Function for the main operation execution (for rb_rescue) +static VALUE busy_operation_execute(VALUE data_value) { + struct BusyOperationData *busy_data = (struct BusyOperationData*)data_value; + + // Record start time + busy_data->start_time = clock(); + + // Execute the blocking operation + void *block_result = rb_nogvl( + busy_blocking_operation, + busy_data, + busy_unblock_function, + busy_data, + RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_OFFLOAD_SAFE + ); + + // Record end time + busy_data->end_time = clock(); + + // Store the operation result + busy_data->operation_result = (int)(intptr_t)block_result; + + return Qnil; +} + +// Function for exception handling (for rb_rescue) +static VALUE busy_operation_rescue(VALUE data_value, VALUE exception) { + struct BusyOperationData *busy_data = (struct BusyOperationData*)data_value; + + // Record end time even in case of exception + busy_data->end_time = clock(); + + // Mark that an exception was caught + busy_data->exception = exception; + + return exception; +} + +// Ruby method: IO::Event::WorkerPool.busy(duration: 1.0) +// This creates a cancellable blocking operation for testing +static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) { + VALUE options = Qnil; + double duration = 1.0; // Default 1 second + + // Parse arguments + if (argc == 1) { + options = argv[0]; + if (RB_TYPE_P(options, T_HASH)) { + VALUE duration_value = rb_hash_aref(options, ID2SYM(rb_intern("duration"))); + if (!NIL_P(duration_value)) { + duration = NUM2DBL(duration_value); + } + } else { + duration = NUM2DBL(options); + } + } else if (argc > 1) { + rb_raise(rb_eArgError, "wrong number of arguments (given %d, expected 0..1)", argc); + } + + // Create pipe for cancellation + int pipe_fds[2]; + if (pipe(pipe_fds) != 0) { + rb_sys_fail("pipe creation failed"); + } + + // Stack allocate and initialize operation data with brace initialization + struct BusyOperationData busy_data = { + .read_fd = pipe_fds[0], + .write_fd = pipe_fds[1], + .duration = duration, + .exception = Qnil, + // All other fields are zero-initialized by default + }; + + // Execute the blocking operation with exception handling using function pointers + rb_rescue( + busy_operation_execute, + (VALUE)&busy_data, + busy_operation_rescue, + (VALUE)&busy_data + ); + + // Calculate elapsed time from the state stored in busy_data + double elapsed = ((double)(busy_data.end_time - busy_data.start_time)) / CLOCKS_PER_SEC; + + // Cleanup pipes + close(busy_data.read_fd); + close(busy_data.write_fd); + + // Create result hash using the state from busy_data + VALUE result = rb_hash_new(); + rb_hash_aset(result, ID2SYM(rb_intern("duration")), DBL2NUM(duration)); + rb_hash_aset(result, ID2SYM(rb_intern("elapsed")), DBL2NUM(elapsed)); + + // Determine result based on operation outcome + if (busy_data.exception != Qnil) { + rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("exception"))); + rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue); + rb_hash_aset(result, ID2SYM(rb_intern("exception")), busy_data.exception); + } else if (busy_data.operation_result == -1) { + rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("cancelled"))); + rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue); + } else if (busy_data.operation_result == 0) { + rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("completed"))); + rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse); + } else { + rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("error"))); + rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse); + } + + return result; +} + +// Initialize the test functions +void Init_IO_Event_WorkerPool_Test(VALUE IO_Event_WorkerPool) { + // Add test methods to IO::Event::WorkerPool class + rb_define_singleton_method(IO_Event_WorkerPool, "busy", worker_pool_test_busy, -1); +} diff --git a/ext/io/event/worker_pool_test.h b/ext/io/event/worker_pool_test.h new file mode 100644 index 00000000..f58ce713 --- /dev/null +++ b/ext/io/event/worker_pool_test.h @@ -0,0 +1,9 @@ +// worker_pool_test.h - Header for WorkerPool test functions +// Released under the MIT License. +// Copyright, 2025, by Samuel Williams. + +#pragma once + +#include + +void Init_IO_Event_WorkerPool_Test(VALUE IO_Event_WorkerPool); diff --git a/fixtures/io/event/test_scheduler.rb b/fixtures/io/event/test_scheduler.rb index 705a2df8..04471b21 100644 --- a/fixtures/io/event/test_scheduler.rb +++ b/fixtures/io/event/test_scheduler.rb @@ -128,7 +128,7 @@ def kernel_sleep(duration = nil) end def fiber(&block) - Fiber.new(&block).tap(&:resume) + Fiber.new(&block).tap(&:transfer) end # Run the scheduler event loop diff --git a/lib/io/event/test_scheduler.rb b/lib/io/event/test_scheduler.rb deleted file mode 100644 index c4b50a8d..00000000 --- a/lib/io/event/test_scheduler.rb +++ /dev/null @@ -1,54 +0,0 @@ -# frozen_string_literal: true - -# Released under the MIT License. -# Copyright, 2025, by Samuel Williams. - -require "io/event" - -module IO::Event - class TestScheduler - def initialize(max_threads: 4) - @worker_pool = IO::Event::WorkerPool.new(max_threads: max_threads) - end - - attr_reader :worker_pool - - def fiber(&block) - Fiber.new(blocking: false, &block).tap(&:resume) - end - - def blocking_operation_wait(blocking_operation) - @worker_pool.call(blocking_operation) - end - - def close - # Close worker pool if needed - end - - def run - # Simple run implementation - end - - # Placeholder implementations for required scheduler methods - def io_wait(io, events, timeout) - # Simple blocking implementation for testing - io.wait_readable if events & IO::READABLE != 0 - io.wait_writable if events & IO::WRITABLE != 0 - end - - def kernel_sleep(duration) - # Simple blocking implementation - sleep(duration) if duration - end - - def block(blocker, timeout = nil) - # Simple blocking implementation - true - end - - def unblock(blocker, fiber) - # Simple unblock implementation - fiber.resume if fiber.alive? - end - end -end \ No newline at end of file diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb index 469f31b9..ba7a1333 100644 --- a/test/io/event/worker_pool.rb +++ b/test/io/event/worker_pool.rb @@ -10,11 +10,7 @@ describe IO::Event::WorkerPool do with "an instance" do - let(:worker_pool) { subject.new(max_threads: 2) } - - after do - worker_pool = nil # This should trigger GC cleanup - end + let(:worker_pool) {subject.new(max_threads: 2)} it "can create a worker pool" do expect(worker_pool).to be_a(IO::Event::WorkerPool) @@ -73,4 +69,95 @@ inform worker_pool.statistics end end + + with "cancellable busy operation" do + let(:scheduler) {IO::Event::TestScheduler.new(max_threads: 1)} + + it "can perform a busy operation that completes normally" do + start_time = Time.now + result = IO::Event::WorkerPool.busy(duration: 0.1) + end_time = Time.now + elapsed = end_time - start_time + + expect(result).to be_a(Hash) + expect(result[:cancelled]).to be == false + expect(result[:result]).to be == :completed + expect(elapsed).to be_within(0.05).of(0.1) + end + + it "can perform a busy operation with different durations" do + result = IO::Event::WorkerPool.busy(duration: 0.05) + + expect(result).to be_a(Hash) + expect(result[:cancelled]).to be == false + expect(result[:result]).to be == :completed + expect(result[:duration]).to be == 0.05 + end + + it "can cancel a busy operation using unblock function" do + # This tests the cancellation mechanism through rb_thread_call_without_gvl + completed = false + thread = Thread.new do + start_time = Time.now + result = IO::Event::WorkerPool.busy(duration: 1.0) # Long operation + end_time = Time.now + elapsed = end_time - start_time + completed = true + + {result: result, elapsed: elapsed} + end + + # Let it start, then kill the thread (which should trigger the unblock function) + sleep(0.1) + thread.kill + thread.join(0.5) # Wait up to 0.5s for thread to finish + + # The operation should have been interrupted before completion + expect(completed).to be == false + end + + it "provides accurate timing information" do + # Test that the timing is reasonably accurate + durations = [0.05, 0.1, 0.2] + + durations.each do |expected_duration| + start_time = Time.now + result = IO::Event::WorkerPool.busy(duration: expected_duration) + end_time = Time.now + actual_duration = end_time - start_time + + expect(result[:duration]).to be == expected_duration + expect(actual_duration).to be_within(0.05).of(expected_duration) + end + end + + it "can be cancelled when executed in a worker pool" do + result = nil + elapsed = nil + error = nil + + Thread.new do + Fiber.set_scheduler(scheduler) + + busy_fiber = Fiber.schedule do + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + result = IO::Event::WorkerPool.busy(duration: 2.0) + rescue Interrupt => error + # Ignore. + ensure + end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + elapsed = end_time - start_time + end + + Fiber.schedule do + sleep(0.5) + Fiber.scheduler.fiber_interrupt(busy_fiber, StandardError) + end + end.join + + expect(result[:cancelled]).to be == true + expect(elapsed).to be < 1.0 + expect(error).to be_nil + end + end end From 791b4edf6f25b8116f00cb53cff652c9605b585f Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 10 Jun 2025 08:43:19 +0900 Subject: [PATCH 06/13] Allow cancelled operation state to outlive scope of caller. --- ext/io/event/worker_pool_test.c | 94 +++++++++++++++++++++++++-------- test/io/event/worker_pool.rb | 16 ------ 2 files changed, 72 insertions(+), 38 deletions(-) diff --git a/ext/io/event/worker_pool_test.c b/ext/io/event/worker_pool_test.c index 5c3eb957..fefff8d9 100644 --- a/ext/io/event/worker_pool_test.c +++ b/ext/io/event/worker_pool_test.c @@ -5,6 +5,9 @@ #include "worker_pool_test.h" #include +#include +#include +#include #include #include @@ -20,12 +23,49 @@ struct BusyOperationData { clock_t end_time; int operation_result; VALUE exception; + + // Reference counting for safe heap management + _Atomic int ref_count; }; +// Reference counting functions for safe heap management +static struct BusyOperationData* busy_data_create(int read_fd, int write_fd, double duration) { + struct BusyOperationData *data = malloc(sizeof(struct BusyOperationData)); + if (!data) return NULL; + + memset(data, 0, sizeof(struct BusyOperationData)); + data->read_fd = read_fd; + data->write_fd = write_fd; + data->duration = duration; + data->exception = Qnil; + atomic_store(&data->ref_count, 1); + + return data; +} + +static struct BusyOperationData* busy_data_retain(struct BusyOperationData* data) { + if (data) { + atomic_fetch_add(&data->ref_count, 1); + } + return data; +} + +static void busy_data_release(struct BusyOperationData* data) { + if (data && atomic_fetch_sub(&data->ref_count, 1) == 1) { + // Last reference, safe to cleanup + close(data->read_fd); + close(data->write_fd); + free(data); + } +} + // The actual blocking operation that can be cancelled static void* busy_blocking_operation(void *data) { struct BusyOperationData *busy_data = (struct BusyOperationData*)data; + // Retain reference while we're using it + busy_data_retain(busy_data); + // Use select() to wait for the pipe to become readable fd_set read_fds; struct timeval timeout; @@ -43,30 +83,41 @@ static void* busy_blocking_operation(void *data) { // 3. An error occurs int result = select(busy_data->read_fd + 1, &read_fds, NULL, NULL, &timeout); + void* return_value; if (result > 0 && FD_ISSET(busy_data->read_fd, &read_fds)) { // Pipe became readable - we were cancelled char buffer; read(busy_data->read_fd, &buffer, 1); // Consume the byte busy_data->cancelled = 1; - return (void*)-1; // Indicate cancellation + return_value = (void*)-1; // Indicate cancellation } else if (result == 0) { // Timeout - operation completed normally - return (void*)0; // Indicate success + return_value = (void*)0; // Indicate success } else { // Error occurred - return (void*)-2; // Indicate error + return_value = (void*)-2; // Indicate error } + + // Release reference before returning + busy_data_release(busy_data); + return return_value; } // Unblock function that writes to the pipe to cancel the operation static void busy_unblock_function(void *data) { struct BusyOperationData *busy_data = (struct BusyOperationData*)data; + // Retain reference while we're using it + busy_data_retain(busy_data); + // Write a byte to the pipe to wake up the select() char wake_byte = 1; write(busy_data->write_fd, &wake_byte, 1); busy_data->cancelled = 1; + + // Release reference + busy_data_release(busy_data); } // Function for the main operation execution (for rb_rescue) @@ -134,29 +185,24 @@ static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) { rb_sys_fail("pipe creation failed"); } - // Stack allocate and initialize operation data with brace initialization - struct BusyOperationData busy_data = { - .read_fd = pipe_fds[0], - .write_fd = pipe_fds[1], - .duration = duration, - .exception = Qnil, - // All other fields are zero-initialized by default - }; + // Heap allocate operation data with reference counting + struct BusyOperationData *busy_data = busy_data_create(pipe_fds[0], pipe_fds[1], duration); + if (!busy_data) { + close(pipe_fds[0]); + close(pipe_fds[1]); + rb_raise(rb_eNoMemError, "failed to allocate busy operation data"); + } // Execute the blocking operation with exception handling using function pointers rb_rescue( busy_operation_execute, - (VALUE)&busy_data, + (VALUE)busy_data, busy_operation_rescue, - (VALUE)&busy_data + (VALUE)busy_data ); // Calculate elapsed time from the state stored in busy_data - double elapsed = ((double)(busy_data.end_time - busy_data.start_time)) / CLOCKS_PER_SEC; - - // Cleanup pipes - close(busy_data.read_fd); - close(busy_data.write_fd); + double elapsed = ((double)(busy_data->end_time - busy_data->start_time)) / CLOCKS_PER_SEC; // Create result hash using the state from busy_data VALUE result = rb_hash_new(); @@ -164,14 +210,14 @@ static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) { rb_hash_aset(result, ID2SYM(rb_intern("elapsed")), DBL2NUM(elapsed)); // Determine result based on operation outcome - if (busy_data.exception != Qnil) { + if (busy_data->exception != Qnil) { rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("exception"))); rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue); - rb_hash_aset(result, ID2SYM(rb_intern("exception")), busy_data.exception); - } else if (busy_data.operation_result == -1) { + rb_hash_aset(result, ID2SYM(rb_intern("exception")), busy_data->exception); + } else if (busy_data->operation_result == -1) { rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("cancelled"))); rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue); - } else if (busy_data.operation_result == 0) { + } else if (busy_data->operation_result == 0) { rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("completed"))); rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse); } else { @@ -179,6 +225,10 @@ static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) { rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse); } + // Release our reference to the busy_data + // The blocking operation and unblock function may still have references + busy_data_release(busy_data); + return result; } diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb index ba7a1333..a9533a18 100644 --- a/test/io/event/worker_pool.rb +++ b/test/io/event/worker_pool.rb @@ -82,7 +82,6 @@ expect(result).to be_a(Hash) expect(result[:cancelled]).to be == false expect(result[:result]).to be == :completed - expect(elapsed).to be_within(0.05).of(0.1) end it "can perform a busy operation with different durations" do @@ -115,22 +114,7 @@ # The operation should have been interrupted before completion expect(completed).to be == false end - - it "provides accurate timing information" do - # Test that the timing is reasonably accurate - durations = [0.05, 0.1, 0.2] - - durations.each do |expected_duration| - start_time = Time.now - result = IO::Event::WorkerPool.busy(duration: expected_duration) - end_time = Time.now - actual_duration = end_time - start_time - expect(result[:duration]).to be == expected_duration - expect(actual_duration).to be_within(0.05).of(expected_duration) - end - end - it "can be cancelled when executed in a worker pool" do result = nil elapsed = nil From c67bccc908111142ef79ddf8f6865992c891621b Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 10 Jun 2025 22:04:09 +0900 Subject: [PATCH 07/13] Cancellation fixes. --- ext/io/event/worker_pool.c | 55 ++++++++++++++++++++++----------- ext/io/event/worker_pool_test.c | 10 +++--- test/io/event/worker_pool.rb | 3 +- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c index de7371c2..7e1f66eb 100644 --- a/ext/io/event/worker_pool.c +++ b/ext/io/event/worker_pool.c @@ -14,6 +14,10 @@ #include #include +enum { + DEBUG = 0, +}; + // Forward declarations static VALUE IO_Event_WorkerPool; @@ -281,6 +285,15 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { return self; } +static VALUE worker_pool_work_begin(VALUE _work) { + struct IO_Event_WorkerPool_Work *work = (void*)_work; + + if (DEBUG) fprintf(stderr, "worker_pool_work_begin:rb_fiber_scheduler_block work=%p\n", work); + rb_fiber_scheduler_block(work->scheduler, work->blocker, Qnil); + + return Qnil; +} + // Ruby method to submit work and wait for completion static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) { struct IO_Event_WorkerPool *pool; @@ -308,34 +321,40 @@ static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) { } // Create work item - struct IO_Event_WorkerPool_Work *work = malloc(sizeof(struct IO_Event_WorkerPool_Work)); - if (!work) { - rb_raise(rb_eNoMemError, "Failed to allocate work item!"); - } - - work->blocking_operation = blocking_operation; - work->completed = false; - work->scheduler = scheduler; - work->blocker = self; - work->fiber = fiber; - work->next = NULL; - + struct IO_Event_WorkerPool_Work work = { + .blocking_operation = blocking_operation, + .completed = false, + .scheduler = scheduler, + .blocker = self, + .fiber = fiber, + .next = NULL + }; + // Enqueue work: pthread_mutex_lock(&pool->mutex); - enqueue_work(pool, work); + enqueue_work(pool, &work); pthread_cond_signal(&pool->work_available); pthread_mutex_unlock(&pool->mutex); - // Block the current fiber until work is completed + // Block the current fiber until work is completed: + int state; while (true) { - rb_fiber_scheduler_block(scheduler, work->blocker, Qnil); - - if (work->completed) { + rb_protect(worker_pool_work_begin, (VALUE)&work, &state); + + if (work.completed) { break; + } else { + if (DEBUG) fprintf(stderr, "worker_pool_call:rb_fiber_scheduler_blocking_operation_cancel\n"); + rb_fiber_scheduler_blocking_operation_cancel(blocking_operation); + // The work was not completed, we need to wait for it to be completed. } } - return Qtrue; + if (state) { + rb_jump_tag(state); + } else { + return Qtrue; + } } static VALUE worker_pool_allocate(VALUE klass) { diff --git a/ext/io/event/worker_pool_test.c b/ext/io/event/worker_pool_test.c index fefff8d9..ca78695e 100644 --- a/ext/io/event/worker_pool_test.c +++ b/ext/io/event/worker_pool_test.c @@ -62,7 +62,7 @@ static void busy_data_release(struct BusyOperationData* data) { // The actual blocking operation that can be cancelled static void* busy_blocking_operation(void *data) { struct BusyOperationData *busy_data = (struct BusyOperationData*)data; - + // Retain reference while we're using it busy_data_retain(busy_data); @@ -82,7 +82,7 @@ static void* busy_blocking_operation(void *data) { // 2. The timeout expires // 3. An error occurs int result = select(busy_data->read_fd + 1, &read_fds, NULL, NULL, &timeout); - + void* return_value; if (result > 0 && FD_ISSET(busy_data->read_fd, &read_fds)) { // Pipe became readable - we were cancelled @@ -110,12 +110,12 @@ static void busy_unblock_function(void *data) { // Retain reference while we're using it busy_data_retain(busy_data); + busy_data->cancelled = 1; + // Write a byte to the pipe to wake up the select() char wake_byte = 1; write(busy_data->write_fd, &wake_byte, 1); - - busy_data->cancelled = 1; - + // Release reference busy_data_release(busy_data); } diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb index a9533a18..597b60f7 100644 --- a/test/io/event/worker_pool.rb +++ b/test/io/event/worker_pool.rb @@ -39,7 +39,7 @@ expect(scheduler.worker_pool).to be_a(IO::Event::WorkerPool) end - it "intercepts IO::Buffer.copy operations larger than 1MiB" do + it "interrupts IO::Buffer.copy operations larger than 1MiB" do skip "IO::Buffer not available" unless defined?(IO::Buffer) # Create buffers larger than 1MiB to trigger GVL release @@ -116,6 +116,7 @@ end it "can be cancelled when executed in a worker pool" do + # puts Process.pid; $stdin.gets result = nil elapsed = nil error = nil From 161a70a4ea398a50116c85761ff73133bf378ddc Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 11 Jun 2025 00:00:19 +0900 Subject: [PATCH 08/13] Add release notes. --- releases.md | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/releases.md b/releases.md index a9370be1..bddf4a72 100644 --- a/releases.md +++ b/releases.md @@ -1,5 +1,31 @@ # Releases +## Unreleased + +### Introduce `IO::Event::WorkerPool` for off-loading blocking operations. + +The {ruby IO::Event::WorkerPool} provides a mechanism for executing blocking operations on separate OS threads while properly integrating with Ruby's fiber scheduler and GVL (Global VM Lock) management. This enables true parallelism for CPU-intensive or blocking operations that would otherwise block the event loop. + +```ruby +# Fiber scheduler integration via blocking_operation_wait hook +class MyScheduler + def initialize + @worker_pool = IO::Event::WorkerPool.new + end + + def blocking_operation_wait(operation) + @worker_pool.call(operation) + end +end + +# Usage with automatic offloading +Fiber.set_scheduler(MyScheduler.new) +# Automatically offload `rb_nogvl(..., RB_NOGVL_OFFLOAD_SAFE)` to a background thread: +result = some_blocking_operation() +``` + +The implementation uses one or more background threads and a list of pending blocking operations. Those operations either execute through to completion or may be cancelled, which executes the "unblock function" provided to `rb_nogvl`. + ## v1.10.2 - Improved consistency of handling closed IO when invoking `#select`. From aa7f4b945e80174d3727f398f5224ea85d26baac Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 11 Jun 2025 00:00:40 +0900 Subject: [PATCH 09/13] Remove dead code. --- test/io/event/worker_pool.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb index 597b60f7..0df640cf 100644 --- a/test/io/event/worker_pool.rb +++ b/test/io/event/worker_pool.rb @@ -116,7 +116,6 @@ end it "can be cancelled when executed in a worker pool" do - # puts Process.pid; $stdin.gets result = nil elapsed = nil error = nil From c06843a7e9660570726e56626963066eced46609 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 11 Jun 2025 00:10:22 +0900 Subject: [PATCH 10/13] Tidy up keyword argument handling. --- ext/io/event/worker_pool.c | 30 +++++++++++++------------ ext/io/event/worker_pool_test.c | 34 +++++++++++++++++------------ fixtures/io/event/test_scheduler.rb | 4 ++-- test/io/event/worker_pool.rb | 8 +++---- 4 files changed, 42 insertions(+), 34 deletions(-) diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c index 7e1f66eb..20df96d0 100644 --- a/ext/io/event/worker_pool.c +++ b/ext/io/event/worker_pool.c @@ -18,8 +18,8 @@ enum { DEBUG = 0, }; -// Forward declarations static VALUE IO_Event_WorkerPool; +static ID id_maximum_worker_count; // Thread pool structure struct IO_Event_WorkerPool_Worker { @@ -228,20 +228,19 @@ static int create_worker_thread(struct IO_Event_WorkerPool *pool) { // Ruby constructor for WorkerPool static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { + size_t maximum_worker_count = 1; // Default + + // Extract keyword arguments + VALUE kwargs = Qnil; VALUE rb_maximum_worker_count = Qnil; - size_t maximum_worker_count = 4; // Default - - // Handle keyword arguments - if (argc == 1 && RB_TYPE_P(argv[0], T_HASH)) { - VALUE hash = argv[0]; - VALUE max_threads_key = ID2SYM(rb_intern("max_threads")); - if (rb_hash_lookup(hash, max_threads_key) != Qnil) { - rb_maximum_worker_count = rb_hash_aref(hash, max_threads_key); - } - } else if (argc == 1) { - rb_maximum_worker_count = argv[0]; - } else if (argc > 1) { - rb_raise(rb_eArgError, "wrong number of arguments (given %d, expected 0..1)!", argc); + + rb_scan_args(argc, argv, "0:", &kwargs); + + if (!NIL_P(kwargs)) { + VALUE kwvals[1]; + ID kwkeys[1] = {id_maximum_worker_count}; + rb_get_kwargs(kwargs, kwkeys, 0, 1, kwvals); + rb_maximum_worker_count = kwvals[0]; } if (!NIL_P(rb_maximum_worker_count)) { @@ -403,6 +402,9 @@ static VALUE worker_pool_statistics(VALUE self) { } void Init_IO_Event_WorkerPool(VALUE IO_Event) { + // Initialize symbols + id_maximum_worker_count = rb_intern("maximum_worker_count"); + IO_Event_WorkerPool = rb_define_class_under(IO_Event, "WorkerPool", rb_cObject); rb_define_alloc_func(IO_Event_WorkerPool, worker_pool_allocate); diff --git a/ext/io/event/worker_pool_test.c b/ext/io/event/worker_pool_test.c index ca78695e..c73582bd 100644 --- a/ext/io/event/worker_pool_test.c +++ b/ext/io/event/worker_pool_test.c @@ -14,6 +14,8 @@ #include #include +static ID id_duration; + struct BusyOperationData { int read_fd; int write_fd; @@ -161,22 +163,23 @@ static VALUE busy_operation_rescue(VALUE data_value, VALUE exception) { // Ruby method: IO::Event::WorkerPool.busy(duration: 1.0) // This creates a cancellable blocking operation for testing static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) { - VALUE options = Qnil; double duration = 1.0; // Default 1 second - // Parse arguments - if (argc == 1) { - options = argv[0]; - if (RB_TYPE_P(options, T_HASH)) { - VALUE duration_value = rb_hash_aref(options, ID2SYM(rb_intern("duration"))); - if (!NIL_P(duration_value)) { - duration = NUM2DBL(duration_value); - } - } else { - duration = NUM2DBL(options); - } - } else if (argc > 1) { - rb_raise(rb_eArgError, "wrong number of arguments (given %d, expected 0..1)", argc); + // Extract keyword arguments + VALUE kwargs = Qnil; + VALUE rb_duration = Qnil; + + rb_scan_args(argc, argv, "0:", &kwargs); + + if (!NIL_P(kwargs)) { + VALUE kwvals[1]; + ID kwkeys[1] = {id_duration}; + rb_get_kwargs(kwargs, kwkeys, 0, 1, kwvals); + rb_duration = kwvals[0]; + } + + if (!NIL_P(rb_duration)) { + duration = NUM2DBL(rb_duration); } // Create pipe for cancellation @@ -234,6 +237,9 @@ static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) { // Initialize the test functions void Init_IO_Event_WorkerPool_Test(VALUE IO_Event_WorkerPool) { + // Initialize symbols + id_duration = rb_intern("duration"); + // Add test methods to IO::Event::WorkerPool class rb_define_singleton_method(IO_Event_WorkerPool, "busy", worker_pool_test_busy, -1); } diff --git a/fixtures/io/event/test_scheduler.rb b/fixtures/io/event/test_scheduler.rb index 04471b21..f324410c 100644 --- a/fixtures/io/event/test_scheduler.rb +++ b/fixtures/io/event/test_scheduler.rb @@ -18,7 +18,7 @@ module IO::Event # # # Or provide custom selector and/or worker pool # selector = IO::Event::Selector.new(Fiber.current) - # worker_pool = IO::Event::WorkerPool.new(max_threads: 4) + # worker_pool = IO::Event::WorkerPool.new(maximum_worker_count: 4) # scheduler = IO::Event::TestScheduler.new(selector: selector, worker_pool: worker_pool) # # Fiber.set_scheduler(scheduler) @@ -32,7 +32,7 @@ module IO::Event class TestScheduler def initialize(selector: nil, worker_pool: nil, max_threads: 2) @selector = selector || ::IO::Event::Selector.new(Fiber.current) - @worker_pool = worker_pool || WorkerPool.new(max_threads: max_threads) + @worker_pool = worker_pool || WorkerPool.new(maximum_worker_count: max_threads) @timers = ::IO::Event::Timers.new # Track the number of fibers that are blocked. diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb index 0df640cf..5dd94ee9 100644 --- a/test/io/event/worker_pool.rb +++ b/test/io/event/worker_pool.rb @@ -10,7 +10,7 @@ describe IO::Event::WorkerPool do with "an instance" do - let(:worker_pool) {subject.new(max_threads: 2)} + let(:worker_pool) {subject.new} it "can create a worker pool" do expect(worker_pool).to be_a(IO::Event::WorkerPool) @@ -24,7 +24,7 @@ expect(statistics).to be_a(Hash) expect(statistics).to have_keys( current_worker_count: be_a(Integer), - maximum_worker_count: be == 2, + maximum_worker_count: be == 1, current_queue_size: be == 0, shutdown: be == false ) @@ -32,7 +32,7 @@ end with "TestScheduler integration" do - let(:scheduler) {IO::Event::TestScheduler.new(max_threads: 1)} + let(:scheduler) {IO::Event::TestScheduler.new} it "can create a test scheduler" do expect(scheduler).to be_a(IO::Event::TestScheduler) @@ -71,7 +71,7 @@ end with "cancellable busy operation" do - let(:scheduler) {IO::Event::TestScheduler.new(max_threads: 1)} + let(:scheduler) {IO::Event::TestScheduler.new} it "can perform a busy operation that completes normally" do start_time = Time.now From e702183e52456af5976051f7311b595b59c4c9b0 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 11 Jun 2025 00:43:06 +0900 Subject: [PATCH 11/13] Remove reference counting implementation. --- ext/io/event/worker_pool_test.c | 93 +++++++++------------------------ 1 file changed, 24 insertions(+), 69 deletions(-) diff --git a/ext/io/event/worker_pool_test.c b/ext/io/event/worker_pool_test.c index c73582bd..4c913551 100644 --- a/ext/io/event/worker_pool_test.c +++ b/ext/io/event/worker_pool_test.c @@ -5,7 +5,6 @@ #include "worker_pool_test.h" #include -#include #include #include @@ -25,48 +24,11 @@ struct BusyOperationData { clock_t end_time; int operation_result; VALUE exception; - - // Reference counting for safe heap management - _Atomic int ref_count; }; -// Reference counting functions for safe heap management -static struct BusyOperationData* busy_data_create(int read_fd, int write_fd, double duration) { - struct BusyOperationData *data = malloc(sizeof(struct BusyOperationData)); - if (!data) return NULL; - - memset(data, 0, sizeof(struct BusyOperationData)); - data->read_fd = read_fd; - data->write_fd = write_fd; - data->duration = duration; - data->exception = Qnil; - atomic_store(&data->ref_count, 1); - - return data; -} - -static struct BusyOperationData* busy_data_retain(struct BusyOperationData* data) { - if (data) { - atomic_fetch_add(&data->ref_count, 1); - } - return data; -} - -static void busy_data_release(struct BusyOperationData* data) { - if (data && atomic_fetch_sub(&data->ref_count, 1) == 1) { - // Last reference, safe to cleanup - close(data->read_fd); - close(data->write_fd); - free(data); - } -} - // The actual blocking operation that can be cancelled static void* busy_blocking_operation(void *data) { struct BusyOperationData *busy_data = (struct BusyOperationData*)data; - - // Retain reference while we're using it - busy_data_retain(busy_data); // Use select() to wait for the pipe to become readable fd_set read_fds; @@ -85,41 +47,30 @@ static void* busy_blocking_operation(void *data) { // 3. An error occurs int result = select(busy_data->read_fd + 1, &read_fds, NULL, NULL, &timeout); - void* return_value; if (result > 0 && FD_ISSET(busy_data->read_fd, &read_fds)) { // Pipe became readable - we were cancelled char buffer; read(busy_data->read_fd, &buffer, 1); // Consume the byte busy_data->cancelled = 1; - return_value = (void*)-1; // Indicate cancellation + return (void*)-1; // Indicate cancellation } else if (result == 0) { // Timeout - operation completed normally - return_value = (void*)0; // Indicate success + return (void*)0; // Indicate success } else { // Error occurred - return_value = (void*)-2; // Indicate error + return (void*)-2; // Indicate error } - - // Release reference before returning - busy_data_release(busy_data); - return return_value; } // Unblock function that writes to the pipe to cancel the operation static void busy_unblock_function(void *data) { struct BusyOperationData *busy_data = (struct BusyOperationData*)data; - // Retain reference while we're using it - busy_data_retain(busy_data); - busy_data->cancelled = 1; // Write a byte to the pipe to wake up the select() char wake_byte = 1; write(busy_data->write_fd, &wake_byte, 1); - - // Release reference - busy_data_release(busy_data); } // Function for the main operation execution (for rb_rescue) @@ -188,24 +139,28 @@ static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) { rb_sys_fail("pipe creation failed"); } - // Heap allocate operation data with reference counting - struct BusyOperationData *busy_data = busy_data_create(pipe_fds[0], pipe_fds[1], duration); - if (!busy_data) { - close(pipe_fds[0]); - close(pipe_fds[1]); - rb_raise(rb_eNoMemError, "failed to allocate busy operation data"); - } + // Stack allocate operation data + struct BusyOperationData busy_data = { + .read_fd = pipe_fds[0], + .write_fd = pipe_fds[1], + .cancelled = 0, + .duration = duration, + .start_time = 0, + .end_time = 0, + .operation_result = 0, + .exception = Qnil + }; // Execute the blocking operation with exception handling using function pointers rb_rescue( busy_operation_execute, - (VALUE)busy_data, + (VALUE)&busy_data, busy_operation_rescue, - (VALUE)busy_data + (VALUE)&busy_data ); // Calculate elapsed time from the state stored in busy_data - double elapsed = ((double)(busy_data->end_time - busy_data->start_time)) / CLOCKS_PER_SEC; + double elapsed = ((double)(busy_data.end_time - busy_data.start_time)) / CLOCKS_PER_SEC; // Create result hash using the state from busy_data VALUE result = rb_hash_new(); @@ -213,14 +168,14 @@ static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) { rb_hash_aset(result, ID2SYM(rb_intern("elapsed")), DBL2NUM(elapsed)); // Determine result based on operation outcome - if (busy_data->exception != Qnil) { + if (busy_data.exception != Qnil) { rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("exception"))); rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue); - rb_hash_aset(result, ID2SYM(rb_intern("exception")), busy_data->exception); - } else if (busy_data->operation_result == -1) { + rb_hash_aset(result, ID2SYM(rb_intern("exception")), busy_data.exception); + } else if (busy_data.operation_result == -1) { rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("cancelled"))); rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue); - } else if (busy_data->operation_result == 0) { + } else if (busy_data.operation_result == 0) { rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("completed"))); rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse); } else { @@ -228,9 +183,9 @@ static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) { rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse); } - // Release our reference to the busy_data - // The blocking operation and unblock function may still have references - busy_data_release(busy_data); + // Clean up pipe file descriptors + close(pipe_fds[0]); + close(pipe_fds[1]); return result; } From b6a74a20b9fd7e82948c72caf2601ab73bc576ef Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 11 Jun 2025 07:47:34 +0900 Subject: [PATCH 12/13] Clean up last remaining use of `max_threads`. --- ext/io/event/worker_pool.c | 2 +- fixtures/io/event/test_scheduler.rb | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c index 20df96d0..b6fa0119 100644 --- a/ext/io/event/worker_pool.c +++ b/ext/io/event/worker_pool.c @@ -246,7 +246,7 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) { if (!NIL_P(rb_maximum_worker_count)) { maximum_worker_count = NUM2SIZET(rb_maximum_worker_count); if (maximum_worker_count == 0) { - rb_raise(rb_eArgError, "max_threads must be greater than 0!"); + rb_raise(rb_eArgError, "maximum_worker_count must be greater than 0!"); } } diff --git a/fixtures/io/event/test_scheduler.rb b/fixtures/io/event/test_scheduler.rb index f324410c..f5415d7c 100644 --- a/fixtures/io/event/test_scheduler.rb +++ b/fixtures/io/event/test_scheduler.rb @@ -30,9 +30,9 @@ module IO::Event # end.resume # ``` class TestScheduler - def initialize(selector: nil, worker_pool: nil, max_threads: 2) + def initialize(selector: nil, worker_pool: nil, maximum_worker_count: nil) @selector = selector || ::IO::Event::Selector.new(Fiber.current) - @worker_pool = worker_pool || WorkerPool.new(maximum_worker_count: max_threads) + @worker_pool = worker_pool || WorkerPool.new(maximum_worker_count: maximum_worker_count) @timers = ::IO::Event::Timers.new # Track the number of fibers that are blocked. From 8c3237b8840edf698876057728e9777cc48e9d82 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 11 Jun 2025 09:59:01 +0900 Subject: [PATCH 13/13] Add `WorkerPool#close`. --- ext/io/event/worker_pool.c | 46 +++++++++++++++++++++++++++++ fixtures/io/event/test_scheduler.rb | 2 +- test/io/event/worker_pool.rb | 30 +++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) diff --git a/ext/io/event/worker_pool.c b/ext/io/event/worker_pool.c index b6fa0119..a6975f4c 100644 --- a/ext/io/event/worker_pool.c +++ b/ext/io/event/worker_pool.c @@ -366,6 +366,51 @@ static VALUE worker_pool_allocate(VALUE klass) { return self; } +// Ruby method to close the worker pool +static VALUE worker_pool_close(VALUE self) { + struct IO_Event_WorkerPool *pool; + TypedData_Get_Struct(self, struct IO_Event_WorkerPool, &IO_Event_WorkerPool_type, pool); + + if (!pool) { + rb_raise(rb_eRuntimeError, "WorkerPool not initialized!"); + } + + if (pool->shutdown) { + return Qnil; // Already closed + } + + // Signal shutdown to all workers + pthread_mutex_lock(&pool->mutex); + pool->shutdown = true; + pthread_cond_broadcast(&pool->work_available); + pthread_mutex_unlock(&pool->mutex); + + // Wait for all worker threads to finish + struct IO_Event_WorkerPool_Worker *worker = pool->workers; + while (worker) { + if (!NIL_P(worker->thread)) { + rb_funcall(worker->thread, rb_intern("join"), 0); + } + worker = worker->next; + } + + // Clean up worker structures + worker = pool->workers; + while (worker) { + struct IO_Event_WorkerPool_Worker *next = worker->next; + free(worker); + worker = next; + } + pool->workers = NULL; + pool->current_worker_count = 0; + + // Clean up mutex and condition variable + pthread_mutex_destroy(&pool->mutex); + pthread_cond_destroy(&pool->work_available); + + return Qnil; +} + // Test helper: get pool statistics for debugging/testing static VALUE worker_pool_statistics(VALUE self) { struct IO_Event_WorkerPool *pool; @@ -410,6 +455,7 @@ void Init_IO_Event_WorkerPool(VALUE IO_Event) { rb_define_method(IO_Event_WorkerPool, "initialize", worker_pool_initialize, -1); rb_define_method(IO_Event_WorkerPool, "call", worker_pool_call, 1); + rb_define_method(IO_Event_WorkerPool, "close", worker_pool_close, 0); rb_define_method(IO_Event_WorkerPool, "statistics", worker_pool_statistics, 0); diff --git a/fixtures/io/event/test_scheduler.rb b/fixtures/io/event/test_scheduler.rb index f5415d7c..47345d01 100644 --- a/fixtures/io/event/test_scheduler.rb +++ b/fixtures/io/event/test_scheduler.rb @@ -54,7 +54,7 @@ def blocking_operation_wait(operation) # Required fiber scheduler hooks def close @selector&.close - # WorkerPool doesn't have a close method, just clear the reference + @worker_pool&.close @worker_pool = nil end diff --git a/test/io/event/worker_pool.rb b/test/io/event/worker_pool.rb index 5dd94ee9..fdbb697a 100644 --- a/test/io/event/worker_pool.rb +++ b/test/io/event/worker_pool.rb @@ -12,6 +12,10 @@ with "an instance" do let(:worker_pool) {subject.new} + after do + worker_pool&.close + end + it "can create a worker pool" do expect(worker_pool).to be_a(IO::Event::WorkerPool) end @@ -29,6 +33,32 @@ shutdown: be == false ) end + + it "can close the worker pool" do + pool = worker_pool + + # Check that it's not shut down initially + expect(pool.statistics[:shutdown]).to be == false + + # Close the pool + result = pool.close + expect(result).to be_nil + + # Check that it's now shut down + expect(pool.statistics[:shutdown]).to be == true + expect(pool.statistics[:current_worker_count]).to be == 0 + end + + it "can close the worker pool multiple times safely" do + pool = worker_pool + + # Close the pool twice + pool.close + pool.close + + # Should still be shut down + expect(pool.statistics[:shutdown]).to be == true + end end with "TestScheduler integration" do