Skip to content

Commit 84e03fb

Browse files
committed
Ensure work can be cancelled while still in the queue.
1 parent bd6d105 commit 84e03fb

2 files changed

Lines changed: 43 additions & 8 deletions

File tree

ext/io/event/worker_pool.c

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -336,19 +336,32 @@ static VALUE worker_pool_call(VALUE self, VALUE _blocking_operation) {
336336
pthread_mutex_unlock(&pool->mutex);
337337

338338
// Block the current fiber until work is completed:
339-
int state;
339+
int state = 0;
340340
while (true) {
341-
rb_protect(worker_pool_work_begin, (VALUE)&work, &state);
342-
341+
int current_state = 0;
342+
rb_protect(worker_pool_work_begin, (VALUE)&work, &current_state);
343+
if (DEBUG) fprintf(stderr, "-- worker_pool_call:work completed=%d, current_state=%d, state=%d\n", work.completed, current_state, state);
344+
345+
// Store the first exception state:
346+
if (!state) {
347+
state = current_state;
348+
}
349+
350+
// If the work is still in the queue, we must wait for a worker to complete it (even if cancelled):
343351
if (work.completed) {
352+
// The work was completed, we can exit the loop:
344353
break;
345354
} else {
346355
if (DEBUG) fprintf(stderr, "worker_pool_call:rb_fiber_scheduler_blocking_operation_cancel\n");
356+
// Ensure the blocking operation is cancelled:
347357
rb_fiber_scheduler_blocking_operation_cancel(blocking_operation);
348-
// The work was not completed, we need to wait for it to be completed.
358+
359+
// The work was not completed, we need to wait for it to be completed, so we go around the loop again.
349360
}
350361
}
351362

363+
if (DEBUG) fprintf(stderr, "<- worker_pool_call:work completed=%d, state=%d\n", work.completed, state);
364+
352365
if (state) {
353366
rb_jump_tag(state);
354367
} else {

test/io/event/worker_pool.rb

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
return unless defined?(IO::Event::WorkerPool)
1010

11-
describe IO::Event::WorkerPool do
11+
describe IO::Event::WorkerPool do
1212
with "an instance" do
1313
let(:worker_pool) {subject.new}
1414

@@ -61,7 +61,7 @@
6161
end
6262
end
6363

64-
with "TestScheduler integration" do
64+
with IO::Event::TestScheduler do
6565
let(:scheduler) {IO::Event::TestScheduler.new}
6666

6767
it "can create a test scheduler" do
@@ -102,7 +102,7 @@
102102

103103
with "cancellable busy operation" do
104104
let(:scheduler) {IO::Event::TestScheduler.new}
105-
105+
106106
it "can perform a busy operation that completes normally" do
107107
start_time = Time.now
108108
result = IO::Event::WorkerPool.busy(duration: 0.1)
@@ -144,7 +144,7 @@
144144
# The operation should have been interrupted before completion
145145
expect(completed).to be == false
146146
end
147-
147+
148148
it "can be cancelled when executed in a worker pool" do
149149
result = nil
150150
elapsed = nil
@@ -173,5 +173,27 @@
173173
expect(elapsed).to be < 1.0
174174
expect(error).to be_nil
175175
end
176+
177+
it "can be cancelled before even starting" do
178+
result = nil
179+
180+
Thread.new do
181+
Fiber.set_scheduler(scheduler)
182+
183+
busy_fiber = Fiber.schedule do
184+
result = IO::Event::WorkerPool.busy(duration: 2.0)
185+
end
186+
187+
Fiber.schedule do
188+
Fiber.scheduler.fiber_interrupt(busy_fiber, StandardError)
189+
end
190+
end.join
191+
192+
expect(result).to have_keys(
193+
cancelled: be == true,
194+
result: be == :exception,
195+
exception: be_a(StandardError)
196+
)
197+
end
176198
end
177199
end

0 commit comments

Comments
 (0)