33require 'concurrent/concern/logging'
44require 'concurrent/executor/ruby_executor_service'
55require 'concurrent/utility/monotonic_time'
6+ require 'concurrent/collection/timeout_queue'
67
78module Concurrent
89
910 # @!macro thread_pool_executor
1011 # @!macro thread_pool_options
1112 # @!visibility private
1213 class RubyThreadPoolExecutor < RubyExecutorService
14+ include Concern ::Deprecation
1315
1416 # @!macro thread_pool_executor_constant_default_max_pool_size
1517 DEFAULT_MAX_POOL_SIZE = 2_147_483_647 # java.lang.Integer::MAX_VALUE
@@ -95,8 +97,16 @@ def remaining_capacity
9597 end
9698
9799 # @!visibility private
98- def remove_busy_worker ( worker )
99- synchronize { ns_remove_busy_worker worker }
100+ def prunable_capacity
101+ synchronize { ns_prunable_capacity }
102+ end
103+
104+ # @!visibility private
105+ def remove_worker ( worker )
106+ synchronize do
107+ ns_remove_ready_worker worker
108+ ns_remove_busy_worker worker
109+ end
100110 end
101111
102112 # @!visibility private
@@ -116,7 +126,7 @@ def worker_task_completed
116126
117127 # @!macro thread_pool_executor_method_prune_pool
118128 def prune_pool
119- synchronize { ns_prune_pool }
129+ deprecated "#prune_pool has no effect and will be removed in next the release, see https://github.com/ruby-concurrency/concurrent-ruby/pull/1082."
120130 end
121131
122132 private
@@ -146,9 +156,6 @@ def ns_initialize(opts)
146156 @largest_length = 0
147157 @workers_counter = 0
148158 @ruby_pid = $$ # detects if Ruby has forked
149-
150- @gc_interval = opts . fetch ( :gc_interval , @idletime / 2.0 ) . to_i # undocumented
151- @next_gc_time = Concurrent . monotonic_time + @gc_interval
152159 end
153160
154161 # @!visibility private
@@ -162,12 +169,10 @@ def ns_execute(*args, &task)
162169
163170 if ns_assign_worker ( *args , &task ) || ns_enqueue ( *args , &task )
164171 @scheduled_task_count += 1
172+ nil
165173 else
166- return fallback_action ( *args , &task )
174+ fallback_action ( *args , &task )
167175 end
168-
169- ns_prune_pool if @next_gc_time < Concurrent . monotonic_time
170- nil
171176 end
172177
173178 # @!visibility private
@@ -218,7 +223,7 @@ def ns_assign_worker(*args, &task)
218223 # @!visibility private
219224 def ns_enqueue ( *args , &task )
220225 return false if @synchronous
221-
226+
222227 if !ns_limited_queue? || @queue . size < @max_queue
223228 @queue << [ task , args ]
224229 true
@@ -265,7 +270,7 @@ def ns_ready_worker(worker, last_message, success = true)
265270 end
266271 end
267272
268- # removes a worker which is not in not tracked in @ready
273+ # removes a worker which is not tracked in @ready
269274 #
270275 # @!visibility private
271276 def ns_remove_busy_worker ( worker )
@@ -274,25 +279,27 @@ def ns_remove_busy_worker(worker)
274279 true
275280 end
276281
277- # try oldest worker if it is idle for enough time, it's returned back at the start
278- #
279282 # @!visibility private
280- def ns_prune_pool
281- now = Concurrent . monotonic_time
282- stopped_workers = 0
283- while !@ready . empty? && ( @pool . size - stopped_workers > @min_length )
284- worker , last_message = @ready . first
285- if now - last_message > self . idletime
286- stopped_workers += 1
287- @ready . shift
288- worker << :stop
289- else break
290- end
283+ def ns_remove_ready_worker ( worker )
284+ if index = @ready . index { |rw , _ | rw == worker }
285+ @ready . delete_at ( index )
291286 end
287+ true
288+ end
292289
293- @next_gc_time = Concurrent . monotonic_time + @gc_interval
290+ # @return [Integer] number of excess idle workers which can be removed without
291+ # going below min_length, or all workers if not running
292+ #
293+ # @!visibility private
294+ def ns_prunable_capacity
295+ if running?
296+ [ @pool . size - @min_length , @ready . size ] . min
297+ else
298+ @pool . size
299+ end
294300 end
295301
302+ # @!visibility private
296303 def ns_reset_if_forked
297304 if $$ != @ruby_pid
298305 @queue . clear
@@ -312,7 +319,7 @@ class Worker
312319
313320 def initialize ( pool , id )
314321 # instance variables accessed only under pool's lock so no need to sync here again
315- @queue = Queue . new
322+ @queue = Collection :: TimeoutQueue . new
316323 @pool = pool
317324 @thread = create_worker @queue , pool , pool . idletime
318325
@@ -338,17 +345,26 @@ def kill
338345 def create_worker ( queue , pool , idletime )
339346 Thread . new ( queue , pool , idletime ) do |my_queue , my_pool , my_idletime |
340347 catch ( :stop ) do
341- loop do
348+ prunable = true
342349
343- case message = my_queue . pop
350+ loop do
351+ timeout = prunable && my_pool . running? ? my_idletime : nil
352+ case message = my_queue . pop ( timeout : timeout )
353+ when nil
354+ if my_pool . prunable_capacity > 0
355+ my_pool . remove_worker ( self )
356+ throw :stop
357+ end
358+
359+ prunable = false
344360 when :stop
345- my_pool . remove_busy_worker ( self )
361+ my_pool . remove_worker ( self )
346362 throw :stop
347-
348363 else
349364 task , args = message
350365 run_task my_pool , task , args
351366 my_pool . ready_worker ( self , Concurrent . monotonic_time )
367+ prunable = true
352368 end
353369 end
354370 end
0 commit comments