Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions ext/io/event/selector/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ struct IO_Event_Selector_URing
{
struct IO_Event_Selector backend;
struct io_uring ring;
size_t pending;

// Flag indicating whether the selector is currently blocked in a system call.
// Set to 1 when blocked in io_uring_wait_cqe_timeout() without GVL, 0 otherwise.
Expand Down Expand Up @@ -239,7 +238,6 @@ VALUE IO_Event_Selector_URing_allocate(VALUE self) {
IO_Event_Selector_initialize(&selector->backend, self, Qnil);
selector->ring.ring_fd = -1;

selector->pending = 0;
selector->blocked = 0;
selector->interrupt.descriptor = -1;
selector->wakeup_registered = 0;
Expand Down Expand Up @@ -421,15 +419,14 @@ void IO_Event_Selector_URing_dump_completion_queue(struct IO_Event_Selector_URin
// Flush the submission queue, optionally yielding if unsuccessful.
static
int io_uring_submit_all(struct IO_Event_Selector_URing *selector, bool yield) {
while (selector->pending > 0) {
struct io_uring *ring = &selector->ring;

while (io_uring_sq_ready(ring) > 0) {
int result = io_uring_submit(&selector->ring);

if (result >= 0) {
// io_uring_submit() returns the number of submitted SQEs
selector->pending -= result;
} else if (result == -EBUSY || result == -EAGAIN) {
if (result == -EBUSY || result == -EAGAIN) {
if (yield) IO_Event_Selector_yield(&selector->backend);
} else {
} else if (result < 0) {
rb_syserr_fail(-result, "io_uring_submit_all:io_uring_submit");
return result;
}
Expand All @@ -442,23 +439,32 @@ int io_uring_submit_all(struct IO_Event_Selector_URing *selector, bool yield) {
// Flush the submission queue if pending operations are present.
static
int io_uring_submit_flush(struct IO_Event_Selector_URing *selector) {
if (DEBUG) fprintf(stderr, "io_uring_submit_now(pending=%ld)\n", selector->pending);
if (DEBUG) {
unsigned pending = io_uring_sq_ready(&selector->ring);
fprintf(stderr, "io_uring_submit_flush(pending=%u)\n", pending);
}

return io_uring_submit_all(selector, false);
}

// Immediately flush the submission queue, yielding to the event loop if it was not successful.
static
int io_uring_submit_now(struct IO_Event_Selector_URing *selector) {
if (DEBUG) fprintf(stderr, "io_uring_submit_now(pending=%ld)\n", selector->pending);

if (DEBUG) {
unsigned pending = io_uring_sq_ready(&selector->ring);
fprintf(stderr, "io_uring_submit_now(pending=%u)\n", pending);
}

return io_uring_submit_all(selector, true);
}

// Submit a pending operation. This does not submit the operation immediately, but instead defers it to the next call to `io_uring_submit_flush` or `io_uring_submit_now`. This is useful for operations that are not urgent, but should be used with care as it can lead to a deadlock if the submission queue is not flushed.
static
void io_uring_submit_pending(struct IO_Event_Selector_URing *selector) {
if (DEBUG) fprintf(stderr, "io_uring_submit_pending(ring=%p, pending=%ld)\n", &selector->ring, selector->pending);
if (DEBUG) {
unsigned pending = io_uring_sq_ready(&selector->ring);
fprintf(stderr, "io_uring_submit_pending(ring=%p, pending=%u)\n", &selector->ring, pending);
}
}

struct io_uring_sqe * io_get_sqe(struct IO_Event_Selector_URing *selector) {
Expand All @@ -471,7 +477,6 @@ struct io_uring_sqe * io_get_sqe(struct IO_Event_Selector_URing *selector) {
sqe = io_uring_get_sqe(&selector->ring);
}

selector->pending += 1;
return sqe;
}

Expand Down
Loading