33require 'concurrent/concern/logging'
44require 'concurrent/executor/ruby_executor_service'
55require 'concurrent/utility/monotonic_time'
6+ require 'concurrent/collection/timeout_queue'
67
78module Concurrent
89
910 # @!macro thread_pool_executor
1011 # @!macro thread_pool_options
1112 # @!visibility private
1213 class RubyThreadPoolExecutor < RubyExecutorService
14+ include Concern ::Deprecation
1315
1416 # @!macro thread_pool_executor_constant_default_max_pool_size
1517 DEFAULT_MAX_POOL_SIZE = 2_147_483_647 # java.lang.Integer::MAX_VALUE
@@ -94,9 +96,28 @@ def remaining_capacity
9496 end
9597 end
9698
99+ # removes the worker if it can be pruned
100+ #
101+ # @return [true, false] if the worker was pruned
102+ #
97103 # @!visibility private
98- def remove_busy_worker ( worker )
99- synchronize { ns_remove_busy_worker worker }
104+ def prune_worker ( worker )
105+ synchronize do
106+ if ns_prunable_capacity > 0
107+ remove_worker worker
108+ true
109+ else
110+ false
111+ end
112+ end
113+ end
114+
115+ # @!visibility private
116+ def remove_worker ( worker )
117+ synchronize do
118+ ns_remove_ready_worker worker
119+ ns_remove_busy_worker worker
120+ end
100121 end
101122
102123 # @!visibility private
@@ -116,7 +137,7 @@ def worker_task_completed
116137
117138 # @!macro thread_pool_executor_method_prune_pool
118139 def prune_pool
119- synchronize { ns_prune_pool }
140+ deprecated "#prune_pool has no effect and will be removed in next the release, see https://github.com/ruby-concurrency/concurrent-ruby/pull/1082."
120141 end
121142
122143 private
@@ -146,9 +167,6 @@ def ns_initialize(opts)
146167 @largest_length = 0
147168 @workers_counter = 0
148169 @ruby_pid = $$ # detects if Ruby has forked
149-
150- @gc_interval = opts . fetch ( :gc_interval , @idletime / 2.0 ) . to_i # undocumented
151- @next_gc_time = Concurrent . monotonic_time + @gc_interval
152170 end
153171
154172 # @!visibility private
@@ -162,12 +180,10 @@ def ns_execute(*args, &task)
162180
163181 if ns_assign_worker ( *args , &task ) || ns_enqueue ( *args , &task )
164182 @scheduled_task_count += 1
183+ nil
165184 else
166- return fallback_action ( *args , &task )
185+ fallback_action ( *args , &task )
167186 end
168-
169- ns_prune_pool if @next_gc_time < Concurrent . monotonic_time
170- nil
171187 end
172188
173189 # @!visibility private
@@ -218,7 +234,7 @@ def ns_assign_worker(*args, &task)
218234 # @!visibility private
219235 def ns_enqueue ( *args , &task )
220236 return false if @synchronous
221-
237+
222238 if !ns_limited_queue? || @queue . size < @max_queue
223239 @queue << [ task , args ]
224240 true
@@ -265,7 +281,7 @@ def ns_ready_worker(worker, last_message, success = true)
265281 end
266282 end
267283
268- # removes a worker which is not in not tracked in @ready
284+ # removes a worker which is not tracked in @ready
269285 #
270286 # @!visibility private
271287 def ns_remove_busy_worker ( worker )
@@ -274,25 +290,27 @@ def ns_remove_busy_worker(worker)
274290 true
275291 end
276292
277- # try oldest worker if it is idle for enough time, it's returned back at the start
278- #
279293 # @!visibility private
280- def ns_prune_pool
281- now = Concurrent . monotonic_time
282- stopped_workers = 0
283- while !@ready . empty? && ( @pool . size - stopped_workers > @min_length )
284- worker , last_message = @ready . first
285- if now - last_message > self . idletime
286- stopped_workers += 1
287- @ready . shift
288- worker << :stop
289- else break
290- end
294+ def ns_remove_ready_worker ( worker )
295+ if index = @ready . index { |rw , _ | rw == worker }
296+ @ready . delete_at ( index )
291297 end
298+ true
299+ end
292300
293- @next_gc_time = Concurrent . monotonic_time + @gc_interval
301+ # @return [Integer] number of excess idle workers which can be removed without
302+ # going below min_length, or all workers if not running
303+ #
304+ # @!visibility private
305+ def ns_prunable_capacity
306+ if running?
307+ [ @pool . size - @min_length , @ready . size ] . min
308+ else
309+ @pool . size
310+ end
294311 end
295312
313+ # @!visibility private
296314 def ns_reset_if_forked
297315 if $$ != @ruby_pid
298316 @queue . clear
@@ -312,7 +330,7 @@ class Worker
312330
313331 def initialize ( pool , id )
314332 # instance variables accessed only under pool's lock so no need to sync here again
315- @queue = Queue . new
333+ @queue = Collection :: TimeoutQueue . new
316334 @pool = pool
317335 @thread = create_worker @queue , pool , pool . idletime
318336
@@ -338,17 +356,22 @@ def kill
338356 def create_worker ( queue , pool , idletime )
339357 Thread . new ( queue , pool , idletime ) do |my_queue , my_pool , my_idletime |
340358 catch ( :stop ) do
341- loop do
359+ prunable = true
342360
343- case message = my_queue . pop
361+ loop do
362+ timeout = prunable && my_pool . running? ? my_idletime : nil
363+ case message = my_queue . pop ( timeout : timeout )
364+ when nil
365+ throw :stop if my_pool . prune_worker ( self )
366+ prunable = false
344367 when :stop
345- my_pool . remove_busy_worker ( self )
368+ my_pool . remove_worker ( self )
346369 throw :stop
347-
348370 else
349371 task , args = message
350372 run_task my_pool , task , args
351373 my_pool . ready_worker ( self , Concurrent . monotonic_time )
374+ prunable = true
352375 end
353376 end
354377 end
0 commit comments