Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
3166677
RUBY-3724 Fix backpressure tests
comandeo-mongo Mar 6, 2026
8bb216d
Apply suggestions from code review
comandeo-mongo Mar 26, 2026
680e19e
Fix rescue clause in backpressure prose test
comandeo-mongo Mar 26, 2026
f29174b
RUBY-3770 Implement makeTimeoutError semantics in withTransaction
comandeo-mongo Mar 27, 2026
bf72c2f
Fix rubocop offenses in with_transaction_timeout_spec.rb
comandeo-mongo Mar 27, 2026
c7a3ba5
Fix session_transaction_spec to use CSOT for timeout test
comandeo-mongo Mar 27, 2026
17d2b40
Simplify stuff
comandeo-mongo Mar 27, 2026
df7c2f7
Sync with master
comandeo-mongo Mar 27, 2026
9ac5b71
Prepare Session interface for WithTransactionRunner extraction
comandeo-mongo Mar 27, 2026
c158c6f
Add WithTransactionRunner skeleton with leaf methods
comandeo-mongo Mar 27, 2026
7721537
Add pre_retry_backoff to WithTransactionRunner
comandeo-mongo Mar 27, 2026
0f5ba05
Add execute_callback and helpers to WithTransactionRunner
comandeo-mongo Mar 27, 2026
754cb38
Add commit helpers and timeout_ms:0 regression test to WithTransactio…
comandeo-mongo Mar 27, 2026
71c9b70
Add run_attempt and run to WithTransactionRunner
comandeo-mongo Mar 27, 2026
33a978a
Wire with_transaction to WithTransactionRunner
comandeo-mongo Mar 27, 2026
81fac05
Remove moved private methods and constants from Session
comandeo-mongo Mar 27, 2026
0042e29
Fix rubocop offenses in spec: use Kernel.sleep stub, compact test
comandeo-mongo Mar 27, 2026
5e1af4c
Merge upstream/master: resolve conflicts, keep runner delegation and …
comandeo-mongo Mar 27, 2026
45237b8
Fix specs
comandeo-mongo Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 27 additions & 208 deletions lib/mongo/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

require 'mongo/session/session_pool'
require 'mongo/session/server_session'
require 'mongo/session/with_transaction_runner'

module Mongo
# A logical session representing a set of sequential operations executed
Expand Down Expand Up @@ -445,175 +446,8 @@ def with_transaction(options = nil)
@inside_with_transaction = true
@with_transaction_timeout_ms = options&.dig(:timeout_ms) || @options[:default_timeout_ms] || @client.timeout_ms
@with_transaction_deadline = calculate_with_transaction_deadline(options)
deadline = if @with_transaction_deadline
# CSOT enabled, so we have a customer defined deadline.
@with_transaction_deadline
else
# CSOT not enabled, so we use the default deadline, 120 seconds.
Utils.monotonic_time + 120
end
transaction_in_progress = false
transaction_attempt = 0
last_error = nil
overload_error_count = 0
overload_encountered = false

loop do
if transaction_attempt > 0
if overload_encountered
delay = @client.retry_policy.backoff_delay(overload_error_count)
if backoff_would_exceed_deadline?(deadline, delay)
make_timeout_error_from(last_error, 'CSOT timeout expired waiting to retry withTransaction')
end
raise(last_error) unless @client.retry_policy.should_retry_overload?(overload_error_count, delay)

sleep(delay)
else
backoff = backoff_seconds_for_retry(transaction_attempt)
if backoff_would_exceed_deadline?(deadline, backoff)
make_timeout_error_from(last_error, 'CSOT timeout expired waiting to retry withTransaction')
end

sleep(backoff)
end
end

commit_options = {}
commit_options[:write_concern] = options[:write_concern] if options
start_transaction(options)
transaction_in_progress = true
transaction_attempt += 1

begin
rv = yield self
rescue Exception => e
if within_states?(STARTING_TRANSACTION_STATE, TRANSACTION_IN_PROGRESS_STATE)
log_warn("Aborting transaction due to #{e.class}: #{e}")
# CSOT: if the deadline is already expired, clear it so that
# abort_transaction uses a fresh timeout (not the expired deadline).
# If the deadline is not yet expired, keep it so abort uses remaining time.
@with_transaction_deadline = nil if @with_transaction_deadline && deadline_expired?(deadline)
abort_transaction
transaction_in_progress = false
end

if deadline_expired?(deadline)
transaction_in_progress = false
make_timeout_error_from(e, 'CSOT timeout expired during withTransaction callback')
end

if e.is_a?(Mongo::Error) && e.label?('TransientTransactionError')
last_error = e
if e.label?('SystemOverloadedError')
overload_encountered = true
overload_error_count += 1
elsif overload_encountered
overload_error_count += 1
@client.retry_policy.record_non_overload_retry_failure
end
next
end

raise
else
if within_states?(TRANSACTION_ABORTED_STATE, NO_TRANSACTION_STATE, TRANSACTION_COMMITTED_STATE)
transaction_in_progress = false
return rv
end

# CSOT: if the timeout has expired before we can commit, abort the
# transaction instead and raise a client-side timeout error.
if @with_transaction_deadline && deadline_expired?(deadline)
transaction_in_progress = false
@with_transaction_deadline = nil
abort_transaction
raise Mongo::Error::TimeoutError, 'CSOT timeout expired before transaction could be committed'
end

begin
commit_transaction(commit_options)
transaction_in_progress = false
return rv
rescue Mongo::Error => e
if e.label?('UnknownTransactionCommitResult')
if deadline_expired?(deadline) ||
(e.is_a?(Error::OperationFailure::Family) && e.max_time_ms_expired?)
transaction_in_progress = false
if @with_transaction_timeout_ms && deadline_expired?(deadline)
make_timeout_error_from(e, 'CSOT timeout expired during withTransaction commit')
else
raise
end
end

if e.label?('SystemOverloadedError')
overload_encountered = true
overload_error_count += 1
elsif overload_encountered
overload_error_count += 1
@client.retry_policy.record_non_overload_retry_failure
end

if overload_encountered
delay = @client.retry_policy.backoff_delay(overload_error_count)
if backoff_would_exceed_deadline?(deadline, delay)
transaction_in_progress = false
make_timeout_error_from(e, 'CSOT timeout expired during withTransaction commit')
end
unless @client.retry_policy.should_retry_overload?(overload_error_count, delay)
transaction_in_progress = false
raise
end
sleep(delay)
end

wc_options = case v = commit_options[:write_concern]
when WriteConcern::Base
v.options
when nil
{}
else
v
end
commit_options[:write_concern] = wc_options.merge(w: :majority)
retry
elsif e.label?('TransientTransactionError')
if Utils.monotonic_time >= deadline
transaction_in_progress = false
make_timeout_error_from(e, 'CSOT timeout expired during withTransaction commit')
end
last_error = e
if e.label?('SystemOverloadedError')
overload_encountered = true
overload_error_count += 1
elsif overload_encountered
overload_error_count += 1
@client.retry_policy.record_non_overload_retry_failure
end
@state = NO_TRANSACTION_STATE
next
else
transaction_in_progress = false
raise
end
rescue Error::AuthError
transaction_in_progress = false
raise
end
end
end

# No official return value, but return true so that in interactive
# use the method hints that it succeeded.
true
WithTransactionRunner.new(self, options).run { yield self }
ensure
if transaction_in_progress
log_warn('with_transaction callback broke out of with_transaction loop, aborting transaction')
begin
abort_transaction
rescue Error::OperationFailure::Family, Error::InvalidTransactionOperation
end
end
@with_transaction_deadline = nil
@with_transaction_timeout_ms = nil
@inside_with_transaction = false
Expand Down Expand Up @@ -1269,6 +1103,31 @@ def inside_with_transaction?
@inside_with_transaction
end

# Nils the with_transaction deadline, allowing subsequent operations
# (e.g. abort_transaction) to use a fresh timeout rather than an
# already-expired one. Called by WithTransactionRunner.
def clear_with_transaction_deadline!
@with_transaction_deadline = nil
end

# Readable by WithTransactionRunner to detect CSOT mode.
# @api private
attr_reader :with_transaction_timeout_ms

# Resets transaction state to NO_TRANSACTION_STATE without calling
# abort_transaction. Used by WithTransactionRunner after a
# TransientTransactionError during commit — the server has already
# rolled back, so no server-side cleanup is needed.
# @api private
def reset_transaction_state!
@state = NO_TRANSACTION_STATE
end

# @api private
def within_states?(*states)
states.include?(@state)
end

private

# Get the read concern the session will use when starting a transaction.
Expand All @@ -1286,10 +1145,6 @@ def txn_read_concern
txn_options[:read_concern] || @client.read_concern
end

def within_states?(*states)
states.include?(@state)
end

def check_if_no_transaction!
return unless within_states?(NO_TRANSACTION_STATE)

Expand Down Expand Up @@ -1372,41 +1227,5 @@ def calculate_with_transaction_deadline(opts)
calc.call(@client.timeout_ms)
end
end

def deadline_expired?(deadline)
if deadline.zero?
false
else
Utils.monotonic_time >= deadline
end
end

# Exponential backoff settings for with_transaction retries.
BACKOFF_INITIAL = 0.005
BACKOFF_MAX = 0.5
private_constant :BACKOFF_INITIAL, :BACKOFF_MAX

def backoff_seconds_for_retry(transaction_attempt)
exponential = BACKOFF_INITIAL * (1.5**(transaction_attempt - 1))
Random.rand * [ exponential, BACKOFF_MAX ].min
end

def backoff_would_exceed_deadline?(deadline, backoff_seconds)
return false if deadline.zero?

Utils.monotonic_time + backoff_seconds >= deadline
end

# Implements makeTimeoutError(lastError) from the transactions-convenient-api spec.
# In CSOT mode raises TimeoutError with last_error's message included as a substring.
# In non-CSOT mode re-raises last_error directly.
def make_timeout_error_from(last_error, timeout_message)
if @with_transaction_timeout_ms
raise Mongo::Error::TimeoutError, "#{timeout_message}: #{last_error}"
else
raise last_error
end
end

end
end
Loading
Loading