-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcommand.rb
More file actions
74 lines (57 loc) · 2 KB
/
command.rb
File metadata and controls
74 lines (57 loc) · 2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# TODO: implement pluggable metrics collection. implement separate statsd impl
module Hystrix
class ExecutorPoolFullError < StandardError; end
class CircuitClosedError < StandardError; end
class Command
include Celluloid
attr_accessor :executor_pool, :circuit
@default_pool_size = 10
def self.default_pool_size
@default_pool_size
end
def initialize(*args)
self.executor_pool = CommandExecutorPools.instance.get_pool(executor_pool_name, self.class.default_pool_size)
self.circuit = self.executor_pool.circuit_supervisor.actors.first
end
# Run the command synchronously
def execute
raise 'No executor pool found! Did you forget to call super in your initialize method?' unless executor_pool
executor = nil
start_time = Time.now
begin
raise CircuitClosedError unless self.circuit.is_closed?
executor = executor_pool.take
result = executor.run(self)
duration = Time.now - start_time
Configuration.notify_success({command_name: self.class.name, executor_pool_name: executor_pool_name, duration: duration})
rescue Exception => main_error
duration = Time.now - start_time
begin
result = fallback(main_error)
Configuration.notify_fallback({command_name: self.class.name, executor_pool_name: executor_pool_name, duration: duration, error: main_error})
rescue NotImplementedError => fallback_error
Configuration.notify_failure({command_name: self.class.name, executor_pool_name: executor_pool_name, duration: duration, error: main_error, fallback_error: fallback_error})
raise main_error
end
ensure
executor.unlock if executor
self.terminate
end
return result
end
# Commands which share the value of executor_pool_name will use the same pool
def executor_pool_name
@executor_pool_name || self.class.name
end
# Run the command asynchronously
def queue
future.execute
end
def fallback(error)
raise NotImplementedError
end
def self.pool_size(size)
@default_pool_size = size
end
end
end