55#include "worker_pool_test.h"
66
77#include <ruby/thread.h>
8+ #include <stdatomic.h>
9+ #include <stdlib.h>
10+ #include <string.h>
811
912#include <unistd.h>
1013#include <sys/select.h>
@@ -20,12 +23,49 @@ struct BusyOperationData {
2023 clock_t end_time ;
2124 int operation_result ;
2225 VALUE exception ;
26+
27+ // Reference counting for safe heap management
28+ _Atomic int ref_count ;
2329};
2430
31+ // Reference counting functions for safe heap management
32+ static struct BusyOperationData * busy_data_create (int read_fd , int write_fd , double duration ) {
33+ struct BusyOperationData * data = malloc (sizeof (struct BusyOperationData ));
34+ if (!data ) return NULL ;
35+
36+ memset (data , 0 , sizeof (struct BusyOperationData ));
37+ data -> read_fd = read_fd ;
38+ data -> write_fd = write_fd ;
39+ data -> duration = duration ;
40+ data -> exception = Qnil ;
41+ atomic_store (& data -> ref_count , 1 );
42+
43+ return data ;
44+ }
45+
46+ static struct BusyOperationData * busy_data_retain (struct BusyOperationData * data ) {
47+ if (data ) {
48+ atomic_fetch_add (& data -> ref_count , 1 );
49+ }
50+ return data ;
51+ }
52+
53+ static void busy_data_release (struct BusyOperationData * data ) {
54+ if (data && atomic_fetch_sub (& data -> ref_count , 1 ) == 1 ) {
55+ // Last reference, safe to cleanup
56+ close (data -> read_fd );
57+ close (data -> write_fd );
58+ free (data );
59+ }
60+ }
61+
2562// The actual blocking operation that can be cancelled
2663static void * busy_blocking_operation (void * data ) {
2764 struct BusyOperationData * busy_data = (struct BusyOperationData * )data ;
2865
66+ // Retain reference while we're using it
67+ busy_data_retain (busy_data );
68+
2969 // Use select() to wait for the pipe to become readable
3070 fd_set read_fds ;
3171 struct timeval timeout ;
@@ -43,30 +83,41 @@ static void* busy_blocking_operation(void *data) {
4383 // 3. An error occurs
4484 int result = select (busy_data -> read_fd + 1 , & read_fds , NULL , NULL , & timeout );
4585
86+ void * return_value ;
4687 if (result > 0 && FD_ISSET (busy_data -> read_fd , & read_fds )) {
4788 // Pipe became readable - we were cancelled
4889 char buffer ;
4990 read (busy_data -> read_fd , & buffer , 1 ); // Consume the byte
5091 busy_data -> cancelled = 1 ;
51- return (void * )-1 ; // Indicate cancellation
92+ return_value = (void * )-1 ; // Indicate cancellation
5293 } else if (result == 0 ) {
5394 // Timeout - operation completed normally
54- return (void * )0 ; // Indicate success
95+ return_value = (void * )0 ; // Indicate success
5596 } else {
5697 // Error occurred
57- return (void * )-2 ; // Indicate error
98+ return_value = (void * )-2 ; // Indicate error
5899 }
100+
101+ // Release reference before returning
102+ busy_data_release (busy_data );
103+ return return_value ;
59104}
60105
61106// Unblock function that writes to the pipe to cancel the operation
62107static void busy_unblock_function (void * data ) {
63108 struct BusyOperationData * busy_data = (struct BusyOperationData * )data ;
64109
110+ // Retain reference while we're using it
111+ busy_data_retain (busy_data );
112+
65113 // Write a byte to the pipe to wake up the select()
66114 char wake_byte = 1 ;
67115 write (busy_data -> write_fd , & wake_byte , 1 );
68116
69117 busy_data -> cancelled = 1 ;
118+
119+ // Release reference
120+ busy_data_release (busy_data );
70121}
71122
72123// Function for the main operation execution (for rb_rescue)
@@ -134,51 +185,50 @@ static VALUE worker_pool_test_busy(int argc, VALUE *argv, VALUE self) {
134185 rb_sys_fail ("pipe creation failed" );
135186 }
136187
137- // Stack allocate and initialize operation data with brace initialization
138- struct BusyOperationData busy_data = {
139- .read_fd = pipe_fds [0 ],
140- .write_fd = pipe_fds [1 ],
141- .duration = duration ,
142- .exception = Qnil ,
143- // All other fields are zero-initialized by default
144- };
188+ // Heap allocate operation data with reference counting
189+ struct BusyOperationData * busy_data = busy_data_create (pipe_fds [0 ], pipe_fds [1 ], duration );
190+ if (!busy_data ) {
191+ close (pipe_fds [0 ]);
192+ close (pipe_fds [1 ]);
193+ rb_raise (rb_eNoMemError , "failed to allocate busy operation data" );
194+ }
145195
146196 // Execute the blocking operation with exception handling using function pointers
147197 rb_rescue (
148198 busy_operation_execute ,
149- (VALUE )& busy_data ,
199+ (VALUE )busy_data ,
150200 busy_operation_rescue ,
151- (VALUE )& busy_data
201+ (VALUE )busy_data
152202 );
153203
154204 // Calculate elapsed time from the state stored in busy_data
155- double elapsed = ((double )(busy_data .end_time - busy_data .start_time )) / CLOCKS_PER_SEC ;
156-
157- // Cleanup pipes
158- close (busy_data .read_fd );
159- close (busy_data .write_fd );
205+ double elapsed = ((double )(busy_data -> end_time - busy_data -> start_time )) / CLOCKS_PER_SEC ;
160206
161207 // Create result hash using the state from busy_data
162208 VALUE result = rb_hash_new ();
163209 rb_hash_aset (result , ID2SYM (rb_intern ("duration" )), DBL2NUM (duration ));
164210 rb_hash_aset (result , ID2SYM (rb_intern ("elapsed" )), DBL2NUM (elapsed ));
165211
166212 // Determine result based on operation outcome
167- if (busy_data . exception != Qnil ) {
213+ if (busy_data -> exception != Qnil ) {
168214 rb_hash_aset (result , ID2SYM (rb_intern ("result" )), ID2SYM (rb_intern ("exception" )));
169215 rb_hash_aset (result , ID2SYM (rb_intern ("cancelled" )), Qtrue );
170- rb_hash_aset (result , ID2SYM (rb_intern ("exception" )), busy_data . exception );
171- } else if (busy_data . operation_result == -1 ) {
216+ rb_hash_aset (result , ID2SYM (rb_intern ("exception" )), busy_data -> exception );
217+ } else if (busy_data -> operation_result == -1 ) {
172218 rb_hash_aset (result , ID2SYM (rb_intern ("result" )), ID2SYM (rb_intern ("cancelled" )));
173219 rb_hash_aset (result , ID2SYM (rb_intern ("cancelled" )), Qtrue );
174- } else if (busy_data . operation_result == 0 ) {
220+ } else if (busy_data -> operation_result == 0 ) {
175221 rb_hash_aset (result , ID2SYM (rb_intern ("result" )), ID2SYM (rb_intern ("completed" )));
176222 rb_hash_aset (result , ID2SYM (rb_intern ("cancelled" )), Qfalse );
177223 } else {
178224 rb_hash_aset (result , ID2SYM (rb_intern ("result" )), ID2SYM (rb_intern ("error" )));
179225 rb_hash_aset (result , ID2SYM (rb_intern ("cancelled" )), Qfalse );
180226 }
181227
228+ // Release our reference to the busy_data
229+ // The blocking operation and unblock function may still have references
230+ busy_data_release (busy_data );
231+
182232 return result ;
183233}
184234
0 commit comments