diff --git a/lib/mongo/session.rb b/lib/mongo/session.rb index 629af3e800..eee8f2dc42 100644 --- a/lib/mongo/session.rb +++ b/lib/mongo/session.rb @@ -475,7 +475,7 @@ def with_transaction(options = nil) if overload_encountered delay = @client.retry_policy.backoff_delay(overload_error_count) if backoff_would_exceed_deadline?(deadline, delay) - raise Mongo::Error::TimeoutError, 'CSOT timeout expired waiting to retry withTransaction' + make_timeout_error_from(last_error, 'CSOT timeout expired waiting to retry withTransaction') end unless @client.retry_policy.should_retry_overload?(overload_error_count, delay) raise(last_error) @@ -484,7 +484,7 @@ def with_transaction(options = nil) else backoff = backoff_seconds_for_retry(transaction_attempt) if backoff_would_exceed_deadline?(deadline, backoff) - raise Mongo::Error::TimeoutError, 'CSOT timeout expired waiting to retry withTransaction' + make_timeout_error_from(last_error, 'CSOT timeout expired waiting to retry withTransaction') end sleep(backoff) end @@ -513,7 +513,7 @@ def with_transaction(options = nil) if deadline_expired?(deadline) transaction_in_progress = false - raise + make_timeout_error_from(e, 'CSOT timeout expired during withTransaction callback') end if e.is_a?(Mongo::Error) && e.label?('TransientTransactionError') @@ -554,7 +554,11 @@ def with_transaction(options = nil) e.is_a?(Error::OperationFailure::Family) && e.max_time_ms_expired? then transaction_in_progress = false - raise + if @with_transaction_timeout_ms && deadline_expired?(deadline) + make_timeout_error_from(e, 'CSOT timeout expired during withTransaction commit') + else + raise + end end if e.label?('SystemOverloadedError') @@ -569,7 +573,7 @@ def with_transaction(options = nil) delay = @client.retry_policy.backoff_delay(overload_error_count) if backoff_would_exceed_deadline?(deadline, delay) transaction_in_progress = false - raise + make_timeout_error_from(e, 'CSOT timeout expired during withTransaction commit') end unless @client.retry_policy.should_retry_overload?(overload_error_count, delay) transaction_in_progress = false @@ -591,7 +595,7 @@ def with_transaction(options = nil) elsif e.label?('TransientTransactionError') if Utils.monotonic_time >= deadline transaction_in_progress = false - raise + make_timeout_error_from(e, 'CSOT timeout expired during withTransaction commit') end last_error = e if e.label?('SystemOverloadedError') @@ -1436,5 +1440,17 @@ def backoff_would_exceed_deadline?(deadline, backoff_seconds) Utils.monotonic_time + backoff_seconds >= deadline end + + # Implements makeTimeoutError(lastError) from the transactions-convenient-api spec. + # In CSOT mode raises TimeoutError with last_error's message included as a substring. + # In non-CSOT mode re-raises last_error directly. + def make_timeout_error_from(last_error, timeout_message) + if @with_transaction_timeout_ms + raise Mongo::Error::TimeoutError, "#{timeout_message}: #{last_error}" + else + raise last_error + end + end + end end diff --git a/spec/integration/sdam_prose_spec.rb b/spec/integration/sdam_prose_spec.rb index 9cf0c33ac6..cb944a3a0a 100644 --- a/spec/integration/sdam_prose_spec.rb +++ b/spec/integration/sdam_prose_spec.rb @@ -64,4 +64,120 @@ configureFailPoint: 'failCommand', mode: 'off') end end + + describe 'Connection Pool Backpressure' do + min_server_fcv '8.2' + require_topology :single + + let(:subscriber) { Mrss::EventSubscriber.new } + + let(:client) do + new_local_client( + SpecConfig.instance.addresses, + SpecConfig.instance.all_test_options.merge( + max_connecting: 100, + max_pool_size: 100, + ), + ).tap do |client| + client.subscribe(Mongo::Monitoring::CONNECTION_POOL, subscriber) + end + end + + after do + sleep 1 + admin_db = root_authorized_client.use('admin').database + + if defined?(@prev_ingressConnectionEstablishmentRateLimiterEnabled) && + defined?(@prev_ingressConnectionEstablishmentRatePerSec) && + defined?(@prev_ingressConnectionEstablishmentBurstCapacitySecs) && + defined?(@prev_ingressConnectionEstablishmentMaxQueueDepth) + admin_db.command( + setParameter: 1, + ingressConnectionEstablishmentRateLimiterEnabled: @prev_ingressConnectionEstablishmentRateLimiterEnabled, + ) + admin_db.command( + setParameter: 1, + ingressConnectionEstablishmentRatePerSec: @prev_ingressConnectionEstablishmentRatePerSec, + ) + admin_db.command( + setParameter: 1, + ingressConnectionEstablishmentBurstCapacitySecs: @prev_ingressConnectionEstablishmentBurstCapacitySecs, + ) + admin_db.command( + setParameter: 1, + ingressConnectionEstablishmentMaxQueueDepth: @prev_ingressConnectionEstablishmentMaxQueueDepth, + ) + else + # Fallback: at least disable the limiter if previous values were not captured. + admin_db.command( + setParameter: 1, + ingressConnectionEstablishmentRateLimiterEnabled: false, + ) + end + end + + it 'generates checkout failures when the ingress connection rate limiter is active' do + admin_db = root_authorized_client.use('admin').database + + # Capture current ingress connection establishment parameters so they can be restored. + current_params = admin_db.command( + getParameter: 1, + ingressConnectionEstablishmentRateLimiterEnabled: 1, + ingressConnectionEstablishmentRatePerSec: 1, + ingressConnectionEstablishmentBurstCapacitySecs: 1, + ingressConnectionEstablishmentMaxQueueDepth: 1, + ).first + + @prev_ingressConnectionEstablishmentRateLimiterEnabled = + current_params['ingressConnectionEstablishmentRateLimiterEnabled'] + @prev_ingressConnectionEstablishmentRatePerSec = + current_params['ingressConnectionEstablishmentRatePerSec'] + @prev_ingressConnectionEstablishmentBurstCapacitySecs = + current_params['ingressConnectionEstablishmentBurstCapacitySecs'] + @prev_ingressConnectionEstablishmentMaxQueueDepth = + current_params['ingressConnectionEstablishmentMaxQueueDepth'] + + # Enable the ingress rate limiter with test-specific values. + admin_db.command( + setParameter: 1, + ingressConnectionEstablishmentRateLimiterEnabled: true, + ) + admin_db.command( + setParameter: 1, + ingressConnectionEstablishmentRatePerSec: 20, + ) + admin_db.command( + setParameter: 1, + ingressConnectionEstablishmentBurstCapacitySecs: 1, + ) + admin_db.command( + setParameter: 1, + ingressConnectionEstablishmentMaxQueueDepth: 1, + ) + + # Add a document so $where has something to process. + client.use('test')['test'].delete_many + client.use('test')['test'].insert_one({}) + + # Run 100 parallel find_one operations that contend for connections. + threads = 100.times.map do + Thread.new do + begin + client.use('test')['test'].find( + '$where' => 'function() { sleep(2000); return true; }' + ).first + rescue StandardError + # Ignore connection errors (including checkout timeouts). + end + end + end + threads.each(&:join) + + checkout_failed = subscriber.select_published_events( + Mongo::Monitoring::Event::Cmap::ConnectionCheckOutFailed + ) + + expect(checkout_failed.length).to be >= 10 + end + end end diff --git a/spec/integration/secondary_reads_spec.rb b/spec/integration/secondary_reads_spec.rb index 53813081ad..12d8bdc235 100644 --- a/spec/integration/secondary_reads_spec.rb +++ b/spec/integration/secondary_reads_spec.rb @@ -28,7 +28,7 @@ end_stats = get_read_counters - end_stats[:secondary].should be_within(10).of(start_stats[:secondary]) + end_stats[:secondary].should be_within(50).of(start_stats[:secondary]) end_stats[:primary].should >= start_stats[:primary] + 30 end end @@ -50,7 +50,7 @@ end_stats = get_read_counters - end_stats[:primary].should be_within(10).of(start_stats[:primary]) + end_stats[:primary].should be_within(50).of(start_stats[:primary]) end_stats[:secondary].should >= start_stats[:secondary] + 30 end end diff --git a/spec/mongo/retryable/token_bucket_spec.rb b/spec/mongo/retryable/token_bucket_spec.rb index 175008e258..7ddf431b7d 100644 --- a/spec/mongo/retryable/token_bucket_spec.rb +++ b/spec/mongo/retryable/token_bucket_spec.rb @@ -67,7 +67,16 @@ end describe 'thread safety' do - let(:bucket) { described_class.new(capacity: 1000) } + # Use capacity 2000, start at 1000 tokens. + # With 500 consumes and 500 deposits, floor/ceiling cannot be hit: + # min possible = 1000 - 500 = 500 > 0 (all consumes succeed) + # max possible = 1000 + 500 = 1500 < 2000 (all deposits effective) + # So the net change is guaranteed to be 0, making the assertion reliable. + let(:bucket) do + b = described_class.new(capacity: 2000) + b.consume(1000) + b + end def run_concurrent_operations(bucket) threads = [] diff --git a/spec/mongo/session/with_transaction_timeout_spec.rb b/spec/mongo/session/with_transaction_timeout_spec.rb new file mode 100644 index 0000000000..694a4c6028 --- /dev/null +++ b/spec/mongo/session/with_transaction_timeout_spec.rb @@ -0,0 +1,244 @@ +# frozen_string_literal: true + +require 'spec_helper' + +# Prose tests for the "Retry Timeout is Enforced" and "Backoff Deadline is +# Enforced" sections of the transactions-convenient-api spec README. +# +# specifications/source/transactions-convenient-api/tests/README.md +# +# Note 1 from spec: "The error SHOULD be propagated as a timeout error if +# the language allows to expose the underlying error as a cause of a timeout +# error." Ruby supports this via Exception#cause. +describe 'Mongo::Session#with_transaction timeout enforcement' do + let(:retry_policy) { Mongo::Retryable::RetryPolicy.new(adaptive_retries: false) } + + let(:client) do + instance_double(Mongo::Client).tap do |c| + allow(c).to receive(:retry_policy).and_return(retry_policy) + allow(c).to receive(:timeout_ms).and_return(nil) + end + end + + let(:session) do + sess = Mongo::Session.allocate + sess.instance_variable_set(:@client, client) + sess.instance_variable_set(:@options, {}) + sess.instance_variable_set(:@state, Mongo::Session::NO_TRANSACTION_STATE) + sess.instance_variable_set(:@lock, Mutex.new) + allow(sess).to receive(:check_transactions_supported!).and_return(true) + allow(sess).to receive(:check_if_ended!) + allow(sess).to receive(:log_warn) + allow(sess).to receive(:session_id).and_return(BSON::Document.new('id' => 'test')) + sess + end + + before do + allow(session).to receive(:start_transaction) do |*_args| + session.instance_variable_set(:@state, Mongo::Session::STARTING_TRANSACTION_STATE) + end + allow(session).to receive(:abort_transaction) do + session.instance_variable_set(:@state, Mongo::Session::TRANSACTION_ABORTED_STATE) + end + allow(session).to receive(:commit_transaction) do + session.instance_variable_set(:@state, Mongo::Session::TRANSACTION_COMMITTED_STATE) + end + allow(session).to receive(:sleep) + end + + # Stubs Mongo::Utils.monotonic_time: first `initial_calls` invocations + # return 100.0 (deadline ≈ 100.001 s with timeout_ms: 1), all subsequent + # calls return 200.0, making every deadline check return "expired". + def with_expired_deadline_after(initial_calls:) + call_count = 0 + allow(Mongo::Utils).to receive(:monotonic_time) do + call_count += 1 + (call_count <= initial_calls) ? 100.0 : 200.0 + end + yield + end + + # CSOT time control: monotonic_time always 100.0. + # With timeout_ms: 1, deadline = 100.001. + # Backoffs (0.005 s, 0.1 s) exceed that deadline; deadline_expired? stays false. + def with_csot_backoff_time_control + allow(Mongo::Utils).to receive(:monotonic_time).and_return(100.0) + allow(Random).to receive(:rand).and_return(1.0) + yield + end + + # non-CSOT time control: first call → 100.0 (deadline = 220.0), + # subsequent calls → 219.996. + # deadline_expired? = false; backoffs (0.005, 0.1) exceed the 220.0 deadline. + def with_non_csot_backoff_time_control + call_count = 0 + allow(Mongo::Utils).to receive(:monotonic_time) do + call_count += 1 + (call_count == 1) ? 100.0 : 219.996 + end + allow(Random).to receive(:rand).and_return(1.0) + yield + end + + def make_transient_error + Mongo::Error::OperationFailure.new('transient').tap do |e| + e.add_label('TransientTransactionError') + end + end + + def make_commit_unknown_error + Mongo::Error::OperationFailure.new('commit unknown').tap do |e| + e.add_label('UnknownTransactionCommitResult') + end + end + + def make_commit_transient_error + Mongo::Error::OperationFailure.new('commit transient').tap do |e| + e.add_label('TransientTransactionError') + end + end + + def make_transient_overload_error + Mongo::Error::OperationFailure.new('transient overload').tap do |e| + e.add_label('TransientTransactionError') + e.add_label('SystemOverloadedError') + end + end + + def make_commit_overload_error + Mongo::Error::OperationFailure.new('commit overload').tap do |e| + e.add_label('UnknownTransactionCommitResult') + e.add_label('SystemOverloadedError') + end + end + + # --------------------------------------------------------------------------- + # "Retry Timeout is Enforced" — three sub-cases from the spec README + # --------------------------------------------------------------------------- + + describe '"Retry Timeout is Enforced" prose tests' do + context 'when callback raises TransientTransactionError and retry timeout is exceeded' do + let(:transient_error) { make_transient_error } + + it 'propagates the error as TimeoutError including the transient error message' do + with_expired_deadline_after(initial_calls: 1) do + ex = expect { session.with_transaction(timeout_ms: 1) { raise transient_error } } + ex.to raise_error(Mongo::Error::TimeoutError) { |e| expect(e.message).to include(transient_error.message) } + end + end + end + + context 'when commit raises UnknownTransactionCommitResult and retry timeout is exceeded' do + let(:commit_error) { make_commit_unknown_error } + + before { allow(session).to receive(:commit_transaction) { raise commit_error } } + + it 'propagates the error as TimeoutError including the commit error message' do + with_expired_deadline_after(initial_calls: 2) do + ex = expect do + session.with_transaction(timeout_ms: 1) do + session.instance_variable_set(:@state, Mongo::Session::TRANSACTION_IN_PROGRESS_STATE) + end + end + ex.to raise_error(Mongo::Error::TimeoutError) { |e| expect(e.message).to include(commit_error.message) } + end + end + end + + context 'when commit raises TransientTransactionError and retry timeout is exceeded' do + let(:commit_error) { make_commit_transient_error } + + before { allow(session).to receive(:commit_transaction) { raise commit_error } } + + it 'propagates the error as TimeoutError including the commit error message' do + with_expired_deadline_after(initial_calls: 2) do + ex = expect do + session.with_transaction(timeout_ms: 1) do + session.instance_variable_set(:@state, Mongo::Session::TRANSACTION_IN_PROGRESS_STATE) + end + end + ex.to raise_error(Mongo::Error::TimeoutError) { |e| expect(e.message).to include(commit_error.message) } + end + end + end + end + + # --------------------------------------------------------------------------- + # "Backoff Deadline is Enforced" — backoff-would-exceed-deadline paths + # --------------------------------------------------------------------------- + + describe '"Backoff Deadline is Enforced" prose tests' do + before do + allow(retry_policy).to receive(:backoff_delay).and_wrap_original do |m, attempt, **_| + m.call(attempt, jitter: 1.0) + end + end + + context 'when regular backoff would exceed CSOT deadline' do + let(:last_error) { make_transient_error } + + it 'raises TimeoutError including last_error message' do + with_csot_backoff_time_control do + ex = expect { session.with_transaction(timeout_ms: 1) { raise last_error } } + ex.to raise_error(Mongo::Error::TimeoutError) { |e| expect(e.message).to include(last_error.message) } + end + end + end + + context 'when regular backoff would exceed the 120 s deadline (non-CSOT)' do + let(:last_error) { make_transient_error } + + it 'raises last_error directly (not TimeoutError)' do + with_non_csot_backoff_time_control do + ex = expect { session.with_transaction { raise last_error } } + ex.to raise_error(Mongo::Error::OperationFailure) do |e| + expect(e).to eq(last_error) + expect(e).not_to be_a(Mongo::Error::TimeoutError) + end + end + end + end + + context 'when overload backoff would exceed CSOT deadline' do + let(:last_error) { make_transient_overload_error } + + it 'raises TimeoutError including last_error message' do + with_csot_backoff_time_control do + ex = expect { session.with_transaction(timeout_ms: 1) { raise last_error } } + ex.to raise_error(Mongo::Error::TimeoutError) { |e| expect(e.message).to include(last_error.message) } + end + end + end + + context 'when overload backoff would exceed the 120 s deadline (non-CSOT)' do + let(:last_error) { make_transient_overload_error } + + it 'raises last_error directly (not TimeoutError)' do + with_non_csot_backoff_time_control do + ex = expect { session.with_transaction { raise last_error } } + ex.to raise_error(Mongo::Error::OperationFailure) do |e| + expect(e).to eq(last_error) + expect(e).not_to be_a(Mongo::Error::TimeoutError) + end + end + end + end + + context 'when commit overload backoff would exceed CSOT deadline' do + let(:commit_error) { make_commit_overload_error } + + before { allow(session).to receive(:commit_transaction) { raise commit_error } } + + it 'raises TimeoutError including the commit error message' do + with_csot_backoff_time_control do + ex = expect do + session.with_transaction(timeout_ms: 1) do + session.instance_variable_set(:@state, Mongo::Session::TRANSACTION_IN_PROGRESS_STATE) + end + end + ex.to raise_error(Mongo::Error::TimeoutError) { |e| expect(e.message).to include(commit_error.message) } + end + end + end + end +end diff --git a/spec/mongo/session_transaction_spec.rb b/spec/mongo/session_transaction_spec.rb index 6e153a7fd9..512841b1a3 100644 --- a/spec/mongo/session_transaction_spec.rb +++ b/spec/mongo/session_transaction_spec.rb @@ -136,7 +136,7 @@ class SessionTransactionSpecError < StandardError; end allow(session).to receive('check_transactions_supported!').and_return true expect do - session.with_transaction do + session.with_transaction(timeout_ms: 5000) do exc = Mongo::Error::OperationFailure.new('timeout test') exc.add_label('TransientTransactionError') raise exc diff --git a/spec/spec_tests/data/sdam_unified/minPoolSize-error.yml b/spec/spec_tests/data/sdam_unified/minPoolSize-error.yml index 110e647c62..1bbc0c376e 100644 --- a/spec/spec_tests/data/sdam_unified/minPoolSize-error.yml +++ b/spec/spec_tests/data/sdam_unified/minPoolSize-error.yml @@ -21,7 +21,7 @@ initialData: &initialData documents: [] tests: - - description: Network error on minPoolSize background creation + - description: Server error on minPoolSize background creation operations: # Configure the initial monitor handshake to succeed but the # first or second background minPoolSize establishments to fail. @@ -38,7 +38,7 @@ tests: - hello - isMaster appName: SDAMminPoolSizeError - closeConnection: true + errorCode: 91 - name: createEntities object: testRunner arguments: @@ -54,6 +54,7 @@ tests: heartbeatFrequencyMS: 10000 appname: SDAMminPoolSizeError minPoolSize: 10 + serverMonitoringMode: poll serverSelectionTimeoutMS: 1000 - database: id: &database database diff --git a/spec/spec_tests/data/sdam_unified/pool-clear-min-pool-size-error.yml b/spec/spec_tests/data/sdam_unified/pool-clear-min-pool-size-error.yml new file mode 100644 index 0000000000..2c8e32a410 --- /dev/null +++ b/spec/spec_tests/data/sdam_unified/pool-clear-min-pool-size-error.yml @@ -0,0 +1,132 @@ +--- +description: pool-cleared-on-min-pool-size-population-error + +schemaVersion: "1.4" + +runOnRequirements: + # failCommand appName requirements + - minServerVersion: "4.4" + serverless: forbid + topologies: [ single ] + +createEntities: + - client: + id: &setupClient setupClient + useMultipleMongoses: false + +tests: + - description: Pool is cleared on authentication error during minPoolSize population + runOnRequirements: + # failCommand appName requirements + - auth: true + operations: + - name: failPoint + object: testRunner + arguments: + client: *setupClient + failPoint: + configureFailPoint: failCommand + mode: + times: 1 + data: + failCommands: + - saslContinue + appName: authErrorTest + errorCode: 18 + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + observeEvents: + - poolReadyEvent + - poolClearedEvent + - connectionClosedEvent + uriOptions: + appname: authErrorTest + minPoolSize: 1 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolReadyEvent: {} + count: 1 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolClearedEvent: {} + count: 1 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + connectionClosedEvent: {} + count: 1 + + - description: Pool is not cleared on handshake error during minPoolSize population + operations: + - name: failPoint + object: testRunner + arguments: + client: *setupClient + failPoint: + configureFailPoint: failCommand + mode: + skip: 1 # skip one to let monitoring thread to move pool to ready state + data: + failCommands: + - hello + - isMaster + appName: authErrorTest + closeConnection: true + + - name: createEntities + object: testRunner + arguments: + entities: + - client: + id: &client client + observeEvents: + - poolReadyEvent + - poolClearedEvent + - connectionClosedEvent + uriOptions: + appname: authErrorTest + minPoolSize: 5 + maxConnecting: 1 + # ensure that once we've connected to the server, the failCommand won't + # be triggered by monitors and will only be triggered by handshakes + serverMonitoringMode: poll + heartbeatFrequencyMS: 1000000 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + poolReadyEvent: {} + count: 1 + + - name: waitForEvent + object: testRunner + arguments: + client: *client + event: + connectionClosedEvent: {} + count: 1 + + - name: assertEventCount + object: testRunner + arguments: + client: *client + event: + poolClearedEvent: {} + count: 0