Skip to content

Commit cac54bf

Browse files
committed
Fix worker pool defaults and async shutdown
1 parent a0e9444 commit cac54bf

4 files changed

Lines changed: 35 additions & 3 deletions

File tree

lib/solid_queue/execution_pools/async_pool.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ module ExecutionPools
55
class AsyncPool
66
include AppExecutor
77

8+
SHUTDOWN_SENTINEL = Object.new
9+
810
class MissingDependencyError < LoadError
911
def initialize(error)
1012
super(
@@ -91,13 +93,13 @@ def idle?
9193
end
9294

9395
def shutdown
94-
should_close = state_mutex.synchronize do
96+
should_enqueue_shutdown = state_mutex.synchronize do
9597
next false if @shutdown
9698

9799
@shutdown = true
98100
end
99101

100-
queue.close if should_close
102+
queue.enqueue(SHUTDOWN_SENTINEL) if should_enqueue_shutdown
101103
end
102104

103105
def shutdown?
@@ -141,6 +143,8 @@ def start_reactor
141143
def drain_queue(task, semaphore)
142144
task.async do
143145
while execution = queue.dequeue
146+
break if execution.equal?(SHUTDOWN_SENTINEL)
147+
144148
semaphore.async(execution) do |_execution_task, scheduled_execution|
145149
perform_execution(scheduled_execution)
146150
end

lib/solid_queue/worker.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class Worker < Processes::Poller
1212

1313
def initialize(**options)
1414
options = options.dup
15-
options[:threads] = options[:capacity] || options[:fibers] || options[:threads]
15+
options[:threads] = options[:capacity] || options[:fibers] if options.key?(:capacity) || options.key?(:fibers)
1616
options = options.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)
1717

1818
# Ensure that the queues array is deep frozen to prevent accidental modification

test/unit/execution_pools/async_pool_test.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,21 @@ def test_waits_for_in_flight_executions_during_shutdown
8282
end
8383
end
8484

85+
def test_shutdown_does_not_depend_on_async_queue_close
86+
with_execution_isolation(:fiber) do
87+
pool = SolidQueue::ExecutionPools::AsyncPool.new(1)
88+
queue = pool.instance_variable_get(:@queue)
89+
queue.define_singleton_method(:close) { raise "should not be called" }
90+
91+
pool.shutdown
92+
93+
assert pool.wait_for_termination(1.second)
94+
ensure
95+
pool&.shutdown
96+
pool&.wait_for_termination(1.second)
97+
end
98+
end
99+
85100
def test_marks_the_pool_as_fatal_when_an_execution_is_cancelled
86101
with_execution_isolation(:fiber) do
87102
notifications = Thread::Queue.new

test/unit/worker_test.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,19 @@ class WorkerTest < ActiveSupport::TestCase
123123
wait_for_registered_processes(0, timeout: 1.second)
124124
end
125125

126+
test "defaults thread workers to the configured thread pool size" do
127+
worker = SolidQueue::Worker.new(queues: "background", polling_interval: 0.2)
128+
129+
worker.start
130+
wait_for_registered_processes(1, timeout: 1.second)
131+
132+
assert_equal 3, worker.pool.size
133+
assert_metadata SolidQueue::Process.first, thread_pool_size: 3, capacity: 3, execution_mode: "thread"
134+
ensure
135+
worker&.stop
136+
wait_for_registered_processes(0, timeout: 1.second)
137+
end
138+
126139
test "errors on polling are passed to on_thread_error and re-raised" do
127140
errors = Concurrent::Array.new
128141

0 commit comments

Comments
 (0)