Skip to content

Commit 444091d

Browse files
committed
Handle async pool cancellations as fatal
1 parent 191883e commit 444091d

2 files changed

Lines changed: 33 additions & 0 deletions

File tree

lib/solid_queue/execution_pools/async_pool.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ def drain_queue(task, semaphore)
126126

127127
def perform_execution(execution)
128128
wrap_in_app_executor { execution.perform }
129+
rescue Async::Cancel => error
130+
handle_thread_error(error)
131+
register_fatal_error(error)
129132
rescue Exception => error
130133
handle_thread_error(error)
131134
ensure

test/unit/execution_pools/async_pool_test.rb

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ def perform
99
end
1010
end
1111

12+
CancelledExecution = Struct.new(:started) do
13+
def perform
14+
started << true if started
15+
raise Async::Cancel.new
16+
end
17+
end
18+
1219
def test_raises_a_clear_error_when_the_async_gem_is_unavailable
1320
load_error = LoadError.new("cannot load such file -- async")
1421

@@ -59,4 +66,27 @@ def test_waits_for_in_flight_executions_during_shutdown
5966
assert_nil pool.wait_for_termination(0.01)
6067
assert pool.wait_for_termination(1.second)
6168
end
69+
70+
def test_marks_the_pool_as_fatal_when_an_execution_is_cancelled
71+
notifications = Thread::Queue.new
72+
started = Thread::Queue.new
73+
reported_errors = []
74+
original_on_thread_error = SolidQueue.on_thread_error
75+
SolidQueue.on_thread_error = ->(error) { reported_errors << error.class.name }
76+
77+
pool = SolidQueue::ExecutionPools::AsyncPool.new(1, on_state_change: -> { notifications << :changed })
78+
79+
pool.post CancelledExecution.new(started)
80+
Timeout.timeout(1.second) { started.pop }
81+
Timeout.timeout(1.second) { notifications.pop }
82+
83+
error = assert_raises(Async::Cancel) { pool.available_capacity }
84+
assert_equal "Task was cancelled", error.message
85+
assert_equal [ "Async::Cancel" ], reported_errors
86+
assert_raises(Async::Cancel) { pool.metadata }
87+
ensure
88+
SolidQueue.on_thread_error = original_on_thread_error
89+
pool&.shutdown
90+
pool&.wait_for_termination(1.second)
91+
end
6292
end

0 commit comments

Comments
 (0)