forked from mongodb/mongo-ruby-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmonitor.rb
More file actions
382 lines (345 loc) · 12.1 KB
/
monitor.rb
File metadata and controls
382 lines (345 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Server
# Responsible for periodically polling a server via hello commands to
# keep the server's status up to date.
#
# Does all work in a background thread so as to not interfere with other
# operations performed by the driver.
#
# @since 2.0.0
# @api private
class Monitor
include Loggable
extend Forwardable
include Event::Publisher
include BackgroundThread
# The default interval between server status refreshes is 10 seconds.
#
# @since 2.0.0
DEFAULT_HEARTBEAT_INTERVAL = 10.freeze
# The minimum time between forced server scans. Is
# minHeartbeatFrequencyMS in the SDAM spec.
#
# @since 2.0.0
MIN_SCAN_INTERVAL = 0.5.freeze
# The weighting factor (alpha) for calculating the average moving round trip time.
#
# @since 2.0.0
# @deprecated Will be removed in version 3.0.
RTT_WEIGHT_FACTOR = 0.2.freeze
# Create the new server monitor.
#
# @example Create the server monitor.
# Mongo::Server::Monitor.new(address, listeners, monitoring)
#
# @note Monitor must never be directly instantiated outside of a Server.
#
# @param [ Server ] server The server to monitor.
# @param [ Event::Listeners ] event_listeners The event listeners.
# @param [ Monitoring ] monitoring The monitoring..
# @param [ Hash ] options The options.
#
# @option options [ Float ] :connect_timeout The timeout, in seconds, to
# use when establishing the monitoring connection.
# @option options [ Float ] :heartbeat_interval The interval between
# regular server checks.
# @option options [ Logger ] :logger A custom logger to use.
# @option options [ Mongo::Server::Monitor::AppMetadata ] :monitor_app_metadata
# The metadata to use for regular monitoring connection.
# @option options [ Mongo::Server::Monitor::AppMetadata ] :push_monitor_app_metadata
# The metadata to use for push monitor's connection.
# @option options [ Float ] :socket_timeout The timeout, in seconds, to
# execute operations on the monitoring connection.
#
# @since 2.0.0
# @api private
def initialize(server, event_listeners, monitoring, options = {})
unless monitoring.is_a?(Monitoring)
raise ArgumentError, "Wrong monitoring type: #{monitoring.inspect}"
end
unless options[:app_metadata]
raise ArgumentError, 'App metadata is required'
end
unless options[:push_monitor_app_metadata]
raise ArgumentError, 'Push monitor app metadata is required'
end
@server = server
@event_listeners = event_listeners
@monitoring = monitoring
@options = options.freeze
@mutex = Mutex.new
@sdam_mutex = Mutex.new
@next_earliest_scan = @next_wanted_scan = Time.now
@update_mutex = Mutex.new
end
# @return [ Server ] server The server that this monitor is monitoring.
# @api private
attr_reader :server
# @return [ Mongo::Server::Monitor::Connection ] connection The connection to use.
attr_reader :connection
# @return [ Hash ] options The server options.
attr_reader :options
# The interval between regular server checks.
#
# @return [ Float ] The heartbeat interval, in seconds.
def heartbeat_interval
options[:heartbeat_interval] || DEFAULT_HEARTBEAT_INTERVAL
end
# @deprecated
def_delegators :server, :last_scan
# The compressor is determined during the handshake, so it must be an
# attribute of the connection.
#
# @deprecated
def_delegators :connection, :compressor
# @return [ Monitoring ] monitoring The monitoring.
attr_reader :monitoring
# @return [ Server::PushMonitor | nil ] The push monitor, if one is being
# used.
def push_monitor
@update_mutex.synchronize do
@push_monitor
end
end
# Perform a check of the server.
#
# @since 2.0.0
def do_work
scan!
# @next_wanted_scan may be updated by the push monitor.
# However we need to check for termination flag so that the monitor
# thread exits when requested.
loop do
delta = @next_wanted_scan - Time.now
if delta > 0
signaled = server.scan_semaphore.wait(delta)
if signaled || @stop_requested
break
end
else
break
end
end
end
# Stop the background thread and wait for it to terminate for a
# reasonable amount of time.
#
# @return [ true | false ] Whether the thread was terminated.
#
# @api public for backwards compatibility only
def stop!
stop_push_monitor!
# Forward super's return value
super.tap do
# Important: disconnect should happen after the background thread
# terminates.
connection&.disconnect!
end
end
def create_push_monitor!(topology_version)
@update_mutex.synchronize do
if @push_monitor && !@push_monitor.running?
@push_monitor = nil
end
@push_monitor ||= PushMonitor.new(
self,
topology_version,
monitoring,
**Utils.shallow_symbolize_keys(options.merge(
socket_timeout: heartbeat_interval + connection.socket_timeout,
app_metadata: options[:push_monitor_app_metadata],
check_document: @connection.check_document
)),
)
end
end
def stop_push_monitor!
@update_mutex.synchronize do
if @push_monitor
@push_monitor.stop!
@push_monitor = nil
end
end
end
# Perform a check of the server with throttling, and update
# the server's description and average round trip time.
#
# If the server was checked less than MIN_SCAN_INTERVAL seconds
# ago, sleep until MIN_SCAN_INTERVAL seconds have passed since the last
# check. Then perform the check which involves running hello
# on the server being monitored and updating the server description
# as a result.
#
# @note If the system clock moves backwards, this method can sleep
# for a very long time.
#
# @note The return value of this method is deprecated. In version 3.0.0
# this method will not have a return value.
#
# @return [ Description ] The updated description.
#
# @since 2.0.0
def scan!
# Ordinarily the background thread would invoke this method.
# But it is also possible to invoke scan! directly on a monitor.
# Allow only one scan to be performed at a time.
@mutex.synchronize do
throttle_scan_frequency!
begin
result = do_scan
rescue => e
run_sdam_flow({}, scan_error: e)
else
run_sdam_flow(result)
end
end
end
def run_sdam_flow(result, awaited: false, scan_error: nil)
@sdam_mutex.synchronize do
old_description = server.description
new_description = Description.new(
server.address,
result,
average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time,
minimum_round_trip_time: server.round_trip_time_calculator.minimum_round_trip_time
)
server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited, scan_error: scan_error)
server.description.tap do |new_description|
unless awaited
if new_description.unknown? && !old_description.unknown?
@next_earliest_scan = @next_wanted_scan = Time.now
else
@next_earliest_scan = Time.now + MIN_SCAN_INTERVAL
@next_wanted_scan = Time.now + heartbeat_interval
end
end
end
end
end
# Restarts the server monitor unless the current thread is alive.
#
# @example Restart the monitor.
# monitor.restart!
#
# @return [ Thread ] The thread the monitor runs on.
#
# @since 2.1.0
def restart!
if @thread && @thread.alive?
@thread
else
run!
end
end
def to_s
"#<#{self.class.name}:#{object_id} #{server.address}>"
end
private
def pre_stop
server.scan_semaphore.signal
end
def do_scan
begin
monitoring.publish_heartbeat(server) do
check
end
rescue => exc
msg = "Error checking #{server.address}"
Utils.warn_bg_exception(msg, exc,
logger: options[:logger],
log_prefix: options[:log_prefix],
bg_error_backtrace: options[:bg_error_backtrace],
)
raise exc
end
end
def check
if @connection && @connection.pid != Process.pid
log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}")
@connection.disconnect!
@connection = nil
end
if @connection
result = server.round_trip_time_calculator.measure do
begin
doc = @connection.check_document
cmd = Protocol::Query.new(
Database::ADMIN, Database::COMMAND, doc, :limit => -1
)
message = @connection.dispatch_bytes(cmd.serialize.to_s)
message.documents.first
rescue Mongo::Error
@connection.disconnect!
@connection = nil
raise
end
end
else
connection = Connection.new(server.address, options)
connection.connect!
result = server.round_trip_time_calculator.measure do
connection.handshake!
end
@connection = connection
if tv_doc = result['topologyVersion']
if streaming_enabled?
create_push_monitor!(TopologyVersion.new(tv_doc))
push_monitor.run!
else
stop_push_monitor!
end
else
# Failed response or pre-4.4 server
stop_push_monitor!
end
result
end
result
end
# @note If the system clock is set to a time in the past, this method
# can sleep for a very long time.
def throttle_scan_frequency!
delta = @next_earliest_scan - Time.now
if delta > 0
sleep(delta)
end
end
# Returns whether the streaming protocol is enabled, based on the
# serverMonitoringMode option. Default mode is :auto.
#
# - :stream - always use streaming when server supports it
# - :poll - never use streaming
# - :auto - use polling on FaaS platforms, streaming otherwise
#
# @return [ true | false ] Whether streaming is enabled.
def streaming_enabled?
mode = options[:server_monitoring_mode] || :auto
case mode
when :poll
false
when :stream
true
when :auto
!Server::AppMetadata::Environment.new.faas?
end
end
end
end
end
require 'mongo/server/monitor/connection'
require 'mongo/server/monitor/app_metadata'