Skip to content

Commit 6391c2a

Browse files
RUBY-3706 Exponential backoff and jitter in retry (#2998)
1 parent 62f2b6d commit 6391c2a

30 files changed

+6091
-40
lines changed

lib/mongo/client.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class Client
5353
#
5454
# @since 2.1.2
5555
VALID_OPTIONS = [
56+
:adaptive_retries,
5657
:app_name,
5758
:auth_mech,
5859
:auth_mech_properties,
@@ -150,6 +151,11 @@ class Client
150151
# auto-encryption behavior
151152
attr_reader :encrypter
152153

154+
# @return [ Mongo::Retryable::RetryPolicy ] The retry policy for
155+
# backpressure and adaptive retries.
156+
# @api private
157+
attr_reader :retry_policy
158+
153159
# Delegate command and collections execution to the current database.
154160
def_delegators :@database, :command, :collections
155161

@@ -589,6 +595,9 @@ def initialize(addresses_or_uri, options = nil)
589595
end
590596

591597
@connect_lock = Mutex.new
598+
@retry_policy = Retryable::RetryPolicy.new(
599+
adaptive_retries: !!@options[:adaptive_retries]
600+
)
592601
@connect_lock.synchronize do
593602
@cluster = Cluster.new(
594603
addresses,

lib/mongo/collection/view/iterable.rb

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,16 @@ def select_cursor(session)
9292
op = initial_query_op(session)
9393
tracer.trace_operation(op, context) do
9494
if respond_to?(:write?, true) && write?
95-
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
96-
result = send_initial_query(server, context, operation: op)
97-
98-
if use_query_cache?
99-
CachingCursor.new(view, result, server, session: session, context: context)
100-
else
101-
Cursor.new(view, result, server, session: session, context: context)
95+
retry_enabled = collection.client.options[:retry_writes] != false
96+
with_overload_retry(context: context, retry_enabled: retry_enabled) do
97+
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
98+
result = send_initial_query(server, context, operation: op)
99+
100+
if use_query_cache?
101+
CachingCursor.new(view, result, server, session: session, context: context)
102+
else
103+
Cursor.new(view, result, server, session: session, context: context)
104+
end
102105
end
103106
else
104107
read_with_retry_cursor(session, server_selector, view, context: context) do |server|

lib/mongo/cursor.rb

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,18 @@ def get_more
383383
# doing so may result in silent data loss, the driver no longer retries
384384
# getMore operations in any circumstance.
385385
# https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md#qa
386-
process(execute_operation(get_more_operation))
386+
#
387+
# However, overload errors (SystemOverloadedError + RetryableError) are
388+
# retried with exponential backoff since the server never processed
389+
# the request.
390+
with_overload_retry(context: possibly_refreshed_context) do
391+
process(execute_operation(get_more_operation))
392+
end
393+
rescue Error::OperationFailure => e
394+
# When overload retries are exhausted on getMore, close the cursor
395+
# so that killCursors is sent to the server.
396+
close if e.label?('RetryableError') && e.label?('SystemOverloadedError')
397+
raise
387398
end
388399

389400
# @api private

lib/mongo/database.rb

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -241,21 +241,24 @@ def command(operation, opts = {})
241241
selector = ServerSelector.get(txn_read_pref)
242242

243243
client.with_session(opts) do |session|
244-
server = selector.select_server(cluster, nil, session)
244+
context = Operation::Context.new(
245+
client: client,
246+
session: session,
247+
operation_timeouts: operation_timeouts(opts)
248+
)
245249
op = Operation::Command.new(
246250
:selector => operation,
247251
:db_name => name,
248252
:read => selector,
249253
:session => session
250254
)
251255

252-
op.execute(server,
253-
context: Operation::Context.new(
254-
client: client,
255-
session: session,
256-
operation_timeouts: operation_timeouts(opts)
257-
),
258-
options: execution_opts)
256+
retry_enabled = client.options[:retry_reads] != false &&
257+
client.options[:retry_writes] != false
258+
with_overload_retry(context: context, retry_enabled: retry_enabled) do
259+
server = selector.select_server(cluster, nil, session)
260+
op.execute(server, context: context, options: execution_opts)
261+
end
259262
end
260263
end
261264

lib/mongo/index/view.rb

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,11 @@ def create_many(*models)
242242
)
243243
operation = Operation::CreateIndex.new(spec)
244244
tracer.trace_operation(operation, context, op_name: 'createIndexes') do
245-
server = next_primary(nil, session)
246-
operation.execute(server, context: context)
245+
retry_enabled = collection.client.options[:retry_writes] != false
246+
with_overload_retry(context: context, retry_enabled: retry_enabled) do
247+
server = next_primary(nil, session)
248+
operation.execute(server, context: context)
249+
end
247250
end
248251
end
249252
end
@@ -369,8 +372,11 @@ def drop_by_name(name, opts = {})
369372
op = Operation::DropIndex.new(spec)
370373
op_name = name == Index::ALL ? 'dropIndexes' : 'dropIndex'
371374
tracer.trace_operation(op, context, op_name: op_name) do
372-
server = next_primary(nil, session)
373-
op.execute(server, context: context)
375+
retry_enabled = collection.client.options[:retry_writes] != false
376+
with_overload_retry(context: context, retry_enabled: retry_enabled) do
377+
server = next_primary(nil, session)
378+
op.execute(server, context: context)
379+
end
374380
end
375381
end
376382
end

lib/mongo/retryable.rb

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717

18+
require 'mongo/retryable/backpressure'
19+
require 'mongo/retryable/token_bucket'
20+
require 'mongo/retryable/retry_policy'
1821
require 'mongo/retryable/read_worker'
1922
require 'mongo/retryable/write_worker'
2023

@@ -95,5 +98,50 @@ def read_worker
9598
def write_worker
9699
@write_worker ||= WriteWorker.new(self)
97100
end
101+
102+
# Wraps an operation with overload retry logic. On overload errors
103+
# (SystemOverloadedError + RetryableError), retries the block with
104+
# exponential backoff up to MAX_RETRIES times.
105+
#
106+
# The block should include server selection so it is re-done on retry.
107+
# For cursor operations (getMore), the same server is reused since the
108+
# cursor is pinned.
109+
#
110+
# @param [ Operation::Context | nil ] context The operation context
111+
# for CSOT deadline checking.
112+
# @param [ true | false ] retry_enabled Whether overload retries are
113+
# permitted. When false, overload errors are raised immediately
114+
# without retrying (used when retryReads/retryWrites is disabled).
115+
#
116+
# @return [ Object ] The result of the block.
117+
#
118+
# @api private
119+
def with_overload_retry(context: nil, retry_enabled: true)
120+
return yield unless retry_enabled
121+
122+
error_count = 0
123+
loop do
124+
begin
125+
result = yield
126+
client.retry_policy.record_success(is_retry: error_count > 0)
127+
return result
128+
rescue Error::TimeoutError
129+
raise
130+
rescue Error::OperationFailure::Family => e
131+
if e.label?('SystemOverloadedError') && e.label?('RetryableError')
132+
error_count += 1
133+
policy = client.retry_policy
134+
delay = policy.backoff_delay(error_count)
135+
unless policy.should_retry_overload?(error_count, delay, context: context)
136+
raise e
137+
end
138+
Logger.logger.warn("Overload retry due to: #{e.class.name}: #{e.message}")
139+
sleep(delay)
140+
else
141+
raise e
142+
end
143+
end
144+
end
145+
end
98146
end
99147
end
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# frozen_string_literal: true
2+
3+
module Mongo
4+
module Retryable
5+
# Constants and helpers for client backpressure (exponential backoff
6+
# and jitter in retry loops).
7+
#
8+
# @api private
9+
module Backpressure
10+
# Base backoff delay in seconds.
11+
BASE_BACKOFF = 0.1
12+
13+
# Maximum backoff delay in seconds.
14+
MAX_BACKOFF = 10
15+
16+
# Maximum number of retries for overload errors.
17+
MAX_RETRIES = 5
18+
19+
# Rate at which tokens are returned to the bucket on success.
20+
RETRY_TOKEN_RETURN_RATE = 0.1
21+
22+
# Default capacity of the retry token bucket.
23+
DEFAULT_RETRY_TOKEN_CAPACITY = 1000
24+
25+
# Calculate the backoff delay for a given retry attempt.
26+
#
27+
# @param [ Integer ] attempt The retry attempt number (1-indexed).
28+
# @param [ Float ] jitter A random float in [0.0, 1.0). Defaults to
29+
# a random value. Can be injected for deterministic testing.
30+
#
31+
# @return [ Float ] The backoff delay in seconds.
32+
def self.backoff_delay(attempt, jitter: rand)
33+
jitter * [ MAX_BACKOFF, BASE_BACKOFF * (2**(attempt - 1)) ].min
34+
end
35+
end
36+
end
37+
end

lib/mongo/retryable/base_worker.rb

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,35 @@ def log_retry(e, options = nil)
110110
message = (options || {}).fetch(:message, "Retry")
111111
Logger.logger.warn "#{message} due to: #{e.class.name}: #{e.message}"
112112
end
113+
114+
# Returns the retry policy from the client.
115+
#
116+
# @return [ Mongo::Retryable::RetryPolicy ] The retry policy.
117+
def retry_policy
118+
client.retry_policy
119+
end
120+
121+
# Whether the error indicates server overload.
122+
#
123+
# @param [ Exception ] e The error to check.
124+
#
125+
# @return [ true | false ] true if the error has the
126+
# SystemOverloadedError label.
127+
def overload_error?(e)
128+
e.respond_to?(:label?) && e.label?('SystemOverloadedError')
129+
end
130+
131+
# Whether the error is a retryable overload error. An error is
132+
# retryable overload when it has both the SystemOverloadedError and
133+
# RetryableError labels.
134+
#
135+
# @param [ Exception ] e The error to check.
136+
#
137+
# @return [ true | false ] true if the error has both labels.
138+
def retryable_overload_error?(e)
139+
overload_error?(e) &&
140+
e.respond_to?(:label?) && e.label?('RetryableError')
141+
end
113142
end
114143

115144
end

lib/mongo/retryable/read_worker.rb

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,12 +202,19 @@ def modern_read_with_retry(session, server_selector, context, &block)
202202
session,
203203
timeout: context&.remaining_timeout_sec
204204
)
205-
yield server
205+
result = yield server
206+
retry_policy.record_success(is_retry: false)
207+
result
206208
rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e
207209
e.add_notes('modern retry', 'attempt 1')
208210
raise e if session.in_transaction?
209-
raise e if !is_retryable_exception?(e) && !e.write_retryable?
210-
retry_read(e, session, server_selector, context: context, failed_server: server, &block)
211+
212+
if retryable_overload_error?(e)
213+
overload_read_retry(e, session, server_selector, context, server, error_count: 1, &block)
214+
else
215+
raise e if !is_retryable_exception?(e) && !e.write_retryable?
216+
retry_read(e, session, server_selector, context: context, failed_server: server, &block)
217+
end
211218
end
212219

213220
# Attempts to do a "legacy" read with retry. The operation will be
@@ -289,11 +296,16 @@ def retry_read(original_error, session, server_selector, context: nil, failed_se
289296
begin
290297
context&.check_timeout!
291298
attempt = attempt ? attempt + 1 : 2
292-
yield server, true
299+
result = yield server, true
300+
retry_policy.record_success(is_retry: true)
301+
result
293302
rescue Error::TimeoutError
294303
raise
295304
rescue *retryable_exceptions => e
296305
e.add_notes('modern retry', "attempt #{attempt}")
306+
if retryable_overload_error?(e)
307+
return overload_read_retry(e, session, server_selector, context, server, error_count: attempt, &block)
308+
end
297309
if context&.csot?
298310
failed_server = server
299311
retry
@@ -302,6 +314,10 @@ def retry_read(original_error, session, server_selector, context: nil, failed_se
302314
end
303315
rescue Error::OperationFailure::Family, Error::PoolError => e
304316
e.add_note('modern retry')
317+
if retryable_overload_error?(e)
318+
e.add_note("attempt #{attempt}")
319+
return overload_read_retry(e, session, server_selector, context, server, error_count: attempt, &block)
320+
end
305321
if e.write_retryable?
306322
e.add_note("attempt #{attempt}")
307323
if context&.csot?
@@ -321,6 +337,51 @@ def retry_read(original_error, session, server_selector, context: nil, failed_se
321337
end
322338
end
323339

340+
# Retry loop for overload errors with exponential backoff.
341+
# Each retry sleeps with jittered backoff, respects MAX_RETRIES,
342+
# and consumes a token from the bucket when adaptive retries
343+
# are enabled.
344+
def overload_read_retry(last_error, session, server_selector, context, failed_server, error_count:, &block)
345+
loop do
346+
delay = retry_policy.backoff_delay(error_count)
347+
unless retry_policy.should_retry_overload?(error_count, delay, context: context)
348+
raise last_error
349+
end
350+
log_retry(last_error, message: 'Read retry (overload backoff)')
351+
sleep(delay)
352+
353+
begin
354+
server = select_server(
355+
cluster, server_selector, session, failed_server,
356+
error: last_error,
357+
timeout: context&.remaining_timeout_sec
358+
)
359+
rescue Error, Error::AuthError => e
360+
last_error.add_note("later retry failed: #{e.class}: #{e}")
361+
raise last_error
362+
end
363+
364+
begin
365+
context&.check_timeout!
366+
result = yield server, true
367+
retry_policy.record_success(is_retry: true)
368+
return result
369+
rescue Error::TimeoutError
370+
raise
371+
rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e
372+
error_count += 1
373+
e.add_notes('modern retry', "attempt #{error_count}")
374+
is_overload = retryable_overload_error?(e)
375+
unless is_overload || is_retryable_exception?(e) || e.write_retryable?
376+
raise e
377+
end
378+
retry_policy.record_non_overload_retry_failure unless is_overload
379+
failed_server = server
380+
last_error = e
381+
end
382+
end
383+
end
384+
324385
def select_server_for_retry(original_error, session, server_selector, context, failed_server)
325386
select_server(
326387
cluster,

0 commit comments

Comments
 (0)