Skip to content

Commit 7e7e15d

Browse files
RUBY-3241 URI option for monitoring type (#2989)
1 parent 4582ab9 commit 7e7e15d

10 files changed

Lines changed: 233 additions & 4 deletions

File tree

lib/mongo/client.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class Client
9292
:scan,
9393
:sdam_proc,
9494
:server_api,
95+
:server_monitoring_mode,
9596
:server_selection_timeout,
9697
:socket_timeout,
9798
:srv_max_hosts,

lib/mongo/server/monitor.rb

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,12 @@ def check
331331
end
332332
@connection = connection
333333
if tv_doc = result['topologyVersion']
334-
# Successful response, server 4.4+
335-
create_push_monitor!(TopologyVersion.new(tv_doc))
336-
push_monitor.run!
334+
if streaming_enabled?
335+
create_push_monitor!(TopologyVersion.new(tv_doc))
336+
push_monitor.run!
337+
else
338+
stop_push_monitor!
339+
end
337340
else
338341
# Failed response or pre-4.4 server
339342
stop_push_monitor!
@@ -351,6 +354,26 @@ def throttle_scan_frequency!
351354
sleep(delta)
352355
end
353356
end
357+
358+
# Returns whether the streaming protocol is enabled, based on the
359+
# serverMonitoringMode option. Default mode is :auto.
360+
#
361+
# - :stream - always use streaming when server supports it
362+
# - :poll - never use streaming
363+
# - :auto - use polling on FaaS platforms, streaming otherwise
364+
#
365+
# @return [ true | false ] Whether streaming is enabled.
366+
def streaming_enabled?
367+
mode = options[:server_monitoring_mode] || :auto
368+
case mode
369+
when :poll
370+
false
371+
when :stream
372+
true
373+
when :auto
374+
!Server::AppMetadata::Environment.new.faas?
375+
end
376+
end
354377
end
355378
end
356379
end

lib/mongo/uri.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,9 @@ class URI
204204
'SCRAM-SHA-256' => :scram256,
205205
}.freeze
206206

207+
# Valid values for the serverMonitoringMode URI option.
208+
SERVER_MONITORING_MODES = %w(stream poll auto).freeze
209+
207210
# Options that are allowed to appear more than once in the uri.
208211
#
209212
# In order to follow the URI options spec requirement that all instances

lib/mongo/uri/options_mapper.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,9 @@ def self.uri_option(uri_key, name, **extra)
325325
uri_option 'retryWrites', :retry_writes, type: :bool
326326
uri_option 'zlibCompressionLevel', :zlib_compression_level, type: :zlib_compression_level
327327

328+
# Monitoring Options
329+
uri_option 'serverMonitoringMode', :server_monitoring_mode, type: :server_monitoring_mode
330+
328331
# Converts +value+ to a boolean.
329332
#
330333
# Returns true for 'true', false for 'false', otherwise nil.
@@ -713,6 +716,31 @@ def revert_read_mode(value)
713716
end
714717
alias :stringify_read_mode :revert_read_mode
715718

719+
# Server monitoring mode transformation.
720+
#
721+
# @param [ String ] name Name of the URI option being processed.
722+
# @param [ String ] value The server monitoring mode string value.
723+
#
724+
# @return [ Symbol | nil ] The server monitoring mode symbol.
725+
def convert_server_monitoring_mode(name, value)
726+
mode = value.downcase
727+
if SERVER_MONITORING_MODES.include?(mode)
728+
mode.to_sym
729+
else
730+
log_warn("#{value} is not a valid server monitoring mode")
731+
nil
732+
end
733+
end
734+
735+
# Stringifies server monitoring mode.
736+
#
737+
# @param [ Symbol ] value The server monitoring mode.
738+
#
739+
# @return [ String ] The server monitoring mode as a string.
740+
def stringify_server_monitoring_mode(value)
741+
value.to_s
742+
end
743+
716744
# Read preference tags transformation.
717745
#
718746
# @param [ String ] name Name of the URI option being processed.

spec/mongo/server/monitor_spec.rb

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,4 +323,84 @@
323323
end
324324
end
325325
end
326+
327+
describe '#streaming_enabled?' do
328+
context 'when server_monitoring_mode is :stream' do
329+
let(:monitor_options) do
330+
{ server_monitoring_mode: :stream }
331+
end
332+
333+
it 'returns true' do
334+
expect(monitor.send(:streaming_enabled?)).to be true
335+
end
336+
end
337+
338+
context 'when server_monitoring_mode is :poll' do
339+
let(:monitor_options) do
340+
{ server_monitoring_mode: :poll }
341+
end
342+
343+
it 'returns false' do
344+
expect(monitor.send(:streaming_enabled?)).to be false
345+
end
346+
end
347+
348+
context 'when server_monitoring_mode is :auto' do
349+
let(:monitor_options) do
350+
{ server_monitoring_mode: :auto }
351+
end
352+
353+
context 'when not in a FaaS environment' do
354+
local_env(
355+
'AWS_EXECUTION_ENV' => nil,
356+
'AWS_LAMBDA_RUNTIME_API' => nil,
357+
'FUNCTIONS_WORKER_RUNTIME' => nil,
358+
'K_SERVICE' => nil,
359+
'FUNCTION_NAME' => nil,
360+
'VERCEL' => nil,
361+
)
362+
363+
it 'returns true' do
364+
expect(monitor.send(:streaming_enabled?)).to be true
365+
end
366+
end
367+
368+
context 'when in a FaaS environment' do
369+
local_env('FUNCTIONS_WORKER_RUNTIME' => 'ruby')
370+
371+
it 'returns false' do
372+
expect(monitor.send(:streaming_enabled?)).to be false
373+
end
374+
end
375+
end
376+
377+
context 'when server_monitoring_mode is not set' do
378+
let(:monitor_options) do
379+
{}
380+
end
381+
382+
context 'when not in a FaaS environment' do
383+
local_env(
384+
'AWS_EXECUTION_ENV' => nil,
385+
'AWS_LAMBDA_RUNTIME_API' => nil,
386+
'FUNCTIONS_WORKER_RUNTIME' => nil,
387+
'K_SERVICE' => nil,
388+
'FUNCTION_NAME' => nil,
389+
'VERCEL' => nil,
390+
)
391+
392+
it 'defaults to auto and returns true' do
393+
expect(monitor.send(:streaming_enabled?)).to be true
394+
end
395+
end
396+
397+
context 'when in a FaaS environment' do
398+
local_env('FUNCTIONS_WORKER_RUNTIME' => 'ruby')
399+
400+
it 'defaults to auto and returns false' do
401+
expect(monitor.send(:streaming_enabled?)).to be false
402+
end
403+
end
404+
end
405+
end
326406
end

spec/mongo/uri_spec.rb

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1439,5 +1439,69 @@
14391439

14401440
include_examples "roundtrips string"
14411441
end
1442+
1443+
context 'when a serverMonitoringMode option is provided' do
1444+
context 'stream' do
1445+
let(:options) { 'serverMonitoringMode=stream' }
1446+
1447+
it 'sets the server monitoring mode in uri options' do
1448+
expect(uri.uri_options[:server_monitoring_mode]).to eq(:stream)
1449+
end
1450+
1451+
it 'sets the option on a client created with the uri' do
1452+
client = new_local_client_nmio(string)
1453+
expect(client.options[:server_monitoring_mode]).to eq(:stream)
1454+
end
1455+
1456+
include_examples "roundtrips string"
1457+
end
1458+
1459+
context 'poll' do
1460+
let(:options) { 'serverMonitoringMode=poll' }
1461+
1462+
it 'sets the server monitoring mode in uri options' do
1463+
expect(uri.uri_options[:server_monitoring_mode]).to eq(:poll)
1464+
end
1465+
1466+
it 'sets the option on a client created with the uri' do
1467+
client = new_local_client_nmio(string)
1468+
expect(client.options[:server_monitoring_mode]).to eq(:poll)
1469+
end
1470+
1471+
include_examples "roundtrips string"
1472+
end
1473+
1474+
context 'auto' do
1475+
let(:options) { 'serverMonitoringMode=auto' }
1476+
1477+
it 'sets the server monitoring mode in uri options' do
1478+
expect(uri.uri_options[:server_monitoring_mode]).to eq(:auto)
1479+
end
1480+
1481+
it 'sets the option on a client created with the uri' do
1482+
client = new_local_client_nmio(string)
1483+
expect(client.options[:server_monitoring_mode]).to eq(:auto)
1484+
end
1485+
1486+
include_examples "roundtrips string"
1487+
end
1488+
1489+
context 'case insensitive' do
1490+
let(:options) { 'serverMonitoringMode=Stream' }
1491+
1492+
it 'sets the server monitoring mode in uri options' do
1493+
expect(uri.uri_options[:server_monitoring_mode]).to eq(:stream)
1494+
end
1495+
end
1496+
1497+
context 'invalid value' do
1498+
let(:options) { 'serverMonitoringMode=invalid' }
1499+
1500+
it 'warns and does not set the option' do
1501+
expect(Mongo::Logger.logger).to receive(:warn).with(/invalid is not a valid server monitoring mode/)
1502+
expect(uri.uri_options[:server_monitoring_mode]).to be_nil
1503+
end
1504+
end
1505+
end
14421506
end
14431507
end

spec/runners/unified.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def define_unified_spec_tests(base_path, paths, expect_failure: false)
9696
test.assert_outcome
9797
test.assert_events
9898
test.assert_tracing_messages
99+
ensure
99100
test.cleanup
100101
end
101102
end

spec/runners/unified/assertions.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ def assert_events
175175
actual_events.select! do |event|
176176
event.class.name.sub(/.*::/, '') =~ /^(?:Pool|Connection)/
177177
end
178+
when 'sdam'
179+
actual_events.select! do |event|
180+
event.class.name.sub(/.*::/, '') =~ /^(?:Server|Topology)/
181+
end
178182
end
179183

180184
if (!ignore_extra_events && actual_events.length != expected_events.length) ||
@@ -217,6 +221,9 @@ def assert_event_matches(actual, expected)
217221
if interrupt_in_use_connections = spec.use('interruptInUseConnections')
218222
assert_matches(actual.options[:interrupt_in_use_connections], interrupt_in_use_connections, 'Command interrupt_in_use_connections does not match expectation')
219223
end
224+
unless (awaited = spec.use('awaited')).nil?
225+
assert_eq(actual.awaited?, awaited, 'Event awaited does not match expectation')
226+
end
220227
unless spec.empty?
221228
raise NotImplementedError, "Unhandled keys: #{spec}"
222229
end

spec/runners/unified/support_operations.rb

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,19 @@ def assert_event_count(op)
286286
end
287287
end
288288

289+
SDAM_SERVER_TYPE_MAP = {
290+
'Standalone' => :standalone,
291+
'RSPrimary' => :primary,
292+
'RSSecondary' => :secondary,
293+
'RSArbiter' => :arbiter,
294+
'Mongos' => :sharded,
295+
'Unknown' => :unknown,
296+
'PossiblePrimary' => :unknown,
297+
'RSGhost' => :ghost,
298+
'RSOther' => :other,
299+
'LoadBalancer' => :load_balancer,
300+
}.freeze
301+
289302
def select_events(subscriber, event)
290303
expected_name, opts = event.first
291304
expected_name = expected_name.sub(/Event$/, '').sub(/^(.)/) { $1.upcase }
@@ -295,7 +308,8 @@ def select_events(subscriber, event)
295308
result = true
296309
if new_desc = spec.use('newDescription')
297310
if type = new_desc.use('type')
298-
result &&= wevent.new_description.server_type == type.downcase.to_sym
311+
expected_type = SDAM_SERVER_TYPE_MAP[type] || type.downcase.to_sym
312+
result &&= wevent.new_description.server_type == expected_type
299313
end
300314
end
301315
unless spec.empty?

spec/runners/unified/test.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,12 @@ def generate_entities(es)
191191
end
192192
kind = event.sub('Event', '').gsub(/([A-Z])/) { "_#{$1}" }.downcase.to_sym
193193
subscriber.add_wanted_events(kind)
194+
when 'serverHeartbeatStartedEvent', 'serverHeartbeatSucceededEvent', 'serverHeartbeatFailedEvent'
195+
unless client.send(:monitoring).subscribers[Mongo::Monitoring::SERVER_HEARTBEAT]&.include?(subscriber)
196+
client.subscribe(Mongo::Monitoring::SERVER_HEARTBEAT, subscriber)
197+
end
198+
kind = event.sub('Event', '').gsub(/([A-Z])/) { "_#{$1}" }.downcase.to_sym
199+
subscriber.add_wanted_events(kind)
194200
else
195201
raise NotImplementedError, "Unknown event #{event}"
196202
end
@@ -402,6 +408,8 @@ def cleanup
402408
$kill_transactions = nil
403409
end
404410

411+
disable_fail_points
412+
405413
entities[:client]&.each do |id, client|
406414
client.close
407415
end

0 commit comments

Comments
 (0)