Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/mongo/operation/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ def retry?
!!@is_retry
end

# Whether every retry so far has been due to overload only.
def overload_only_retry?
!!@overload_only_retry
end

# Returns a new context with the parameters changed as per the
# provided arguments.
#
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/retryable/read_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def modern_read_with_retry(session, server_selector, context, &block)
result
rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e
e.add_notes('modern retry', 'attempt 1')
raise e if session.in_transaction?
raise e if session.in_transaction? && !retryable_overload_error?(e)

if retryable_overload_error?(e)
overload_read_retry(e, session, server_selector, context, server, error_count: 1, &block)
Expand Down
17 changes: 13 additions & 4 deletions lib/mongo/retryable/write_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ def legacy_write_with_retry(server = nil, context:)
def modern_write_with_retry(session, server, context, &block)
txn_num = nil
connection_succeeded = false
was_starting = false

result = server.with_connection(
connection_global_id: context.connection_global_id,
Expand All @@ -266,6 +267,7 @@ def modern_write_with_retry(session, server, context, &block)

session.materialize_if_needed
txn_num = session.in_transaction? ? session.txn_num : session.next_txn_num
was_starting = session.starting_transaction?

# The context needs to be duplicated here because we will be using
# it later for the retry as well.
Expand All @@ -288,7 +290,11 @@ def modern_write_with_retry(session, server, context, &block)
retry_context = context.with(is_retry: true)

if is_overload
overload_write_retry(e, session, txn_num, context: retry_context, failed_server: server, error_count: 1, &block)
overload_write_retry(e, session, txn_num,
context: retry_context.with(overload_only_retry: true),
failed_server: server, error_count: 1,
was_starting_transaction: was_starting,
&block)
else
# Context#with creates a new context, which is not necessary here
# but the API is less prone to misuse this way.
Expand Down Expand Up @@ -351,7 +357,7 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
rescue *retryable_exceptions, Error::PoolError => e
if retryable_overload_error?(e)
e.add_notes('modern retry', "attempt #{attempt}")
return overload_write_retry(e, context.session, txn_num, context: context, failed_server: server, error_count: attempt, &block)
return overload_write_retry(e, context.session, txn_num, context: context, failed_server: server, error_count: attempt, was_starting_transaction: false, &block)
end
maybe_fail_on_retryable(e, original_error, context, attempt)
failed_server = server
Expand All @@ -360,7 +366,7 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
rescue Error::OperationFailure::Family => e
if retryable_overload_error?(e)
e.add_notes('modern retry', "attempt #{attempt}")
return overload_write_retry(e, context.session, txn_num, context: context, failed_server: server, error_count: attempt, &block)
return overload_write_retry(e, context.session, txn_num, context: context, failed_server: server, error_count: attempt, was_starting_transaction: false, &block)
end
maybe_fail_on_operation_failure(e, original_error, context, attempt)
failed_server = server
Expand All @@ -375,7 +381,8 @@ def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
end

# Retry loop for overload write errors with exponential backoff.
def overload_write_retry(last_error, session, txn_num, context:, failed_server:, error_count:, &block)
def overload_write_retry(last_error, session, txn_num, context:, failed_server:, error_count:,
was_starting_transaction: false, &block)
loop do
delay = retry_policy.backoff_delay(error_count)
unless retry_policy.should_retry_overload?(error_count, delay, context: context)
Expand All @@ -401,6 +408,7 @@ def overload_write_retry(last_error, session, txn_num, context:, failed_server:,
end

begin
session.revert_to_starting_transaction! if was_starting_transaction
context.check_timeout!
result = server.with_connection(connection_global_id: context.connection_global_id) do |connection|
yield connection, txn_num, context
Expand All @@ -423,6 +431,7 @@ def overload_write_retry(last_error, session, txn_num, context:, failed_server:,
end
end
retry_policy.record_non_overload_retry_failure unless is_overload
context = context.with(overload_only_retry: false) unless is_overload
failed_server = server
last_error = e
rescue Error, Error::AuthError => e
Expand Down
12 changes: 11 additions & 1 deletion lib/mongo/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ def commit_transaction(options=nil)
write_with_retry(write_concern, ending_transaction: true,
context: context,
) do |connection, txn_num, context|
if context.retry?
if context.retry? && !context.overload_only_retry?
if write_concern
wco = write_concern.options.merge(w: :majority)
wco[:wtimeout] ||= 10000
Expand Down Expand Up @@ -1126,6 +1126,16 @@ def validate_read_preference!(command)
end
end

# Reverts the session state to STARTING_TRANSACTION_STATE.
# Called before retrying the first command in a transaction so that
# startTransaction: true is preserved on the retry.
# @api private
def revert_to_starting_transaction!
if within_states?(TRANSACTION_IN_PROGRESS_STATE)
@state = STARTING_TRANSACTION_STATE
end
end

# Update the state of the session due to a (non-commit and non-abort) operation being run.
#
# @since 2.6.0
Expand Down
1 change: 1 addition & 0 deletions spec/mongo/retryable/write_worker_overload_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
instance_double(Mongo::Session).tap do |s|
allow(s).to receive(:retry_writes?).and_return(true)
allow(s).to receive(:in_transaction?).and_return(false)
allow(s).to receive(:starting_transaction?).and_return(false)
allow(s).to receive(:materialize_if_needed)
allow(s).to receive(:txn_num).and_return(1)
allow(s).to receive(:next_txn_num).and_return(2)
Expand Down
10 changes: 6 additions & 4 deletions spec/runners/unified/support_operations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ def fail_point(op)
client = entities.get(:client, args.use!('client'))
client.command(fp = args.use('failPoint'))

# Use the client's actual primary address rather than the cached
# ClusterConfig value, which can become stale after a replSetStepDown.
primary_server = client.cluster.servers.find(&:primary?)
address = primary_server&.address || ClusterConfig.instance.primary_address

$disable_fail_points ||= []
$disable_fail_points << [
fp,
ClusterConfig.instance.primary_address,
]
$disable_fail_points << [fp, address]
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
description: backpressure-retryable-abort
schemaVersion: "1.3"
runOnRequirements:
- minServerVersion: "4.4"
topologies:
- replicaset
- sharded
- load-balanced
createEntities:
-
client:
id: &client0 client0
useMultipleMongoses: false
observeEvents:
- commandStartedEvent
-
database:
id: &database0 database0
client: *client0
databaseName: &database_name transaction-tests
-
collection:
id: &collection0 collection0
database: *database0
collectionName: &collection_name test
-
session:
id: &session0 session0
client: *client0

initialData:
-
collectionName: *collection_name
databaseName: *database_name
documents: []
tests:
- description: abortTransaction retries if backpressure labels are added
operations:
- object: testRunner
name: failPoint
arguments:
client: *client0
failPoint:
configureFailPoint: failCommand
mode:
times: 2
data:
failCommands:
- abortTransaction
errorLabels:
- RetryableError
- SystemOverloadedError
errorCode: 112
- object: *session0
name: startTransaction
- object: *collection0
name: insertOne
arguments:
session: *session0
document:
_id: 1
expectResult:
$$unsetOrMatches:
insertedId:
$$unsetOrMatches: 1
- object: *session0
name: abortTransaction
expectEvents:
- client: *client0
events:
- commandStartedEvent:
command:
insert: test
documents:
- _id: 1
ordered: true
readConcern:
$$exists: false
lsid:
$$sessionLsid: *session0
txnNumber:
$numberLong: "1"
startTransaction: true
autocommit: false
writeConcern:
$$exists: false
commandName: insert
databaseName: *database_name
- commandStartedEvent:
command:
abortTransaction: 1
lsid:
$$sessionLsid: *session0
txnNumber:
$numberLong: "1"
startTransaction:
$$exists: false
autocommit: false
writeConcern:
$$exists: false
commandName: abortTransaction
databaseName: admin
- commandStartedEvent:
command:
abortTransaction: 1
lsid:
$$sessionLsid: *session0
txnNumber:
$numberLong: "1"
startTransaction:
$$exists: false
autocommit: false
writeConcern:
$$exists: false
commandName: abortTransaction
databaseName: admin
- commandStartedEvent:
command:
abortTransaction: 1
lsid:
$$sessionLsid: *session0
txnNumber:
$numberLong: "1"
startTransaction:
$$exists: false
autocommit: false
writeConcern:
$$exists: false
commandName: abortTransaction
databaseName: admin
outcome:
- collectionName: *collection_name
databaseName: *database_name
documents: []
- description: abortTransaction is retried maxAttempts=5 times if backpressure labels are added
operations:
- object: testRunner
name: failPoint
arguments:
client: *client0
failPoint:
configureFailPoint: failCommand
mode: alwaysOn
data:
failCommands:
- abortTransaction
errorLabels:
- RetryableError
- SystemOverloadedError
errorCode: 112
- object: *session0
name: startTransaction
- object: *collection0
name: insertOne
arguments:
session: *session0
document:
_id: 1
expectResult:
$$unsetOrMatches:
insertedId:
$$unsetOrMatches: 1
- object: *session0
name: abortTransaction
expectEvents:
- client: *client0
events:
- commandStartedEvent:
command:
insert: test
documents:
- _id: 1
ordered: true
readConcern:
$$exists: false
lsid:
$$sessionLsid: *session0
txnNumber:
$numberLong: "1"
startTransaction: true
autocommit: false
writeConcern:
$$exists: false
commandName: insert
databaseName: *database_name
- commandStartedEvent:
command:
abortTransaction: 1
lsid:
$$sessionLsid: *session0
txnNumber:
$numberLong: "1"
startTransaction:
$$exists: false
autocommit: false
writeConcern:
$$exists: false
commandName: abortTransaction
databaseName: admin
- commandStartedEvent:
commandName: abortTransaction
- commandStartedEvent:
commandName: abortTransaction
- commandStartedEvent:
commandName: abortTransaction
- commandStartedEvent:
commandName: abortTransaction
- commandStartedEvent:
commandName: abortTransaction
outcome:
- collectionName: *collection_name
databaseName: *database_name
documents: []
Loading
Loading