Skip to content
Merged
63 changes: 55 additions & 8 deletions lib/mongo/server/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ def initialize(server, options = {})
@pending_connections = Set.new
@interrupt_connections = []

# RUBY-3364: count threads currently blocked on size_cv /
# max_connecting_cv. When non-zero, a newly-arriving thread must
# enter the wait queue even if the gate predicate is currently
# satisfied, to prevent barging past existing waiters.
@size_waiters = 0
@max_connecting_waiters = 0

# Mutex used for synchronizing access to @available_connections and
# @checked_out_connections. The pool object is thread-safe, thus
# all methods that retrieve or modify instance variables generally
Expand Down Expand Up @@ -1301,13 +1308,27 @@ def retrieve_and_connect_connection(connection_global_id, context = nil)
connection = nil

@lock.synchronize do
# The first gate to checking out a connection. Make sure the number of
# unavailable connections is less than the max pool size.
until max_size == 0 || unavailable_connections < max_size
# RUBY-3364: if any thread is already waiting for a size slot,
# join the queue even when the gate predicate is currently
# satisfied. Without this, re-entering threads (those that just
# checked a connection back in) barge past existing waiters and
# the 195 blocked threads in a 200:5 scenario never wake.
# Skip the gate for unlimited pools (max_size == 0) where there
# is no size constraint to wait on.
must_wait = max_size != 0 && @size_waiters > 0
until (max_size == 0 || unavailable_connections < max_size) && !must_wait
wait = deadline - Utils.monotonic_time
raise_check_out_timeout!(connection_global_id) if wait <= 0
@size_cv.wait(wait)
@size_waiters += 1
begin
@size_cv.wait(wait)
ensure
Comment on lines +1319 to +1325
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forcing a @size_cv.wait solely because @size_waiters > 0 can create a lost-wakeup/spurious-timeout scenario: a thread may enter the wait even though unavailable_connections < max_size is already true, and if no further @size_cv.signal occurs (e.g., multiple check-ins happened before this thread began waiting, leaving capacity available but no subsequent signals), it will sleep until deadline and raise a checkout timeout despite capacity being available. This fairness mechanism likely needs a handoff/ticket-style turnstile (or some form of waiter-to-waiter signaling) so a “queue tax” wait does not depend on an external signal when the predicate is already satisfied.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This is a real lost-wakeup: a newcomer entering during the wake-up window (observing a stale @size_waiters > 0 before the waking waiter has decremented it) can be parked on @size_cv with capacity available, and no further signal is guaranteed.

Fixed in b7b50f2: after a successful checkout updates @checked_out_connections, signal @size_cv if there is a waiter AND the gate predicate is satisfied for them. The predicate guard matters — an unconditional signal would wake a waiter whose predicate still fails, causing it to re-queue itself at the back of the FIFO and break the wait-queue-fairness CMAP spec test.

Reproduction test added in fa55ebd (white-box, instruments the pool CV to force the race window open). Verified it fails on the prior commit and passes after the fix.

@size_waiters -= 1
end
raise_if_not_ready!
# After one wait cycle we have served our "queue tax" and
# compete for the slot on the next predicate check.
must_wait = false
end
@connection_requests += 1
connection = wait_for_connection(connection_global_id, deadline)
Expand All @@ -1319,8 +1340,17 @@ def retrieve_and_connect_connection(connection_global_id, context = nil)
@checked_out_connections << connection
@pending_connections.delete(connection) if @pending_connections.include?(connection)
@max_connecting_cv.signal
# no need to signal size_cv here since the number of unavailable
# connections is unchanged.
# RUBY-3364: hand off the baton. A waiter that arrived during
# our wake-up window (seeing our stale @size_waiters > 0) may be
# parked on @size_cv with capacity already available. The
# regular check-in path is the only other place that signals
# @size_cv, so we wake the next waiter only when the predicate
# is actually satisfied for them. Signaling unconditionally
# would re-queue a waiter at the back of the FIFO and break
# ordering.
if @size_waiters > 0 && (max_size == 0 || unavailable_connections < max_size)
@size_cv.signal
end
end

connection
Expand All @@ -1342,9 +1372,12 @@ def retrieve_and_connect_connection(connection_global_id, context = nil)
def wait_for_connection(connection_global_id, deadline)
connection = nil
while connection.nil?
# RUBY-3364: as above, yield to any thread already queued for
# a max_connecting slot before competing ourselves.
must_wait = @max_connecting_waiters > 0
# The second gate to checking out a connection. Make sure 1) there
# exists an available connection and 2) we are under max_connecting.
until @available_connections.any? || @pending_connections.length < @max_connecting
until (@available_connections.any? || @pending_connections.length < @max_connecting) && !must_wait
wait = deadline - Utils.monotonic_time
if wait <= 0
# We are going to raise a timeout error, so the connection
Expand All @@ -1353,10 +1386,16 @@ def wait_for_connection(connection_global_id, deadline)
decrement_connection_requests_and_signal
raise_check_out_timeout!(connection_global_id)
end
@max_connecting_cv.wait(wait)
@max_connecting_waiters += 1
begin
Comment on lines 1375 to +1390
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same must_wait pattern on @max_connecting_cv can block even when a connection is already available (@available_connections.any?), and can similarly lead to a spurious timeout if the thread starts waiting after the last relevant signal and no further signals occur. Consider ensuring the fairness mechanism does not require an additional CV signal when the gate predicate is already satisfied (e.g., via an explicit FIFO/turnstile handoff among waiters).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same fix applied to @max_connecting_cv in wait_for_connection. Commit b7b50f2.

Predicate-guarded signal (@available_connections.any? || @pending_connections.length < @max_connecting) avoids the FIFO-reordering side effect that would otherwise surface on the CMAP wait-queue-fairness spec test.

@max_connecting_cv.wait(wait)
ensure
@max_connecting_waiters -= 1
end
# We do not need to decrement the connection_requests counter
# or signal here because the pool is not ready yet.
raise_if_not_ready!
must_wait = false
end

connection = get_connection(Process.pid, connection_global_id)
Expand All @@ -1370,6 +1409,14 @@ def wait_for_connection(connection_global_id, deadline)
raise_check_out_timeout!(connection_global_id)
end

# RUBY-3364: hand off the baton for max_connecting_cv. Signal
# only if the gate predicate is satisfied for the next waiter, to
# avoid re-queuing a waiter at the back of the FIFO.
if @max_connecting_waiters > 0 &&
(@available_connections.any? || @pending_connections.length < @max_connecting)
@max_connecting_cv.signal
end

connection
end

Expand Down
172 changes: 172 additions & 0 deletions profile/connection_pool_fairness.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# frozen_string_literal: true

# Connection pool fairness harness.
#
# Drives the pool under heavy over-subscription and measures per-thread
# service counts and wait-time distribution. Originally written to diagnose
# RUBY-3364 ("Mongo Connection Pool should serve queries in FIFO manner"),
# and kept as a permanent regression/fairness check.
#
# A fair pool, given N threads and a pool size of M (N >> M), should serve
# every thread roughly the same number of times over a long enough run.
# With the pre-fix code, 5 "lucky" threads held the connections for the
# entire run and the other 195 threads starved and hit the wait_timeout
# exactly once each. A healthy pool shows min/median/max per-thread counts
# within ~1% of each other.
#
# Usage:
# MONGODB_URI="mongodb://..." bundle exec ruby profile/connection_pool_fairness.rb
#
# Tunable via environment:
# MONGODB_URI cluster URI (default: local replica set on 27017-9)
# POOL_SIZE max_pool_size (default: 5)
# THREADS concurrent worker threads (default: 200)
# DURATION_SEC how long to run (default: 30)
# WAIT_TIMEOUT pool wait_queue_timeout in seconds (default: 10)
#
# Memory note: this script records every operation into an in-memory Queue
# and drains it at the end. At default settings (~4000 ops/sec × 30s ≈ 120k
# rows, roughly 24 MB) that is well within normal memory budgets. For very
# long runs (tens of minutes or more) switch to online aggregation —
# e.g., per-thread counters and a histogram — to avoid unbounded memory
# growth.

require 'mongo'
require 'logger'

MONGO_URI = ENV.fetch('MONGODB_URI',
'mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=replset')
POOL_SIZE = Integer(ENV.fetch('POOL_SIZE', '5'))
THREADS = Integer(ENV.fetch('THREADS', '200'))
DURATION_SEC = Integer(ENV.fetch('DURATION_SEC', '30'))
WAIT_TIMEOUT = Float(ENV.fetch('WAIT_TIMEOUT', '10'))
DB_NAME = 'ruby_3364'
COLL_NAME = 'probe'

Mongo::Logger.logger = Logger.new(File::NULL)
Mongo::Logger.logger.level = Logger::FATAL

def now_us
(Process.clock_gettime(Process::CLOCK_MONOTONIC) * 1_000_000).to_i
end

client = Mongo::Client.new(
MONGO_URI,
database: DB_NAME,
max_pool_size: POOL_SIZE,
min_pool_size: POOL_SIZE,
wait_queue_timeout: WAIT_TIMEOUT,
logger: Mongo::Logger.logger
)

# Seed the collection so first() has something to return.
client[COLL_NAME].drop
client[COLL_NAME].insert_one(x: 1)

results = Queue.new
stop_at = now_us + (DURATION_SEC * 1_000_000)
start_barrier = Queue.new

threads = Array.new(THREADS) do |i|
Thread.new do
start_barrier.pop
while now_us < stop_at
t1 = now_us
err = nil
begin
Mongo::QueryCache.uncached do
client[COLL_NAME].find.first
end
rescue StandardError => e
err = e.class.name
end
t2 = now_us
results << [ i, t1, t2, t2 - t1, err ]
end
end
end

# Release all threads at once to get maximum contention.
THREADS.times { start_barrier << :go }

threads.each(&:join)
client.close

# Drain queue into array.
rows = []
rows << results.pop until results.empty?

Comment on lines +95 to +98
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This harness enqueues a full per-operation row [tid, t1, t2, dur, err] into results for the entire run, then drains it into rows. Under fast operations (or longer durations), this can grow to a very large number of Ruby objects and can OOM the process, making the diagnostic unreliable. Consider aggregating statistics online (counts/histograms per thread) and only optionally storing full rows behind an env flag or sampling rate.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. Deferring to a follow-up. At the default workload (200 threads × 30s ≈ 120k rows, ~24 MB of Ruby objects) the harness is fine; it would need online aggregation for multi-minute runs. Since this script lives under profile/ and is run by humans for diagnostic purposes rather than in CI, the memory ceiling is acceptable for now. I will add a header note.

total = rows.size
errors = rows.count { |r| r[4] }
timeouts = rows.count { |r| r[4] == 'Mongo::Error::ConnectionCheckOutTimeout' }
puts "Total ops: #{total}"
puts "Errors (any): #{errors} (#{format('%.4f', 100.0 * errors / total)}%)"
puts "Checkout timeouts: #{timeouts} (#{format('%.4f', 100.0 * timeouts / total)}%)"

# Wait-time band histogram: buckets of 1s up to 11s.
buckets = Array.new(12, 0)
rows.each do |_, _, _, dur, _|
idx = [ dur / 1_000_000, 11 ].min
buckets[idx] += 1
end
puts
puts 'Wait-time band histogram (seconds):'
buckets.each_with_index do |n, i|
label = (i == 11) ? '>10s' : "#{i}-#{i + 1}s"
bar = '#' * [ (n.to_f / total * 400).to_i, 80 ].min
puts " #{label.rjust(6)}: #{n.to_s.rjust(8)} #{bar}"
end

# Fine-grained banding in the 0-10s range (by 500ms buckets) to detect the
# reporter's 2s/4s/6s/8s pattern.
puts
puts 'Fine band histogram (500ms buckets, 0-10s):'
fine = Array.new(21, 0)
rows.each do |_, _, _, dur, _|
idx = [ dur / 500_000, 20 ].min
fine[idx] += 1
end
fine.each_with_index do |n, i|
lo = i * 0.5
hi = (i + 1) * 0.5
bar = '#' * [ (n.to_f / total * 400).to_i, 80 ].min
puts " #{format('%.1f-%.1fs', lo, hi).rjust(10)}: #{n.to_s.rjust(8)} #{bar}"
end

# Per-thread op counts — are any threads starved?
per_thread = Hash.new(0)
per_thread_errors = Hash.new(0)
rows.each do |r|
per_thread[r[0]] += 1
per_thread_errors[r[0]] += 1 if r[4]
end

# Make sure every thread id appears (some may have zero completions)
THREADS.times do |i|
per_thread[i] ||= 0
per_thread_errors[i] ||= 0
end

counts = per_thread.values.sort
puts
puts 'Per-thread op count distribution:'
puts " min=#{counts.first}, p10=#{counts[counts.size / 10]}, " \
"median=#{counts[counts.size / 2]}, p90=#{counts[counts.size * 9 / 10]}, " \
"max=#{counts.last}"
puts " threads with 0 ops: #{counts.count(&:zero?)}"
puts " threads with <10 ops: #{counts.count { |c| c < 10 }}"
puts " threads with >100 ops: #{counts.count { |c| c > 100 }}"
puts " threads with >1000 ops:#{counts.count { |c| c > 1000 }}"
puts

# Top 5 and bottom 5 threads by count
ranked = per_thread.sort_by { |_, v| v }
puts 'Bottom 10 threads by op count:'
ranked.first(10).each do |tid, n|
puts " thread #{tid.to_s.rjust(3)}: #{n.to_s.rjust(6)} ops, #{per_thread_errors[tid]} timeouts"
end
puts
puts 'Top 10 threads by op count:'
ranked.last(10).each do |tid, n|
puts " thread #{tid.to_s.rjust(3)}: #{n.to_s.rjust(6)} ops, #{per_thread_errors[tid]} timeouts"
end
Loading
Loading