Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class Client
#
# @since 2.1.2
VALID_OPTIONS = %i[
adaptive_retries
app_name
auth_mech
auth_mech_properties
Expand All @@ -62,6 +61,7 @@ class Client
cleanup
compressors
direct_connection
enable_overload_retargeting
connect
connect_timeout
database
Expand All @@ -71,6 +71,7 @@ class Client
local_threshold
logger
log_prefix
max_adaptive_retries
max_connecting
Comment thread
comandeo-mongo marked this conversation as resolved.
max_idle_time
max_pool_size
Expand Down Expand Up @@ -587,7 +588,7 @@ def initialize(addresses_or_uri, options = nil)

@connect_lock = Mutex.new
@retry_policy = Retryable::RetryPolicy.new(
adaptive_retries: !!@options[:adaptive_retries]
max_retries: @options[:max_adaptive_retries] || Retryable::Backpressure::DEFAULT_MAX_RETRIES
)
Comment thread
comandeo-mongo marked this conversation as resolved.
@connect_lock.synchronize do
@cluster = Cluster.new(
Expand Down
8 changes: 4 additions & 4 deletions lib/mongo/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.

require 'mongo/retryable/backpressure'
require 'mongo/retryable/token_bucket'
require 'mongo/retryable/retry_policy'
require 'mongo/retryable/read_worker'
require 'mongo/retryable/write_worker'
Expand Down Expand Up @@ -67,10 +66,11 @@ def select_server(cluster, server_selector, session, failed_server = nil, error:
# Whether the failed server should be deprioritized during server
# selection for a retry attempt. For sharded and load-balanced
# topologies, servers are always deprioritized on any retryable error.
# For replica sets, servers are only deprioritized when the error
# carries the SystemOverloadedError label.
# For replica sets, servers are deprioritized on overload errors only
# when enableOverloadRetargeting is enabled.
def deprioritize_server?(cluster, error)
return true if cluster.sharded? || cluster.load_balanced?
return false unless client.options[:enable_overload_retargeting]

error.respond_to?(:label?) && error.label?('SystemOverloadedError')
end
Expand Down Expand Up @@ -120,7 +120,7 @@ def with_overload_retry(context: nil, retry_enabled: true)
error_count = 0
loop do
result = yield
client.retry_policy.record_success(is_retry: error_count > 0)

return result
rescue Error::TimeoutError
raise
Expand Down
10 changes: 2 additions & 8 deletions lib/mongo/retryable/backpressure.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,8 @@ module Backpressure
# Maximum backoff delay in seconds.
MAX_BACKOFF = 10

# Maximum number of retries for overload errors.
MAX_RETRIES = 5

# Rate at which tokens are returned to the bucket on success.
RETRY_TOKEN_RETURN_RATE = 0.1

# Default capacity of the retry token bucket.
DEFAULT_RETRY_TOKEN_CAPACITY = 1000
# Default maximum number of retries for overload errors.
DEFAULT_MAX_RETRIES = 2

Comment thread
comandeo-mongo marked this conversation as resolved.
# Calculate the backoff delay for a given retry attempt.
#
Expand Down
10 changes: 2 additions & 8 deletions lib/mongo/retryable/read_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,7 @@ def modern_read_with_retry(session, server_selector, context, &block)
session,
timeout: context&.remaining_timeout_sec
)
result = yield server
retry_policy.record_success(is_retry: false)
result
yield server
rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e
e.add_notes('modern retry', 'attempt 1')
raise e if session.in_transaction? && !retryable_overload_error?(e)
Expand Down Expand Up @@ -295,9 +293,7 @@ def retry_read(original_error, session, server_selector, context: nil, failed_se
begin
context&.check_timeout!
attempt = attempt ? attempt + 1 : 2
result = yield server, true
retry_policy.record_success(is_retry: true)
result
yield server, true
rescue Error::TimeoutError
raise
rescue *retryable_exceptions => e
Expand Down Expand Up @@ -358,7 +354,6 @@ def overload_read_retry(last_error, session, server_selector, context, failed_se
begin
context&.check_timeout!
result = yield server, true
retry_policy.record_success(is_retry: true)
return result
rescue Error::TimeoutError
raise
Expand All @@ -368,7 +363,6 @@ def overload_read_retry(last_error, session, server_selector, context, failed_se
is_overload = retryable_overload_error?(e)
raise e unless is_overload || is_retryable_exception?(e) || e.write_retryable?

retry_policy.record_non_overload_retry_failure unless is_overload
failed_server = server
last_error = e
end
Expand Down
49 changes: 11 additions & 38 deletions lib/mongo/retryable/retry_policy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,25 @@

module Mongo
module Retryable
# Encapsulates the retry policy for client backpressure, combining
# exponential backoff with jitter and an optional token bucket for
# adaptive retries.
# Encapsulates the retry policy for client backpressure with
# exponential backoff and jitter.
#
# One instance is created per Client and shared across all operations
# on that client.
#
# @api private
class RetryPolicy
# @return [ Integer ] The maximum number of overload retries.
attr_reader :max_retries

# Create a new retry policy.
#
# @param [ true | false ] adaptive_retries Whether the adaptive
# retry token bucket is enabled.
def initialize(adaptive_retries: false)
@token_bucket = adaptive_retries ? TokenBucket.new : nil
# @param [ Integer ] max_retries The maximum number of overload
# retry attempts. Defaults to Backpressure::DEFAULT_MAX_RETRIES.
def initialize(max_retries: Backpressure::DEFAULT_MAX_RETRIES)
@max_retries = max_retries
end

# @return [ TokenBucket | nil ] The token bucket, if adaptive
# retries are enabled.
attr_reader :token_bucket

# Calculate the backoff delay for a given retry attempt.
#
# @param [ Integer ] attempt The retry attempt number (1-indexed).
Expand All @@ -35,46 +33,21 @@ def backoff_delay(attempt, jitter: rand)

# Determine whether an overload retry should be attempted.
#
# Checks that the attempt number does not exceed MAX_RETRIES,
# that the backoff delay would not exceed the CSOT deadline (if set),
# and that a token is available (if adaptive retries are enabled).
#
# @param [ Integer ] attempt The retry attempt number (1-indexed).
# @param [ Float ] delay The backoff delay in seconds.
# @param [ Mongo::CsotTimeoutHolder | nil ] context The operation
# @param [ Mongo::Operation::Context | nil ] context The operation
# context (for CSOT deadline checking).
#
# @return [ true | false ] Whether the retry should proceed.
def should_retry_overload?(attempt, delay, context: nil)
return false if attempt > Backpressure::MAX_RETRIES
return false if attempt > @max_retries
return false if exceeds_deadline?(delay, context)
return false if @token_bucket && !@token_bucket.consume(1)

true
end

# Record a successful operation by depositing tokens into the
# bucket.
#
# @param [ true | false ] is_retry Whether the success came from
# a retried attempt.
def record_success(is_retry:)
return unless @token_bucket

tokens = Backpressure::RETRY_TOKEN_RETURN_RATE
tokens += 1 if is_retry
@token_bucket.deposit(tokens)
end

# Record a non-overload failure during a retry attempt by
# depositing 1 token.
def record_non_overload_retry_failure
@token_bucket&.deposit(1)
end

private

# Check whether the backoff delay would exceed the CSOT deadline.
def exceeds_deadline?(delay, context)
return false unless context&.csot?

Expand Down
59 changes: 0 additions & 59 deletions lib/mongo/retryable/token_bucket.rb

This file was deleted.

14 changes: 3 additions & 11 deletions lib/mongo/retryable/write_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,9 @@ def nro_write_with_retry(_write_concern, context:, &block)
error_count = 0
error_to_raise = nil
begin
result = server.with_connection(connection_global_id: context.connection_global_id) do |connection|
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
yield connection, nil, context
end
retry_policy.record_success(is_retry: error_count > 0) if error_count > 0
result
rescue Error::TimeoutError
raise
rescue *retryable_exceptions, Error::PoolError, Error::OperationFailure::Family => e
Expand Down Expand Up @@ -254,7 +252,7 @@ def modern_write_with_retry(session, server, context, &block)
connection_succeeded = false
was_starting = false

result = server.with_connection(
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
Expand All @@ -268,8 +266,6 @@ def modern_write_with_retry(session, server, context, &block)
# it later for the retry as well.
yield connection, txn_num, context.dup
end
retry_policy.record_success(is_retry: false)
result
rescue *retryable_exceptions, Error::PoolError, Auth::Unauthorized, Error::OperationFailure::Family => e
e.add_notes('modern retry', 'attempt 1')

Expand Down Expand Up @@ -344,11 +340,9 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)

attempt = attempt ? attempt + 1 : 2
log_retry(original_error, message: 'Write retry')
result = server.with_connection(connection_global_id: context.connection_global_id) do |connection|
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
yield(connection, txn_num, context)
end
retry_policy.record_success(is_retry: true)
result
rescue *retryable_exceptions, Error::PoolError => e
if retryable_overload_error?(e)
e.add_notes('modern retry', "attempt #{attempt}")
Expand Down Expand Up @@ -414,7 +408,6 @@ def overload_write_retry(last_error, session, txn_num, context:, failed_server:,
result = server.with_connection(connection_global_id: context.connection_global_id) do |connection|
yield connection, txn_num, context
end
retry_policy.record_success(is_retry: true)
return result
rescue Error::TimeoutError
raise
Expand All @@ -430,7 +423,6 @@ def overload_write_retry(last_error, session, txn_num, context:, failed_server:,
unless e.respond_to?(:label?) && e.label?('NoWritesPerformed')
error_to_raise = e
end
retry_policy.record_non_overload_retry_failure unless is_overload
context = context.with(overload_only_retry: false) unless is_overload
failed_server = server
last_error = e
Expand Down
5 changes: 1 addition & 4 deletions lib/mongo/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ def with_transaction(options = nil)
overload_error_count = 0
overload_encountered = false

loop do # rubocop:disable Metrics/BlockLength
loop do
if transaction_attempt > 0
Comment thread
comandeo-mongo marked this conversation as resolved.
if overload_encountered
delay = @client.retry_policy.backoff_delay(overload_error_count)
Expand Down Expand Up @@ -512,7 +512,6 @@ def with_transaction(options = nil)
overload_error_count += 1
elsif overload_encountered
overload_error_count += 1
@client.retry_policy.record_non_overload_retry_failure
end
next
end
Expand Down Expand Up @@ -553,7 +552,6 @@ def with_transaction(options = nil)
overload_error_count += 1
elsif overload_encountered
overload_error_count += 1
@client.retry_policy.record_non_overload_retry_failure
end

if overload_encountered
Expand Down Expand Up @@ -590,7 +588,6 @@ def with_transaction(options = nil)
overload_error_count += 1
elsif overload_encountered
overload_error_count += 1
@client.retry_policy.record_non_overload_retry_failure
end
@state = NO_TRANSACTION_STATE
next
Expand Down
3 changes: 2 additions & 1 deletion lib/mongo/uri/options_mapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ def self.uri_option(uri_key, name, **extra)
uri_option 'readConcernLevel', :level, group: :read_concern, type: :symbol
uri_option 'retryReads', :retry_reads, type: :bool
uri_option 'retryWrites', :retry_writes, type: :bool
uri_option 'adaptiveRetries', :adaptive_retries, type: :bool
uri_option 'enableOverloadRetargeting', :enable_overload_retargeting, type: :bool
uri_option 'maxAdaptiveRetries', :max_adaptive_retries, type: :integer
uri_option 'zlibCompressionLevel', :zlib_compression_level, type: :zlib_compression_level

# Monitoring Options
Expand Down
Loading
Loading