Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Client
#
# @since 2.1.2
VALID_OPTIONS = [
:adaptive_retries,
:app_name,
:auth_mech,
:auth_mech_properties,
Expand Down Expand Up @@ -150,6 +151,11 @@ class Client
# auto-encryption behavior
attr_reader :encrypter

# @return [ Mongo::Retryable::RetryPolicy ] The retry policy for
# backpressure and adaptive retries.
# @api private
attr_reader :retry_policy

# Delegate command and collections execution to the current database.
def_delegators :@database, :command, :collections

Expand Down Expand Up @@ -589,6 +595,9 @@ def initialize(addresses_or_uri, options = nil)
end

@connect_lock = Mutex.new
@retry_policy = Retryable::RetryPolicy.new(
adaptive_retries: !!@options[:adaptive_retries]
)
@connect_lock.synchronize do
@cluster = Cluster.new(
addresses,
Expand Down
17 changes: 10 additions & 7 deletions lib/mongo/collection/view/iterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,16 @@ def select_cursor(session)
op = initial_query_op(session)
tracer.trace_operation(op, context) do
if respond_to?(:write?, true) && write?
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
result = send_initial_query(server, context, operation: op)

if use_query_cache?
CachingCursor.new(view, result, server, session: session, context: context)
else
Cursor.new(view, result, server, session: session, context: context)
retry_enabled = collection.client.options[:retry_writes] != false
with_overload_retry(context: context, retry_enabled: retry_enabled) do
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
result = send_initial_query(server, context, operation: op)

if use_query_cache?
CachingCursor.new(view, result, server, session: session, context: context)
else
Cursor.new(view, result, server, session: session, context: context)
end
end
else
read_with_retry_cursor(session, server_selector, view, context: context) do |server|
Expand Down
13 changes: 12 additions & 1 deletion lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,18 @@ def get_more
# doing so may result in silent data loss, the driver no longer retries
# getMore operations in any circumstance.
# https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md#qa
process(execute_operation(get_more_operation))
#
# However, overload errors (SystemOverloadedError + RetryableError) are
# retried with exponential backoff since the server never processed
# the request.
with_overload_retry(context: possibly_refreshed_context) do
process(execute_operation(get_more_operation))
end
rescue Error::OperationFailure => e
# When overload retries are exhausted on getMore, close the cursor
# so that killCursors is sent to the server.
close if e.label?('RetryableError') && e.label?('SystemOverloadedError')
raise
end

# @api private
Expand Down
19 changes: 11 additions & 8 deletions lib/mongo/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -241,21 +241,24 @@ def command(operation, opts = {})
selector = ServerSelector.get(txn_read_pref)

client.with_session(opts) do |session|
server = selector.select_server(cluster, nil, session)
context = Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
)
op = Operation::Command.new(
:selector => operation,
:db_name => name,
:read => selector,
:session => session
)

op.execute(server,
context: Operation::Context.new(
client: client,
session: session,
operation_timeouts: operation_timeouts(opts)
),
options: execution_opts)
retry_enabled = client.options[:retry_reads] != false &&
client.options[:retry_writes] != false
with_overload_retry(context: context, retry_enabled: retry_enabled) do
server = selector.select_server(cluster, nil, session)
op.execute(server, context: context, options: execution_opts)
end
end
end

Expand Down
14 changes: 10 additions & 4 deletions lib/mongo/index/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,11 @@ def create_many(*models)
)
operation = Operation::CreateIndex.new(spec)
tracer.trace_operation(operation, context, op_name: 'createIndexes') do
server = next_primary(nil, session)
operation.execute(server, context: context)
retry_enabled = collection.client.options[:retry_writes] != false
with_overload_retry(context: context, retry_enabled: retry_enabled) do
server = next_primary(nil, session)
operation.execute(server, context: context)
end
end
end
end
Expand Down Expand Up @@ -369,8 +372,11 @@ def drop_by_name(name, opts = {})
op = Operation::DropIndex.new(spec)
op_name = name == Index::ALL ? 'dropIndexes' : 'dropIndex'
tracer.trace_operation(op, context, op_name: op_name) do
server = next_primary(nil, session)
op.execute(server, context: context)
retry_enabled = collection.client.options[:retry_writes] != false
with_overload_retry(context: context, retry_enabled: retry_enabled) do
server = next_primary(nil, session)
op.execute(server, context: context)
end
end
end
end
Expand Down
48 changes: 48 additions & 0 deletions lib/mongo/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/retryable/backpressure'
require 'mongo/retryable/token_bucket'
require 'mongo/retryable/retry_policy'
require 'mongo/retryable/read_worker'
require 'mongo/retryable/write_worker'

Expand Down Expand Up @@ -95,5 +98,50 @@ def read_worker
def write_worker
@write_worker ||= WriteWorker.new(self)
end

# Wraps an operation with overload retry logic. On overload errors
# (SystemOverloadedError + RetryableError), retries the block with
# exponential backoff up to MAX_RETRIES times.
#
# The block should include server selection so it is re-done on retry.
# For cursor operations (getMore), the same server is reused since the
# cursor is pinned.
#
# @param [ Operation::Context | nil ] context The operation context
# for CSOT deadline checking.
# @param [ true | false ] retry_enabled Whether overload retries are
# permitted. When false, overload errors are raised immediately
# without retrying (used when retryReads/retryWrites is disabled).
#
# @return [ Object ] The result of the block.
#
# @api private
def with_overload_retry(context: nil, retry_enabled: true)
return yield unless retry_enabled

error_count = 0
loop do
begin
Comment on lines +123 to +124
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use implicit begin/rescue/end in a do block:

loop do
  do_something
rescue => e
  rescue_something
else
  else_something
ensure
  ensure_something
end

result = yield
client.retry_policy.record_success(is_retry: error_count > 0)
return result
rescue Error::TimeoutError
raise
rescue Error::OperationFailure::Family => e
if e.label?('SystemOverloadedError') && e.label?('RetryableError')
error_count += 1
policy = client.retry_policy
delay = policy.backoff_delay(error_count)
unless policy.should_retry_overload?(error_count, delay, context: context)
raise e
end
Logger.logger.warn("Overload retry due to: #{e.class.name}: #{e.message}")
sleep(delay)
else
raise e
end
end
end
end
end
end
37 changes: 37 additions & 0 deletions lib/mongo/retryable/backpressure.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

module Mongo
module Retryable
# Constants and helpers for client backpressure (exponential backoff
# and jitter in retry loops).
#
# @api private
module Backpressure
# Base backoff delay in seconds.
BASE_BACKOFF = 0.1

# Maximum backoff delay in seconds.
MAX_BACKOFF = 10

# Maximum number of retries for overload errors.
MAX_RETRIES = 5

# Rate at which tokens are returned to the bucket on success.
RETRY_TOKEN_RETURN_RATE = 0.1

# Default capacity of the retry token bucket.
DEFAULT_RETRY_TOKEN_CAPACITY = 1000

# Calculate the backoff delay for a given retry attempt.
#
# @param [ Integer ] attempt The retry attempt number (1-indexed).
# @param [ Float ] jitter A random float in [0.0, 1.0). Defaults to
# a random value. Can be injected for deterministic testing.
#
# @return [ Float ] The backoff delay in seconds.
def self.backoff_delay(attempt, jitter: rand)
jitter * [ MAX_BACKOFF, BASE_BACKOFF * (2**(attempt - 1)) ].min
end
end
end
end
29 changes: 29 additions & 0 deletions lib/mongo/retryable/base_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,35 @@ def log_retry(e, options = nil)
message = (options || {}).fetch(:message, "Retry")
Logger.logger.warn "#{message} due to: #{e.class.name}: #{e.message}"
end

# Returns the retry policy from the client.
#
# @return [ Mongo::Retryable::RetryPolicy ] The retry policy.
def retry_policy
client.retry_policy
end

# Whether the error indicates server overload.
#
# @param [ Exception ] e The error to check.
#
# @return [ true | false ] true if the error has the
# SystemOverloadedError label.
def overload_error?(e)
e.respond_to?(:label?) && e.label?('SystemOverloadedError')
end

# Whether the error is a retryable overload error. An error is
# retryable overload when it has both the SystemOverloadedError and
# RetryableError labels.
#
# @param [ Exception ] e The error to check.
#
# @return [ true | false ] true if the error has both labels.
def retryable_overload_error?(e)
overload_error?(e) &&
e.respond_to?(:label?) && e.label?('RetryableError')
end
end

end
Expand Down
69 changes: 65 additions & 4 deletions lib/mongo/retryable/read_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,19 @@ def modern_read_with_retry(session, server_selector, context, &block)
session,
timeout: context&.remaining_timeout_sec
)
yield server
result = yield server
retry_policy.record_success(is_retry: false)
result
rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e
e.add_notes('modern retry', 'attempt 1')
raise e if session.in_transaction?
raise e if !is_retryable_exception?(e) && !e.write_retryable?
retry_read(e, session, server_selector, context: context, failed_server: server, &block)

if retryable_overload_error?(e)
overload_read_retry(e, session, server_selector, context, server, error_count: 1, &block)
else
raise e if !is_retryable_exception?(e) && !e.write_retryable?
retry_read(e, session, server_selector, context: context, failed_server: server, &block)
end
end

# Attempts to do a "legacy" read with retry. The operation will be
Expand Down Expand Up @@ -289,11 +296,16 @@ def retry_read(original_error, session, server_selector, context: nil, failed_se
begin
context&.check_timeout!
attempt = attempt ? attempt + 1 : 2
yield server, true
result = yield server, true
retry_policy.record_success(is_retry: true)
result
rescue Error::TimeoutError
raise
rescue *retryable_exceptions => e
e.add_notes('modern retry', "attempt #{attempt}")
if retryable_overload_error?(e)
return overload_read_retry(e, session, server_selector, context, server, error_count: attempt, &block)
end
if context&.csot?
failed_server = server
retry
Expand All @@ -302,6 +314,10 @@ def retry_read(original_error, session, server_selector, context: nil, failed_se
end
rescue Error::OperationFailure::Family, Error::PoolError => e
e.add_note('modern retry')
if retryable_overload_error?(e)
e.add_note("attempt #{attempt}")
return overload_read_retry(e, session, server_selector, context, server, error_count: attempt, &block)
end
if e.write_retryable?
e.add_note("attempt #{attempt}")
if context&.csot?
Expand All @@ -321,6 +337,51 @@ def retry_read(original_error, session, server_selector, context: nil, failed_se
end
end

# Retry loop for overload errors with exponential backoff.
# Each retry sleeps with jittered backoff, respects MAX_RETRIES,
# and consumes a token from the bucket when adaptive retries
# are enabled.
def overload_read_retry(last_error, session, server_selector, context, failed_server, error_count:, &block)
loop do
delay = retry_policy.backoff_delay(error_count)
unless retry_policy.should_retry_overload?(error_count, delay, context: context)
raise last_error
end
log_retry(last_error, message: 'Read retry (overload backoff)')
sleep(delay)

begin
server = select_server(
cluster, server_selector, session, failed_server,
error: last_error,
timeout: context&.remaining_timeout_sec
)
rescue Error, Error::AuthError => e
last_error.add_note("later retry failed: #{e.class}: #{e}")
raise last_error
end

begin
context&.check_timeout!
result = yield server, true
retry_policy.record_success(is_retry: true)
return result
rescue Error::TimeoutError
raise
rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e
error_count += 1
e.add_notes('modern retry', "attempt #{error_count}")
is_overload = retryable_overload_error?(e)
unless is_overload || is_retryable_exception?(e) || e.write_retryable?
raise e
end
retry_policy.record_non_overload_retry_failure unless is_overload
failed_server = server
last_error = e
end
end
end

def select_server_for_retry(original_error, session, server_selector, context, failed_server)
select_server(
cluster,
Expand Down
Loading
Loading