Skip to content

Commit 74e443e

Browse files
committed
Fail fast for async worker config errors
1 parent abbbbb4 commit 74e443e

4 files changed

Lines changed: 54 additions & 11 deletions

File tree

lib/solid_queue/configuration.rb

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ class Configuration
88
validate :ensure_valid_recurring_tasks
99
validate :ensure_correctly_sized_thread_pool
1010
validate :ensure_valid_worker_execution_modes
11+
validate :ensure_async_workers_use_capacity_aliases
12+
validate :ensure_async_workers_have_required_dependency
1113
validate :ensure_async_workers_use_supported_isolation_level
1214

1315
class Process < Struct.new(:kind, :attributes)
@@ -106,6 +108,22 @@ def ensure_valid_worker_execution_modes
106108
end
107109
end
108110

111+
def ensure_async_workers_use_capacity_aliases
112+
workers_options.each do |options|
113+
if async_worker?(options) && options.key?(:threads)
114+
errors.add(:base, "Async workers do not accept `threads`. Use `capacity` or `fibers` instead.")
115+
end
116+
end
117+
end
118+
119+
def ensure_async_workers_have_required_dependency
120+
return unless workers_options.any? { |options| async_worker?(options) }
121+
122+
SolidQueue::ExecutionPools::AsyncPool.ensure_dependency!
123+
rescue LoadError => error
124+
errors.add(:base, error.message)
125+
end
126+
109127
def ensure_async_workers_use_supported_isolation_level
110128
return unless workers_options.any? { |options| async_worker?(options) }
111129

@@ -281,7 +299,9 @@ def async_worker?(options)
281299

282300
def worker_defaults_for(options)
283301
if async_worker?(options)
284-
WORKER_DEFAULTS.except(:threads).merge(capacity: WORKER_DEFAULTS[:threads])
302+
WORKER_DEFAULTS.except(:threads).tap do |defaults|
303+
defaults[:capacity] = WORKER_DEFAULTS[:threads] unless options.key?(:threads)
304+
end
285305
else
286306
WORKER_DEFAULTS
287307
end

lib/solid_queue/execution_pools/async_pool.rb

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ def initialize(level)
2727
end
2828

2929
class << self
30+
def ensure_dependency!
31+
require "async"
32+
require "async/queue"
33+
require "async/semaphore"
34+
rescue LoadError => error
35+
raise MissingDependencyError.new(error)
36+
end
37+
3038
def ensure_supported_isolation_level!
3139
return if supported_isolation_level?
3240

@@ -50,7 +58,7 @@ def initialize(size, on_state_change: nil)
5058
@fatal_error = nil
5159
@boot_queue = Thread::Queue.new
5260

53-
load_dependency!
61+
self.class.ensure_dependency!
5462
self.class.ensure_supported_isolation_level!
5563

5664
@queue = Async::Queue.new
@@ -115,14 +123,6 @@ def name
115123
@name ||= "solid_queue-async-pool-#{object_id}"
116124
end
117125

118-
def load_dependency!
119-
require "async"
120-
require "async/queue"
121-
require "async/semaphore"
122-
rescue LoadError => error
123-
raise MissingDependencyError.new(error)
124-
end
125-
126126
def start_reactor
127127
create_thread do
128128
Async do |task|

test/unit/configuration_test.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,19 @@ class ConfigurationTest < ActiveSupport::TestCase
7777
end
7878
end
7979

80+
test "async workers reject threads in favor of capacity aliases" do
81+
with_execution_isolation(:fiber) do
82+
configuration = SolidQueue::Configuration.new(
83+
workers: [ { queues: "llm*", execution_mode: :async, threads: 10 } ],
84+
dispatchers: [],
85+
skip_recurring: true
86+
)
87+
88+
assert_not configuration.valid?
89+
assert_match /Async workers do not accept `threads`/, configuration.errors.full_messages.first
90+
end
91+
end
92+
8093
test "async worker capacity does not inflate required database pool size" do
8194
with_execution_isolation(:fiber) do
8295
configuration = SolidQueue::Configuration.new(
@@ -201,6 +214,16 @@ class ConfigurationTest < ActiveSupport::TestCase
201214
assert_not configuration.valid?
202215
assert_match /requires fiber-scoped isolated execution state/, configuration.errors.full_messages.first
203216

217+
with_execution_isolation(:fiber) do
218+
load_error = LoadError.new("cannot load such file -- async")
219+
missing_dependency_error = SolidQueue::ExecutionPools::AsyncPool::MissingDependencyError.new(load_error)
220+
SolidQueue::ExecutionPools::AsyncPool.expects(:ensure_dependency!).raises(missing_dependency_error)
221+
222+
configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: [ { execution_mode: :async } ])
223+
assert_not configuration.valid?
224+
assert_match /gem "async"/, configuration.errors.full_messages.first
225+
end
226+
204227
# Not enough DB connections
205228
configuration = SolidQueue::Configuration.new(workers: [ { queues: "background", threads: 50, polling_interval: 10 } ])
206229
assert_not configuration.valid?

test/unit/execution_pools/async_pool_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def perform
1919
def test_raises_a_clear_error_when_the_async_gem_is_unavailable
2020
load_error = LoadError.new("cannot load such file -- async")
2121

22-
SolidQueue::ExecutionPools::AsyncPool.any_instance.expects(:require).with("async").raises(load_error)
22+
SolidQueue::ExecutionPools::AsyncPool.expects(:require).with("async").raises(load_error)
2323

2424
error = assert_raises SolidQueue::ExecutionPools::AsyncPool::MissingDependencyError do
2525
SolidQueue::ExecutionPools::AsyncPool.new(3)

0 commit comments

Comments
 (0)