Skip to content

Commit 88ead99

Browse files
RUBY-3364 Fix connection pool thread starvation (#3028)
1 parent ecbb8f8 commit 88ead99

3 files changed

Lines changed: 430 additions & 8 deletions

File tree

lib/mongo/server/connection_pool.rb

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ def initialize(server, options = {})
141141
@pending_connections = Set.new
142142
@interrupt_connections = []
143143

144+
# RUBY-3364: count threads currently blocked on size_cv /
145+
# max_connecting_cv. When non-zero, a newly-arriving thread must
146+
# enter the wait queue even if the gate predicate is currently
147+
# satisfied, to prevent barging past existing waiters.
148+
@size_waiters = 0
149+
@max_connecting_waiters = 0
150+
144151
# Mutex used for synchronizing access to @available_connections and
145152
# @checked_out_connections. The pool object is thread-safe, thus
146153
# all methods that retrieve or modify instance variables generally
@@ -1301,13 +1308,27 @@ def retrieve_and_connect_connection(connection_global_id, context = nil)
13011308
connection = nil
13021309

13031310
@lock.synchronize do
1304-
# The first gate to checking out a connection. Make sure the number of
1305-
# unavailable connections is less than the max pool size.
1306-
until max_size == 0 || unavailable_connections < max_size
1311+
# RUBY-3364: if any thread is already waiting for a size slot,
1312+
# join the queue even when the gate predicate is currently
1313+
# satisfied. Without this, re-entering threads (those that just
1314+
# checked a connection back in) barge past existing waiters and
1315+
# the 195 blocked threads in a 200:5 scenario never wake.
1316+
# Skip the gate for unlimited pools (max_size == 0) where there
1317+
# is no size constraint to wait on.
1318+
must_wait = max_size != 0 && @size_waiters > 0
1319+
until (max_size == 0 || unavailable_connections < max_size) && !must_wait
13071320
wait = deadline - Utils.monotonic_time
13081321
raise_check_out_timeout!(connection_global_id) if wait <= 0
1309-
@size_cv.wait(wait)
1322+
@size_waiters += 1
1323+
begin
1324+
@size_cv.wait(wait)
1325+
ensure
1326+
@size_waiters -= 1
1327+
end
13101328
raise_if_not_ready!
1329+
# After one wait cycle we have served our "queue tax" and
1330+
# compete for the slot on the next predicate check.
1331+
must_wait = false
13111332
end
13121333
@connection_requests += 1
13131334
connection = wait_for_connection(connection_global_id, deadline)
@@ -1319,8 +1340,17 @@ def retrieve_and_connect_connection(connection_global_id, context = nil)
13191340
@checked_out_connections << connection
13201341
@pending_connections.delete(connection) if @pending_connections.include?(connection)
13211342
@max_connecting_cv.signal
1322-
# no need to signal size_cv here since the number of unavailable
1323-
# connections is unchanged.
1343+
# RUBY-3364: hand off the baton. A waiter that arrived during
1344+
# our wake-up window (seeing our stale @size_waiters > 0) may be
1345+
# parked on @size_cv with capacity already available. The
1346+
# regular check-in path is the only other place that signals
1347+
# @size_cv, so we wake the next waiter only when the predicate
1348+
# is actually satisfied for them. Signaling unconditionally
1349+
# would re-queue a waiter at the back of the FIFO and break
1350+
# ordering.
1351+
if @size_waiters > 0 && (max_size == 0 || unavailable_connections < max_size)
1352+
@size_cv.signal
1353+
end
13241354
end
13251355

13261356
connection
@@ -1342,9 +1372,12 @@ def retrieve_and_connect_connection(connection_global_id, context = nil)
13421372
def wait_for_connection(connection_global_id, deadline)
13431373
connection = nil
13441374
while connection.nil?
1375+
# RUBY-3364: as above, yield to any thread already queued for
1376+
# a max_connecting slot before competing ourselves.
1377+
must_wait = @max_connecting_waiters > 0
13451378
# The second gate to checking out a connection. Make sure 1) there
13461379
# exists an available connection and 2) we are under max_connecting.
1347-
until @available_connections.any? || @pending_connections.length < @max_connecting
1380+
until (@available_connections.any? || @pending_connections.length < @max_connecting) && !must_wait
13481381
wait = deadline - Utils.monotonic_time
13491382
if wait <= 0
13501383
# We are going to raise a timeout error, so the connection
@@ -1353,10 +1386,16 @@ def wait_for_connection(connection_global_id, deadline)
13531386
decrement_connection_requests_and_signal
13541387
raise_check_out_timeout!(connection_global_id)
13551388
end
1356-
@max_connecting_cv.wait(wait)
1389+
@max_connecting_waiters += 1
1390+
begin
1391+
@max_connecting_cv.wait(wait)
1392+
ensure
1393+
@max_connecting_waiters -= 1
1394+
end
13571395
# We do not need to decrement the connection_requests counter
13581396
# or signal here because the pool is not ready yet.
13591397
raise_if_not_ready!
1398+
must_wait = false
13601399
end
13611400

13621401
connection = get_connection(Process.pid, connection_global_id)
@@ -1370,6 +1409,14 @@ def wait_for_connection(connection_global_id, deadline)
13701409
raise_check_out_timeout!(connection_global_id)
13711410
end
13721411

1412+
# RUBY-3364: hand off the baton for max_connecting_cv. Signal
1413+
# only if the gate predicate is satisfied for the next waiter, to
1414+
# avoid re-queuing a waiter at the back of the FIFO.
1415+
if @max_connecting_waiters > 0 &&
1416+
(@available_connections.any? || @pending_connections.length < @max_connecting)
1417+
@max_connecting_cv.signal
1418+
end
1419+
13731420
connection
13741421
end
13751422

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
# frozen_string_literal: true
2+
3+
# Connection pool fairness harness.
4+
#
5+
# Drives the pool under heavy over-subscription and measures per-thread
6+
# service counts and wait-time distribution. Originally written to diagnose
7+
# RUBY-3364 ("Mongo Connection Pool should serve queries in FIFO manner"),
8+
# and kept as a permanent regression/fairness check.
9+
#
10+
# A fair pool, given N threads and a pool size of M (N >> M), should serve
11+
# every thread roughly the same number of times over a long enough run.
12+
# With the pre-fix code, 5 "lucky" threads held the connections for the
13+
# entire run and the other 195 threads starved and hit the wait_timeout
14+
# exactly once each. A healthy pool shows min/median/max per-thread counts
15+
# within ~1% of each other.
16+
#
17+
# Usage:
18+
# MONGODB_URI="mongodb://..." bundle exec ruby profile/connection_pool_fairness.rb
19+
#
20+
# Tunable via environment:
21+
# MONGODB_URI cluster URI (default: local replica set on 27017-9)
22+
# POOL_SIZE max_pool_size (default: 5)
23+
# THREADS concurrent worker threads (default: 200)
24+
# DURATION_SEC how long to run (default: 30)
25+
# WAIT_TIMEOUT pool wait_queue_timeout in seconds (default: 10)
26+
#
27+
# Memory note: this script records every operation into an in-memory Queue
28+
# and drains it at the end. At default settings (~4000 ops/sec × 30s ≈ 120k
29+
# rows, roughly 24 MB) that is well within normal memory budgets. For very
30+
# long runs (tens of minutes or more) switch to online aggregation —
31+
# e.g., per-thread counters and a histogram — to avoid unbounded memory
32+
# growth.
33+
34+
require 'mongo'
35+
require 'logger'
36+
37+
MONGO_URI = ENV.fetch('MONGODB_URI',
38+
'mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=replset')
39+
POOL_SIZE = Integer(ENV.fetch('POOL_SIZE', '5'))
40+
THREADS = Integer(ENV.fetch('THREADS', '200'))
41+
DURATION_SEC = Integer(ENV.fetch('DURATION_SEC', '30'))
42+
WAIT_TIMEOUT = Float(ENV.fetch('WAIT_TIMEOUT', '10'))
43+
DB_NAME = 'ruby_3364'
44+
COLL_NAME = 'probe'
45+
46+
Mongo::Logger.logger = Logger.new(File::NULL)
47+
Mongo::Logger.logger.level = Logger::FATAL
48+
49+
def now_us
50+
(Process.clock_gettime(Process::CLOCK_MONOTONIC) * 1_000_000).to_i
51+
end
52+
53+
client = Mongo::Client.new(
54+
MONGO_URI,
55+
database: DB_NAME,
56+
max_pool_size: POOL_SIZE,
57+
min_pool_size: POOL_SIZE,
58+
wait_queue_timeout: WAIT_TIMEOUT,
59+
logger: Mongo::Logger.logger
60+
)
61+
62+
# Seed the collection so first() has something to return.
63+
client[COLL_NAME].drop
64+
client[COLL_NAME].insert_one(x: 1)
65+
66+
results = Queue.new
67+
stop_at = now_us + (DURATION_SEC * 1_000_000)
68+
start_barrier = Queue.new
69+
70+
threads = Array.new(THREADS) do |i|
71+
Thread.new do
72+
start_barrier.pop
73+
while now_us < stop_at
74+
t1 = now_us
75+
err = nil
76+
begin
77+
Mongo::QueryCache.uncached do
78+
client[COLL_NAME].find.first
79+
end
80+
rescue StandardError => e
81+
err = e.class.name
82+
end
83+
t2 = now_us
84+
results << [ i, t1, t2, t2 - t1, err ]
85+
end
86+
end
87+
end
88+
89+
# Release all threads at once to get maximum contention.
90+
THREADS.times { start_barrier << :go }
91+
92+
threads.each(&:join)
93+
client.close
94+
95+
# Drain queue into array.
96+
rows = []
97+
rows << results.pop until results.empty?
98+
99+
total = rows.size
100+
errors = rows.count { |r| r[4] }
101+
timeouts = rows.count { |r| r[4] == 'Mongo::Error::ConnectionCheckOutTimeout' }
102+
puts "Total ops: #{total}"
103+
puts "Errors (any): #{errors} (#{format('%.4f', 100.0 * errors / total)}%)"
104+
puts "Checkout timeouts: #{timeouts} (#{format('%.4f', 100.0 * timeouts / total)}%)"
105+
106+
# Wait-time band histogram: buckets of 1s up to 11s.
107+
buckets = Array.new(12, 0)
108+
rows.each do |_, _, _, dur, _|
109+
idx = [ dur / 1_000_000, 11 ].min
110+
buckets[idx] += 1
111+
end
112+
puts
113+
puts 'Wait-time band histogram (seconds):'
114+
buckets.each_with_index do |n, i|
115+
label = (i == 11) ? '>10s' : "#{i}-#{i + 1}s"
116+
bar = '#' * [ (n.to_f / total * 400).to_i, 80 ].min
117+
puts " #{label.rjust(6)}: #{n.to_s.rjust(8)} #{bar}"
118+
end
119+
120+
# Fine-grained banding in the 0-10s range (by 500ms buckets) to detect the
121+
# reporter's 2s/4s/6s/8s pattern.
122+
puts
123+
puts 'Fine band histogram (500ms buckets, 0-10s):'
124+
fine = Array.new(21, 0)
125+
rows.each do |_, _, _, dur, _|
126+
idx = [ dur / 500_000, 20 ].min
127+
fine[idx] += 1
128+
end
129+
fine.each_with_index do |n, i|
130+
lo = i * 0.5
131+
hi = (i + 1) * 0.5
132+
bar = '#' * [ (n.to_f / total * 400).to_i, 80 ].min
133+
puts " #{format('%.1f-%.1fs', lo, hi).rjust(10)}: #{n.to_s.rjust(8)} #{bar}"
134+
end
135+
136+
# Per-thread op counts — are any threads starved?
137+
per_thread = Hash.new(0)
138+
per_thread_errors = Hash.new(0)
139+
rows.each do |r|
140+
per_thread[r[0]] += 1
141+
per_thread_errors[r[0]] += 1 if r[4]
142+
end
143+
144+
# Make sure every thread id appears (some may have zero completions)
145+
THREADS.times do |i|
146+
per_thread[i] ||= 0
147+
per_thread_errors[i] ||= 0
148+
end
149+
150+
counts = per_thread.values.sort
151+
puts
152+
puts 'Per-thread op count distribution:'
153+
puts " min=#{counts.first}, p10=#{counts[counts.size / 10]}, " \
154+
"median=#{counts[counts.size / 2]}, p90=#{counts[counts.size * 9 / 10]}, " \
155+
"max=#{counts.last}"
156+
puts " threads with 0 ops: #{counts.count(&:zero?)}"
157+
puts " threads with <10 ops: #{counts.count { |c| c < 10 }}"
158+
puts " threads with >100 ops: #{counts.count { |c| c > 100 }}"
159+
puts " threads with >1000 ops:#{counts.count { |c| c > 1000 }}"
160+
puts
161+
162+
# Top 5 and bottom 5 threads by count
163+
ranked = per_thread.sort_by { |_, v| v }
164+
puts 'Bottom 10 threads by op count:'
165+
ranked.first(10).each do |tid, n|
166+
puts " thread #{tid.to_s.rjust(3)}: #{n.to_s.rjust(6)} ops, #{per_thread_errors[tid]} timeouts"
167+
end
168+
puts
169+
puts 'Top 10 threads by op count:'
170+
ranked.last(10).each do |tid, n|
171+
puts " thread #{tid.to_s.rjust(3)}: #{n.to_s.rjust(6)} ops, #{per_thread_errors[tid]} timeouts"
172+
end

0 commit comments

Comments
 (0)