Skip to content

Commit e4c2d58

Browse files
Use IO_Event_Interrupt for wakeup instead of bespoke eventfd.
Replace the hand-rolled wakeup_fd/eventfd code with struct IO_Event_Interrupt, which already provides open/close/signal/clear with an eventfd on Linux and a pipe fallback on other platforms. The logic is identical; this just removes the duplication. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 0a1d278 commit e4c2d58

1 file changed

Lines changed: 24 additions & 32 deletions

File tree

ext/io/event/selector/uring.c

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
#include <poll.h>
1111
#include <stdint.h>
1212
#include <time.h>
13-
#include <sys/eventfd.h>
13+
14+
#include "../interrupt.h"
1415

1516
#include "pidfd.c"
1617

@@ -36,15 +37,16 @@ struct IO_Event_Selector_URing
3637
// Set to 1 when blocked in io_uring_wait_cqe_timeout() without GVL, 0 otherwise.
3738
int blocked;
3839

39-
// eventfd used to wake the selector from another thread without touching the ring's SQ.
40+
// Interrupt used to wake the selector from another thread without touching the ring's SQ.
4041
// This allows IORING_SETUP_SINGLE_ISSUER: only the owner thread ever submits SQEs.
41-
int wakeup_fd;
42+
// Uses eventfd on Linux, pipe fallback elsewhere.
43+
struct IO_Event_Interrupt interrupt;
4244

43-
// Whether an async read on wakeup_fd is currently pending in the ring.
45+
// Whether an async read on interrupt is currently pending in the ring.
4446
// The read is re-submitted before each blocking wait when not registered.
4547
int wakeup_registered;
4648

47-
// Buffer for the pending async read on wakeup_fd.
49+
// Buffer for the pending async read on the interrupt descriptor.
4850
// Must remain valid for the lifetime of the in-flight SQE.
4951
uint64_t wakeup_value;
5052

@@ -113,9 +115,9 @@ void IO_Event_Selector_URing_Type_compact(void *_selector)
113115
static
114116
void close_internal(struct IO_Event_Selector_URing *selector)
115117
{
116-
if (selector->wakeup_fd >= 0) {
117-
close(selector->wakeup_fd);
118-
selector->wakeup_fd = -1;
118+
if (selector->interrupt.descriptor >= 0) {
119+
IO_Event_Interrupt_close(&selector->interrupt);
120+
selector->interrupt.descriptor = -1;
119121
selector->wakeup_registered = 0;
120122
}
121123

@@ -238,7 +240,7 @@ VALUE IO_Event_Selector_URing_allocate(VALUE self) {
238240

239241
selector->pending = 0;
240242
selector->blocked = 0;
241-
selector->wakeup_fd = -1;
243+
selector->interrupt.descriptor = -1;
242244
selector->wakeup_registered = 0;
243245

244246
IO_Event_List_initialize(&selector->free_list);
@@ -282,16 +284,15 @@ VALUE IO_Event_Selector_URing_initialize(VALUE self, VALUE loop) {
282284

283285
rb_update_max_fd(selector->ring.ring_fd);
284286

285-
// eventfd for cross-thread wakeup: another thread writes to this fd; the owner
286-
// thread registers a one-shot poll_add before each blocking wait so the ring
287-
// wakes up without the waking thread ever touching the SQ.
288-
selector->wakeup_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
289-
if (selector->wakeup_fd < 0) {
287+
// Interrupt for cross-thread wakeup: another thread calls signal(); the owner
288+
// thread submits an async read before each blocking wait so the ring wakes up
289+
// without the waking thread ever touching the SQ.
290+
IO_Event_Interrupt_open(&selector->interrupt);
291+
if (selector->interrupt.descriptor < 0) {
290292
io_uring_queue_exit(&selector->ring);
291293
selector->ring.ring_fd = -1;
292-
rb_sys_fail("IO_Event_Selector_URing_initialize:eventfd");
294+
rb_sys_fail("IO_Event_Selector_URing_initialize:IO_Event_Interrupt_open");
293295
}
294-
rb_update_max_fd(selector->wakeup_fd);
295296

296297
return self;
297298
}
@@ -1126,8 +1127,8 @@ int select_internal_without_gvl(struct select_arguments *arguments) {
11261127
// The address of wakeup_fd serves as a unique sentinel in user_data.
11271128
if (!selector->wakeup_registered) {
11281129
struct io_uring_sqe *sqe = io_get_sqe(selector);
1129-
io_uring_prep_read(sqe, selector->wakeup_fd, &selector->wakeup_value, sizeof(selector->wakeup_value), 0);
1130-
io_uring_sqe_set_data(sqe, &selector->wakeup_fd);
1130+
io_uring_prep_read(sqe, IO_Event_Interrupt_descriptor(&selector->interrupt), &selector->wakeup_value, sizeof(selector->wakeup_value), 0);
1131+
io_uring_sqe_set_data(sqe, &selector->interrupt);
11311132
selector->wakeup_registered = 1;
11321133
selector->pending += 1;
11331134
}
@@ -1175,10 +1176,9 @@ unsigned select_process_completions(struct IO_Event_Selector_URing *selector) {
11751176
continue;
11761177
}
11771178

1178-
// Wakeup eventfd read completion — the read already consumed the counter,
1179-
// no separate drain needed. Clear the flag so the next blocking wait
1180-
// re-submits the read.
1181-
if (cqe->user_data == (uint64_t)(uintptr_t)&selector->wakeup_fd) {
1179+
// Interrupt read completion — the read already consumed the counter.
1180+
// Clear the flag so the next blocking wait re-submits the read.
1181+
if (cqe->user_data == (uint64_t)(uintptr_t)&selector->interrupt) {
11821182
selector->wakeup_registered = 0;
11831183
io_uring_cq_advance(ring, 1);
11841184
continue;
@@ -1269,18 +1269,10 @@ VALUE IO_Event_Selector_URing_wakeup(VALUE self) {
12691269
struct IO_Event_Selector_URing *selector = NULL;
12701270
TypedData_Get_Struct(self, struct IO_Event_Selector_URing, &IO_Event_Selector_URing_Type, selector);
12711271

1272-
// Wake the selector by writing to the eventfd. This is safe from any thread
1272+
// Wake the selector by signalling the interrupt. This is safe from any thread
12731273
// and never touches the ring's SQ, which is required for IORING_SETUP_SINGLE_ISSUER.
12741274
if (selector->blocked) {
1275-
uint64_t value = 1;
1276-
int result = write(selector->wakeup_fd, &value, sizeof(value));
1277-
1278-
// EAGAIN means the eventfd counter is already at its maximum (UINT64_MAX - 1),
1279-
// i.e. a wakeup is already pending — that's fine.
1280-
if (result < 0 && errno != EAGAIN) {
1281-
rb_sys_fail("IO_Event_Selector_URing_wakeup:write");
1282-
}
1283-
1275+
IO_Event_Interrupt_signal(&selector->interrupt);
12841276
return Qtrue;
12851277
}
12861278

0 commit comments

Comments
 (0)