-
Notifications
You must be signed in to change notification settings - Fork 530
RUBY-3364 fix connection pool thread starvation under oversubscription #3028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
aab04fc
f807391
6e1e48e
00dbe76
dcf8be4
b7b50f2
fa55ebd
0c69a8a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -141,6 +141,13 @@ def initialize(server, options = {}) | |
| @pending_connections = Set.new | ||
| @interrupt_connections = [] | ||
|
|
||
| # RUBY-3364: count threads currently blocked on size_cv / | ||
| # max_connecting_cv. When non-zero, a newly-arriving thread must | ||
| # enter the wait queue even if the gate predicate is currently | ||
| # satisfied, to prevent barging past existing waiters. | ||
| @size_waiters = 0 | ||
| @max_connecting_waiters = 0 | ||
|
|
||
| # Mutex used for synchronizing access to @available_connections and | ||
| # @checked_out_connections. The pool object is thread-safe, thus | ||
| # all methods that retrieve or modify instance variables generally | ||
|
|
@@ -1301,13 +1308,27 @@ def retrieve_and_connect_connection(connection_global_id, context = nil) | |
| connection = nil | ||
|
|
||
| @lock.synchronize do | ||
| # The first gate to checking out a connection. Make sure the number of | ||
| # unavailable connections is less than the max pool size. | ||
| until max_size == 0 || unavailable_connections < max_size | ||
| # RUBY-3364: if any thread is already waiting for a size slot, | ||
| # join the queue even when the gate predicate is currently | ||
| # satisfied. Without this, re-entering threads (those that just | ||
| # checked a connection back in) barge past existing waiters and | ||
| # the 195 blocked threads in a 200:5 scenario never wake. | ||
| # Skip the gate for unlimited pools (max_size == 0) where there | ||
| # is no size constraint to wait on. | ||
| must_wait = max_size != 0 && @size_waiters > 0 | ||
| until (max_size == 0 || unavailable_connections < max_size) && !must_wait | ||
| wait = deadline - Utils.monotonic_time | ||
| raise_check_out_timeout!(connection_global_id) if wait <= 0 | ||
| @size_cv.wait(wait) | ||
| @size_waiters += 1 | ||
| begin | ||
| @size_cv.wait(wait) | ||
| ensure | ||
| @size_waiters -= 1 | ||
| end | ||
| raise_if_not_ready! | ||
| # After one wait cycle we have served our "queue tax" and | ||
| # compete for the slot on the next predicate check. | ||
| must_wait = false | ||
| end | ||
| @connection_requests += 1 | ||
| connection = wait_for_connection(connection_global_id, deadline) | ||
|
|
@@ -1319,8 +1340,17 @@ def retrieve_and_connect_connection(connection_global_id, context = nil) | |
| @checked_out_connections << connection | ||
| @pending_connections.delete(connection) if @pending_connections.include?(connection) | ||
| @max_connecting_cv.signal | ||
| # no need to signal size_cv here since the number of unavailable | ||
| # connections is unchanged. | ||
| # RUBY-3364: hand off the baton. A waiter that arrived during | ||
| # our wake-up window (seeing our stale @size_waiters > 0) may be | ||
| # parked on @size_cv with capacity already available. The | ||
| # regular check-in path is the only other place that signals | ||
| # @size_cv, so we wake the next waiter only when the predicate | ||
| # is actually satisfied for them. Signaling unconditionally | ||
| # would re-queue a waiter at the back of the FIFO and break | ||
| # ordering. | ||
| if @size_waiters > 0 && (max_size == 0 || unavailable_connections < max_size) | ||
| @size_cv.signal | ||
| end | ||
| end | ||
|
|
||
| connection | ||
|
|
@@ -1342,9 +1372,12 @@ def retrieve_and_connect_connection(connection_global_id, context = nil) | |
| def wait_for_connection(connection_global_id, deadline) | ||
| connection = nil | ||
| while connection.nil? | ||
| # RUBY-3364: as above, yield to any thread already queued for | ||
| # a max_connecting slot before competing ourselves. | ||
| must_wait = @max_connecting_waiters > 0 | ||
| # The second gate to checking out a connection. Make sure 1) there | ||
| # exists an available connection and 2) we are under max_connecting. | ||
| until @available_connections.any? || @pending_connections.length < @max_connecting | ||
| until (@available_connections.any? || @pending_connections.length < @max_connecting) && !must_wait | ||
| wait = deadline - Utils.monotonic_time | ||
| if wait <= 0 | ||
| # We are going to raise a timeout error, so the connection | ||
|
|
@@ -1353,10 +1386,16 @@ def wait_for_connection(connection_global_id, deadline) | |
| decrement_connection_requests_and_signal | ||
| raise_check_out_timeout!(connection_global_id) | ||
| end | ||
| @max_connecting_cv.wait(wait) | ||
| @max_connecting_waiters += 1 | ||
| begin | ||
|
Comment on lines
1375
to
+1390
|
||
| @max_connecting_cv.wait(wait) | ||
| ensure | ||
| @max_connecting_waiters -= 1 | ||
| end | ||
| # We do not need to decrement the connection_requests counter | ||
| # or signal here because the pool is not ready yet. | ||
| raise_if_not_ready! | ||
| must_wait = false | ||
| end | ||
|
|
||
| connection = get_connection(Process.pid, connection_global_id) | ||
|
|
@@ -1370,6 +1409,14 @@ def wait_for_connection(connection_global_id, deadline) | |
| raise_check_out_timeout!(connection_global_id) | ||
| end | ||
|
|
||
| # RUBY-3364: hand off the baton for max_connecting_cv. Signal | ||
| # only if the gate predicate is satisfied for the next waiter, to | ||
| # avoid re-queuing a waiter at the back of the FIFO. | ||
| if @max_connecting_waiters > 0 && | ||
| (@available_connections.any? || @pending_connections.length < @max_connecting) | ||
| @max_connecting_cv.signal | ||
| end | ||
|
|
||
| connection | ||
| end | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,172 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| # Connection pool fairness harness. | ||
| # | ||
| # Drives the pool under heavy over-subscription and measures per-thread | ||
| # service counts and wait-time distribution. Originally written to diagnose | ||
| # RUBY-3364 ("Mongo Connection Pool should serve queries in FIFO manner"), | ||
| # and kept as a permanent regression/fairness check. | ||
| # | ||
| # A fair pool, given N threads and a pool size of M (N >> M), should serve | ||
| # every thread roughly the same number of times over a long enough run. | ||
| # With the pre-fix code, 5 "lucky" threads held the connections for the | ||
| # entire run and the other 195 threads starved and hit the wait_timeout | ||
| # exactly once each. A healthy pool shows min/median/max per-thread counts | ||
| # within ~1% of each other. | ||
| # | ||
| # Usage: | ||
| # MONGODB_URI="mongodb://..." bundle exec ruby profile/connection_pool_fairness.rb | ||
| # | ||
| # Tunable via environment: | ||
| # MONGODB_URI cluster URI (default: local replica set on 27017-9) | ||
| # POOL_SIZE max_pool_size (default: 5) | ||
| # THREADS concurrent worker threads (default: 200) | ||
| # DURATION_SEC how long to run (default: 30) | ||
| # WAIT_TIMEOUT pool wait_queue_timeout in seconds (default: 10) | ||
| # | ||
| # Memory note: this script records every operation into an in-memory Queue | ||
| # and drains it at the end. At default settings (~4000 ops/sec × 30s ≈ 120k | ||
| # rows, roughly 24 MB) that is well within normal memory budgets. For very | ||
| # long runs (tens of minutes or more) switch to online aggregation — | ||
| # e.g., per-thread counters and a histogram — to avoid unbounded memory | ||
| # growth. | ||
|
|
||
| require 'mongo' | ||
| require 'logger' | ||
|
|
||
| MONGO_URI = ENV.fetch('MONGODB_URI', | ||
| 'mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=replset') | ||
| POOL_SIZE = Integer(ENV.fetch('POOL_SIZE', '5')) | ||
| THREADS = Integer(ENV.fetch('THREADS', '200')) | ||
| DURATION_SEC = Integer(ENV.fetch('DURATION_SEC', '30')) | ||
| WAIT_TIMEOUT = Float(ENV.fetch('WAIT_TIMEOUT', '10')) | ||
| DB_NAME = 'ruby_3364' | ||
| COLL_NAME = 'probe' | ||
|
|
||
| Mongo::Logger.logger = Logger.new(File::NULL) | ||
| Mongo::Logger.logger.level = Logger::FATAL | ||
|
|
||
| def now_us | ||
| (Process.clock_gettime(Process::CLOCK_MONOTONIC) * 1_000_000).to_i | ||
| end | ||
|
|
||
| client = Mongo::Client.new( | ||
| MONGO_URI, | ||
| database: DB_NAME, | ||
| max_pool_size: POOL_SIZE, | ||
| min_pool_size: POOL_SIZE, | ||
| wait_queue_timeout: WAIT_TIMEOUT, | ||
| logger: Mongo::Logger.logger | ||
| ) | ||
|
|
||
| # Seed the collection so first() has something to return. | ||
| client[COLL_NAME].drop | ||
| client[COLL_NAME].insert_one(x: 1) | ||
|
|
||
| results = Queue.new | ||
| stop_at = now_us + (DURATION_SEC * 1_000_000) | ||
| start_barrier = Queue.new | ||
|
|
||
| threads = Array.new(THREADS) do |i| | ||
| Thread.new do | ||
| start_barrier.pop | ||
| while now_us < stop_at | ||
| t1 = now_us | ||
| err = nil | ||
| begin | ||
| Mongo::QueryCache.uncached do | ||
| client[COLL_NAME].find.first | ||
| end | ||
| rescue StandardError => e | ||
| err = e.class.name | ||
| end | ||
| t2 = now_us | ||
| results << [ i, t1, t2, t2 - t1, err ] | ||
| end | ||
| end | ||
| end | ||
|
|
||
| # Release all threads at once to get maximum contention. | ||
| THREADS.times { start_barrier << :go } | ||
|
|
||
| threads.each(&:join) | ||
| client.close | ||
|
|
||
| # Drain queue into array. | ||
| rows = [] | ||
| rows << results.pop until results.empty? | ||
|
|
||
|
Comment on lines
+95
to
+98
|
||
| total = rows.size | ||
| errors = rows.count { |r| r[4] } | ||
| timeouts = rows.count { |r| r[4] == 'Mongo::Error::ConnectionCheckOutTimeout' } | ||
| puts "Total ops: #{total}" | ||
| puts "Errors (any): #{errors} (#{format('%.4f', 100.0 * errors / total)}%)" | ||
| puts "Checkout timeouts: #{timeouts} (#{format('%.4f', 100.0 * timeouts / total)}%)" | ||
|
|
||
| # Wait-time band histogram: buckets of 1s up to 11s. | ||
| buckets = Array.new(12, 0) | ||
| rows.each do |_, _, _, dur, _| | ||
| idx = [ dur / 1_000_000, 11 ].min | ||
| buckets[idx] += 1 | ||
| end | ||
| puts | ||
| puts 'Wait-time band histogram (seconds):' | ||
| buckets.each_with_index do |n, i| | ||
| label = (i == 11) ? '>10s' : "#{i}-#{i + 1}s" | ||
| bar = '#' * [ (n.to_f / total * 400).to_i, 80 ].min | ||
| puts " #{label.rjust(6)}: #{n.to_s.rjust(8)} #{bar}" | ||
| end | ||
|
|
||
| # Fine-grained banding in the 0-10s range (by 500ms buckets) to detect the | ||
| # reporter's 2s/4s/6s/8s pattern. | ||
| puts | ||
| puts 'Fine band histogram (500ms buckets, 0-10s):' | ||
| fine = Array.new(21, 0) | ||
| rows.each do |_, _, _, dur, _| | ||
| idx = [ dur / 500_000, 20 ].min | ||
| fine[idx] += 1 | ||
| end | ||
| fine.each_with_index do |n, i| | ||
| lo = i * 0.5 | ||
| hi = (i + 1) * 0.5 | ||
| bar = '#' * [ (n.to_f / total * 400).to_i, 80 ].min | ||
| puts " #{format('%.1f-%.1fs', lo, hi).rjust(10)}: #{n.to_s.rjust(8)} #{bar}" | ||
| end | ||
|
|
||
| # Per-thread op counts — are any threads starved? | ||
| per_thread = Hash.new(0) | ||
| per_thread_errors = Hash.new(0) | ||
| rows.each do |r| | ||
| per_thread[r[0]] += 1 | ||
| per_thread_errors[r[0]] += 1 if r[4] | ||
| end | ||
|
|
||
| # Make sure every thread id appears (some may have zero completions) | ||
| THREADS.times do |i| | ||
| per_thread[i] ||= 0 | ||
| per_thread_errors[i] ||= 0 | ||
| end | ||
|
|
||
| counts = per_thread.values.sort | ||
| puts | ||
| puts 'Per-thread op count distribution:' | ||
| puts " min=#{counts.first}, p10=#{counts[counts.size / 10]}, " \ | ||
| "median=#{counts[counts.size / 2]}, p90=#{counts[counts.size * 9 / 10]}, " \ | ||
| "max=#{counts.last}" | ||
| puts " threads with 0 ops: #{counts.count(&:zero?)}" | ||
| puts " threads with <10 ops: #{counts.count { |c| c < 10 }}" | ||
| puts " threads with >100 ops: #{counts.count { |c| c > 100 }}" | ||
| puts " threads with >1000 ops:#{counts.count { |c| c > 1000 }}" | ||
| puts | ||
|
|
||
| # Top 5 and bottom 5 threads by count | ||
| ranked = per_thread.sort_by { |_, v| v } | ||
| puts 'Bottom 10 threads by op count:' | ||
| ranked.first(10).each do |tid, n| | ||
| puts " thread #{tid.to_s.rjust(3)}: #{n.to_s.rjust(6)} ops, #{per_thread_errors[tid]} timeouts" | ||
| end | ||
| puts | ||
| puts 'Top 10 threads by op count:' | ||
| ranked.last(10).each do |tid, n| | ||
| puts " thread #{tid.to_s.rjust(3)}: #{n.to_s.rjust(6)} ops, #{per_thread_errors[tid]} timeouts" | ||
| end | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forcing a
@size_cv.waitsolely because@size_waiters > 0can create a lost-wakeup/spurious-timeout scenario: a thread may enter the wait even thoughunavailable_connections < max_sizeis already true, and if no further@size_cv.signaloccurs (e.g., multiple check-ins happened before this thread began waiting, leaving capacity available but no subsequent signals), it will sleep until deadline and raise a checkout timeout despite capacity being available. This fairness mechanism likely needs a handoff/ticket-style turnstile (or some form of waiter-to-waiter signaling) so a “queue tax” wait does not depend on an external signal when the predicate is already satisfied.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. This is a real lost-wakeup: a newcomer entering during the wake-up window (observing a stale
@size_waiters > 0before the waking waiter has decremented it) can be parked on@size_cvwith capacity available, and no further signal is guaranteed.Fixed in b7b50f2: after a successful checkout updates
@checked_out_connections, signal@size_cvif there is a waiter AND the gate predicate is satisfied for them. The predicate guard matters — an unconditional signal would wake a waiter whose predicate still fails, causing it to re-queue itself at the back of the FIFO and break thewait-queue-fairnessCMAP spec test.Reproduction test added in fa55ebd (white-box, instruments the pool CV to force the race window open). Verified it fails on the prior commit and passes after the fix.