Skip to content

Commit ff58c4d

Browse files
committed
Refactor async mode and restore tests for it
The tests were removed in 4b3483a This also restores support for "standalone" async mode, which means the Solid Queue's supervisor running on its own process, via `bin/jobs`, but having all other processes (workers, dispatchers, schedulers) run as threads instead of processes. When Solid Queue is run via the Puma plugin, it won't be considered standalone, and it won't need to handle signals and act on them.
1 parent 40e319d commit ff58c4d

27 files changed

Lines changed: 797 additions & 353 deletions

lib/puma/plugin/solid_queue.rb

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ def start_forked(launcher)
3636
end
3737
end
3838

39-
launcher.events.on_stopped { stop_solid_queue }
40-
launcher.events.on_restart { stop_solid_queue }
39+
launcher.events.on_stopped { stop_solid_queue_fork }
40+
launcher.events.on_restart { stop_solid_queue_fork }
4141
else
4242
launcher.events.after_booted do
4343
@solid_queue_pid = fork do
@@ -46,24 +46,38 @@ def start_forked(launcher)
4646
end
4747
end
4848

49-
launcher.events.after_stopped { stop_solid_queue }
50-
launcher.events.before_restart { stop_solid_queue }
49+
launcher.events.after_stopped { stop_solid_queue_fork }
50+
launcher.events.before_restart { stop_solid_queue_fork }
5151
end
5252
end
5353

5454
def start_async(launcher)
5555
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
56-
launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
57-
launcher.events.on_stopped { solid_queue_supervisor&.stop }
58-
launcher.events.on_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
56+
launcher.events.on_booted do
57+
@solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false)
58+
end
59+
60+
launcher.events.on_stopped { @solid_queue_supervisor&.stop }
61+
62+
launcher.events.on_restart do
63+
solid_queue_supervisor&.stop
64+
@solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false)
65+
end
5966
else
60-
launcher.events.after_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
61-
launcher.events.after_stopped { solid_queue_supervisor&.stop }
62-
launcher.events.before_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) }
67+
launcher.events.after_booted do
68+
@solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false)
69+
end
70+
71+
launcher.events.after_stopped { @solid_queue_supervisor&.stop }
72+
73+
launcher.events.before_restart do
74+
solid_queue_supervisor&.stop
75+
@solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false)
76+
end
6377
end
6478
end
6579

66-
def stop_solid_queue
80+
def stop_solid_queue_fork
6781
Process.waitpid(solid_queue_pid, Process::WNOHANG)
6882
log "Stopping Solid Queue..."
6983
Process.kill(:INT, solid_queue_pid) if solid_queue_pid
@@ -76,7 +90,7 @@ def monitor_puma
7690
end
7791

7892
def monitor_solid_queue
79-
monitor(:solid_queue_dead?, "Detected Solid Queue has gone away, stopping Puma...")
93+
monitor(:solid_queue_fork_dead?, "Detected Solid Queue has gone away, stopping Puma...")
8094
end
8195

8296
def monitor(process_dead, message)
@@ -90,7 +104,7 @@ def monitor(process_dead, message)
90104
end
91105
end
92106

93-
def solid_queue_dead?
107+
def solid_queue_fork_dead?
94108
if solid_queue_started?
95109
Process.waitpid(solid_queue_pid, Process::WNOHANG)
96110
end

lib/solid_queue/async_supervisor.rb

Lines changed: 23 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -3,96 +3,43 @@
33
module SolidQueue
44
class AsyncSupervisor < Supervisor
55
private
6-
attr_reader :threads
7-
8-
def start_processes
9-
@threads = {}
10-
11-
configuration.configured_processes.each { |configured_process| start_process(configured_process) }
6+
def check_and_replace_terminated_processes
7+
terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? }
8+
terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) }
129
end
1310

14-
def start_process(configured_process)
15-
process_instance = configured_process.instantiate.tap do |instance|
16-
instance.supervised_by process
17-
instance.mode = :async
18-
end
11+
def replace_thread(thread_id, instance)
12+
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
13+
payload[:thread] = instance
14+
handle_claimed_jobs_by(terminated_instance, thread)
1915

20-
thread = Thread.new do
21-
begin
22-
process_instance.start
23-
rescue Exception => e
24-
puts "Error in thread: #{e.message}"
25-
puts e.backtrace
26-
end
16+
start_process(configured_processes.delete(thread_id))
2717
end
28-
threads[thread] = [ process_instance, configured_process ]
2918
end
3019

31-
def terminate_gracefully
32-
SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload|
33-
processes.each(&:stop)
34-
35-
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_threads_terminated? }) do
36-
# No-op, we just wait
37-
end
38-
39-
unless all_threads_terminated?
40-
payload[:shutdown_timeout_exceeded] = true
41-
terminate_immediately
42-
end
43-
end
44-
end
20+
def perform_graceful_termination
21+
process_instances.values.each(&:stop)
4522

46-
def terminate_immediately
47-
SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do
48-
threads.keys.each(&:kill)
49-
end
23+
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? })
5024
end
5125

52-
def supervised_processes
53-
processes.map(&:to_s)
26+
def perform_immediate_termination
27+
exit!
5428
end
5529

56-
def reap_and_replace_terminated_forks
57-
# No-op in async mode, we'll check for dead threads in the supervise loop
30+
def all_processes_terminated?
31+
process_instances.values.none?(&:alive?)
5832
end
5933

60-
def all_threads_terminated?
61-
threads.keys.all? { |thread| !thread.alive? }
62-
end
63-
64-
def supervise
65-
loop do
66-
break if stopped?
67-
68-
set_procline
69-
process_signal_queue
70-
71-
unless stopped?
72-
check_and_replace_terminated_threads
73-
interruptible_sleep(1.second)
34+
# When a supervised thread terminates unexpectedly, mark all executions
35+
# it had claimed as failed so they can be retried by another worker.
36+
def handle_claimed_jobs_by(terminated_instance, thread)
37+
wrap_in_app_executor do
38+
if registered_process = SolidQueue::Process.find_by(name: terminated_instance.name)
39+
error = Processes::ThreadTerminatedError.new(terminated_instance.name)
40+
registered_process.fail_all_claimed_executions_with(error)
7441
end
7542
end
76-
ensure
77-
shutdown
78-
end
79-
80-
def check_and_replace_terminated_threads
81-
terminated_threads = {}
82-
threads.each do |thread, (process, configured_process)|
83-
unless thread.alive?
84-
terminated_threads[thread] = configured_process
85-
end
86-
end
87-
88-
terminated_threads.each do |thread, configured_process|
89-
threads.delete(thread)
90-
start_process(configured_process)
91-
end
92-
end
93-
94-
def processes
95-
threads.values.map(&:first)
9643
end
9744
end
98-
end
45+
end

lib/solid_queue/configuration.rb

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,8 @@ def instantiate
3131
DEFAULT_CONFIG_FILE_PATH = "config/queue.yml"
3232
DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml"
3333

34-
attr_reader :mode
35-
3634
def initialize(**options)
3735
@options = options.with_defaults(default_options)
38-
@mode = @options[:mode].to_s.inquiry
3936
end
4037

4138
def configured_processes
@@ -59,6 +56,14 @@ def error_messages
5956
end
6057
end
6158

59+
def mode
60+
@options[:mode].to_s.inquiry
61+
end
62+
63+
def standalone?
64+
mode.fork? || @options[:standalone]
65+
end
66+
6267
private
6368
attr_reader :options
6469

@@ -88,6 +93,7 @@ def ensure_correctly_sized_thread_pool
8893
def default_options
8994
{
9095
mode: :fork,
96+
standalone: true,
9197
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
9298
recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH),
9399
only_work: false,

lib/solid_queue/fork_supervisor.rb

Lines changed: 50 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -3,111 +3,76 @@
33
module SolidQueue
44
class ForkSupervisor < Supervisor
55
private
6-
attr_reader :forks, :configured_processes
76

8-
def start_processes
9-
@forks = {}
10-
@configured_processes = {}
7+
def perform_graceful_termination
8+
term_forks
119

12-
configuration.configured_processes.each { |configured_process| start_process(configured_process) }
10+
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? }) do
11+
reap_terminated_forks
1312
end
13+
end
1414

15-
def start_process(configured_process)
16-
process_instance = configured_process.instantiate.tap do |instance|
17-
instance.supervised_by process
18-
instance.mode = :fork
19-
end
20-
21-
pid = fork do
22-
process_instance.start
23-
end
24-
25-
configured_processes[pid] = configured_process
26-
forks[pid] = process_instance
27-
end
28-
29-
def terminate_gracefully
30-
SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload|
31-
term_forks
32-
33-
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do
34-
reap_terminated_forks
35-
end
36-
37-
unless all_forks_terminated?
38-
payload[:shutdown_timeout_exceeded] = true
39-
terminate_immediately
40-
end
41-
end
42-
end
15+
def perform_immediate_termination
16+
quit_forks
17+
end
4318

44-
def terminate_immediately
45-
SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do
46-
quit_forks
47-
end
48-
end
19+
def term_forks
20+
signal_processes(process_instances.keys, :TERM)
21+
end
4922

50-
def supervised_processes
51-
forks.keys
52-
end
23+
def quit_forks
24+
signal_processes(process_instances.keys, :QUIT)
25+
end
5326

54-
def term_forks
55-
signal_processes(forks.keys, :TERM)
56-
end
27+
def check_and_replace_terminated_processes
28+
loop do
29+
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
30+
break unless pid
5731

58-
def quit_forks
59-
signal_processes(forks.keys, :QUIT)
32+
replace_fork(pid, status)
6033
end
34+
end
6135

62-
def reap_and_replace_terminated_forks
63-
loop do
64-
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
65-
break unless pid
36+
def reap_terminated_forks
37+
loop do
38+
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
39+
break unless pid
6640

67-
replace_fork(pid, status)
41+
if (terminated_fork = process_instances.delete(pid)) && !status.exited? || status.exitstatus > 0
42+
handle_claimed_jobs_by(terminated_fork, status)
6843
end
69-
end
7044

71-
def reap_terminated_forks
72-
loop do
73-
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
74-
break unless pid
75-
76-
if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0)
77-
handle_claimed_jobs_by(terminated_fork, status)
78-
end
79-
80-
configured_processes.delete(pid)
81-
end
82-
rescue SystemCallError
83-
# All children already reaped
45+
configured_processes.delete(pid)
8446
end
47+
rescue SystemCallError
48+
# All children already reaped
49+
end
8550

86-
def replace_fork(pid, status)
87-
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
88-
if terminated_fork = forks.delete(pid)
89-
payload[:fork] = terminated_fork
90-
handle_claimed_jobs_by(terminated_fork, status)
51+
def replace_fork(pid, status)
52+
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
53+
if terminated_fork = process_instances.delete(pid)
54+
payload[:fork] = terminated_fork
55+
handle_claimed_jobs_by(terminated_fork, status)
9156

92-
start_process(configured_processes.delete(pid))
93-
end
57+
start_process(configured_processes.delete(pid))
9458
end
9559
end
96-
97-
# When a supervised fork crashes or exits we need to mark all the
98-
# executions it had claimed as failed so that they can be retried
99-
# by some other worker.
100-
def handle_claimed_jobs_by(terminated_fork, status)
101-
wrap_in_app_executor do
102-
if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name)
103-
error = Processes::ProcessExitError.new(status)
104-
registered_process.fail_all_claimed_executions_with(error)
105-
end
60+
end
61+
62+
# When a supervised fork crashes or exits we need to mark all the
63+
# executions it had claimed as failed so that they can be retried
64+
# by some other worker.
65+
def handle_claimed_jobs_by(terminated_fork, status)
66+
wrap_in_app_executor do
67+
if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name)
68+
error = Processes::ProcessExitError.new(status)
69+
registered_process.fail_all_claimed_executions_with(error)
10670
end
10771
end
72+
end
10873

109-
def all_forks_terminated?
110-
forks.empty?
111-
end
74+
def all_processes_terminated?
75+
process_instances.empty?
76+
end
11277
end
11378
end

0 commit comments

Comments
 (0)