Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions lib/mongo/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,36 @@ module Retryable
# @api private
#
# @return [ Mongo::Server ] A server matching the server preference.
def select_server(cluster, server_selector, session, failed_server = nil, timeout: nil)
def select_server(cluster, server_selector, session, failed_server = nil, error: nil, timeout: nil)
deprioritized = if failed_server && deprioritize_server?(cluster, error)
[failed_server]
else
[]
end
server_selector.select_server(
cluster,
nil,
session,
deprioritized: [failed_server].compact,
deprioritized: deprioritized,
timeout: timeout
)
end

private

# Whether the failed server should be deprioritized during server
# selection for a retry attempt. For sharded and load-balanced
# topologies, servers are always deprioritized on any retryable error.
# For replica sets, servers are only deprioritized when the error
# carries the SystemOverloadedError label.
def deprioritize_server?(cluster, error)
return true if cluster.sharded? || cluster.load_balanced?

error.respond_to?(:label?) && error.label?('SystemOverloadedError')
end

public

# Returns the read worker for handling retryable reads.
#
# @api private
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/retryable/read_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ def select_server_for_retry(original_error, session, server_selector, context, f
server_selector,
session,
failed_server,
error: original_error,
timeout: context&.remaining_timeout_sec
)
rescue Error, Error::AuthError => e
Expand Down
4 changes: 4 additions & 0 deletions lib/mongo/retryable/write_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ def modern_write_with_retry(session, server, context, &block)
#
# @return [ Result ] The result of the operation.
def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
failed_error = failed_error || original_error
Comment thread
comandeo-mongo marked this conversation as resolved.
context&.check_timeout!

session = context.session
Expand All @@ -286,6 +287,7 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
ServerSelector.primary,
session,
failed_server,
error: failed_error,
timeout: context.remaining_timeout_sec
)

Expand All @@ -311,10 +313,12 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
rescue *retryable_exceptions, Error::PoolError => e
maybe_fail_on_retryable(e, original_error, context, attempt)
failed_server = server
failed_error = e
retry
rescue Error::OperationFailure::Family => e
maybe_fail_on_operation_failure(e, original_error, context, attempt)
failed_server = server
failed_error = e
retry
rescue Mongo::Error::TimeoutError
raise
Expand Down
85 changes: 85 additions & 0 deletions spec/integration/retryable_reads_errors_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,89 @@
end
end
end

context 'Retries in a replica set' do
require_topology :replica_set
min_server_version '4.4'

let(:subscriber) { Mrss::EventSubscriber.new }

let(:find_failed_events) do
subscriber.failed_events.select { |e| e.command_name == 'find' }
end

let(:find_succeeded_events) do
subscriber.succeeded_events.select { |e| e.command_name == 'find' }
end

let(:options) { {} }

after do
authorized_client.use(:admin).command(
configureFailPoint: 'failCommand',
mode: 'off'
)
client.close
end

context 'when error has SystemOverloadedError label' do
let(:client) do
authorized_client.with(
retry_reads: true,
read: { mode: :primary_preferred }
)
end

before do
authorized_client.use(:admin).command(
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: %w(find),
errorCode: 6,
errorLabels: %w(RetryableError SystemOverloadedError)
}
)
end

it 'retries on a different server' do
client.subscribe(Mongo::Monitoring::COMMAND, subscriber)
subscriber.clear_events!
expect { collection.find.first }.not_to raise_error
expect(find_failed_events.length).to eq(1)
expect(find_succeeded_events.length).to eq(1)
expect(find_failed_events.first.address).not_to eq(find_succeeded_events.first.address)
end
end

context 'when error does not have SystemOverloadedError label' do
let(:client) do
authorized_client.with(
retry_reads: true,
read: { mode: :primary_preferred }
)
end

before do
authorized_client.use(:admin).command(
configureFailPoint: 'failCommand',
mode: { times: 1 },
data: {
failCommands: %w(find),
errorCode: 6,
errorLabels: %w(RetryableError)
}
)
end

it 'retries on the same server' do
client.subscribe(Mongo::Monitoring::COMMAND, subscriber)
subscriber.clear_events!
expect { collection.find.first }.not_to raise_error
expect(find_failed_events.length).to eq(1)
expect(find_succeeded_events.length).to eq(1)
expect(find_failed_events.first.address).to eq(find_succeeded_events.first.address)
end
end
end
end
2 changes: 2 additions & 0 deletions spec/mongo/retryable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ def retry_write_allowed?(*args)
let(:cluster) do
double('cluster', next_primary: server).tap do |cluster|
allow(cluster).to receive(:replica_set?).and_return(true)
allow(cluster).to receive(:sharded?).and_return(false)
allow(cluster).to receive(:load_balanced?).and_return(false)
allow(cluster).to receive(:addresses).and_return(['x'])
end
end
Expand Down
Loading