Skip to content

Commit abbbbb4

Browse files
committed
Gate async workers on fiber isolation
1 parent ae0efbd commit abbbbb4

8 files changed

Lines changed: 247 additions & 90 deletions

File tree

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ Here's an overview of the different options:
278278
Check the sections below on [how queue order behaves combined with priorities](#queue-order-and-priorities), and [how the way you specify the queues per worker might affect performance](#queues-specification-and-performance).
279279

280280
- `execution_mode`: controls how a worker executes claimed jobs. `thread` is the default and uses the existing thread pool behavior. `async` executes jobs as fibers on a single reactor thread. `fiber` is accepted as an alias for `async`.
281+
Async worker execution requires fiber-scoped isolated execution state. In Rails apps, set `config.active_support.isolation_level = :fiber` before using `execution_mode: async`. Solid Queue refuses to boot async workers when isolation remains thread-scoped.
281282
- `threads`: this is the execution capacity for a worker in `thread` mode. It is the max size of the thread pool. By default, this is `3`. Only workers have this setting.
282283
It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker uses connections for polling and heartbeat and thread mode may use additional connections for job execution.
283284
- `capacity`: an alias for worker execution capacity. This is the clearer name when `execution_mode: async`, because it refers to in-flight execution capacity rather than operating system threads.
@@ -377,6 +378,8 @@ By default, workers in Solid Queue use a thread pool to run work in multiple thr
377378

378379
Async worker execution is best suited for cooperative, mostly I/O-bound jobs. Blocking or CPU-heavy work still blocks the single reactor thread, so it should not be expected to outperform thread mode for every workload.
379380

381+
Because async workers run multiple fibers on a single thread, Rails must also isolate execution state per fiber rather than per thread. If your app keeps the default thread-scoped isolation level, Solid Queue will raise a boot-time error instead of running async workers with shared Active Record state.
382+
380383
The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode:
381384
- `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit.
382385
- `QUIT`: starts immediate termination. The supervisor will send a `QUIT` signal to its supervised processes, causing them to exit immediately.

lib/solid_queue/configuration.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class Configuration
88
validate :ensure_valid_recurring_tasks
99
validate :ensure_correctly_sized_thread_pool
1010
validate :ensure_valid_worker_execution_modes
11+
validate :ensure_async_workers_use_supported_isolation_level
1112

1213
class Process < Struct.new(:kind, :attributes)
1314
def instantiate
@@ -105,6 +106,14 @@ def ensure_valid_worker_execution_modes
105106
end
106107
end
107108

109+
def ensure_async_workers_use_supported_isolation_level
110+
return unless workers_options.any? { |options| async_worker?(options) }
111+
112+
SolidQueue::ExecutionPools::AsyncPool.ensure_supported_isolation_level!
113+
rescue ArgumentError => error
114+
errors.add(:base, error.message)
115+
end
116+
108117
def default_options
109118
{
110119
mode: ENV["SOLID_QUEUE_SUPERVISOR_MODE"] || :fork,

lib/solid_queue/execution_pools/async_pool.rb

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,29 @@ def initialize(error)
1515
end
1616
end
1717

18+
class UnsupportedIsolationLevelError < ArgumentError
19+
def initialize(level)
20+
super(
21+
"Async execution mode requires fiber-scoped isolated execution state. " \
22+
"Set `ActiveSupport::IsolatedExecutionState.isolation_level = :fiber` " \
23+
"(or `config.active_support.isolation_level = :fiber` in Rails). " \
24+
"Current isolation level: #{level.inspect}"
25+
)
26+
end
27+
end
28+
29+
class << self
30+
def ensure_supported_isolation_level!
31+
return if supported_isolation_level?
32+
33+
raise UnsupportedIsolationLevelError.new(ActiveSupport::IsolatedExecutionState.isolation_level)
34+
end
35+
36+
def supported_isolation_level?
37+
ActiveSupport::IsolatedExecutionState.isolation_level == :fiber
38+
end
39+
end
40+
1841
attr_reader :size
1942

2043
def initialize(size, on_state_change: nil)
@@ -28,6 +51,7 @@ def initialize(size, on_state_change: nil)
2851
@boot_queue = Thread::Queue.new
2952

3053
load_dependency!
54+
self.class.ensure_supported_isolation_level!
3155

3256
@queue = Async::Queue.new
3357
@reactor_thread = start_reactor

lib/solid_queue/worker.rb

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ def initialize(**options)
1717

1818
# Ensure that the queues array is deep frozen to prevent accidental modification
1919
@queues = Array(options[:queues]).map(&:freeze).freeze
20+
@metadata_state_mutex = Mutex.new
21+
@metadata_dirty = false
2022

2123
@pool_options = {
2224
mode: options[:execution_mode],
2325
size: options[:threads],
24-
on_state_change: -> { wake_up }
26+
on_state_change: -> { mark_metadata_dirty; wake_up }
2527
}
2628

2729
super(**options)
@@ -38,7 +40,7 @@ def poll
3840
pool.post(execution)
3941
end
4042

41-
reload_metadata if executions.any?
43+
reload_metadata_if_needed(executions.any?)
4244

4345
pool.idle? ? polling_interval : 10.minutes
4446
end
@@ -78,5 +80,23 @@ def set_procline
7880
def build_pool
7981
@pool ||= ExecutionPools.build(**@pool_options)
8082
end
83+
84+
def mark_metadata_dirty
85+
metadata_state_mutex.synchronize { @metadata_dirty = true }
86+
end
87+
88+
def metadata_state_mutex
89+
@metadata_state_mutex
90+
end
91+
92+
def reload_metadata_if_needed(executions_claimed)
93+
needs_reload = metadata_state_mutex.synchronize do
94+
claimed_or_dirty = executions_claimed || @metadata_dirty
95+
@metadata_dirty = false
96+
claimed_or_dirty
97+
end
98+
99+
reload_metadata if needs_reload
100+
end
81101
end
82102
end

test/test_helper.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,19 @@ def write(...)
2929
Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions)
3030
class ExpectedTestError < RuntimeError; end
3131

32+
module ExecutionIsolationTestHelper
33+
def with_execution_isolation(level)
34+
previous_level = ActiveSupport::IsolatedExecutionState.isolation_level
35+
ActiveSupport::IsolatedExecutionState.isolation_level = level
36+
yield
37+
ensure
38+
ActiveSupport::IsolatedExecutionState.isolation_level = previous_level
39+
end
40+
end
41+
42+
class Minitest::Test
43+
include ExecutionIsolationTestHelper
44+
end
3245

3346
class ActiveSupport::TestCase
3447
include ConfigurationTestHelper, ProcessesTestHelper, JobsTestHelper

test/unit/configuration_test.rb

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,27 +62,31 @@ class ConfigurationTest < ActiveSupport::TestCase
6262
end
6363

6464
test "normalize worker execution modes and capacity aliases" do
65-
configuration = SolidQueue::Configuration.new(
66-
workers: [
67-
{ queues: "llm*", execution_mode: :async, capacity: 10 },
68-
{ queues: "*", execution_mode: :fiber, fibers: 3 }
69-
],
70-
dispatchers: [],
71-
skip_recurring: true
72-
)
65+
with_execution_isolation(:fiber) do
66+
configuration = SolidQueue::Configuration.new(
67+
workers: [
68+
{ queues: "llm*", execution_mode: :async, capacity: 10 },
69+
{ queues: "*", execution_mode: :fiber, fibers: 3 }
70+
],
71+
dispatchers: [],
72+
skip_recurring: true
73+
)
7374

74-
assert configuration.valid?
75-
assert_processes configuration, :worker, 2, execution_mode: [ :async, :async ], capacity: [ 10, 3 ], threads: [ nil, nil ]
75+
assert configuration.valid?
76+
assert_processes configuration, :worker, 2, execution_mode: [ :async, :async ], capacity: [ 10, 3 ], threads: [ nil, nil ]
77+
end
7678
end
7779

7880
test "async worker capacity does not inflate required database pool size" do
79-
configuration = SolidQueue::Configuration.new(
80-
workers: [ { queues: "llm*", execution_mode: :async, capacity: 1000 } ],
81-
dispatchers: [],
82-
skip_recurring: true
83-
)
81+
with_execution_isolation(:fiber) do
82+
configuration = SolidQueue::Configuration.new(
83+
workers: [ { queues: "llm*", execution_mode: :async, capacity: 1000 } ],
84+
dispatchers: [],
85+
skip_recurring: true
86+
)
8487

85-
assert configuration.valid?
88+
assert configuration.valid?
89+
end
8690
end
8791

8892
test "mulitple workers with the same configuration" do
@@ -193,6 +197,10 @@ class ConfigurationTest < ActiveSupport::TestCase
193197
assert_not configuration.valid?
194198
assert_match /Unknown execution mode/, configuration.errors.full_messages.first
195199

200+
configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [ { execution_mode: :async } ])
201+
assert_not configuration.valid?
202+
assert_match /requires fiber-scoped isolated execution state/, configuration.errors.full_messages.first
203+
196204
# Not enough DB connections
197205
configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ])
198206
assert_not configuration.valid?

test/unit/execution_pools/async_pool_test.rb

Lines changed: 58 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,57 +36,74 @@ def test_build_treats_fiber_as_an_alias_for_async
3636
assert_equal pool, SolidQueue::ExecutionPools.build(mode: :fiber, size: 5)
3737
end
3838

39-
def test_executes_jobs_as_fibers_on_a_single_reactor_thread
40-
pool = SolidQueue::ExecutionPools::AsyncPool.new(2)
41-
results = Thread::Queue.new
42-
43-
pool.post Execution.new(nil, results, 0.05)
44-
pool.post Execution.new(nil, results, 0.05)
39+
def test_raises_a_clear_error_when_isolation_level_is_not_fiber
40+
error = assert_raises SolidQueue::ExecutionPools::AsyncPool::UnsupportedIsolationLevelError do
41+
SolidQueue::ExecutionPools::AsyncPool.new(3)
42+
end
4543

46-
entries = 2.times.map { Timeout.timeout(1.second) { results.pop } }
44+
assert_match /isolation_level = :fiber/, error.message
45+
end
4746

48-
assert_equal 1, entries.map(&:first).uniq.count
49-
assert_equal 2, entries.map(&:last).uniq.count
50-
assert_equal 2, pool.available_capacity
51-
assert_equal 0, pool.metadata[:inflight]
52-
ensure
53-
pool&.shutdown
54-
pool&.wait_for_termination(1.second)
47+
def test_executes_jobs_as_fibers_on_a_single_reactor_thread
48+
with_execution_isolation(:fiber) do
49+
pool = SolidQueue::ExecutionPools::AsyncPool.new(2)
50+
results = Thread::Queue.new
51+
52+
pool.post Execution.new(nil, results, 0.05)
53+
pool.post Execution.new(nil, results, 0.05)
54+
55+
entries = 2.times.map { Timeout.timeout(1.second) { results.pop } }
56+
57+
assert_equal 1, entries.map(&:first).uniq.count
58+
assert_equal 2, entries.map(&:last).uniq.count
59+
assert_equal 2, pool.available_capacity
60+
assert_equal 0, pool.metadata[:inflight]
61+
ensure
62+
pool&.shutdown
63+
pool&.wait_for_termination(1.second)
64+
end
5565
end
5666

5767
def test_waits_for_in_flight_executions_during_shutdown
58-
pool = SolidQueue::ExecutionPools::AsyncPool.new(1)
59-
started = Thread::Queue.new
68+
with_execution_isolation(:fiber) do
69+
pool = SolidQueue::ExecutionPools::AsyncPool.new(1)
70+
started = Thread::Queue.new
6071

61-
pool.post Execution.new(started, nil, 0.1)
62-
Timeout.timeout(1.second) { started.pop }
72+
pool.post Execution.new(started, nil, 0.1)
73+
Timeout.timeout(1.second) { started.pop }
6374

64-
pool.shutdown
75+
pool.shutdown
6576

66-
assert_nil pool.wait_for_termination(0.01)
67-
assert pool.wait_for_termination(1.second)
77+
assert_nil pool.wait_for_termination(0.01)
78+
assert pool.wait_for_termination(1.second)
79+
ensure
80+
pool&.shutdown
81+
pool&.wait_for_termination(1.second)
82+
end
6883
end
6984

7085
def test_marks_the_pool_as_fatal_when_an_execution_is_cancelled
71-
notifications = Thread::Queue.new
72-
started = Thread::Queue.new
73-
reported_errors = []
74-
original_on_thread_error = SolidQueue.on_thread_error
75-
SolidQueue.on_thread_error = ->(error) { reported_errors << error.class.name }
76-
77-
pool = SolidQueue::ExecutionPools::AsyncPool.new(1, on_state_change: -> { notifications << :changed })
78-
79-
pool.post CancelledExecution.new(started)
80-
Timeout.timeout(1.second) { started.pop }
81-
Timeout.timeout(1.second) { notifications.pop }
82-
83-
error = assert_raises(Async::Cancel) { pool.available_capacity }
84-
assert_equal "Task was cancelled", error.message
85-
assert_equal [ "Async::Cancel" ], reported_errors
86-
assert_raises(Async::Cancel) { pool.metadata }
87-
ensure
88-
SolidQueue.on_thread_error = original_on_thread_error
89-
pool&.shutdown
90-
pool&.wait_for_termination(1.second)
86+
with_execution_isolation(:fiber) do
87+
notifications = Thread::Queue.new
88+
started = Thread::Queue.new
89+
reported_errors = []
90+
original_on_thread_error = SolidQueue.on_thread_error
91+
SolidQueue.on_thread_error = ->(error) { reported_errors << error.class.name }
92+
93+
pool = SolidQueue::ExecutionPools::AsyncPool.new(1, on_state_change: -> { notifications << :changed })
94+
95+
pool.post CancelledExecution.new(started)
96+
Timeout.timeout(1.second) { started.pop }
97+
Timeout.timeout(1.second) { notifications.pop }
98+
99+
error = assert_raises(Async::Cancel) { pool.available_capacity }
100+
assert_equal "Task was cancelled", error.message
101+
assert_equal [ "Async::Cancel" ], reported_errors
102+
assert_raises(Async::Cancel) { pool.metadata }
103+
ensure
104+
SolidQueue.on_thread_error = original_on_thread_error
105+
pool&.shutdown
106+
pool&.wait_for_termination(1.second)
107+
end
91108
end
92109
end

0 commit comments

Comments
 (0)