Skip to content

Commit 94ff46e

Browse files
committed
add flag for batch enqueue delayed
1 parent b88d6a3 commit 94ff46e

11 files changed

Lines changed: 214 additions & 90 deletions

File tree

.github/workflows/ruby.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ jobs:
3838
- "3.5"
3939
- "3.6"
4040
redis-version:
41+
- "~> 3.x"
4142
- "~> 4.x"
4243
- "~> 5.x"
4344
exclude:

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ nbproject
1616
.env
1717
.env.*
1818
/nul
19-
vendor/
19+
vendor/
20+
.vscode

.rubocop.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ Style/DoubleNegation:
1515
Metrics/PerceivedComplexity:
1616
Enabled: false
1717
Metrics/ClassLength:
18-
Max: 110
18+
Max: 130

.rubocop_todo.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Metrics/MethodLength:
3939
# Offense count: 2
4040
# Configuration parameters: CountComments.
4141
Metrics/ModuleLength:
42-
Max: 364
42+
Max: 373
4343

4444
# Offense count: 1
4545
Style/CaseEquality:

README.md

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -129,24 +129,18 @@ Both the Rake task and standalone executable support the following
129129
environment variables:
130130

131131
* `APP_NAME` - Application name used in procline (`$0`) (default empty)
132-
* `BACKGROUND` - [Run in the background](#running-in-the-background) if
133-
non-empty (via `Process.daemon`, if supported) (default `false`)
134-
* `DYNAMIC_SCHEDULE` - Enables [dynamic scheduling](#dynamic-schedules)
135-
if non-empty (default `false`)
136-
* `RAILS_ENV` - Environment to use in procline (`$0`) (default empty)
137-
* `INITIALIZER_PATH` - Path to a Ruby file that will be loaded *before*
138-
requiring `resque` and `resque/scheduler` (default empty).
139-
* `RESQUE_SCHEDULER_INTERVAL` - Interval in seconds for checking if a
140-
scheduled job must run (coerced with `Kernel#Float()`) (default `5`)
132+
* `BACKGROUND` - [Run in the background](#running-in-the-background) if non-empty (via `Process.daemon`, if supported) (default `false`)
133+
* `DELAYED_REQUEUE_BATCH_SIZE` - Set the delayed job batch size if enabled (default `100`). If `<= 1`, this disables batching.
134+
* `DISABLE_DELAYED_REQUEUE_BATCH` - Disable batched delayed job queuing (default `false`) - [See section below on consequences](#batched-delayed-job-and-resque-enqueue-hooks)
135+
* `DYNAMIC_SCHEDULE` - Enables [dynamic scheduling](#dynamic-schedules) if non-empty (default `false`)
136+
* `INITIALIZER_PATH` - Path to a Ruby file that will be loaded *before* requiring `resque` and `resque/scheduler` (default empty).
141137
* `LOGFILE` - Log file name (default empty, meaning `$stdout`)
142-
* `LOGFORMAT` - Log output format to use (either `'text'`, `'json'` or `'logfmt'`,
143-
default `'text'`)
138+
* `LOGFORMAT` - Log output format to use (either `'text'`, `'json'` or `'logfmt'`, default `'text'`)
144139
* `PIDFILE` - If non-empty, write process PID to file (default empty)
145-
* `QUIET` - Silence most output if non-empty (equivalent to a level of
146-
`MonoLogger::FATAL`, default `false`)
147-
* `VERBOSE` - Maximize log verbosity if non-empty (equivalent to a level
148-
of `MonoLogger::DEBUG`, default `false`)
149-
140+
* `QUIET` - Silence most output if non-empty (equivalent to a level of `MonoLogger::FATAL`, default `false`)
141+
* `RAILS_ENV` - Environment to use in procline (`$0`) (default empty)
142+
* `RESQUE_SCHEDULER_INTERVAL` - Interval in seconds for checking if a scheduled job must run (coerced with `Kernel#Float()`) (default `5`)
143+
* `VERBOSE` - Maximize log verbosity if non-empty (equivalent to a level of `MonoLogger::DEBUG`, default `false`)
150144

151145
### Resque Pool integration
152146

@@ -755,6 +749,24 @@ This table explains the version requirements for rufus-scheduler
755749
| `~> 4.0` | `~> 3.0` |
756750
| `< 4.0` | `~> 2.0` |
757751

752+
##### Batched delayed job and resque enqueue hooks
753+
754+
Batching delayed job queuing can speed up when per-second job counts grows,
755+
avoiding situations that may cause delayed enqueues to fall behind. This
756+
batching wraps enqueues in a `multi` pipeline, making far fewer roundtrips to
757+
the server.
758+
759+
However, in `redis` gem `>= 4.0`, any operations to redis within the `multi`
760+
block must use the multi handle so that the actions are captured. Resque's hooks
761+
do not currently have a way to pass this around, and so compatibility with other
762+
resque plugins or hooks which access redis at enqueue time is impacted with
763+
batch mode. In these cases, you should consider disabling the batching by setting
764+
the `DISABLE_DELAYED_REQUEUE_BATCH` environment variable to `true`.
765+
766+
Detecting when this occurs can be tricky, you must watch for logs
767+
emitted by your `resque-scheduler` process such as `Redis::CommandError: ERR
768+
MULTI calls can not be nested` or `NoMethodError: undefined method nil? for
769+
<Redis::Future`, and delayed jobs you expect would not be enqueued.
758770

759771
### Contributing
760772

lib/resque/scheduler.rb

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -204,25 +204,36 @@ def enqueue_next_item(timestamp)
204204
item
205205
end
206206

207+
def batch_delayed_items?
208+
!disable_delayed_requeue_batches && delayed_requeue_batch_size > 1
209+
end
210+
207211
# Enqueues all delayed jobs for a timestamp
208212
def enqueue_delayed_items_for_timestamp(timestamp)
209213
count = 0
210-
batch_size = delayed_requeue_batch_size
211-
actual_batch_size = nil
214+
batch_size = batch_delayed_items? ? delayed_requeue_batch_size : 1
212215

213-
log "Processing delayed items for timestamp #{timestamp}, in batches of #{batch_size}"
216+
message = "Processing delayed items for timestamp #{timestamp}"
217+
message += ", in batches of #{batch_size}" if batch_delayed_items?
218+
log message
214219

215220
loop do
221+
actual_batch_size = 0
222+
216223
handle_shutdown do
217224
# Continually check that it is still the master
218225
if am_master
219-
actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp,
220-
batch_size)
226+
if batch_delayed_items?
227+
actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp, batch_size)
228+
log "queued batch of #{actual_batch_size} jobs" if actual_batch_size != -1
229+
else
230+
item = enqueue_next_item(timestamp)
231+
actual_batch_size = item.nil? ? 0 : 1
232+
end
221233
end
222234
end
223235

224236
count += actual_batch_size
225-
log "queued #{count} jobs" if actual_batch_size != -1
226237

227238
# continue processing until there are no more items in this
228239
# timestamp. If we don't have a full batch, this is the last one.
@@ -231,7 +242,7 @@ def enqueue_delayed_items_for_timestamp(timestamp)
231242
break if actual_batch_size < batch_size
232243
end
233244

234-
log "finished queueing #{count} total jobs for timestamp #{timestamp}" if count != -1
245+
log "finished queueing #{count} total jobs for timestamp #{timestamp}"
235246
end
236247

237248
def timestamp_key(timestamp)

lib/resque/scheduler/cli.rb

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ module Scheduler
1616
pidfile: 'PIDFILE',
1717
poll_sleep_amount: 'RESQUE_SCHEDULER_INTERVAL',
1818
verbose: 'VERBOSE',
19-
lock_timeout: 'LOCK_TIMEOUT'
19+
lock_timeout: 'LOCK_TIMEOUT',
20+
delayed_requeue_batch_size: 'DELAYED_REQUEUE_BATCH_SIZE',
21+
disable_delayed_requeue_batches: 'DISABLE_DELAYED_REQUEUE_BATCHES'
2022
}.freeze
2123

2224
class Cli
@@ -74,6 +76,24 @@ class Cli
7476
{
7577
args: ['-v', '--verbose', 'Run with verbose output [VERBOSE]'],
7678
callback: ->(options) { ->(v) { options[:verbose] = v } }
79+
},
80+
{
81+
args: ['--lock-timeout [LOCK_TIMEOUT]', 'Lock timeout'],
82+
callback: ->(options) { ->(t) { options[:lock_timeout] = t } }
83+
},
84+
{
85+
args: [
86+
'--delayed-requeue-batch-size [DELAYED_REQUEUE_BATCH_SIZE]',
87+
'Delayed requeue batch size'
88+
],
89+
callback: ->(options) { ->(d) { options[:delayed_requeue_batch_size] = d } }
90+
},
91+
{
92+
args: [
93+
'--disable-delayed-requeue-batches [DISABLE_DELAYED_REQUEUE_BATCHES]',
94+
'Disable delayed requeue batches'
95+
],
96+
callback: ->(options) { ->(d) { options[:disable_delayed_requeue_batches] = d } }
7797
}
7898
].freeze
7999

lib/resque/scheduler/configuration.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,21 @@ def app_name
6565
@app_name ||= environment['APP_NAME']
6666
end
6767

68+
attr_writer :delayed_requeue_batch_size
69+
6870
def delayed_requeue_batch_size
6971
@delayed_requeue_batch_size ||= \
7072
ENV['DELAYED_REQUEUE_BATCH_SIZE'].to_i if environment['DELAYED_REQUEUE_BATCH_SIZE']
7173
@delayed_requeue_batch_size ||= 100
7274
end
7375

76+
attr_writer :disable_delayed_requeue_batches
77+
78+
def disable_delayed_requeue_batches
79+
@disable_delayed_requeue_batches ||= \
80+
to_bool(environment['DISABLE_DELAYED_REQUEUE_BATCH'])
81+
end
82+
7483
# Amount of time in seconds to sleep between polls of the delayed
7584
# queue. Defaults to 5
7685
attr_writer :poll_sleep_amount

lib/resque/scheduler/env.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,21 @@ def setup_scheduler_configuration
6767

6868
c.logformat = options[:logformat] if options.key?(:logformat)
6969

70-
c.lock_timeout = options[:lock_timeout] if options.key?(:lock_timeout)
70+
c.lock_timeout = options[:lock_timeout].to_i if options.key?(:lock_timeout)
7171

7272
if (psleep = options[:poll_sleep_amount]) && !psleep.nil?
7373
c.poll_sleep_amount = Float(psleep)
7474
end
7575

7676
c.verbose = !!options[:verbose] if options.key?(:verbose)
77+
78+
if options.key?(:delayed_requeue_batch_size)
79+
c.delayed_requeue_batch_size = options[:delayed_requeue_batch_size].to_i
80+
end
81+
82+
if options.key?(:disable_delayed_requeue_batches)
83+
c.disable_delayed_requeue_batches = !!options[:disable_delayed_requeue_batches]
84+
end
7785
end
7886
end
7987
# rubocop:enable Metrics/AbcSize

0 commit comments

Comments
 (0)