This repository was archived by the owner on Jun 21, 2023. It is now read-only.
forked from reinh/statsd
-
Notifications
You must be signed in to change notification settings - Fork 44
Expand file tree
/
Copy pathstatsd.rb
More file actions
256 lines (216 loc) · 6.76 KB
/
Copy pathstatsd.rb
File metadata and controls
256 lines (216 loc) · 6.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
require 'socket'
require 'zlib'
# = Statsd: A Statsd client (https://github.com/etsy/statsd)
#
# @example Set up a global Statsd client for a server on localhost:8125
# $statsd = Statsd.new 'localhost', 8125
# @example Send some stats
# $statsd.increment 'garets'
# $statsd.timing 'glork', 320
# @example Use {#time} to time the execution of a block
# $statsd.time('account.activate') { @account.activate! }
# @example Create a namespaced statsd client and increment 'account.activate'
# statsd = Statsd.new('localhost').tap{|sd| sd.namespace = 'account'}
# statsd.increment 'activate'
class Statsd
class UDPClient
attr_reader :sock
def initialize(address, port = nil)
address, port = address.split(':') if address.include?(':')
addrinfo = Addrinfo.ip(address)
@sock = UDPSocket.new(addrinfo.pfamily)
@sock.connect(addrinfo.ip_address, port)
end
def send(msg)
sock.write(msg)
rescue SystemCallError
nil
end
end
class SecureUDPClient < UDPClient
def initialize(address, port, key)
super(address, port)
@key = key
end
def send(msg)
super(signed_payload(msg))
end
private
# defer loading openssl and securerandom unless needed. this shaves ~10ms off
# of baseline require load time for environments that don't require message signing.
def self.setup_openssl
@sha256 ||= begin
require 'securerandom'
require 'openssl'
OpenSSL::Digest::SHA256.new
end
end
def signed_payload(message)
sha256 = SecureUDPClient.setup_openssl
payload = timestamp + nonce + message
signature = OpenSSL::HMAC.digest(sha256, @key, payload)
signature + payload
end
def timestamp
[Time.now.to_i].pack("Q<")
end
def nonce
SecureRandom.random_bytes(4)
end
end
# A namespace to prepend to all statsd calls.
attr_reader :namespace
def namespace=(namespace)
@namespace = namespace
@prefix = namespace ? "#{@namespace}." : "".freeze
end
# All the endpoints where StatsD will report metrics
attr_reader :shards
#characters that will be replaced with _ in stat names
RESERVED_CHARS_REGEX = /[\:\|\@]/
COUNTER_TYPE = "c".freeze
TIMING_TYPE = "ms".freeze
GAUGE_TYPE = "g".freeze
HISTOGRAM_TYPE = "h".freeze
def initialize(client_class = nil)
@shards = []
@client_class = client_class || UDPClient
self.namespace = nil
end
def self.simple(addr, port = nil)
self.new.add_shard(addr, port)
end
def add_shard(*args)
@shards << @client_class.new(*args)
self
end
def enable_buffering(buffer_size = nil)
return if @buffering
@shards.map! { |client| Buffer.new(client, buffer_size) }
@buffering = true
end
def disable_buffering
return unless @buffering
flush_all
@shards.map! { |client| client.base_client }
@buffering = false
end
def flush_all
return unless @buffering
@shards.each { |client| client.flush }
end
# Sends an increment (count = 1) for the given stat to the statsd server.
#
# @param stat (see #count)
# @param sample_rate (see #count)
# @see #count
def increment(stat, sample_rate=1); count stat, 1, sample_rate end
# Sends a decrement (count = -1) for the given stat to the statsd server.
#
# @param stat (see #count)
# @param sample_rate (see #count)
# @see #count
def decrement(stat, sample_rate=1); count stat, -1, sample_rate end
# Sends an arbitrary count for the given stat to the statsd server.
#
# @param [String] stat stat name
# @param [Integer] count count
# @param [Integer] sample_rate sample rate, 1 for always
def count(stat, count, sample_rate=1); send stat, count, COUNTER_TYPE, sample_rate end
# Sends an arbitary gauge value for the given stat to the statsd server.
#
# @param [String] stat stat name.
# @param [Numeric] gauge value.
# @example Report the current user count:
# $statsd.gauge('user.count', User.count)
def gauge(stat, value)
send stat, value, GAUGE_TYPE
end
# Sends a timing (in ms) for the given stat to the statsd server. The
# sample_rate determines what percentage of the time this report is sent. The
# statsd server then uses the sample_rate to correctly track the average
# timing for the stat.
#
# @param stat stat name
# @param [Integer] ms timing in milliseconds
# @param [Integer] sample_rate sample rate, 1 for always
def timing(stat, ms, sample_rate=1); send stat, ms, TIMING_TYPE, sample_rate end
# Reports execution time of the provided block using {#timing}.
#
# @param stat (see #timing)
# @param sample_rate (see #timing)
# @yield The operation to be timed
# @see #timing
# @example Report the time (in ms) taken to activate an account
# $statsd.time('account.activate') { @account.activate! }
def time(stat, sample_rate=1)
start = Time.now
result = yield
timing(stat, ((Time.now - start) * 1000).round(5), sample_rate)
result
end
# Sends a histogram measurement for the given stat to the statsd server. The
# sample_rate determines what percentage of the time this report is sent. The
# statsd server then uses the sample_rate to correctly track the average
# for the stat.
def histogram(stat, value, sample_rate=1); send stat, value, HISTOGRAM_TYPE, sample_rate end
# Provided for backwards compatibility to systems that expect a batch method.
# Buffering will transparently be used if its enabled, otherwise this is a noop.
def batch
yield self
end
private
def sampled(sample_rate)
yield unless sample_rate < 1 and rand > sample_rate
end
def send(stat, delta, type, sample_rate=1)
sampled(sample_rate) do
stat = stat.to_s.dup
stat.gsub!(/::/, ".".freeze)
stat.gsub!(RESERVED_CHARS_REGEX, "_".freeze)
msg = String.new
msg << @prefix
msg << stat
msg << ":".freeze
msg << delta.to_s
msg << "|".freeze
msg << type
if sample_rate < 1
msg << "|@".freeze
msg << sample_rate.to_s
end
shard = select_shard(stat)
shard.send(msg)
end
end
def select_shard(stat)
if @shards.size == 1
@shards.first
else
@shards[Zlib.crc32(stat) % @shards.size]
end
end
class Buffer
DEFAULT_BUFFER_CAP = 512
attr_reader :base_client
attr_accessor :flush_count
def initialize(client, buffer_cap = nil)
@base_client = client
@buffer = String.new
@buffer_cap = buffer_cap || DEFAULT_BUFFER_CAP
@flush_count = 0
end
def flush
return unless @buffer.bytesize > 0
@base_client.send(@buffer)
@buffer.clear
@flush_count += 1
end
def send(msg)
flush if @buffer.bytesize + msg.bytesize >= @buffer_cap
@buffer << msg
@buffer << "\n".freeze
nil
end
end
end