Skip to content

Commit 199170e

Browse files
Better cancellation.
1 parent e7519d0 commit 199170e

4 files changed

Lines changed: 204 additions & 106 deletions

File tree

ext/io/event/worker_pool.c

Lines changed: 119 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#include "worker_pool.h"
55
#include "fiber.h"
66

7+
#include <ruby/thread.h>
8+
79
#include <pthread.h>
810
#include <stdbool.h>
911
#include <stdlib.h>
@@ -15,24 +17,35 @@ static VALUE IO_Event_WorkerPool;
1517

1618
// Thread pool structure
1719
struct IO_Event_WorkerPool_Worker {
18-
pthread_t thread;
20+
VALUE thread;
21+
22+
// Flag to indicate this specific worker should exit:
23+
bool interrupted;
24+
25+
// Currently executing operation:
26+
rb_fiber_scheduler_blocking_operation_t *current_blocking_operation;
27+
1928
struct IO_Event_WorkerPool *pool;
2029
struct IO_Event_WorkerPool_Worker *next;
2130
};
2231

2332
// Work item structure
2433
struct IO_Event_WorkerPool_Work {
2534
rb_fiber_scheduler_blocking_operation_t *blocking_operation;
35+
2636
bool completed;
2737

38+
VALUE scheduler;
39+
VALUE blocker;
40+
VALUE fiber;
41+
2842
struct IO_Event_WorkerPool_Work *next;
2943
};
3044

3145
// Worker pool structure
3246
struct IO_Event_WorkerPool {
3347
pthread_mutex_t mutex;
3448
pthread_cond_t work_available;
35-
pthread_cond_t work_completed;
3649

3750
struct IO_Event_WorkerPool_Work *work_queue;
3851
struct IO_Event_WorkerPool_Work *work_queue_tail;
@@ -62,7 +75,7 @@ static void worker_pool_free(void *ptr) {
6275
// Wait for all workers to finish
6376
struct IO_Event_WorkerPool_Worker *thread = pool->workers;
6477
while (thread) {
65-
pthread_join(thread->thread, NULL);
78+
rb_funcall(thread->thread, rb_intern("join"), 0);
6679
struct IO_Event_WorkerPool_Worker *next = thread->next;
6780
free(thread);
6881
thread = next;
@@ -78,7 +91,6 @@ static void worker_pool_free(void *ptr) {
7891

7992
pthread_mutex_destroy(&pool->mutex);
8093
pthread_cond_destroy(&pool->work_available);
81-
pthread_cond_destroy(&pool->work_completed);
8294

8395
free(pool);
8496
}
@@ -96,51 +108,104 @@ static const rb_data_type_t IO_Event_WorkerPool_type = {
96108
0, 0, RUBY_TYPED_FREE_IMMEDIATELY
97109
};
98110

99-
// Worker thread function
100-
static void* worker_thread_func(void *arg) {
101-
struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)arg;
111+
// Helper function to enqueue work (must be called with mutex held)
112+
static void enqueue_work(struct IO_Event_WorkerPool *pool, struct IO_Event_WorkerPool_Work *work) {
113+
if (pool->work_queue_tail) {
114+
pool->work_queue_tail->next = work;
115+
} else {
116+
pool->work_queue = work;
117+
}
118+
pool->work_queue_tail = work;
119+
}
120+
121+
// Helper function to dequeue work (must be called with mutex held)
122+
static struct IO_Event_WorkerPool_Work *dequeue_work(struct IO_Event_WorkerPool *pool) {
123+
struct IO_Event_WorkerPool_Work *work = pool->work_queue;
124+
if (work) {
125+
pool->work_queue = work->next;
126+
if (!pool->work_queue) {
127+
pool->work_queue_tail = NULL;
128+
}
129+
work->next = NULL; // Clear the next pointer for safety
130+
}
131+
return work;
132+
}
133+
134+
// Unblock function to interrupt a specific worker.
135+
static void worker_unblock_func(void *_worker) {
136+
struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker;
137+
struct IO_Event_WorkerPool *pool = worker->pool;
138+
139+
// Mark this specific worker as interrupted
140+
pthread_mutex_lock(&pool->mutex);
141+
worker->interrupted = true;
142+
pthread_cond_broadcast(&pool->work_available);
143+
pthread_mutex_unlock(&pool->mutex);
144+
145+
// If there's a currently executing blocking operation, cancel it
146+
if (worker->current_blocking_operation) {
147+
rb_fiber_scheduler_blocking_operation_cancel(worker->current_blocking_operation);
148+
}
149+
}
150+
151+
// Function to wait for work and execute it without GVL.
152+
static void *worker_wait_and_execute(void *_worker) {
153+
struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker;
102154
struct IO_Event_WorkerPool *pool = worker->pool;
103155

104156
while (true) {
105157
struct IO_Event_WorkerPool_Work *work = NULL;
106158

107159
pthread_mutex_lock(&pool->mutex);
108160

109-
// Wait for work or shutdown
110-
while (!pool->work_queue && !pool->shutdown) {
161+
// Wait for work, shutdown, or interruption
162+
while (!pool->work_queue && !pool->shutdown && !worker->interrupted) {
111163
pthread_cond_wait(&pool->work_available, &pool->mutex);
112164
}
113165

114-
if (pool->shutdown) {
166+
if (pool->shutdown || worker->interrupted) {
115167
pthread_mutex_unlock(&pool->mutex);
116168
break;
117169
}
118170

119-
// Dequeue work item
120-
if (pool->work_queue) {
121-
work = pool->work_queue;
122-
pool->work_queue = work->next;
123-
if (!pool->work_queue) {
124-
pool->work_queue_tail = NULL;
125-
}
126-
}
171+
work = dequeue_work(pool);
127172

128173
pthread_mutex_unlock(&pool->mutex);
129174

130-
// Execute work
175+
// Execute work WITHOUT GVL (this is the whole point!)
131176
if (work) {
177+
worker->current_blocking_operation = work->blocking_operation;
132178
rb_fiber_scheduler_blocking_operation_execute(work->blocking_operation);
133-
134-
// Mark work as completed (mutex required for worker thread)
135-
pthread_mutex_lock(&pool->mutex);
136-
work->completed = true;
137-
pool->completed_count++;
138-
pthread_cond_signal(&pool->work_completed);
139-
pthread_mutex_unlock(&pool->mutex);
179+
worker->current_blocking_operation = NULL;
140180
}
181+
182+
return work;
141183
}
142184

143-
return NULL;
185+
return NULL; // Shutdown signal
186+
}
187+
188+
static VALUE worker_thread_func(void *_worker) {
189+
struct IO_Event_WorkerPool_Worker *worker = (struct IO_Event_WorkerPool_Worker *)_worker;
190+
191+
while (true) {
192+
// Wait for work and execute it without holding GVL
193+
struct IO_Event_WorkerPool_Work *work = (struct IO_Event_WorkerPool_Work *)rb_thread_call_without_gvl(worker_wait_and_execute, worker, worker_unblock_func, worker);
194+
195+
if (!work) {
196+
// Shutdown signal received
197+
break;
198+
}
199+
200+
// Protected by GVL:
201+
work->completed = true;
202+
worker->pool->completed_count++;
203+
204+
// Work was executed without GVL, now unblock the waiting fiber (we have GVL here)
205+
rb_fiber_scheduler_unblock(work->scheduler, work->blocker, work->fiber);
206+
}
207+
208+
return Qnil;
144209
}
145210

146211
// Create a new worker thread
@@ -155,9 +220,12 @@ static int create_worker_thread(struct IO_Event_WorkerPool *pool) {
155220
}
156221

157222
worker->pool = pool;
223+
worker->interrupted = false;
224+
worker->current_blocking_operation = NULL;
158225
worker->next = pool->workers;
159226

160-
if (pthread_create(&worker->thread, NULL, worker_thread_func, worker) != 0) {
227+
worker->thread = rb_thread_create(worker_thread_func, worker);
228+
if (NIL_P(worker->thread)) {
161229
free(worker);
162230
return -1;
163231
}
@@ -203,7 +271,6 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) {
203271

204272
pthread_mutex_init(&pool->mutex, NULL);
205273
pthread_cond_init(&pool->work_available, NULL);
206-
pthread_cond_init(&pool->work_completed, NULL);
207274

208275
pool->work_queue = NULL;
209276
pool->work_queue_tail = NULL;
@@ -228,44 +295,6 @@ static VALUE worker_pool_initialize(int argc, VALUE *argv, VALUE self) {
228295
return self;
229296
}
230297

231-
// Structure to pass both work and pool to rb_ensure functions
232-
struct worker_pool_call_arguments {
233-
struct IO_Event_WorkerPool_Work *work;
234-
struct IO_Event_WorkerPool *pool;
235-
};
236-
237-
// Cleanup function for rb_ensure
238-
static VALUE worker_pool_call_cleanup(VALUE _arguments) {
239-
struct worker_pool_call_arguments *arguments = (struct worker_pool_call_arguments *)_arguments;
240-
if (arguments && arguments->work) {
241-
// Cancel the blocking operation if possible
242-
if (arguments->work->blocking_operation) {
243-
rb_fiber_scheduler_blocking_operation_cancel(arguments->work->blocking_operation);
244-
245-
// Increment cancelled count (protected by GVL)
246-
arguments->pool->cancelled_count++;
247-
}
248-
free(arguments->work);
249-
}
250-
return Qnil;
251-
}
252-
253-
// Main work execution function
254-
static VALUE worker_pool_call_body(VALUE _arguments) {
255-
struct worker_pool_call_arguments *arguments = (struct worker_pool_call_arguments *)_arguments;
256-
struct IO_Event_WorkerPool_Work *work = arguments->work;
257-
struct IO_Event_WorkerPool *pool = arguments->pool;
258-
259-
// Wait for completion
260-
pthread_mutex_lock(&pool->mutex);
261-
while (!work->completed) {
262-
pthread_cond_wait(&pool->work_completed, &pool->mutex);
263-
}
264-
pthread_mutex_unlock(&pool->mutex);
265-
266-
return Qnil;
267-
}
268-
269298
// Ruby method to submit work and wait for completion
270299
static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) {
271300
struct IO_Event_WorkerPool *pool;
@@ -278,6 +307,13 @@ static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) {
278307
// Increment call count (protected by GVL)
279308
pool->call_count++;
280309

310+
// Get current fiber and scheduler
311+
VALUE fiber = rb_fiber_current();
312+
VALUE scheduler = rb_fiber_scheduler_current();
313+
if (NIL_P(scheduler)) {
314+
rb_raise(rb_eRuntimeError, "WorkerPool requires a fiber scheduler!");
315+
}
316+
281317
// Extract blocking operation handle
282318
rb_fiber_scheduler_blocking_operation_t *blocking_operation = rb_fiber_scheduler_blocking_operation_extract(_blocking_operation);
283319

@@ -293,26 +329,27 @@ static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) {
293329

294330
work->blocking_operation = blocking_operation;
295331
work->completed = false;
332+
work->scheduler = scheduler;
333+
work->blocker = self;
334+
work->fiber = fiber;
296335
work->next = NULL;
297336

298-
// Enqueue work
337+
// Enqueue work:
299338
pthread_mutex_lock(&pool->mutex);
300-
301-
if (pool->work_queue_tail) {
302-
pool->work_queue_tail->next = work;
303-
} else {
304-
pool->work_queue = work;
305-
}
306-
pool->work_queue_tail = work;
307-
339+
enqueue_work(pool, work);
308340
pthread_cond_signal(&pool->work_available);
309341
pthread_mutex_unlock(&pool->mutex);
310342

311-
// Wait for completion with proper cleanup using rb_ensure
312-
struct worker_pool_call_arguments arguments = {work, pool};
313-
rb_ensure(worker_pool_call_body, (VALUE)&arguments, worker_pool_call_cleanup, (VALUE)&arguments);
343+
// Block the current fiber until work is completed
344+
while (true) {
345+
rb_fiber_scheduler_block(scheduler, work->blocker, Qnil);
346+
347+
if (work->completed) {
348+
break;
349+
}
350+
}
314351

315-
return Qnil;
352+
return Qtrue;
316353
}
317354

318355
static VALUE worker_pool_allocate(VALUE klass) {
@@ -363,7 +400,9 @@ static VALUE worker_pool_statistics(VALUE self) {
363400
void Init_IO_Event_WorkerPool(VALUE IO_Event) {
364401
IO_Event_WorkerPool = rb_define_class_under(IO_Event, "WorkerPool", rb_cObject);
365402
rb_define_alloc_func(IO_Event_WorkerPool, worker_pool_allocate);
403+
366404
rb_define_method(IO_Event_WorkerPool, "initialize", worker_pool_initialize, -1);
367405
rb_define_method(IO_Event_WorkerPool, "call", worker_pool_call, 1);
406+
368407
rb_define_method(IO_Event_WorkerPool, "statistics", worker_pool_statistics, 0);
369408
}

fixtures/io/event/test_scheduler.rb

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,23 @@ module IO::Event
1212
# blocking operations to a WorkerPool instance for testing.
1313
#
1414
# @example Testing usage
15-
# # Create with default selector and worker pool
16-
# scheduler = IO::Event::TestScheduler.new
17-
#
18-
# # Or provide custom selector and/or worker pool
19-
# selector = IO::Event::Selector.new(Fiber.current)
20-
# worker_pool = IO::Event::WorkerPool.new(max_threads: 4)
21-
# scheduler = IO::Event::TestScheduler.new(selector: selector, worker_pool: worker_pool)
22-
#
23-
# Fiber.set_scheduler(scheduler)
24-
#
25-
# # Standard Ruby operations that use rb_nogvl will be handled by the worker pool
26-
# # Examples: sleep, file I/O, network operations, etc.
27-
# Fiber.schedule do
28-
# sleep(0.001) # This triggers rb_nogvl and uses the worker pool
29-
# end.resume
15+
# ```ruby
16+
# # Create with default selector and worker pool
17+
# scheduler = IO::Event::TestScheduler.new
18+
#
19+
# # Or provide custom selector and/or worker pool
20+
# selector = IO::Event::Selector.new(Fiber.current)
21+
# worker_pool = IO::Event::WorkerPool.new(max_threads: 4)
22+
# scheduler = IO::Event::TestScheduler.new(selector: selector, worker_pool: worker_pool)
23+
#
24+
# Fiber.set_scheduler(scheduler)
25+
#
26+
# # Standard Ruby operations that use rb_nogvl will be handled by the worker pool
27+
# # Examples: sleep, file I/O, network operations, etc.
28+
# Fiber.schedule do
29+
# sleep(0.001) # This triggers rb_nogvl and uses the worker pool
30+
# end.resume
31+
# ```
3032
class TestScheduler
3133
def initialize(selector: nil, worker_pool: nil, max_threads: 2)
3234
@selector = selector || ::IO::Event::Selector.new(Fiber.current)
@@ -35,9 +37,6 @@ def initialize(selector: nil, worker_pool: nil, max_threads: 2)
3537

3638
# Track the number of fibers that are blocked.
3739
@blocked = 0
38-
39-
# Track how many times blocking_operation_wait was called.
40-
@blocking_operation_count = 0
4140
end
4241

4342
# @attribute [WorkerPool] The worker pool used for executing blocking operations.
@@ -134,12 +133,18 @@ def fiber(&block)
134133

135134
# Run the scheduler event loop
136135
def run
137-
while @blocked > 0 || @timers.size > 0
136+
while @blocked > 0 or @timers.size > 0
138137
interval = @timers.wait_interval
139138
@selector.select(interval)
140139
@timers.fire
141140
end
142141
end
142+
143+
def scheduler_close(error = $!)
144+
self.run
145+
ensure
146+
self.close
147+
end
143148

144149
private
145150

0 commit comments

Comments
 (0)