diff --git a/lib/mongo/collection/view/change_stream.rb b/lib/mongo/collection/view/change_stream.rb index 49a1044ba7..5b77bc9d59 100644 --- a/lib/mongo/collection/view/change_stream.rb +++ b/lib/mongo/collection/view/change_stream.rb @@ -347,20 +347,34 @@ def create_cursor!(timeout_ms = nil) start_at_operation_time = nil @cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server| - server.with_connection do |connection| + if server.load_balancer? + # In load balanced topology, manually check out a connection + # so it remains checked out and pinned to the cursor. + connection = server.pool.check_out(context: context) result = send_initial_query(connection, context) start_at_operation_time = if doc = result.replies.first && result.replies.first.documents.first doc['operationTime'] else - # The above may set @start_at_operation_time to nil - # if it was not in the document for some reason, - # for consistency set it to nil here as well. - # NB: since this block may be executed more than once, each - # execution must write to start_at_operation_time either way. nil end result + else + server.with_connection do |connection| + result = send_initial_query(connection, context) + + start_at_operation_time = if doc = result.replies.first && result.replies.first.documents.first + doc['operationTime'] + else + # The above may set @start_at_operation_time to nil + # if it was not in the document for some reason, + # for consistency set it to nil here as well. + # NB: since this block may be executed more than once, each + # execution must write to start_at_operation_time either way. + nil + end + result + end end end diff --git a/lib/mongo/collection/view/iterable.rb b/lib/mongo/collection/view/iterable.rb index 0352955f7b..8230a8d53c 100644 --- a/lib/mongo/collection/view/iterable.rb +++ b/lib/mongo/collection/view/iterable.rb @@ -172,8 +172,15 @@ def initial_query_op(session) def send_initial_query(server, context, operation: nil) operation ||= initial_query_op(context.session) if server.load_balancer? - # Connection will be checked in when cursor is drained. - connection = server.pool.check_out(context: context) + # Connection will be checked in when cursor is drained, + # unless the connection is pinned to a transaction (in which + # case it stays checked out for the transaction duration). + if context.connection_global_id + connection = server.pool.check_out_pinned_connection( + context.connection_global_id + ) + end + connection ||= server.pool.check_out(context: context) operation.execute_with_connection(connection, context: context) else operation.execute(server, context: context) diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 9218c07bbf..15c2f8eb03 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -87,6 +87,7 @@ def initialize(view, result, server, options = {}) @connection_global_id = result.connection_global_id @context = @options[:context]&.with(connection_global_id: connection_global_id_for_context) || fresh_context @explicitly_closed = false + @get_more_network_error = false @lock = Mutex.new if server.load_balancer? # We need the connection in the cursor only in load balanced topology; @@ -297,19 +298,21 @@ def close(opts = {}) ctx = context ? context.refresh(timeout_ms: opts[:timeout_ms]) : fresh_context(opts) unregister - read_with_one_retry do - spec = { - coll_name: collection_name, - db_name: database.name, - cursor_ids: [ id ], - } - op = Operation::KillCursors.new(spec) - execute_operation(op, context: ctx) + unless @get_more_network_error + read_with_one_retry do + spec = { + coll_name: collection_name, + db_name: database.name, + cursor_ids: [ id ], + } + op = Operation::KillCursors.new(spec) + execute_operation(op, context: ctx) + end end nil - rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::ServerNotUsable - # Errors are swallowed since there is noting can be done by handling them. + rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::ServerNotUsable, Error::ConnectionPerished + # Errors are swallowed since there is nothing can be done by handling them. ensure end_session @cursor_id = 0 @@ -379,6 +382,9 @@ def get_more with_overload_retry(context: possibly_refreshed_context) do process(execute_operation(get_more_operation)) end + rescue Error::SocketError, Error::SocketTimeoutError + @get_more_network_error = true + raise rescue Error::OperationFailure => e # When overload retries are exhausted on getMore, close the cursor # so that killCursors is sent to the server. @@ -579,7 +585,12 @@ def check_in_connection return if @connection.nil? return unless @connection.server.load_balancer? - @connection.connection_pool.check_in(@connection) + @connection.unpin + # Do not check in if the connection is still pinned by something + # else (e.g. a transaction). + unless @connection.pinned? + @connection.connection_pool.check_in(@connection) + end @connection = nil end end diff --git a/lib/mongo/database/view.rb b/lib/mongo/database/view.rb index 232bd12982..4b10947511 100644 --- a/lib/mongo/database/view.rb +++ b/lib/mongo/database/view.rb @@ -202,6 +202,7 @@ def operation_timeouts(opts = {}) private def collections_info(session, server_selector, options = {}) + @batch_size = options[:batch_size] if options[:batch_size] description = nil context = Operation::Context.new( client: client, diff --git a/lib/mongo/operation/shared/executable.rb b/lib/mongo/operation/shared/executable.rb index df6188f4e2..5bc82386eb 100644 --- a/lib/mongo/operation/shared/executable.rb +++ b/lib/mongo/operation/shared/executable.rb @@ -53,9 +53,9 @@ def do_execute(connection, context, options = {}) "Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}" ) end - else - session.pin_to_connection(connection.global_id) - connection.pin + elsif !session.committing_transaction? && !session.aborting_transaction? + session.pin_to_connection(connection.global_id, connection: connection) + connection.pin(:transaction) end end diff --git a/lib/mongo/server/connection.rb b/lib/mongo/server/connection.rb index 4a3289ae71..3f5390932d 100644 --- a/lib/mongo/server/connection.rb +++ b/lib/mongo/server/connection.rb @@ -101,7 +101,7 @@ def initialize(server, options = {}) @last_checkin = nil @auth_mechanism = nil @pid = Process.pid - @pinned = false + @pin_reasons = Set.new publish_cmap_event( Monitoring::Event::Cmap::ConnectionCreated.new(address, id) @@ -182,21 +182,34 @@ def error? # # @api private def pinned? - @pinned + !@pin_reasons.empty? end - # Mark the connection as pinned. + # Mark the connection as pinned for the given reason. + # + # @param [ Symbol ] reason The reason for pinning (:cursor or + # :transaction). Defaults to :cursor for backward compatibility. + # + # @api private + def pin(reason = :cursor) + @pin_reasons << reason + end + + # Remove a pin from the connection for the given reason. + # + # @param [ Symbol ] reason The reason to unpin (:cursor or + # :transaction). Defaults to :cursor for backward compatibility. # # @api private - def pin - @pinned = true + def unpin(reason = :cursor) + @pin_reasons.delete(reason) end - # Mark the connection as not pinned. + # Remove all pins from the connection. # # @api private - def unpin - @pinned = false + def unpin_all + @pin_reasons.clear end # Establishes a network connection to the target address. diff --git a/lib/mongo/server/connection_pool.rb b/lib/mongo/server/connection_pool.rb index 6fc08e31d6..1a75f665e4 100644 --- a/lib/mongo/server/connection_pool.rb +++ b/lib/mongo/server/connection_pool.rb @@ -391,6 +391,23 @@ def check_out(connection_global_id: nil, context: nil) check_invariants end + # Returns a pinned connection that is already checked out, if one + # exists with the given global id. Returns nil otherwise. + # + # @param [ Integer ] connection_global_id The global id of the pinned + # connection. + # + # @return [ Connection | nil ] The pinned connection, or nil. + # + # @api private + def check_out_pinned_connection(connection_global_id) + @lock.synchronize do + @checked_out_connections.detect do |conn| + conn.global_id == connection_global_id && conn.pinned? + end + end + end + # Check a connection back into the pool. # # The connection must have been previously created by this pool. @@ -714,15 +731,36 @@ def inspect def with_connection(connection_global_id: nil, context: nil) raise_if_closed! - connection = check_out( + # If a specific connection is requested and it is already checked out + # and pinned (e.g. for a transaction or cursor in load-balanced mode), + # reuse it directly without going through the check_out/check_in cycle. + if connection_global_id + connection = @lock.synchronize do + @checked_out_connections.detect do |conn| + conn.global_id == connection_global_id && conn.pinned? + end + end + end + + connection ||= check_out( connection_global_id: connection_global_id, context: context ) + yield(connection) rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished => e maybe_raise_pool_cleared!(connection, e) ensure - check_in(connection) if connection + if connection && !connection.pinned? + # Do not check in if the connection is pinned (the session or cursor + # owns it and will check it in later when unpinning). Also skip + # check-in if the connection was already checked in during the block + # (e.g. by Session#unpin after an error on the first operation). + checked_out = @lock.synchronize do + @checked_out_connections.include?(connection) + end + check_in(connection) if checked_out + end end # Close sockets that have been open for longer than the max idle time, diff --git a/lib/mongo/server_selector/base.rb b/lib/mongo/server_selector/base.rb index 13d97193c1..62a2e0dec8 100644 --- a/lib/mongo/server_selector/base.rb +++ b/lib/mongo/server_selector/base.rb @@ -191,7 +191,14 @@ def select_server( # Parameters and return values are the same as for select_server, only # the +timeout+ param is renamed to +csot_timeout+. private def select_server_impl(cluster, _ping, session, write_aggregation, deprioritized, csot_timeout) - return cluster.servers.first if cluster.topology.is_a?(Cluster::Topology::LoadBalanced) + if cluster.topology.is_a?(Cluster::Topology::LoadBalanced) + # In load-balanced mode, release the pinned connection if the session + # is no longer in a transaction (e.g. after commit or abort). + if session&.pinned_connection_global_id && !session.in_transaction? + session.unpin + end + return cluster.servers.first + end timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT diff --git a/lib/mongo/session.rb b/lib/mongo/session.rb index d185e68e57..d569da50a0 100644 --- a/lib/mongo/session.rb +++ b/lib/mongo/session.rb @@ -379,6 +379,9 @@ def end_session rescue Mongo::Error, Error::AuthError end end + # Release any pinned connection (e.g. after a committed transaction + # in load-balanced mode). + unpin if pinned_connection_global_id cluster.session_pool.checkin(@server_session) if @server_session end ensure @@ -919,12 +922,14 @@ def pin_to_server(server) # # @param [ Integer ] connection_global_id The global id of connection to pin # this session to. + # @param [ Connection | nil ] connection The connection object to pin to. # # @api private - def pin_to_connection(connection_global_id) + def pin_to_connection(connection_global_id, connection: nil) raise ArgumentError, 'Cannot pin to a nil connection id' if connection_global_id.nil? @pinned_connection_global_id = connection_global_id + @pinned_connection = connection end # Unpins this session from the pinned server or connection, @@ -936,7 +941,16 @@ def pin_to_connection(connection_global_id) def unpin(connection = nil) @pinned_server = nil @pinned_connection_global_id = nil - connection.unpin unless connection.nil? + conn = connection || @pinned_connection + if conn + conn.unpin(:transaction) + # Only check the connection back into the pool if nothing else + # still holds a pin on it (e.g. an open cursor). + unless conn.pinned? + conn.connection_pool.check_in(conn) + end + end + @pinned_connection = nil end # Unpins this session from the pinned server or connection, if the session was pinned diff --git a/spec/runners/unified/assertions.rb b/spec/runners/unified/assertions.rb index 9a429a9ddb..727d926381 100644 --- a/spec/runners/unified/assertions.rb +++ b/spec/runners/unified/assertions.rb @@ -214,6 +214,9 @@ def assert_event_matches(actual, expected) unless (awaited = spec.use('awaited')).nil? assert_eq(actual.awaited?, awaited, 'Event awaited does not match expectation') end + if reason = spec.use('reason') + assert_eq(actual.reason.to_s, reason.to_s, 'Event reason does not match expectation') + end return if spec.empty? raise NotImplementedError, "Unhandled keys: #{spec}" diff --git a/spec/runners/unified/crud_operations.rb b/spec/runners/unified/crud_operations.rb index 8eba2fd994..cfa3a883d6 100644 --- a/spec/runners/unified/crud_operations.rb +++ b/spec/runners/unified/crud_operations.rb @@ -310,8 +310,12 @@ def create_find_cursor(op) args = op.use!('arguments') filter = args.use('filter') + if session = args.use('session') + session = entities.get(:session, session) + end opts = extract_options(args, 'batchSize', 'timeoutMS', 'cursorType', 'maxAwaitTimeMS') symbolize_options!(opts, :cursor_type) + opts[:session] = session if session view = obj.find(filter, opts) view.each # to initialize the cursor diff --git a/spec/runners/unified/ddl_operations.rb b/spec/runners/unified/ddl_operations.rb index 8d40f8e08b..5d49c8aace 100644 --- a/spec/runners/unified/ddl_operations.rb +++ b/spec/runners/unified/ddl_operations.rb @@ -83,6 +83,9 @@ def list_colls(op, name_only: false) if timeout_ms = args.use('timeoutMS') opts[:timeout_ms] = timeout_ms end + if batch_size = args.use('batchSize') + opts[:batch_size] = batch_size + end database.list_collections(**opts, name_only: name_only) end @@ -141,6 +144,9 @@ def list_indexes(op) if timeout_ms = args.use('timeoutMS') opts[:timeout_ms] = timeout_ms end + if batch_size = args.use('batchSize') + opts[:batch_size] = batch_size + end collection.indexes(**opts).to_a end end diff --git a/spec/spec_tests/data/load_balancers/cursors.yml b/spec/spec_tests/data/load_balancers/cursors.yml index cbd9852c3b..1b558affe1 100644 --- a/spec/spec_tests/data/load_balancers/cursors.yml +++ b/spec/spec_tests/data/load_balancers/cursors.yml @@ -85,7 +85,6 @@ tests: - connectionCheckedInEvent: {} - description: pinned connections are returned when the cursor is drained - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - &createAndSaveCursor name: createFindCursor @@ -179,7 +178,6 @@ tests: # If a network error occurs during a getMore request, the connection must remain pinned. and drivers must not # attempt to send a killCursors command when the cursor is closed because the connection is no longer valid. - description: pinned connections are not returned after an network error during getMore - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: failPoint object: testRunner @@ -234,7 +232,6 @@ tests: reason: error - description: pinned connections are returned after a network error during a killCursors request - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: failPoint object: testRunner @@ -360,7 +357,6 @@ tests: - connectionCheckedInEvent: {} - description: listCollections pins the cursor to a connection - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" runOnRequirements: - serverless: forbid # CLOUDP-98562 listCollections batchSize is ignored on serverless. operations: @@ -402,7 +398,6 @@ tests: - connectionCheckedInEvent: {} - description: listIndexes pins the cursor to a connection - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: # There is an automatic index on _id so we create two more indexes to force multiple batches with batchSize=2. - name: createIndex @@ -471,7 +466,6 @@ tests: - connectionCheckedInEvent: {} - description: change streams pin to a connection - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" runOnRequirements: - serverless: forbid # Serverless does not support change streams. operations: diff --git a/spec/spec_tests/data/load_balancers/sdam-error-handling.yml b/spec/spec_tests/data/load_balancers/sdam-error-handling.yml index b237f4c6e1..c5a69339e3 100644 --- a/spec/spec_tests/data/load_balancers/sdam-error-handling.yml +++ b/spec/spec_tests/data/load_balancers/sdam-error-handling.yml @@ -65,7 +65,6 @@ initialData: tests: - description: only connections for a specific serviceId are closed when pools are cleared - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" runOnRequirements: # This test assumes that two sequential connections receive different serviceIDs. # Sequential connections to a serverless instance may receive the same serviceID. @@ -141,7 +140,6 @@ tests: # This test uses singleClient to ensure that connection attempts are routed # to the same mongos on which the failpoint is set. - description: errors during the initial connection hello are ignored - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" runOnRequirements: # Require SERVER-49336 for failCommand + appName on the initial handshake. - minServerVersion: '4.4.7' @@ -155,14 +153,14 @@ tests: mode: { times: 1 } data: failCommands: [isMaster, hello] - closeConnection: true + errorCode: 11600 appName: *singleClientAppName - name: insertOne object: *singleColl arguments: document: { x: 1 } expectError: - isClientError: true + isError: true expectEvents: - client: *singleClient eventType: cmap @@ -206,7 +204,6 @@ tests: reason: connectionError - description: stale errors are ignored - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: failPoint object: testRunner diff --git a/spec/spec_tests/data/load_balancers/transactions.yml b/spec/spec_tests/data/load_balancers/transactions.yml index 26c18ddbe2..4807ff21ef 100644 --- a/spec/spec_tests/data/load_balancers/transactions.yml +++ b/spec/spec_tests/data/load_balancers/transactions.yml @@ -57,7 +57,6 @@ tests: client: *client0 - description: all operations go to the same mongos - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - &startTransaction name: startTransaction @@ -105,7 +104,6 @@ tests: - connectionCheckedOutEvent: {} - description: transaction can be committed multiple times - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - *startTransaction - *transactionalInsert @@ -131,7 +129,6 @@ tests: - connectionCheckedOutEvent: {} - description: pinned connection is not released after a non-transient CRUD error - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: failPoint object: testRunner @@ -168,7 +165,6 @@ tests: - connectionCheckedOutEvent: {} - description: pinned connection is not released after a non-transient commit error - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: failPoint object: testRunner @@ -204,7 +200,6 @@ tests: # Errors during abort are different than errors during commit and CRUD operations because the pinned connection is # always released after abort. - description: pinned connection is released after a non-transient abort error - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: failPoint object: testRunner @@ -245,7 +240,6 @@ tests: - connectionCheckedInEvent: {} - description: pinned connection is released after a transient non-network CRUD error - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" runOnRequirements: - serverless: forbid # (CLOUDP-88216) Serverless does not append error labels to errors triggered by failpoints. operations: @@ -288,7 +282,6 @@ tests: - connectionCheckedInEvent: {} - description: pinned connection is released after a transient network CRUD error - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" runOnRequirements: - serverless: forbid # (CLOUDP-88216) Serverless does not append error labels to errors triggered by failpoints. operations: @@ -334,7 +327,6 @@ tests: - connectionCheckedInEvent: {} - description: pinned connection is released after a transient non-network commit error - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" runOnRequirements: - serverless: forbid # (CLOUDP-88216) Serverless does not append error labels to errors triggered by failpoints. operations: @@ -371,7 +363,6 @@ tests: - connectionCheckedInEvent: {} - description: pinned connection is released after a transient network commit error - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: failPoint object: testRunner @@ -418,7 +409,6 @@ tests: - connectionCheckedInEvent: {} - description: pinned connection is released after a transient non-network abort error - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: failPoint object: testRunner @@ -453,7 +443,6 @@ tests: - connectionCheckedInEvent: {} - description: pinned connection is released after a transient network abort error - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: failPoint object: testRunner @@ -496,7 +485,6 @@ tests: - connectionCheckedInEvent: {} - description: pinned connection is released on successful abort - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - *startTransaction - *transactionalInsert @@ -517,7 +505,6 @@ tests: - connectionCheckedInEvent: {} - description: pinned connection is returned when a new transaction is started - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - *startTransaction - *transactionalInsert @@ -547,7 +534,6 @@ tests: - connectionCheckedOutEvent: {} - description: pinned connection is returned when a non-transaction operation uses the session - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - *startTransaction - *transactionalInsert @@ -574,7 +560,6 @@ tests: - connectionCheckedInEvent: {} - description: a connection can be shared by a transaction and a cursor - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - *startTransaction - *transactionalInsert @@ -611,3 +596,21 @@ tests: - connectionCheckedOutEvent: {} # Events for abortTransaction. - connectionCheckedInEvent: {} + + - description: pinned connection is released when session ended + operations: + - *startTransaction + - *transactionalInsert + - *commitTransaction + - &endSession + name: endSession + object: *session0 + expectEvents: + - client: *client0 + eventType: cmap + events: + # Events for the insert and commitTransaction. + - connectionReadyEvent: {} + - connectionCheckedOutEvent: {} + # Events for endSession. + - connectionCheckedInEvent: {} \ No newline at end of file diff --git a/spec/spec_tests/data/load_balancers/wait-queue-timeouts.yml b/spec/spec_tests/data/load_balancers/wait-queue-timeouts.yml index 33f9361a43..9d8c935fea 100644 --- a/spec/spec_tests/data/load_balancers/wait-queue-timeouts.yml +++ b/spec/spec_tests/data/load_balancers/wait-queue-timeouts.yml @@ -37,7 +37,6 @@ initialData: tests: - description: wait queue timeout errors include cursor statistics - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: createFindCursor object: *collection0 @@ -60,7 +59,6 @@ tests: - connectionCheckOutFailedEvent: {} - description: wait queue timeout errors include transaction statistics - skipReason: "RUBY-2881: ruby driver LB is not spec compliant" operations: - name: startTransaction object: *session0