Skip to content

Commit 33e6b0b

Browse files
Allow cancelled operation state to outlive scope of caller.
1 parent 3e3075a commit 33e6b0b

1 file changed

Lines changed: 72 additions & 22 deletions

File tree

ext/io/event/worker_pool_test.c

Lines changed: 72 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
#include "worker_pool_test.h"
66

77
#include <ruby/thread.h>
8+
#include <stdatomic.h>
9+
#include <stdlib.h>
10+
#include <string.h>
811

912
#include <unistd.h>
1013
#include <sys/select.h>
@@ -20,12 +23,49 @@ struct BusyOperationData {
2023
clock_t end_time;
2124
int operation_result;
2225
VALUE exception;
26+
27+
// Reference counting for safe heap management
28+
_Atomic int ref_count;
2329
};
2430

31+
// Reference counting functions for safe heap management
32+
static struct BusyOperationData* busy_data_create(int read_fd, int write_fd, double duration) {
33+
struct BusyOperationData *data = malloc(sizeof(struct BusyOperationData));
34+
if (!data) return NULL;
35+
36+
memset(data, 0, sizeof(struct BusyOperationData));
37+
data->read_fd = read_fd;
38+
data->write_fd = write_fd;
39+
data->duration = duration;
40+
data->exception = Qnil;
41+
atomic_store(&data->ref_count, 1);
42+
43+
return data;
44+
}
45+
46+
static struct BusyOperationData* busy_data_retain(struct BusyOperationData* data) {
47+
if (data) {
48+
atomic_fetch_add(&data->ref_count, 1);
49+
}
50+
return data;
51+
}
52+
53+
static void busy_data_release(struct BusyOperationData* data) {
54+
if (data && atomic_fetch_sub(&data->ref_count, 1) == 1) {
55+
// Last reference, safe to cleanup
56+
close(data->read_fd);
57+
close(data->write_fd);
58+
free(data);
59+
}
60+
}
61+
2562
// The actual blocking operation that can be cancelled
2663
static void* busy_blocking_operation(void *data) {
2764
struct BusyOperationData *busy_data = (struct BusyOperationData*)data;
2865

66+
// Retain reference while we're using it
67+
busy_data_retain(busy_data);
68+
2969
// Use select() to wait for the pipe to become readable
3070
fd_set read_fds;
3171
struct timeval timeout;
@@ -43,30 +83,41 @@ static void* busy_blocking_operation(void *data) {
4383
// 3. An error occurs
4484
int result = select(busy_data->read_fd + 1, &read_fds, NULL, NULL, &timeout);
4585

86+
void* return_value;
4687
if (result > 0 && FD_ISSET(busy_data->read_fd, &read_fds)) {
4788
// Pipe became readable - we were cancelled
4889
char buffer;
4990
read(busy_data->read_fd, &buffer, 1); // Consume the byte
5091
busy_data->cancelled = 1;
51-
return (void*)-1; // Indicate cancellation
92+
return_value = (void*)-1; // Indicate cancellation
5293
} else if (result == 0) {
5394
// Timeout - operation completed normally
54-
return (void*)0; // Indicate success
95+
return_value = (void*)0; // Indicate success
5596
} else {
5697
// Error occurred
57-
return (void*)-2; // Indicate error
98+
return_value = (void*)-2; // Indicate error
5899
}
100+
101+
// Release reference before returning
102+
busy_data_release(busy_data);
103+
return return_value;
59104
}
60105

61106
// Unblock function that writes to the pipe to cancel the operation
62107
static void busy_unblock_function(void *data) {
63108
struct BusyOperationData *busy_data = (struct BusyOperationData*)data;
64109

110+
// Retain reference while we're using it
111+
busy_data_retain(busy_data);
112+
65113
// Write a byte to the pipe to wake up the select()
66114
char wake_byte = 1;
67115
write(busy_data->write_fd, &wake_byte, 1);
68116

69117
busy_data->cancelled = 1;
118+
119+
// Release reference
120+
busy_data_release(busy_data);
70121
}
71122

72123
// Function for the main operation execution (for rb_rescue)
@@ -134,51 +185,50 @@ static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) {
134185
rb_sys_fail("pipe creation failed");
135186
}
136187

137-
// Stack allocate and initialize operation data with brace initialization
138-
struct BusyOperationData busy_data = {
139-
.read_fd = pipe_fds[0],
140-
.write_fd = pipe_fds[1],
141-
.duration = duration,
142-
.exception = Qnil,
143-
// All other fields are zero-initialized by default
144-
};
188+
// Heap allocate operation data with reference counting
189+
struct BusyOperationData *busy_data = busy_data_create(pipe_fds[0], pipe_fds[1], duration);
190+
if (!busy_data) {
191+
close(pipe_fds[0]);
192+
close(pipe_fds[1]);
193+
rb_raise(rb_eNoMemError, "failed to allocate busy operation data");
194+
}
145195

146196
// Execute the blocking operation with exception handling using function pointers
147197
rb_rescue(
148198
busy_operation_execute,
149-
(VALUE)&busy_data,
199+
(VALUE)busy_data,
150200
busy_operation_rescue,
151-
(VALUE)&busy_data
201+
(VALUE)busy_data
152202
);
153203

154204
// Calculate elapsed time from the state stored in busy_data
155-
double elapsed = ((double)(busy_data.end_time - busy_data.start_time)) / CLOCKS_PER_SEC;
156-
157-
// Cleanup pipes
158-
close(busy_data.read_fd);
159-
close(busy_data.write_fd);
205+
double elapsed = ((double)(busy_data->end_time - busy_data->start_time)) / CLOCKS_PER_SEC;
160206

161207
// Create result hash using the state from busy_data
162208
VALUE result = rb_hash_new();
163209
rb_hash_aset(result, ID2SYM(rb_intern("duration")), DBL2NUM(duration));
164210
rb_hash_aset(result, ID2SYM(rb_intern("elapsed")), DBL2NUM(elapsed));
165211

166212
// Determine result based on operation outcome
167-
if (busy_data.exception != Qnil) {
213+
if (busy_data->exception != Qnil) {
168214
rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("exception")));
169215
rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue);
170-
rb_hash_aset(result, ID2SYM(rb_intern("exception")), busy_data.exception);
171-
} else if (busy_data.operation_result == -1) {
216+
rb_hash_aset(result, ID2SYM(rb_intern("exception")), busy_data->exception);
217+
} else if (busy_data->operation_result == -1) {
172218
rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("cancelled")));
173219
rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qtrue);
174-
} else if (busy_data.operation_result == 0) {
220+
} else if (busy_data->operation_result == 0) {
175221
rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("completed")));
176222
rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse);
177223
} else {
178224
rb_hash_aset(result, ID2SYM(rb_intern("result")), ID2SYM(rb_intern("error")));
179225
rb_hash_aset(result, ID2SYM(rb_intern("cancelled")), Qfalse);
180226
}
181227

228+
// Release our reference to the busy_data
229+
// The blocking operation and unblock function may still have references
230+
busy_data_release(busy_data);
231+
182232
return result;
183233
}
184234

0 commit comments

Comments
 (0)