Skip to content

Commit 839d627

Browse files
authored
Merge pull request #644 from joshleblanc/async-mode
Re-institute async mode
2 parents d330516 + f7feb48 commit 839d627

32 files changed

Lines changed: 893 additions & 267 deletions

README.md

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
1414
- [Dashboard UI Setup](#dashboard-ui-setup)
1515
- [Incremental adoption](#incremental-adoption)
1616
- [High performance requirements](#high-performance-requirements)
17+
- [Workers, dispatchers, and scheduler](#workers-dispatchers-and-scheduler)
18+
- [Fork vs. async mode](#fork-vs-async-mode)
1719
- [Configuration](#configuration)
18-
- [Workers, dispatchers, and scheduler](#workers-dispatchers-and-scheduler)
1920
- [Queue order and priorities](#queue-order-and-priorities)
2021
- [Queues specification and performance](#queues-specification-and-performance)
2122
- [Threads, processes, and signals](#threads-processes-and-signals)
@@ -179,9 +180,7 @@ end
179180

180181
Solid Queue was designed for the highest throughput when used with MySQL 8+, MariaDB 10.6+, or PostgreSQL 9.5+, as they support `FOR UPDATE SKIP LOCKED`. You can use it with older versions, but in that case, you might run into lock waits if you run multiple workers for the same queue. You can also use it with SQLite on smaller applications.
181182

182-
## Configuration
183-
184-
### Workers, dispatchers, and scheduler
183+
## Workers, dispatchers, and scheduler
185184

186185
We have several types of actors in Solid Queue:
187186

@@ -190,7 +189,19 @@ We have several types of actors in Solid Queue:
190189
- The _scheduler_ manages [recurring tasks](#recurring-tasks), enqueuing jobs for them when they're due.
191190
- The _supervisor_ runs workers and dispatchers according to the configuration, controls their heartbeats, and stops and starts them when needed.
192191

193-
Solid Queue's supervisor will fork a separate process for each supervised worker/dispatcher/scheduler.
192+
### Fork vs. async mode
193+
194+
By default, Solid Queue runs in `fork` mode. This means the supervisor will fork a separate process for each supervised worker/dispatcher/scheduler. This provides the best isolation and performance, but can have additional memory usage and might not work with some Ruby implementations. As an alternative, you can run all workers, dispatchers and schedulers in the same process as the supervisor, in different threads, with an `async` mode. You can choose this mode by running `bin/jobs` as:
195+
196+
```
197+
bin/jobs --mode async
198+
```
199+
200+
Or you can also set the environment variable `SOLID_QUEUE_SUPERVISOR_MODE` to `async`. If you use the `async` mode, the `processes` option in the configuration described below will be ignored.
201+
202+
**The recommended and default mode is `fork`. Only use `async` if you know what you're doing and have strong reasons to**
203+
204+
## Configuration
194205

195206
By default, Solid Queue will try to find your configuration under `config/queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG` or by using the `-c/--config_file` option with `bin/jobs`, like this:
196207

@@ -254,7 +265,7 @@ Here's an overview of the different options:
254265

255266
- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
256267
It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker thread uses one connection, and two additional connections are reserved for polling and heartbeat.
257-
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting.
268+
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#fork-vs-async-mode).
258269
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.
259270

260271

@@ -334,7 +345,7 @@ queues: back*
334345

335346
Workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling.
336347

337-
The supervisor is in charge of managing these processes, and it responds to the following signals:
348+
The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode:
338349
- `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit.
339350
- `QUIT`: starts immediate termination. The supervisor will send a `QUIT` signal to its supervised processes, causing them to exit immediately.
340351

@@ -603,6 +614,22 @@ that you set in production only. This is what Rails 8's default Puma config look
603614

604615
**Note**: phased restarts are not supported currently because the plugin requires [app preloading](https://github.com/puma/puma?tab=readme-ov-file#cluster-mode) to work.
605616

617+
### Running as a fork or asynchronously
618+
619+
By default, the Puma plugin will fork additional processes for each worker and dispatcher so that they run in different processes. This provides the best isolation and performance, but can have additional memory usage.
620+
621+
Alternatively, workers and dispatchers can be run within the same Puma process(s). To do so just configure the plugin as:
622+
623+
```ruby
624+
plugin :solid_queue
625+
solid_queue_mode :async
626+
```
627+
628+
Note that in this case, the `processes` configuration option will be ignored. See also [Fork vs. async mode](#fork-vs-async-mode).
629+
630+
**The recommended and default mode is `fork`. Only use `async` if you know what you're doing and have strong reasons to**
631+
632+
606633
## Jobs and transactional integrity
607634
:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed and vice versa, and ensuring that your job won't be enqueued until the transaction within which you're enqueuing it is committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. Because this can be quite tricky and many people shouldn't need to worry about it, by default Solid Queue is configured in a different database as the main app.
608635

lib/puma/plugin/solid_queue.rb

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,87 @@
11
require "puma/plugin"
22

3+
module Puma
4+
class DSL
5+
def solid_queue_mode(mode = :fork)
6+
@options[:solid_queue_mode] = mode.to_sym
7+
end
8+
end
9+
end
10+
311
Puma::Plugin.create do
412
attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor
513

614
def start(launcher)
715
@log_writer = launcher.log_writer
816
@puma_pid = $$
917

10-
in_background do
11-
monitor_solid_queue
18+
if launcher.options[:solid_queue_mode] == :async
19+
start_async(launcher)
20+
else
21+
start_forked(launcher)
1222
end
23+
end
1324

14-
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
15-
launcher.events.on_booted do
16-
@solid_queue_pid = fork do
17-
Thread.new { monitor_puma }
18-
SolidQueue::Supervisor.start
25+
private
26+
def start_forked(launcher)
27+
in_background do
28+
monitor_solid_queue
29+
end
30+
31+
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
32+
launcher.events.on_booted do
33+
@solid_queue_pid = fork do
34+
Thread.new { monitor_puma }
35+
SolidQueue::Supervisor.start(mode: :fork)
36+
end
1937
end
38+
39+
launcher.events.on_stopped { stop_solid_queue_fork }
40+
launcher.events.on_restart { stop_solid_queue_fork }
41+
else
42+
launcher.events.after_booted do
43+
@solid_queue_pid = fork do
44+
Thread.new { monitor_puma }
45+
start_solid_queue(mode: :fork)
46+
end
47+
end
48+
49+
launcher.events.after_stopped { stop_solid_queue_fork }
50+
launcher.events.before_restart { stop_solid_queue_fork }
2051
end
52+
end
2153

22-
launcher.events.on_stopped { stop_solid_queue }
23-
launcher.events.on_restart { stop_solid_queue }
24-
else
25-
launcher.events.after_booted do
26-
@solid_queue_pid = fork do
27-
Thread.new { monitor_puma }
28-
SolidQueue::Supervisor.start
54+
def start_async(launcher)
55+
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
56+
launcher.events.on_booted do
57+
start_solid_queue(mode: :async, standalone: false)
58+
end
59+
60+
launcher.events.on_stopped { solid_queue_supervisor&.stop }
61+
62+
launcher.events.on_restart do
63+
solid_queue_supervisor&.stop
64+
start_solid_queue(mode: :async, standalone: false)
65+
end
66+
else
67+
launcher.events.after_booted do
68+
start_solid_queue(mode: :async, standalone: false)
69+
end
70+
71+
launcher.events.after_stopped { solid_queue_supervisor&.stop }
72+
73+
launcher.events.before_restart do
74+
solid_queue_supervisor&.stop
75+
start_solid_queue(mode: :async, standalone: false)
2976
end
3077
end
78+
end
3179

32-
launcher.events.after_stopped { stop_solid_queue }
33-
launcher.events.before_restart { stop_solid_queue }
80+
def start_solid_queue(**options)
81+
@solid_queue_supervisor = SolidQueue::Supervisor.start(**options)
3482
end
35-
end
3683

37-
private
38-
def stop_solid_queue
84+
def stop_solid_queue_fork
3985
Process.waitpid(solid_queue_pid, Process::WNOHANG)
4086
log "Stopping Solid Queue..."
4187
Process.kill(:INT, solid_queue_pid) if solid_queue_pid
@@ -48,7 +94,7 @@ def monitor_puma
4894
end
4995

5096
def monitor_solid_queue
51-
monitor(:solid_queue_dead?, "Detected Solid Queue has gone away, stopping Puma...")
97+
monitor(:solid_queue_fork_dead?, "Detected Solid Queue has gone away, stopping Puma...")
5298
end
5399

54100
def monitor(process_dead, message)
@@ -62,7 +108,7 @@ def monitor(process_dead, message)
62108
end
63109
end
64110

65-
def solid_queue_dead?
111+
def solid_queue_fork_dead?
66112
if solid_queue_started?
67113
Process.waitpid(solid_queue_pid, Process::WNOHANG)
68114
end

lib/solid_queue/app_executor.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,15 @@ def handle_thread_error(error)
1717
SolidQueue.on_thread_error.call(error)
1818
end
1919
end
20+
21+
def create_thread(&block)
22+
Thread.new do
23+
Thread.current.name = name
24+
block.call
25+
rescue Exception => exception
26+
handle_thread_error(exception)
27+
raise
28+
end
29+
end
2030
end
2131
end
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class AsyncSupervisor < Supervisor
5+
after_shutdown :terminate_gracefully, unless: :standalone?
6+
7+
def stop
8+
super
9+
@thread&.join
10+
end
11+
12+
private
13+
def supervise
14+
if standalone? then super
15+
else
16+
@thread = create_thread { super }
17+
end
18+
end
19+
20+
def check_and_replace_terminated_processes
21+
terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? }
22+
terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) }
23+
end
24+
25+
def replace_thread(thread_id, instance)
26+
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
27+
payload[:thread] = instance
28+
29+
error = Processes::ThreadTerminatedError.new(terminated_instance.name)
30+
release_claimed_jobs_by(terminated_instance, with_error: error)
31+
32+
start_process(configured_processes.delete(thread_id))
33+
end
34+
end
35+
36+
def perform_graceful_termination
37+
process_instances.values.each(&:stop)
38+
39+
Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? })
40+
end
41+
42+
def perform_immediate_termination
43+
exit!
44+
end
45+
46+
def all_processes_terminated?
47+
process_instances.values.none?(&:alive?)
48+
end
49+
end
50+
end

lib/solid_queue/cli.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ class Cli < Thor
88
desc: "Path to config file (default: #{Configuration::DEFAULT_CONFIG_FILE_PATH}).",
99
banner: "SOLID_QUEUE_CONFIG"
1010

11+
class_option :mode, type: :string, default: "fork", enum: %w[ fork async ],
12+
desc: "Whether to fork processes for workers and dispatchers (fork) or to run these in the same process as the supervisor (async) (default: fork).",
13+
banner: "SOLID_QUEUE_SUPERVISOR_MODE"
14+
1115
class_option :recurring_schedule_file, type: :string,
1216
desc: "Path to recurring schedule definition (default: #{Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH}).",
1317
banner: "SOLID_QUEUE_RECURRING_SCHEDULE"

lib/solid_queue/configuration.rb

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ def error_messages
5656
end
5757
end
5858

59+
def mode
60+
@options[:mode].to_s.inquiry
61+
end
62+
63+
def standalone?
64+
mode.fork? || @options[:standalone]
65+
end
66+
5967
private
6068
attr_reader :options
6169

@@ -84,6 +92,8 @@ def ensure_correctly_sized_thread_pool
8492

8593
def default_options
8694
{
95+
mode: ENV["SOLID_QUEUE_SUPERVISOR_MODE"] || :fork,
96+
standalone: true,
8797
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
8898
recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH),
8999
only_work: false,
@@ -110,7 +120,12 @@ def skip_recurring_tasks?
110120

111121
def workers
112122
workers_options.flat_map do |worker_options|
113-
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
123+
processes = if mode.fork?
124+
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
125+
else
126+
1
127+
end
128+
114129
processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
115130
end
116131
end

lib/solid_queue/dispatcher.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
module SolidQueue
44
class Dispatcher < Processes::Poller
55
include LifecycleHooks
6+
67
attr_reader :batch_size
78

89
after_boot :run_start_hooks

0 commit comments

Comments
 (0)