Skip to content

Commit ae0efbd

Browse files
committed
Build worker pools after fork
1 parent 444091d commit ae0efbd

2 files changed

Lines changed: 31 additions & 2 deletions

File tree

lib/solid_queue/worker.rb

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ def initialize(**options)
1818
# Ensure that the queues array is deep frozen to prevent accidental modification
1919
@queues = Array(options[:queues]).map(&:freeze).freeze
2020

21-
@pool = ExecutionPools.build(
21+
@pool_options = {
2222
mode: options[:execution_mode],
2323
size: options[:threads],
2424
on_state_change: -> { wake_up }
25-
)
25+
}
2626

2727
super(**options)
2828
end
@@ -50,6 +50,11 @@ def claim_executions
5050
end
5151
end
5252

53+
def boot
54+
build_pool
55+
super
56+
end
57+
5358
def shutdown
5459
pool.shutdown
5560
pool.wait_for_termination(SolidQueue.shutdown_timeout)
@@ -69,5 +74,9 @@ def heartbeat
6974
def set_procline
7075
procline "waiting for jobs in #{queues.join(",")}"
7176
end
77+
78+
def build_pool
79+
@pool ||= ExecutionPools.build(**@pool_options)
80+
end
7281
end
7382
end

test/unit/worker_test.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,26 @@ class WorkerTest < ActiveSupport::TestCase
3030
}
3131
end
3232

33+
test "builds the execution pool on boot instead of initialize" do
34+
pool = SolidQueue::ExecutionPools::ThreadPool.new(3)
35+
36+
SolidQueue::ExecutionPools.expects(:build).once.with do |**options|
37+
options[:mode] == :thread && options[:size] == 3 && options[:on_state_change].respond_to?(:call)
38+
end.returns(pool)
39+
40+
worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2)
41+
42+
assert_nil worker.pool
43+
44+
worker.start
45+
wait_for_registered_processes(1, timeout: 1.second)
46+
47+
assert_equal pool, worker.pool
48+
ensure
49+
worker&.stop
50+
wait_for_registered_processes(0, timeout: 1.second)
51+
end
52+
3353
test "errors on polling are passed to on_thread_error and re-raised" do
3454
errors = Concurrent::Array.new
3555

0 commit comments

Comments
 (0)