Skip to content

Commit 9a4a085

Browse files
RUBY-3794 Finalize client backpressure (mongodb#3018)
1 parent 55b5b7f commit 9a4a085

30 files changed

+362
-1236
lines changed

lib/mongo/client.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class Client
3434
:write, :write_concern,
3535
:retry_reads, :max_read_retries, :read_retry_interval,
3636
:retry_writes, :max_write_retries,
37+
:max_adaptive_retries, :enable_overload_retargeting,
3738
:timeout_ms,
3839

3940
# Options which cannot currently be here:
@@ -52,7 +53,6 @@ class Client
5253
#
5354
# @since 2.1.2
5455
VALID_OPTIONS = %i[
55-
adaptive_retries
5656
app_name
5757
auth_mech
5858
auth_mech_properties
@@ -62,6 +62,7 @@ class Client
6262
cleanup
6363
compressors
6464
direct_connection
65+
enable_overload_retargeting
6566
connect
6667
connect_timeout
6768
database
@@ -71,6 +72,7 @@ class Client
7172
local_threshold
7273
logger
7374
log_prefix
75+
max_adaptive_retries
7476
max_connecting
7577
max_idle_time
7678
max_pool_size
@@ -587,7 +589,7 @@ def initialize(addresses_or_uri, options = nil)
587589

588590
@connect_lock = Mutex.new
589591
@retry_policy = Retryable::RetryPolicy.new(
590-
adaptive_retries: !!@options[:adaptive_retries]
592+
max_retries: @options[:max_adaptive_retries] || Retryable::Backpressure::DEFAULT_MAX_RETRIES
591593
)
592594
@connect_lock.synchronize do
593595
@cluster = Cluster.new(

lib/mongo/retryable.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
# limitations under the License.
1616

1717
require 'mongo/retryable/backpressure'
18-
require 'mongo/retryable/token_bucket'
1918
require 'mongo/retryable/retry_policy'
2019
require 'mongo/retryable/read_worker'
2120
require 'mongo/retryable/write_worker'
@@ -67,10 +66,11 @@ def select_server(cluster, server_selector, session, failed_server = nil, error:
6766
# Whether the failed server should be deprioritized during server
6867
# selection for a retry attempt. For sharded and load-balanced
6968
# topologies, servers are always deprioritized on any retryable error.
70-
# For replica sets, servers are only deprioritized when the error
71-
# carries the SystemOverloadedError label.
69+
# For replica sets, servers are deprioritized on overload errors only
70+
# when enableOverloadRetargeting is enabled.
7271
def deprioritize_server?(cluster, error)
7372
return true if cluster.sharded? || cluster.load_balanced?
73+
return false unless client.options[:enable_overload_retargeting]
7474

7575
error.respond_to?(:label?) && error.label?('SystemOverloadedError')
7676
end
@@ -120,7 +120,7 @@ def with_overload_retry(context: nil, retry_enabled: true)
120120
error_count = 0
121121
loop do
122122
result = yield
123-
client.retry_policy.record_success(is_retry: error_count > 0)
123+
124124
return result
125125
rescue Error::TimeoutError
126126
raise

lib/mongo/retryable/backpressure.rb

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,8 @@ module Backpressure
1313
# Maximum backoff delay in seconds.
1414
MAX_BACKOFF = 10
1515

16-
# Maximum number of retries for overload errors.
17-
MAX_RETRIES = 5
18-
19-
# Rate at which tokens are returned to the bucket on success.
20-
RETRY_TOKEN_RETURN_RATE = 0.1
21-
22-
# Default capacity of the retry token bucket.
23-
DEFAULT_RETRY_TOKEN_CAPACITY = 1000
16+
# Default maximum number of retries for overload errors.
17+
DEFAULT_MAX_RETRIES = 2
2418

2519
# Calculate the backoff delay for a given retry attempt.
2620
#

lib/mongo/retryable/read_worker.rb

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,7 @@ def modern_read_with_retry(session, server_selector, context, &block)
200200
session,
201201
timeout: context&.remaining_timeout_sec
202202
)
203-
result = yield server
204-
retry_policy.record_success(is_retry: false)
205-
result
203+
yield server
206204
rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e
207205
e.add_notes('modern retry', 'attempt 1')
208206
raise e if session.in_transaction? && !retryable_overload_error?(e)
@@ -295,9 +293,7 @@ def retry_read(original_error, session, server_selector, context: nil, failed_se
295293
begin
296294
context&.check_timeout!
297295
attempt = attempt ? attempt + 1 : 2
298-
result = yield server, true
299-
retry_policy.record_success(is_retry: true)
300-
result
296+
yield server, true
301297
rescue Error::TimeoutError
302298
raise
303299
rescue *retryable_exceptions => e
@@ -358,7 +354,6 @@ def overload_read_retry(last_error, session, server_selector, context, failed_se
358354
begin
359355
context&.check_timeout!
360356
result = yield server, true
361-
retry_policy.record_success(is_retry: true)
362357
return result
363358
rescue Error::TimeoutError
364359
raise
@@ -368,7 +363,6 @@ def overload_read_retry(last_error, session, server_selector, context, failed_se
368363
is_overload = retryable_overload_error?(e)
369364
raise e unless is_overload || is_retryable_exception?(e) || e.write_retryable?
370365

371-
retry_policy.record_non_overload_retry_failure unless is_overload
372366
failed_server = server
373367
last_error = e
374368
end

lib/mongo/retryable/retry_policy.rb

Lines changed: 11 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,25 @@
22

33
module Mongo
44
module Retryable
5-
# Encapsulates the retry policy for client backpressure, combining
6-
# exponential backoff with jitter and an optional token bucket for
7-
# adaptive retries.
5+
# Encapsulates the retry policy for client backpressure with
6+
# exponential backoff and jitter.
87
#
98
# One instance is created per Client and shared across all operations
109
# on that client.
1110
#
1211
# @api private
1312
class RetryPolicy
13+
# @return [ Integer ] The maximum number of overload retries.
14+
attr_reader :max_retries
15+
1416
# Create a new retry policy.
1517
#
16-
# @param [ true | false ] adaptive_retries Whether the adaptive
17-
# retry token bucket is enabled.
18-
def initialize(adaptive_retries: false)
19-
@token_bucket = adaptive_retries ? TokenBucket.new : nil
18+
# @param [ Integer ] max_retries The maximum number of overload
19+
# retry attempts. Defaults to Backpressure::DEFAULT_MAX_RETRIES.
20+
def initialize(max_retries: Backpressure::DEFAULT_MAX_RETRIES)
21+
@max_retries = max_retries
2022
end
2123

22-
# @return [ TokenBucket | nil ] The token bucket, if adaptive
23-
# retries are enabled.
24-
attr_reader :token_bucket
25-
2624
# Calculate the backoff delay for a given retry attempt.
2725
#
2826
# @param [ Integer ] attempt The retry attempt number (1-indexed).
@@ -35,46 +33,21 @@ def backoff_delay(attempt, jitter: rand)
3533

3634
# Determine whether an overload retry should be attempted.
3735
#
38-
# Checks that the attempt number does not exceed MAX_RETRIES,
39-
# that the backoff delay would not exceed the CSOT deadline (if set),
40-
# and that a token is available (if adaptive retries are enabled).
41-
#
4236
# @param [ Integer ] attempt The retry attempt number (1-indexed).
4337
# @param [ Float ] delay The backoff delay in seconds.
44-
# @param [ Mongo::CsotTimeoutHolder | nil ] context The operation
38+
# @param [ Mongo::Operation::Context | nil ] context The operation
4539
# context (for CSOT deadline checking).
4640
#
4741
# @return [ true | false ] Whether the retry should proceed.
4842
def should_retry_overload?(attempt, delay, context: nil)
49-
return false if attempt > Backpressure::MAX_RETRIES
43+
return false if attempt > @max_retries
5044
return false if exceeds_deadline?(delay, context)
51-
return false if @token_bucket && !@token_bucket.consume(1)
5245

5346
true
5447
end
5548

56-
# Record a successful operation by depositing tokens into the
57-
# bucket.
58-
#
59-
# @param [ true | false ] is_retry Whether the success came from
60-
# a retried attempt.
61-
def record_success(is_retry:)
62-
return unless @token_bucket
63-
64-
tokens = Backpressure::RETRY_TOKEN_RETURN_RATE
65-
tokens += 1 if is_retry
66-
@token_bucket.deposit(tokens)
67-
end
68-
69-
# Record a non-overload failure during a retry attempt by
70-
# depositing 1 token.
71-
def record_non_overload_retry_failure
72-
@token_bucket&.deposit(1)
73-
end
74-
7549
private
7650

77-
# Check whether the backoff delay would exceed the CSOT deadline.
7851
def exceeds_deadline?(delay, context)
7952
return false unless context&.csot?
8053

lib/mongo/retryable/token_bucket.rb

Lines changed: 0 additions & 59 deletions
This file was deleted.

lib/mongo/retryable/write_worker.rb

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,9 @@ def nro_write_with_retry(_write_concern, context:, &block)
111111
error_count = 0
112112
error_to_raise = nil
113113
begin
114-
result = server.with_connection(connection_global_id: context.connection_global_id) do |connection|
114+
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
115115
yield connection, nil, context
116116
end
117-
retry_policy.record_success(is_retry: error_count > 0) if error_count > 0
118-
result
119117
rescue Error::TimeoutError
120118
raise
121119
rescue *retryable_exceptions, Error::PoolError, Error::OperationFailure::Family => e
@@ -254,7 +252,7 @@ def modern_write_with_retry(session, server, context, &block)
254252
connection_succeeded = false
255253
was_starting = false
256254

257-
result = server.with_connection(
255+
server.with_connection(
258256
connection_global_id: context.connection_global_id,
259257
context: context
260258
) do |connection|
@@ -268,8 +266,6 @@ def modern_write_with_retry(session, server, context, &block)
268266
# it later for the retry as well.
269267
yield connection, txn_num, context.dup
270268
end
271-
retry_policy.record_success(is_retry: false)
272-
result
273269
rescue *retryable_exceptions, Error::PoolError, Auth::Unauthorized, Error::OperationFailure::Family => e
274270
e.add_notes('modern retry', 'attempt 1')
275271

@@ -344,11 +340,9 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
344340

345341
attempt = attempt ? attempt + 1 : 2
346342
log_retry(original_error, message: 'Write retry')
347-
result = server.with_connection(connection_global_id: context.connection_global_id) do |connection|
343+
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
348344
yield(connection, txn_num, context)
349345
end
350-
retry_policy.record_success(is_retry: true)
351-
result
352346
rescue *retryable_exceptions, Error::PoolError => e
353347
if retryable_overload_error?(e)
354348
e.add_notes('modern retry', "attempt #{attempt}")
@@ -414,7 +408,6 @@ def overload_write_retry(last_error, session, txn_num, context:, failed_server:,
414408
result = server.with_connection(connection_global_id: context.connection_global_id) do |connection|
415409
yield connection, txn_num, context
416410
end
417-
retry_policy.record_success(is_retry: true)
418411
return result
419412
rescue Error::TimeoutError
420413
raise
@@ -430,7 +423,6 @@ def overload_write_retry(last_error, session, txn_num, context:, failed_server:,
430423
unless e.respond_to?(:label?) && e.label?('NoWritesPerformed')
431424
error_to_raise = e
432425
end
433-
retry_policy.record_non_overload_retry_failure unless is_overload
434426
context = context.with(overload_only_retry: false) unless is_overload
435427
failed_server = server
436428
last_error = e

lib/mongo/session.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ def with_transaction(options = nil)
461461
overload_error_count = 0
462462
overload_encountered = false
463463

464-
loop do # rubocop:disable Metrics/BlockLength
464+
loop do
465465
if transaction_attempt > 0
466466
if overload_encountered
467467
delay = @client.retry_policy.backoff_delay(overload_error_count)
@@ -512,7 +512,6 @@ def with_transaction(options = nil)
512512
overload_error_count += 1
513513
elsif overload_encountered
514514
overload_error_count += 1
515-
@client.retry_policy.record_non_overload_retry_failure
516515
end
517516
next
518517
end
@@ -553,7 +552,6 @@ def with_transaction(options = nil)
553552
overload_error_count += 1
554553
elsif overload_encountered
555554
overload_error_count += 1
556-
@client.retry_policy.record_non_overload_retry_failure
557555
end
558556

559557
if overload_encountered
@@ -590,7 +588,6 @@ def with_transaction(options = nil)
590588
overload_error_count += 1
591589
elsif overload_encountered
592590
overload_error_count += 1
593-
@client.retry_policy.record_non_overload_retry_failure
594591
end
595592
@state = NO_TRANSACTION_STATE
596593
next

lib/mongo/uri/options_mapper.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,8 @@ def merge_uri_option(target, value, name)
307307
uri_option 'readConcernLevel', :level, group: :read_concern, type: :symbol
308308
uri_option 'retryReads', :retry_reads, type: :bool
309309
uri_option 'retryWrites', :retry_writes, type: :bool
310-
uri_option 'adaptiveRetries', :adaptive_retries, type: :bool
310+
uri_option 'enableOverloadRetargeting', :enable_overload_retargeting, type: :bool
311+
uri_option 'maxAdaptiveRetries', :max_adaptive_retries, type: :integer
311312
uri_option 'zlibCompressionLevel', :zlib_compression_level, type: :zlib_compression_level
312313

313314
# Monitoring Options

spec/integration/retryable_reads_errors_spec.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,8 @@
297297
let(:client) do
298298
authorized_client.with(
299299
retry_reads: true,
300-
read: { mode: :primary_preferred }
300+
read: { mode: :primary_preferred },
301+
enable_overload_retargeting: true
301302
)
302303
end
303304

0 commit comments

Comments
 (0)