Skip to content

Commit 9834e7e

Browse files
committed
Simplify code for how callbacks are serialized/deserialized
* Raise specified error (AlreadyFinished) if the batch is attempting to run after being finished * Naming improvements - remove bang from check_completion (since there is no companion method). Change limit(1).exists to simpler `any?`. Make callback method names more consistent * Simplify serialize_callback * perform_completion_job -> enqueue_callback_job, remove unused attrs, and use .enqueue instead of unecessary enqueue_all * We now tell users how to use queue_as to switch the EmptyJob queue and remove the maintenance queue setup
1 parent fcae152 commit 9834e7e

5 files changed

Lines changed: 35 additions & 47 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,7 @@ By default, this jobs run on the `default` queue. You can specify an alternative
657657

658658
```rb
659659
Rails.application.config.after_initialize do # or to_prepare
660-
SolidQueue::Batch.maintenance_queue_name = "my_batch_queue"
660+
SolidQueue::Batch::EmptyJob.queue_as "my_batch_queue"
661661
end
662662
```
663663

app/models/solid_queue/batch.rb

Lines changed: 28 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,27 @@
22

33
module SolidQueue
44
class Batch < Record
5+
class AlreadyFinished < StandardError; end
6+
57
include Trackable
68

79
has_many :jobs
810
has_many :batch_executions, class_name: "SolidQueue::BatchExecution", dependent: :destroy
911

10-
serialize :on_finish, coder: JSON
11-
serialize :on_success, coder: JSON
12-
serialize :on_failure, coder: JSON
1312
serialize :metadata, coder: JSON
13+
%w[ finish success failure ].each do |callback_type|
14+
serialize "on_#{callback_type}", coder: JSON
15+
16+
define_method("on_#{callback_type}=") do |callback|
17+
super serialize_callback(callback)
18+
end
19+
end
1420

1521
after_initialize :set_active_job_batch_id
1622
after_commit :start_batch, on: :create, unless: -> { ActiveRecord.respond_to?(:after_all_transactions_commit) }
1723

18-
mattr_accessor :maintenance_queue_name
19-
self.maintenance_queue_name = "default"
20-
2124
def enqueue(&block)
22-
raise "You cannot enqueue a batch that is already finished" if finished?
25+
raise AlreadyFinished, "You cannot enqueue a batch that is already finished" if finished?
2326

2427
transaction do
2528
save! if new_record?
@@ -36,26 +39,13 @@ def enqueue(&block)
3639
end
3740
end
3841

39-
def on_success=(value)
40-
super(serialize_callback(value))
41-
end
42-
43-
def on_failure=(value)
44-
super(serialize_callback(value))
45-
end
46-
47-
def on_finish=(value)
48-
super(serialize_callback(value))
49-
end
50-
5142
def metadata
5243
(super || {}).with_indifferent_access
5344
end
5445

55-
def check_completion!
56-
return if finished? || !ready?
57-
return if batch_executions.limit(1).exists?
58-
46+
def check_completion
47+
return if finished? || !enqueued?
48+
return if batch_executions.any?
5949
rows = Batch
6050
.where(id: id)
6151
.unfinished
@@ -74,7 +64,7 @@ def check_completion!
7464
finished_attributes[:completed_jobs] = total_jobs - failed
7565

7666
update!(finished_attributes)
77-
execute_callbacks
67+
enqueue_callback_jobs
7868
end
7969
end
8070

@@ -89,36 +79,34 @@ def as_active_job(active_job_klass)
8979
end
9080

9181
def serialize_callback(value)
92-
return value if value.blank?
93-
active_job = as_active_job(value)
94-
# We can pick up batch ids from context, but callbacks should never be considered a part of the batch
95-
active_job.batch_id = nil
96-
active_job.serialize
82+
if value.present?
83+
active_job = value.is_a?(ActiveJob::Base) ? value : value.new
84+
# We can pick up batch ids from context, but callbacks should never be considered a part of the batch
85+
active_job.batch_id = nil
86+
active_job.serialize
87+
end
9788
end
9889

99-
def perform_completion_job(job_field, attrs)
100-
active_job = ActiveJob::Base.deserialize(send(job_field))
90+
def enqueue_callback_job(callback_name)
91+
active_job = ActiveJob::Base.deserialize(send(callback_name))
10192
active_job.send(:deserialize_arguments_if_needed)
10293
active_job.arguments = [ self ] + Array.wrap(active_job.arguments)
103-
SolidQueue::Job.enqueue_all([ active_job ])
104-
105-
active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id
106-
attrs[job_field] = active_job.serialize
94+
active_job.enqueue
10795
end
10896

109-
def execute_callbacks
97+
def enqueue_callback_jobs
11098
if failed_at?
111-
perform_completion_job(:on_failure, {}) if on_failure.present?
99+
enqueue_callback_job(:on_failure) if on_failure.present?
112100
else
113-
perform_completion_job(:on_success, {}) if on_success.present?
101+
enqueue_callback_job(:on_success) if on_success.present?
114102
end
115103

116-
perform_completion_job(:on_finish, {}) if on_finish.present?
104+
enqueue_callback_job(:on_finish) if on_finish.present?
117105
end
118106

119107
def enqueue_empty_job
120108
Batch.wrap_in_batch_context(id) do
121-
EmptyJob.set(queue: self.class.maintenance_queue_name || "default").perform_later
109+
EmptyJob.perform_later
122110
end
123111
end
124112

app/models/solid_queue/batch/trackable.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ module Trackable
2424
def status
2525
if finished?
2626
failed? ? "failed" : "completed"
27-
elsif enqueued_at.present?
28-
"processing"
27+
elsif enqueued?
28+
"enqueued"
2929
else
3030
"pending"
3131
end
@@ -43,7 +43,7 @@ def finished?
4343
finished_at.present?
4444
end
4545

46-
def ready?
46+
def enqueued?
4747
enqueued_at.present?
4848
end
4949

app/models/solid_queue/batch_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class BatchExecution < Record
1010
private
1111
def check_completion
1212
batch = Batch.find_by(id: batch_id)
13-
batch.check_completion! if batch.present?
13+
batch.check_completion if batch.present?
1414
end
1515

1616
class << self

test/integration/batch_lifecycle_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class BatchLifecycleTest < ActiveSupport::TestCase
1010
SolidQueue.on_thread_error = silent_on_thread_error_for([ FailingJobError ], @_on_thread_error)
1111
@worker = SolidQueue::Worker.new(queues: "background", threads: 3)
1212
@dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2)
13-
SolidQueue::Batch.maintenance_queue_name = "background"
13+
SolidQueue::Batch::EmptyJob.queue_as "background"
1414
end
1515

1616
teardown do
@@ -25,7 +25,7 @@ class BatchLifecycleTest < ActiveSupport::TestCase
2525

2626
ApplicationJob.enqueue_after_transaction_commit = false if defined?(ApplicationJob.enqueue_after_transaction_commit)
2727
SolidQueue.preserve_finished_jobs = true
28-
SolidQueue::Batch.maintenance_queue_name = nil
28+
SolidQueue::Batch::EmptyJob.queue_as "default"
2929
end
3030

3131
class BatchOnSuccessJob < ApplicationJob

0 commit comments

Comments
 (0)