Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions lib/mongo/collection/view/change_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -347,20 +347,34 @@ def create_cursor!(timeout_ms = nil)
start_at_operation_time = nil

@cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server|
server.with_connection do |connection|
if server.load_balancer?
# In load balanced topology, manually check out a connection
# so it remains checked out and pinned to the cursor.
connection = server.pool.check_out(context: context)
result = send_initial_query(connection, context)

start_at_operation_time = if doc = result.replies.first && result.replies.first.documents.first
doc['operationTime']
Comment on lines +350 to 357
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In load-balanced mode this code checks out a connection manually but does not guarantee it is checked back into the pool if send_initial_query (or later logic) raises. That can leak checked-out connections and eventually exhaust the pool. Consider using server.with_connection again now that ConnectionPool#with_connection skips check-in for pinned connections, or add an ensure that checks the connection in (when it is not pinned/owned by the created cursor).

Copilot uses AI. Check for mistakes.
else
# The above may set @start_at_operation_time to nil
# if it was not in the document for some reason,
# for consistency set it to nil here as well.
# NB: since this block may be executed more than once, each
# execution must write to start_at_operation_time either way.
nil
end
result
else
server.with_connection do |connection|
result = send_initial_query(connection, context)

start_at_operation_time = if doc = result.replies.first && result.replies.first.documents.first
doc['operationTime']
else
# The above may set @start_at_operation_time to nil
# if it was not in the document for some reason,
# for consistency set it to nil here as well.
# NB: since this block may be executed more than once, each
# execution must write to start_at_operation_time either way.
nil
end
result
end
end
end

Expand Down
11 changes: 9 additions & 2 deletions lib/mongo/collection/view/iterable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,15 @@ def initial_query_op(session)
def send_initial_query(server, context, operation: nil)
operation ||= initial_query_op(context.session)
if server.load_balancer?
# Connection will be checked in when cursor is drained.
connection = server.pool.check_out(context: context)
# Connection will be checked in when cursor is drained,
# unless the connection is pinned to a transaction (in which
# case it stays checked out for the transaction duration).
if context.connection_global_id
connection = server.pool.check_out_pinned_connection(
context.connection_global_id
)
end
connection ||= server.pool.check_out(context: context)
operation.execute_with_connection(connection, context: context)
else
operation.execute(server, context: context)
Expand Down
33 changes: 22 additions & 11 deletions lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def initialize(view, result, server, options = {})
@connection_global_id = result.connection_global_id
@context = @options[:context]&.with(connection_global_id: connection_global_id_for_context) || fresh_context
@explicitly_closed = false
@get_more_network_error = false
@lock = Mutex.new
if server.load_balancer?
# We need the connection in the cursor only in load balanced topology;
Expand Down Expand Up @@ -297,19 +298,21 @@ def close(opts = {})
ctx = context ? context.refresh(timeout_ms: opts[:timeout_ms]) : fresh_context(opts)

unregister
read_with_one_retry do
spec = {
coll_name: collection_name,
db_name: database.name,
cursor_ids: [ id ],
}
op = Operation::KillCursors.new(spec)
execute_operation(op, context: ctx)
unless @get_more_network_error
read_with_one_retry do
spec = {
coll_name: collection_name,
db_name: database.name,
cursor_ids: [ id ],
}
op = Operation::KillCursors.new(spec)
execute_operation(op, context: ctx)
end
end

nil
rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::ServerNotUsable
# Errors are swallowed since there is noting can be done by handling them.
rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::ServerNotUsable, Error::ConnectionPerished
# Errors are swallowed since there is nothing can be done by handling them.
ensure
end_session
@cursor_id = 0
Expand Down Expand Up @@ -379,6 +382,9 @@ def get_more
with_overload_retry(context: possibly_refreshed_context) do
process(execute_operation(get_more_operation))
end
rescue Error::SocketError, Error::SocketTimeoutError
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@get_more_network_error is only set for SocketError and SocketTimeoutError, but Error::ConnectionPerished also represents a network error (and is raised when a perished connection is reused). If get_more raises ConnectionPerished, close will still attempt to send killCursors, which the load balancer cursor spec says should not be attempted after a network error during getMore. Consider including Error::ConnectionPerished in this rescue that sets @get_more_network_error.

Suggested change
rescue Error::SocketError, Error::SocketTimeoutError
rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished

Copilot uses AI. Check for mistakes.
@get_more_network_error = true
raise
rescue Error::OperationFailure => e
# When overload retries are exhausted on getMore, close the cursor
# so that killCursors is sent to the server.
Expand Down Expand Up @@ -579,7 +585,12 @@ def check_in_connection
return if @connection.nil?
return unless @connection.server.load_balancer?

@connection.connection_pool.check_in(@connection)
@connection.unpin
# Do not check in if the connection is still pinned by something
# else (e.g. a transaction).
unless @connection.pinned?
@connection.connection_pool.check_in(@connection)
end
@connection = nil
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/database/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def operation_timeouts(opts = {})
private

def collections_info(session, server_selector, options = {})
@batch_size = options[:batch_size] if options[:batch_size]
description = nil
context = Operation::Context.new(
client: client,
Expand Down
6 changes: 3 additions & 3 deletions lib/mongo/operation/shared/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ def do_execute(connection, context, options = {})
"Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}"
)
end
else
session.pin_to_connection(connection.global_id)
connection.pin
elsif !session.committing_transaction? && !session.aborting_transaction?
session.pin_to_connection(connection.global_id, connection: connection)
connection.pin(:transaction)
end
end

Expand Down
29 changes: 21 additions & 8 deletions lib/mongo/server/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def initialize(server, options = {})
@last_checkin = nil
@auth_mechanism = nil
@pid = Process.pid
@pinned = false
@pin_reasons = Set.new

publish_cmap_event(
Monitoring::Event::Cmap::ConnectionCreated.new(address, id)
Expand Down Expand Up @@ -182,21 +182,34 @@ def error?
#
# @api private
def pinned?
@pinned
!@pin_reasons.empty?
end

# Mark the connection as pinned.
# Mark the connection as pinned for the given reason.
#
# @param [ Symbol ] reason The reason for pinning (:cursor or
# :transaction). Defaults to :cursor for backward compatibility.
#
# @api private
def pin(reason = :cursor)
@pin_reasons << reason
end

# Remove a pin from the connection for the given reason.
#
# @param [ Symbol ] reason The reason to unpin (:cursor or
# :transaction). Defaults to :cursor for backward compatibility.
#
# @api private
def pin
@pinned = true
def unpin(reason = :cursor)
@pin_reasons.delete(reason)
end

# Mark the connection as not pinned.
# Remove all pins from the connection.
#
# @api private
def unpin
@pinned = false
def unpin_all
@pin_reasons.clear
end

# Establishes a network connection to the target address.
Expand Down
42 changes: 40 additions & 2 deletions lib/mongo/server/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,23 @@ def check_out(connection_global_id: nil, context: nil)
check_invariants
end

# Returns a pinned connection that is already checked out, if one
# exists with the given global id. Returns nil otherwise.
#
# @param [ Integer ] connection_global_id The global id of the pinned
# connection.
#
# @return [ Connection | nil ] The pinned connection, or nil.
#
# @api private
def check_out_pinned_connection(connection_global_id)
@lock.synchronize do
@checked_out_connections.detect do |conn|
conn.global_id == connection_global_id && conn.pinned?
end
end
end

# Check a connection back into the pool.
#
# The connection must have been previously created by this pool.
Expand Down Expand Up @@ -714,15 +731,36 @@ def inspect
def with_connection(connection_global_id: nil, context: nil)
raise_if_closed!

connection = check_out(
# If a specific connection is requested and it is already checked out
# and pinned (e.g. for a transaction or cursor in load-balanced mode),
# reuse it directly without going through the check_out/check_in cycle.
if connection_global_id
connection = @lock.synchronize do
@checked_out_connections.detect do |conn|
conn.global_id == connection_global_id && conn.pinned?
end
end
end

connection ||= check_out(
connection_global_id: connection_global_id,
context: context
)

yield(connection)
rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished => e
maybe_raise_pool_cleared!(connection, e)
ensure
check_in(connection) if connection
if connection && !connection.pinned?
# Do not check in if the connection is pinned (the session or cursor
# owns it and will check it in later when unpinning). Also skip
# check-in if the connection was already checked in during the block
# (e.g. by Session#unpin after an error on the first operation).
checked_out = @lock.synchronize do
@checked_out_connections.include?(connection)
end
check_in(connection) if checked_out
end
end

# Close sockets that have been open for longer than the max idle time,
Expand Down
9 changes: 8 additions & 1 deletion lib/mongo/server_selector/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,14 @@ def select_server(
# Parameters and return values are the same as for select_server, only
# the +timeout+ param is renamed to +csot_timeout+.
private def select_server_impl(cluster, _ping, session, write_aggregation, deprioritized, csot_timeout)
return cluster.servers.first if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
# In load-balanced mode, release the pinned connection if the session
# is no longer in a transaction (e.g. after commit or abort).
if session&.pinned_connection_global_id && !session.in_transaction?
session.unpin
end
return cluster.servers.first
end

timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT

Expand Down
18 changes: 16 additions & 2 deletions lib/mongo/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ def end_session
rescue Mongo::Error, Error::AuthError
end
end
# Release any pinned connection (e.g. after a committed transaction
# in load-balanced mode).
unpin if pinned_connection_global_id
cluster.session_pool.checkin(@server_session) if @server_session
end
ensure
Expand Down Expand Up @@ -919,12 +922,14 @@ def pin_to_server(server)
#
# @param [ Integer ] connection_global_id The global id of connection to pin
# this session to.
# @param [ Connection | nil ] connection The connection object to pin to.
#
# @api private
def pin_to_connection(connection_global_id)
def pin_to_connection(connection_global_id, connection: nil)
raise ArgumentError, 'Cannot pin to a nil connection id' if connection_global_id.nil?

@pinned_connection_global_id = connection_global_id
@pinned_connection = connection
end

# Unpins this session from the pinned server or connection,
Expand All @@ -936,7 +941,16 @@ def pin_to_connection(connection_global_id)
def unpin(connection = nil)
@pinned_server = nil
@pinned_connection_global_id = nil
connection.unpin unless connection.nil?
conn = connection || @pinned_connection
if conn
conn.unpin(:transaction)
# Only check the connection back into the pool if nothing else
# still holds a pin on it (e.g. an open cursor).
unless conn.pinned?
conn.connection_pool.check_in(conn)
end
end
@pinned_connection = nil
end

# Unpins this session from the pinned server or connection, if the session was pinned
Expand Down
3 changes: 3 additions & 0 deletions spec/runners/unified/assertions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ def assert_event_matches(actual, expected)
unless (awaited = spec.use('awaited')).nil?
assert_eq(actual.awaited?, awaited, 'Event awaited does not match expectation')
end
if reason = spec.use('reason')
assert_eq(actual.reason.to_s, reason.to_s, 'Event reason does not match expectation')
end
return if spec.empty?

raise NotImplementedError, "Unhandled keys: #{spec}"
Expand Down
4 changes: 4 additions & 0 deletions spec/runners/unified/crud_operations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,12 @@ def create_find_cursor(op)
args = op.use!('arguments')

filter = args.use('filter')
if session = args.use('session')
session = entities.get(:session, session)
end
opts = extract_options(args, 'batchSize', 'timeoutMS', 'cursorType', 'maxAwaitTimeMS')
symbolize_options!(opts, :cursor_type)
opts[:session] = session if session

view = obj.find(filter, opts)
view.each # to initialize the cursor
Expand Down
6 changes: 6 additions & 0 deletions spec/runners/unified/ddl_operations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def list_colls(op, name_only: false)
if timeout_ms = args.use('timeoutMS')
opts[:timeout_ms] = timeout_ms
end
if batch_size = args.use('batchSize')
opts[:batch_size] = batch_size
end

database.list_collections(**opts, name_only: name_only)
end
Expand Down Expand Up @@ -141,6 +144,9 @@ def list_indexes(op)
if timeout_ms = args.use('timeoutMS')
opts[:timeout_ms] = timeout_ms
end
if batch_size = args.use('batchSize')
opts[:batch_size] = batch_size
end
collection.indexes(**opts).to_a
end
end
Expand Down
Loading
Loading