Skip to content

Commit 32adf2c

Browse files
Fix URing io_wait unmatched poll events
1 parent 725193e commit 32adf2c

1 file changed

Lines changed: 48 additions & 25 deletions

File tree

ext/io/event/selector/uring.c

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -584,9 +584,24 @@ int events_from_poll_flags(short flags) {
584584
struct io_wait_arguments {
585585
struct IO_Event_Selector_URing *selector;
586586
struct IO_Event_Selector_URing_Waiting *waiting;
587+
int events;
587588
short flags;
589+
int descriptor;
588590
};
589591

592+
static
593+
void io_wait_submit(struct io_wait_arguments *arguments)
594+
{
595+
struct IO_Event_Selector_URing *selector = arguments->selector;
596+
struct IO_Event_Selector_URing_Completion *completion = IO_Event_Selector_URing_Completion_acquire(selector, arguments->waiting);
597+
598+
struct io_uring_sqe *sqe = io_get_sqe(selector);
599+
io_uring_prep_poll_add(sqe, arguments->descriptor, arguments->flags);
600+
io_uring_sqe_set_data(sqe, completion);
601+
// If we are going to wait, we assume that we are waiting for a while:
602+
io_uring_submit_pending(selector);
603+
}
604+
590605
static
591606
VALUE io_wait_ensure(VALUE _arguments) {
592607
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;
@@ -609,20 +624,31 @@ static
609624
VALUE io_wait_transfer(VALUE _arguments) {
610625
struct io_wait_arguments *arguments = (struct io_wait_arguments *)_arguments;
611626
struct IO_Event_Selector_URing *selector = arguments->selector;
612-
613-
IO_Event_Selector_loop_yield(&selector->backend);
614-
615-
if (DEBUG) fprintf(stderr, "io_wait_transfer:waiting=%p, result=%d\n", (void*)arguments->waiting, arguments->waiting->result);
616-
617-
int32_t result = arguments->waiting->result;
618-
if (result < 0) {
619-
rb_syserr_fail(-result, "io_wait_transfer:io_uring_poll_add");
620-
} else if (result > 0) {
621-
// We explicitly filter the resulting events based on the requested events.
622-
// In some cases, poll will report events we didn't ask for.
623-
return RB_INT2NUM(events_from_poll_flags(arguments->waiting->result & arguments->flags));
624-
} else {
625-
return Qfalse;
627+
628+
while (true) {
629+
IO_Event_Selector_loop_yield(&selector->backend);
630+
631+
if (DEBUG) fprintf(stderr, "io_wait_transfer:waiting=%p, result=%d\n", (void*)arguments->waiting, arguments->waiting->result);
632+
633+
int32_t result = arguments->waiting->result;
634+
if (result < 0) {
635+
rb_syserr_fail(-result, "io_wait_transfer:io_uring_poll_add");
636+
} else if (result > 0) {
637+
// We explicitly filter the resulting events based on the requested events.
638+
// In some cases, poll will report events we didn't ask for.
639+
int events = events_from_poll_flags(result) & arguments->events;
640+
if (events) {
641+
return RB_INT2NUM(events);
642+
}
643+
644+
if (result & POLLNVAL) {
645+
rb_syserr_fail(EBADF, "io_wait_transfer:io_uring_poll_add");
646+
}
647+
648+
io_wait_submit(arguments);
649+
} else {
650+
return Qfalse;
651+
}
626652
}
627653
};
628654

@@ -632,7 +658,8 @@ VALUE IO_Event_Selector_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e
632658

633659
int descriptor = IO_Event_Selector_io_descriptor(io);
634660

635-
short flags = poll_flags_from_events(NUM2INT(events));
661+
int requested_events = NUM2INT(events);
662+
short flags = poll_flags_from_events(requested_events);
636663

637664
if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_io_wait:io_uring_prep_poll_add(descriptor=%d, flags=%d, fiber=%p)\n", descriptor, flags, (void*)fiber);
638665

@@ -642,20 +669,16 @@ VALUE IO_Event_Selector_URing_io_wait(VALUE self, VALUE fiber, VALUE io, VALUE e
642669

643670
RB_OBJ_WRITTEN(self, Qundef, fiber);
644671

645-
struct IO_Event_Selector_URing_Completion *completion = IO_Event_Selector_URing_Completion_acquire(selector, &waiting);
646-
647-
struct io_uring_sqe *sqe = io_get_sqe(selector);
648-
io_uring_prep_poll_add(sqe, descriptor, flags);
649-
io_uring_sqe_set_data(sqe, completion);
650-
// If we are going to wait, we assume that we are waiting for a while:
651-
io_uring_submit_pending(selector);
652-
653672
struct io_wait_arguments io_wait_arguments = {
654673
.selector = selector,
655674
.waiting = &waiting,
656-
.flags = flags
675+
.events = requested_events,
676+
.flags = flags,
677+
.descriptor = descriptor,
657678
};
658-
679+
680+
io_wait_submit(&io_wait_arguments);
681+
659682
return rb_ensure(io_wait_transfer, (VALUE)&io_wait_arguments, io_wait_ensure, (VALUE)&io_wait_arguments);
660683
}
661684

0 commit comments

Comments
 (0)