Skip to content

Commit 3e3075a

Browse files
Add better cancellation tests.
1 parent 199170e commit 3e3075a

8 files changed

Lines changed: 306 additions & 89 deletions

File tree

ext/extconf.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
if have_header("pthread.h")
6565
append_cflags(["-DHAVE_IO_EVENT_WORKER_POOL"])
6666
$srcs << "io/event/worker_pool.c"
67+
$srcs << "io/event/worker_pool_test.c"
6768
end
6869
end
6970

ext/io/event/worker_pool.c

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
// Copyright, 2025, by Samuel Williams.
33

44
#include "worker_pool.h"
5+
#include "worker_pool_test.h"
56
#include "fiber.h"
67

78
#include <ruby/thread.h>
9+
#include <ruby/fiber/scheduler.h>
810

911
#include <pthread.h>
1012
#include <stdbool.h>
@@ -66,33 +68,17 @@ static void worker_pool_free(void *ptr) {
6668
struct IO_Event_WorkerPool *pool = (struct IO_Event_WorkerPool *)ptr;
6769

6870
if (pool) {
69-
// Signal shutdown
70-
pthread_mutex_lock(&pool->mutex);
71-
pool->shutdown = true;
72-
pthread_cond_broadcast(&pool->work_available);
73-
pthread_mutex_unlock(&pool->mutex);
74-
75-
// Wait for all workers to finish
76-
struct IO_Event_WorkerPool_Worker *thread = pool->workers;
77-
while (thread) {
78-
rb_funcall(thread->thread, rb_intern("join"), 0);
79-
struct IO_Event_WorkerPool_Worker *next = thread->next;
80-
free(thread);
81-
thread = next;
82-
}
83-
84-
// Clean up work queue
85-
struct IO_Event_WorkerPool_Work *work = pool->work_queue;
86-
while (work) {
87-
struct IO_Event_WorkerPool_Work *next = work->next;
88-
free(work);
89-
work = next;
71+
// Signal shutdown to all workers
72+
if (!pool->shutdown) {
73+
pthread_mutex_lock(&pool->mutex);
74+
pool->shutdown = true;
75+
pthread_cond_broadcast(&pool->work_available);
76+
pthread_mutex_unlock(&pool->mutex);
9077
}
9178

92-
pthread_mutex_destroy(&pool->mutex);
93-
pthread_cond_destroy(&pool->work_available);
94-
95-
free(pool);
79+
// Note: We don't free worker structures or wait for threads during GC
80+
// as this can cause deadlocks. The Ruby GC will handle the thread objects.
81+
// Workers will see the shutdown flag and exit cleanly.
9682
}
9783
}
9884

@@ -405,4 +391,7 @@ void Init_IO_Event_WorkerPool(VALUE IO_Event) {
405391
rb_define_method(IO_Event_WorkerPool, "call", worker_pool_call, 1);
406392

407393
rb_define_method(IO_Event_WorkerPool, "statistics", worker_pool_statistics, 0);
394+
395+
// Initialize test functions
396+
Init_IO_Event_WorkerPool_Test(IO_Event_WorkerPool);
408397
}

ext/io/event/worker_pool.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,4 @@
55

66
#include <ruby.h>
77

8-
#ifdef HAVE_RB_FIBER_SCHEDULER_BLOCKING_OPERATION_EXTRACT
9-
#include <ruby/fiber/scheduler.h>
10-
#endif
11-
128
void Init_IO_Event_WorkerPool(VALUE IO_Event);

ext/io/event/worker_pool_test.c

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
// worker_pool_test.c - Test functions for WorkerPool cancellation
2+
// Released under the MIT License.
3+
// Copyright, 2025, by Samuel Williams.
4+
5+
#include "worker_pool_test.h"
6+
7+
#include <ruby/thread.h>
8+
9+
#include <unistd.h>
10+
#include <sys/select.h>
11+
#include <errno.h>
12+
#include <time.h>
13+
14+
struct BusyOperationData {
15+
int read_fd;
16+
int write_fd;
17+
volatile int cancelled;
18+
double duration; // How long to wait (for testing)
19+
clock_t start_time;
20+
clock_t end_time;
21+
int operation_result;
22+
VALUE exception;
23+
};
24+
25+
// The actual blocking operation that can be cancelled
26+
static void* busy_blocking_operation(void *data) {
27+
struct BusyOperationData *busy_data = (struct BusyOperationData*)data;
28+
29+
// Use select() to wait for the pipe to become readable
30+
fd_set read_fds;
31+
struct timeval timeout;
32+
33+
FD_ZERO(&read_fds);
34+
FD_SET(busy_data->read_fd, &read_fds);
35+
36+
// Set timeout based on duration
37+
timeout.tv_sec = (long)busy_data->duration;
38+
timeout.tv_usec = ((busy_data->duration - timeout.tv_sec) * 1000000);
39+
40+
// This will block until:
41+
// 1. The pipe becomes readable (cancellation)
42+
// 2. The timeout expires
43+
// 3. An error occurs
44+
int result = select(busy_data->read_fd + 1, &read_fds, NULL, NULL, &timeout);
45+
46+
if (result > 0 && FD_ISSET(busy_data->read_fd, &read_fds)) {
47+
// Pipe became readable - we were cancelled
48+
char buffer;
49+
read(busy_data->read_fd, &buffer, 1); // Consume the byte
50+
busy_data->cancelled = 1;
51+
return (void*)-1; // Indicate cancellation
52+
} else if (result == 0) {
53+
// Timeout - operation completed normally
54+
return (void*)0; // Indicate success
55+
} else {
56+
// Error occurred
57+
return (void*)-2; // Indicate error
58+
}
59+
}
60+
61+
// Unblock function that writes to the pipe to cancel the operation
62+
static void busy_unblock_function(void *data) {
63+
struct BusyOperationData *busy_data = (struct BusyOperationData*)data;
64+
65+
// Write a byte to the pipe to wake up the select()
66+
char wake_byte = 1;
67+
write(busy_data->write_fd, &wake_byte, 1);
68+
69+
busy_data->cancelled = 1;
70+
}
71+
72+
// Function for the main operation execution (for rb_rescue)
73+
static VALUE busy_operation_execute(VALUE data_value) {
74+
struct BusyOperationData *busy_data = (struct BusyOperationData*)data_value;
75+
76+
// Record start time
77+
busy_data->start_time = clock();
78+
79+
// Execute the blocking operation
80+
void *block_result = rb_nogvl(
81+
busy_blocking_operation,
82+
busy_data,
83+
busy_unblock_function,
84+
busy_data,
85+
RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_OFFLOAD_SAFE
86+
);
87+
88+
// Record end time
89+
busy_data->end_time = clock();
90+
91+
// Store the operation result
92+
busy_data->operation_result = (int)(intptr_t)block_result;
93+
94+
return Qnil;
95+
}
96+
97+
// Function for exception handling (for rb_rescue)
98+
static VALUE busy_operation_rescue(VALUE data_value, VALUE exception) {
99+
struct BusyOperationData *busy_data = (struct BusyOperationData*)data_value;
100+
101+
// Record end time even in case of exception
102+
busy_data->end_time = clock();
103+
104+
// Mark that an exception was caught
105+
busy_data->exception = exception;
106+
107+
return exception;
108+
}
109+
110+
// Ruby method: IO::Event::WorkerPool.busy(duration: 1.0)
111+
// This creates a cancellable blocking operation for testing
112+
static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) {
113+
VALUE options = Qnil;
114+
double duration = 1.0; // Default 1 second
115+
116+
// Parse arguments
117+
if (argc == 1) {
118+
options = argv[0];
119+
if (RB_TYPE_P(options, T_HASH)) {
120+
VALUE duration_value = rb_hash_aref(options, ID2SYM(rb_intern("duration")));
121+
if (!NIL_P(duration_value)) {
122+
duration = NUM2DBL(duration_value);
123+
}
124+
} else {
125+
duration = NUM2DBL(options);
126+
}
127+
} else if (argc > 1) {
128+
rb_raise(rb_eArgError, "wrong number of arguments (given %d, expected 0..1)", argc);
129+
}
130+
131+
// Create pipe for cancellation
132+
int pipe_fds[2];
133+
if (pipe(pipe_fds) != 0) {
134+
rb_sys_fail("pipe creation failed");
135+
}
136+
137+
// Stack allocate and initialize operation data with brace initialization
138+
struct BusyOperationData busy_data = {
139+
.read_fd = pipe_fds[0],
140+
.write_fd = pipe_fds[1],
141+
.duration = duration,
142+
.exception = Qnil,
143+
// All other fields are zero-initialized by default
144+
};
145+
146+
// Execute the blocking operation with exception handling using function pointers
147+
rb_rescue(
148+
busy_operation_execute,
149+
(VALUE)&busy_data,
150+
busy_operation_rescue,
151+
(VALUE)&busy_data
152+
);
153+
154+
// Calculate elapsed time from the state stored in busy_data
155+
double elapsed = ((double)(busy_data.end_time - busy_data.start_time)) / CLOCKS_PER_SEC;
156+
157+
// Cleanup pipes
158+
close(busy_data.read_fd);
159+
close(busy_data.write_fd);
160+
161+
// Create result hash using the state from busy_data
162+
VALUE result = rb_hash_new();
163+
rb_hash_aset(result, ID2SYM(rb_intern("duration")), DBL2NUM(duration));
164+
rb_hash_aset(result, ID2SYM(rb_intern("elapsed")), DBL2NUM(elapsed));
165+
166+
// Determine result based on operation outcome
167+
if (busy_data.exception != Qnil) {
168+
rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("exception")));
169+
rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue);
170+
rb_hash_aset(result, ID2SYM(rb_intern("exception")), busy_data.exception);
171+
} else if (busy_data.operation_result == -1) {
172+
rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("cancelled")));
173+
rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue);
174+
} else if (busy_data.operation_result == 0) {
175+
rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("completed")));
176+
rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse);
177+
} else {
178+
rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("error")));
179+
rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse);
180+
}
181+
182+
return result;
183+
}
184+
185+
// Initialize the test functions
186+
void Init_IO_Event_WorkerPool_Test(VALUE IO_Event_WorkerPool) {
187+
// Add test methods to IO::Event::WorkerPool class
188+
rb_define_singleton_method(IO_Event_WorkerPool, "busy", worker_pool_test_busy, -1);
189+
}

ext/io/event/worker_pool_test.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// worker_pool_test.h - Header for WorkerPool test functions
2+
// Released under the MIT License.
3+
// Copyright, 2025, by Samuel Williams.
4+
5+
#pragma once
6+
7+
#include <ruby.h>
8+
9+
void Init_IO_Event_WorkerPool_Test(VALUE IO_Event_WorkerPool);

fixtures/io/event/test_scheduler.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def kernel_sleep(duration = nil)
128128
end
129129

130130
def fiber(&block)
131-
Fiber.new(&block).tap(&:resume)
131+
Fiber.new(&block).tap(&:transfer)
132132
end
133133

134134
# Run the scheduler event loop

lib/io/event/test_scheduler.rb

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)