Skip to content

Commit 0419fa5

Browse files
RUBY-3616 Fix load balanced implementation (#3013)
1 parent e5fb27c commit 0419fa5

16 files changed

Lines changed: 173 additions & 63 deletions

File tree

lib/mongo/collection/view/change_stream.rb

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -347,20 +347,34 @@ def create_cursor!(timeout_ms = nil)
347347
start_at_operation_time = nil
348348

349349
@cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server|
350-
server.with_connection do |connection|
350+
if server.load_balancer?
351+
# In load balanced topology, manually check out a connection
352+
# so it remains checked out and pinned to the cursor.
353+
connection = server.pool.check_out(context: context)
351354
result = send_initial_query(connection, context)
352355

353356
start_at_operation_time = if doc = result.replies.first && result.replies.first.documents.first
354357
doc['operationTime']
355358
else
356-
# The above may set @start_at_operation_time to nil
357-
# if it was not in the document for some reason,
358-
# for consistency set it to nil here as well.
359-
# NB: since this block may be executed more than once, each
360-
# execution must write to start_at_operation_time either way.
361359
nil
362360
end
363361
result
362+
else
363+
server.with_connection do |connection|
364+
result = send_initial_query(connection, context)
365+
366+
start_at_operation_time = if doc = result.replies.first && result.replies.first.documents.first
367+
doc['operationTime']
368+
else
369+
# The above may set @start_at_operation_time to nil
370+
# if it was not in the document for some reason,
371+
# for consistency set it to nil here as well.
372+
# NB: since this block may be executed more than once, each
373+
# execution must write to start_at_operation_time either way.
374+
nil
375+
end
376+
result
377+
end
364378
end
365379
end
366380

lib/mongo/collection/view/iterable.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,15 @@ def initial_query_op(session)
172172
def send_initial_query(server, context, operation: nil)
173173
operation ||= initial_query_op(context.session)
174174
if server.load_balancer?
175-
# Connection will be checked in when cursor is drained.
176-
connection = server.pool.check_out(context: context)
175+
# Connection will be checked in when cursor is drained,
176+
# unless the connection is pinned to a transaction (in which
177+
# case it stays checked out for the transaction duration).
178+
if context.connection_global_id
179+
connection = server.pool.check_out_pinned_connection(
180+
context.connection_global_id
181+
)
182+
end
183+
connection ||= server.pool.check_out(context: context)
177184
operation.execute_with_connection(connection, context: context)
178185
else
179186
operation.execute(server, context: context)

lib/mongo/cursor.rb

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def initialize(view, result, server, options = {})
8787
@connection_global_id = result.connection_global_id
8888
@context = @options[:context]&.with(connection_global_id: connection_global_id_for_context) || fresh_context
8989
@explicitly_closed = false
90+
@get_more_network_error = false
9091
@lock = Mutex.new
9192
if server.load_balancer?
9293
# We need the connection in the cursor only in load balanced topology;
@@ -297,19 +298,21 @@ def close(opts = {})
297298
ctx = context ? context.refresh(timeout_ms: opts[:timeout_ms]) : fresh_context(opts)
298299

299300
unregister
300-
read_with_one_retry do
301-
spec = {
302-
coll_name: collection_name,
303-
db_name: database.name,
304-
cursor_ids: [ id ],
305-
}
306-
op = Operation::KillCursors.new(spec)
307-
execute_operation(op, context: ctx)
301+
unless @get_more_network_error
302+
read_with_one_retry do
303+
spec = {
304+
coll_name: collection_name,
305+
db_name: database.name,
306+
cursor_ids: [ id ],
307+
}
308+
op = Operation::KillCursors.new(spec)
309+
execute_operation(op, context: ctx)
310+
end
308311
end
309312

310313
nil
311-
rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::ServerNotUsable
312-
# Errors are swallowed since there is noting can be done by handling them.
314+
rescue Error::OperationFailure::Family, Error::SocketError, Error::SocketTimeoutError, Error::ServerNotUsable, Error::ConnectionPerished
315+
# Errors are swallowed since there is nothing can be done by handling them.
313316
ensure
314317
end_session
315318
@cursor_id = 0
@@ -379,6 +382,9 @@ def get_more
379382
with_overload_retry(context: possibly_refreshed_context) do
380383
process(execute_operation(get_more_operation))
381384
end
385+
rescue Error::SocketError, Error::SocketTimeoutError
386+
@get_more_network_error = true
387+
raise
382388
rescue Error::OperationFailure => e
383389
# When overload retries are exhausted on getMore, close the cursor
384390
# so that killCursors is sent to the server.
@@ -579,7 +585,12 @@ def check_in_connection
579585
return if @connection.nil?
580586
return unless @connection.server.load_balancer?
581587

582-
@connection.connection_pool.check_in(@connection)
588+
@connection.unpin
589+
# Do not check in if the connection is still pinned by something
590+
# else (e.g. a transaction).
591+
unless @connection.pinned?
592+
@connection.connection_pool.check_in(@connection)
593+
end
583594
@connection = nil
584595
end
585596
end

lib/mongo/database/view.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ def operation_timeouts(opts = {})
202202
private
203203

204204
def collections_info(session, server_selector, options = {})
205+
@batch_size = options[:batch_size] if options[:batch_size]
205206
description = nil
206207
context = Operation::Context.new(
207208
client: client,

lib/mongo/operation/shared/executable.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ def do_execute(connection, context, options = {})
5353
"Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}"
5454
)
5555
end
56-
else
57-
session.pin_to_connection(connection.global_id)
58-
connection.pin
56+
elsif !session.committing_transaction? && !session.aborting_transaction?
57+
session.pin_to_connection(connection.global_id, connection: connection)
58+
connection.pin(:transaction)
5959
end
6060
end
6161

lib/mongo/server/connection.rb

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def initialize(server, options = {})
101101
@last_checkin = nil
102102
@auth_mechanism = nil
103103
@pid = Process.pid
104-
@pinned = false
104+
@pin_reasons = Set.new
105105

106106
publish_cmap_event(
107107
Monitoring::Event::Cmap::ConnectionCreated.new(address, id)
@@ -182,21 +182,34 @@ def error?
182182
#
183183
# @api private
184184
def pinned?
185-
@pinned
185+
!@pin_reasons.empty?
186186
end
187187

188-
# Mark the connection as pinned.
188+
# Mark the connection as pinned for the given reason.
189+
#
190+
# @param [ Symbol ] reason The reason for pinning (:cursor or
191+
# :transaction). Defaults to :cursor for backward compatibility.
192+
#
193+
# @api private
194+
def pin(reason = :cursor)
195+
@pin_reasons << reason
196+
end
197+
198+
# Remove a pin from the connection for the given reason.
199+
#
200+
# @param [ Symbol ] reason The reason to unpin (:cursor or
201+
# :transaction). Defaults to :cursor for backward compatibility.
189202
#
190203
# @api private
191-
def pin
192-
@pinned = true
204+
def unpin(reason = :cursor)
205+
@pin_reasons.delete(reason)
193206
end
194207

195-
# Mark the connection as not pinned.
208+
# Remove all pins from the connection.
196209
#
197210
# @api private
198-
def unpin
199-
@pinned = false
211+
def unpin_all
212+
@pin_reasons.clear
200213
end
201214

202215
# Establishes a network connection to the target address.

lib/mongo/server/connection_pool.rb

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,23 @@ def check_out(connection_global_id: nil, context: nil)
391391
check_invariants
392392
end
393393

394+
# Returns a pinned connection that is already checked out, if one
395+
# exists with the given global id. Returns nil otherwise.
396+
#
397+
# @param [ Integer ] connection_global_id The global id of the pinned
398+
# connection.
399+
#
400+
# @return [ Connection | nil ] The pinned connection, or nil.
401+
#
402+
# @api private
403+
def check_out_pinned_connection(connection_global_id)
404+
@lock.synchronize do
405+
@checked_out_connections.detect do |conn|
406+
conn.global_id == connection_global_id && conn.pinned?
407+
end
408+
end
409+
end
410+
394411
# Check a connection back into the pool.
395412
#
396413
# The connection must have been previously created by this pool.
@@ -714,15 +731,36 @@ def inspect
714731
def with_connection(connection_global_id: nil, context: nil)
715732
raise_if_closed!
716733

717-
connection = check_out(
734+
# If a specific connection is requested and it is already checked out
735+
# and pinned (e.g. for a transaction or cursor in load-balanced mode),
736+
# reuse it directly without going through the check_out/check_in cycle.
737+
if connection_global_id
738+
connection = @lock.synchronize do
739+
@checked_out_connections.detect do |conn|
740+
conn.global_id == connection_global_id && conn.pinned?
741+
end
742+
end
743+
end
744+
745+
connection ||= check_out(
718746
connection_global_id: connection_global_id,
719747
context: context
720748
)
749+
721750
yield(connection)
722751
rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished => e
723752
maybe_raise_pool_cleared!(connection, e)
724753
ensure
725-
check_in(connection) if connection
754+
if connection && !connection.pinned?
755+
# Do not check in if the connection is pinned (the session or cursor
756+
# owns it and will check it in later when unpinning). Also skip
757+
# check-in if the connection was already checked in during the block
758+
# (e.g. by Session#unpin after an error on the first operation).
759+
checked_out = @lock.synchronize do
760+
@checked_out_connections.include?(connection)
761+
end
762+
check_in(connection) if checked_out
763+
end
726764
end
727765

728766
# Close sockets that have been open for longer than the max idle time,

lib/mongo/server_selector/base.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,14 @@ def select_server(
191191
# Parameters and return values are the same as for select_server, only
192192
# the +timeout+ param is renamed to +csot_timeout+.
193193
private def select_server_impl(cluster, _ping, session, write_aggregation, deprioritized, csot_timeout)
194-
return cluster.servers.first if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
194+
if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
195+
# In load-balanced mode, release the pinned connection if the session
196+
# is no longer in a transaction (e.g. after commit or abort).
197+
if session&.pinned_connection_global_id && !session.in_transaction?
198+
session.unpin
199+
end
200+
return cluster.servers.first
201+
end
195202

196203
timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT
197204

lib/mongo/session.rb

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,9 @@ def end_session
379379
rescue Mongo::Error, Error::AuthError
380380
end
381381
end
382+
# Release any pinned connection (e.g. after a committed transaction
383+
# in load-balanced mode).
384+
unpin if pinned_connection_global_id
382385
cluster.session_pool.checkin(@server_session) if @server_session
383386
end
384387
ensure
@@ -919,12 +922,14 @@ def pin_to_server(server)
919922
#
920923
# @param [ Integer ] connection_global_id The global id of connection to pin
921924
# this session to.
925+
# @param [ Connection | nil ] connection The connection object to pin to.
922926
#
923927
# @api private
924-
def pin_to_connection(connection_global_id)
928+
def pin_to_connection(connection_global_id, connection: nil)
925929
raise ArgumentError, 'Cannot pin to a nil connection id' if connection_global_id.nil?
926930

927931
@pinned_connection_global_id = connection_global_id
932+
@pinned_connection = connection
928933
end
929934

930935
# Unpins this session from the pinned server or connection,
@@ -936,7 +941,16 @@ def pin_to_connection(connection_global_id)
936941
def unpin(connection = nil)
937942
@pinned_server = nil
938943
@pinned_connection_global_id = nil
939-
connection.unpin unless connection.nil?
944+
conn = connection || @pinned_connection
945+
if conn
946+
conn.unpin(:transaction)
947+
# Only check the connection back into the pool if nothing else
948+
# still holds a pin on it (e.g. an open cursor).
949+
unless conn.pinned?
950+
conn.connection_pool.check_in(conn)
951+
end
952+
end
953+
@pinned_connection = nil
940954
end
941955

942956
# Unpins this session from the pinned server or connection, if the session was pinned

spec/runners/unified/assertions.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ def assert_event_matches(actual, expected)
214214
unless (awaited = spec.use('awaited')).nil?
215215
assert_eq(actual.awaited?, awaited, 'Event awaited does not match expectation')
216216
end
217+
if reason = spec.use('reason')
218+
assert_eq(actual.reason.to_s, reason.to_s, 'Event reason does not match expectation')
219+
end
217220
return if spec.empty?
218221

219222
raise NotImplementedError, "Unhandled keys: #{spec}"

0 commit comments

Comments
 (0)