-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathexecutor_pool.rb
More file actions
124 lines (101 loc) · 2.46 KB
/
executor_pool.rb
File metadata and controls
124 lines (101 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
require 'singleton'
require 'securerandom'
module Hystrix
class CommandExecutorPools
include Singleton
attr_accessor :pools, :lock
def initialize
self.lock = Mutex.new
self.pools = {}
end
def get_pool(pool_name, size = nil)
lock.synchronize do
pools[pool_name] ||= CommandExecutorPool.new(pool_name, size || 10)
pools[pool_name].set_size(size || 10)
return pools[pool_name]
end
end
def shutdown
lock.synchronize do
for pool_name, pool in pools
pool.shutdown
end
end
end
end
class CommandExecutorPool
attr_accessor :name, :size
attr_accessor :executors, :locked_executors, :lock
attr_accessor :circuit_supervisor
attr_reader :uuid
def initialize(name, size)
@uuid = SecureRandom.uuid
self.name = name
self.size = size
self.executors = {}
self.locked_executors = {}
self.lock = Mutex.new
self.circuit_supervisor = Circuit.supervise(self.name)
size.times do
e = CommandExecutor.new(self)
self.executors[e.uuid] = e
end
end
def set_size(size)
self.size = size
if size > self.size
(size - self.size).times do
e = CommandExecutor.new(self)
self.executors[e.uuid] = e
end
self.size = size
end
end
def take
raise ExecutorPoolFullError.new("Unable to get executor from #{self.name} pool. [#{self.locked_executors.size} locked] [#{@uuid}]") unless self.executors.count > 0
lock.synchronize do
raise ExecutorPoolFullError.new("Unable to get executor from #{self.name} pool. [#{self.locked_executors.size} locked] [#{@uuid}]") unless self.executors.count > 0
uuid, executor = self.executors.first
executor.lock
self.executors.delete(executor.uuid)
self.locked_executors[executor.uuid] = executor
return executor
end
end
def release(executor)
self.locked_executors.delete(executor.uuid)
self.executors[executor.uuid] = executor
end
def shutdown
lock.synchronize do
self.executors = {}
until (self.executors.size + self.locked_executors.size) == 0 do
self.executors = {}
sleep 0.1
end
end
end
end
class CommandExecutor
attr_accessor :owner
attr_reader :uuid, :pool
def initialize(pool)
@uuid = SecureRandom.uuid
@pool = pool
self.owner = nil
end
def lock
self.owner = Thread.current
end
def unlock
self.owner = nil
self.pool.release(self) if self.pool
end
def locked?
!self.owner.nil?
end
def run(command)
command.run
end
end
end