diff --git a/README.md b/README.md index 7b60f9d..4fc4479 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ used to allow JVM based servers to warm-up slowly to prevent jolts in runtime pe `PB_NATS_CLIENT_SUBSCRIPTION_POOL_SIZE` - If subscription pooling is desired for the request/response cycle then the pool size maximum should be set; the pool is lazy and therefore will only start new subscriptions as necessary (default: 0) +`PB_NATS_DISABLE_JNATS` - Disable the default jruby jnats client on the jruby platform, use the nats-pure.rb client instead (default: false). + `PROTOBUF_NATS_CONFIG_PATH` - Custom path to the config yaml (default: "config/protobuf_nats.yml"). ### YAML Config diff --git a/lib/protobuf/nats.rb b/lib/protobuf/nats.rb index 82c0e9c..84475e2 100644 --- a/lib/protobuf/nats.rb +++ b/lib/protobuf/nats.rb @@ -6,6 +6,7 @@ require "nats/io/client" +require "protobuf/nats/platform" require "protobuf/nats/errors" require "protobuf/nats/client" require "protobuf/nats/server" @@ -23,7 +24,7 @@ module Messages NACK = "\2".freeze end - NatsClient = if defined? JRUBY_VERSION + NatsClient = if jruby? require "protobuf/nats/jnats" ::Protobuf::Nats::JNats else diff --git a/lib/protobuf/nats/client.rb b/lib/protobuf/nats/client.rb index 4291618..c1a2a23 100644 --- a/lib/protobuf/nats/client.rb +++ b/lib/protobuf/nats/client.rb @@ -1,11 +1,133 @@ require "connection_pool" require "protobuf/nats" +require "protobuf/nats/platform" require "protobuf/rpc/connectors/base" require "monitor" module Protobuf module Nats + class ResponseMuxerRequest + def initialize(muxer, token) + @muxer = muxer + @token = token + end + + def publish(subject, data) + @muxer.publish(subject, data, @token) + end + + def next_message(timeout) + @muxer.next_message(@token, timeout) + end + + def cleanup + @muxer.cleanup(@token) + end + end + + class ResponseMuxer + LOCK = ::Mutex.new + + def initialize + @resp_map = Hash.new { |h,k| h[k] = { } } + end + + def cleanup(token) + @resp_sub.synchronize { @resp_map.delete(token) } + end + + def next_message(token, timeout) + ::NATS::MonotonicTime::with_nats_timeout(timeout) do + @resp_sub.synchronize do + break if @resp_map[token].key?(:response) && + !@resp_map[token][:response].empty? + + @resp_map[token][:signal].wait(timeout) + end + end + + @resp_sub.synchronize { @resp_map[token][:response].shift } + end + + def new_request + nats = Protobuf::Nats.client_nats_connection + token = nats.new_inbox.split('.').last + @resp_sub.synchronize do + @resp_map[token][:signal] = @resp_sub.new_cond + end + + ResponseMuxerRequest.new(self, token) + end + + def publish(subject, data, token) + nats = Protobuf::Nats.client_nats_connection + reply_to = "#{@resp_inbox_prefix}.#{token}" + nats.publish(subject, data, reply_to) + end + + def restart + start unless started? + + LOCK.synchronize do + @resp_handler&.kill + @started = false + end + + start + end + + def start + return if started? + LOCK.synchronize do + # We check this twice in case another thread was waiting for the lock to + # start this party. + return if started? + + nats = ::Protobuf::Nats.client_nats_connection + return if nats.nil? + + @resp_inbox_prefix = nats.new_inbox + @resp_sub = nats.subscribe("#{@resp_inbox_prefix}.*") + @started = true + end + + @resp_handler = Thread.new do + begin + loop do + msg = @resp_sub.pending_queue.pop + next if msg.nil? + @resp_sub.synchronize do + # Decrease pending size since consumed already + @resp_sub.pending_size -= msg.data.size + end + token = msg.subject.split('.').last + + @resp_sub.synchronize do + # Reject if the token is missing from the request map + break unless @resp_map.key?(token) + + signal = @resp_map[token][:signal] + @resp_map[token][:response] ||= [] + @resp_map[token][:response] << msg + signal.signal + end + rescue => error + ::Protobuf::Nats.notify_error_callbacks(error) + LOCK.synchronize { @started = false } + end + end + end + end + + def started? + !!@started + end + end + class Client < ::Protobuf::Rpc::Connectors::Base + + RESPONSE_MUXER = ResponseMuxer.new + # Structure to hold subscription and inbox to use within pool SubscriptionInbox = ::Struct.new(:subscription, :inbox) do def swap(sub_inbox) @@ -36,6 +158,9 @@ def initialize(options) # This will ensure the client is started. ::Protobuf::Nats.start_client_nats_connection + + # Ensure the response muxer is started + RESPONSE_MUXER.start end def new_subscription_inbox @@ -195,7 +320,7 @@ def formatted_service_and_method_name # The Java nats client offers better message queueing so we're going to use # that over locking ourselves. This split in code isn't great, but we can # refactor this later. - if defined? JRUBY_VERSION + if ::Protobuf::Nats.jruby? # This is a request that expects two responses. # 1. An ACK from the server. We use a shorter timeout. @@ -253,43 +378,41 @@ def nats_request_with_two_responses(subject, data, opts) else def nats_request_with_two_responses(subject, data, opts) + # Wait for the ACK from the server + ack_timeout = opts[:ack_timeout] || 5 + # Wait for the protobuf response + timeout = opts[:timeout] || 60 + nats = Protobuf::Nats.client_nats_connection - inbox = nats.new_inbox - lock = ::Monitor.new - received = lock.new_cond - messages = [] - first_message = nil - second_message = nil - response = nil - - sid = nats.subscribe(inbox, :max => 2) do |message, _, _| - lock.synchronize do - messages << message - received.signal - end - end - lock.synchronize do - # Publish to server - nats.publish(subject, data, inbox) + # Publish message with the reply topic pointed at the response muxer. + req = RESPONSE_MUXER.new_request + req.publish(subject, data) - # Wait for the ACK from the server - ack_timeout = opts[:ack_timeout] || 5 - received.wait(ack_timeout) if messages.empty? - first_message = messages.shift + # Receive the first message + begin + first_message = req.next_message(ack_timeout) + rescue ::NATS::Timeout => e + return :ack_timeout + end - return :ack_timeout if first_message.nil? - return :nack if first_message == ::Protobuf::Nats::Messages::NACK + # Check for a NACK + return :nack if first_message.data == ::Protobuf::Nats::Messages::NACK - # Wait for the protobuf response - timeout = opts[:timeout] || 60 - received.wait(timeout) if messages.empty? - second_message = messages.shift + # Receive the second message + begin + second_message = req.next_message(timeout) + rescue ::NATS::Timeout + # ignore to raise a repsonse timeout below end + # NOTE: This might be nil, so be careful checking the data value + second_message_data = second_message&.data + + # Check messages response = case ::Protobuf::Nats::Messages::ACK - when first_message then second_message - when second_message then first_message + when first_message.data then second_message_data + when second_message_data then first_message.data else return :ack_timeout end @@ -297,8 +420,7 @@ def nats_request_with_two_responses(subject, data, opts) response ensure - # Ensure we don't leave a subscription sitting around. - nats.unsubscribe(sid) if response.nil? + req.cleanup if req end end diff --git a/lib/protobuf/nats/platform.rb b/lib/protobuf/nats/platform.rb new file mode 100644 index 0000000..c606c32 --- /dev/null +++ b/lib/protobuf/nats/platform.rb @@ -0,0 +1,14 @@ +module Protobuf + module Nats + def self.jruby? + return false if jnats_disabled? + + defined? JRUBY_VERSION + end + + def self.jnats_disabled? + !!ENV["PB_NATS_DISABLE_JNATS"] + end + end +end + diff --git a/lib/protobuf/nats/server.rb b/lib/protobuf/nats/server.rb index da231be..922bb8d 100644 --- a/lib/protobuf/nats/server.rb +++ b/lib/protobuf/nats/server.rb @@ -6,11 +6,58 @@ module Protobuf module Nats + class SuperSubscriptionManager + def initialize(nats, &cb) + # Central queue used by all subscriptions + @pending_queue = ::SizedQueue.new(::NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT) + @subscriptions = [] + @nats = nats + @callback = cb + + # For MRI, reroute the pending queue to the callback + @pending_queue_handler = Thread.new do + loop do + msg = @pending_queue.pop + @callback.call(msg.data, msg.reply) + end + end + end + + def queue_subscribe(name) + if ::Protobuf::Nats.jruby? + @subscriptions << @nats.subscribe(name, :queue => name) do |request_data, reply_id| + @callback.call(request_data, reply_id) + end + else + sub = @nats.subscribe(name, :queue => name) + + # Create a subscription but reset the pending queue to use a central pending queue. + # NOTE: This is a potential race condition. Chances of the round-trip message to an + # existing queue before this queue swap happens seems extremely low, but possible. + sub.pending_queue = @pending_queue + + @subscriptions << sub + + sub + end + end + + def unsubscribe_all + if ::Protobuf::Nats.jruby? + @subscriptions.each do |subscription_id| + @nats.unsubscribe(subscription_id) + end + else + @subscriptions.each { |sub| sub.unsubscribe } + end + end + end + class Server include ::Protobuf::Rpc::Server include ::Protobuf::Logging - attr_reader :nats, :thread_pool, :subscriptions + attr_reader :nats, :thread_pool, :subscription_manager MILLISECOND = 1000 @@ -25,7 +72,11 @@ def initialize(options) @thread_pool = ::Protobuf::Nats::ThreadPool.new(@options[:threads], :max_queue => max_queue_size) - @subscriptions = [] + @subscription_manager = SuperSubscriptionManager.new(@nats) do |request_data, reply_id| + unless enqueue_request(request_data, reply_id) + logger.error { "Thread pool is full! Dropping message for: #{subscription_key_and_queue}" } + end + end @server = options.fetch(:server, ::Socket.gethostname) end @@ -114,11 +165,7 @@ def print_subscription_keys def subscribe_to_services_once with_each_subscription_key do |subscription_key_and_queue| - subscriptions << nats.subscribe(subscription_key_and_queue, :queue => subscription_key_and_queue) do |request_data, reply_id, _subject| - unless enqueue_request(request_data, reply_id) - logger.error { "Thread pool is full! Dropping message for: #{subscription_key_and_queue}" } - end - end + subscription_manager.queue_subscribe(subscription_key_and_queue) end end @@ -233,9 +280,7 @@ def subscribe def unsubscribe logger.info "Unsubscribing from rpc routes..." - subscriptions.each do |subscription_id| - nats.unsubscribe(subscription_id) - end + subscription_manager.unsubscribe_all end end end diff --git a/lib/protobuf/nats/version.rb b/lib/protobuf/nats/version.rb index 582b4f5..07d55b9 100644 --- a/lib/protobuf/nats/version.rb +++ b/lib/protobuf/nats/version.rb @@ -1,5 +1,5 @@ module Protobuf module Nats - VERSION = "0.10.4" + VERSION = "0.12.0.pre0" end end diff --git a/protobuf-nats.gemspec b/protobuf-nats.gemspec index 1fff921..61a4b9d 100644 --- a/protobuf-nats.gemspec +++ b/protobuf-nats.gemspec @@ -33,7 +33,7 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "activesupport", ">= 3.2" spec.add_runtime_dependency "connection_pool" spec.add_runtime_dependency "protobuf", "~> 3.7", ">= 3.7.2" - spec.add_runtime_dependency "nats-pure", "~> 0.3", "< 0.4" + spec.add_runtime_dependency "nats-pure", "~> 2" spec.add_development_dependency "bundler" spec.add_development_dependency "rake", "~> 10.0" diff --git a/spec/fake_nats_client.rb b/spec/fake_nats_client.rb index d514541..a7b390c 100644 --- a/spec/fake_nats_client.rb +++ b/spec/fake_nats_client.rb @@ -24,8 +24,13 @@ def publish(*) def flush end - def subscribe(subject, args, &block) - subscriptions[subject] = block + def subscribe(subject, args = {}, &block) + s = ::NATS::Subscription.new + s.pending_queue = ::SizedQueue.new(1024) + + subscriptions[subject] = {:block => block, :subscription => s } + + s end def unsubscribe(*) @@ -47,9 +52,15 @@ def schedule_messages(messages) Thread.new do begin sleep message.seconds_in_future - block = subscriptions[message.subject] + + sub = subscriptions[message.subject] || + subscriptions[message.subject.split(".").first + ".*"] + + block = sub[:block] block.call(message.data) if block @next_message = message + s = sub[:subscription] + s.pending_queue.push(message) if s.pending_queue rescue => error puts error end @@ -59,8 +70,21 @@ def schedule_messages(messages) end class FakeNackClient < FakeNatsClient - def subscribe(subject, args, &block) - Thread.new { block.call(::Protobuf::Nats::Messages::NACK) } + def publish(*) + subscriptions.each do |_key, sub| + s = sub[:subscription] + s.pending_queue.push(NATS::Msg.new(:data => ::Protobuf::Nats::Messages::NACK, :subject => "BASE.#{@inbox}")) + end + end + + def subscribe(subject, args = {}, &block) + s = super + + Thread.new do + block.call(::Protobuf::Nats::Messages::NACK) if block + end + + s end def next_message(_sub, _timeout) diff --git a/spec/protobuf/nats/jnats_spec.rb b/spec/protobuf/nats/jnats_spec.rb index 7a02e27..6d8579c 100644 --- a/spec/protobuf/nats/jnats_spec.rb +++ b/spec/protobuf/nats/jnats_spec.rb @@ -1,6 +1,6 @@ require "rspec" -if defined?(JRUBY_VERSION) +if ::Protobuf::Nats.jruby? require "protobuf/nats/jnats" describe ::Protobuf::Nats::JNats do diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6a0f2e3..40afe57 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -17,6 +17,8 @@ end config.before(:each) do - allow(Protobuf::Nats).to receive(:start_client_nats_connection) + allow(::Protobuf::Nats).to receive(:start_client_nats_connection) + + ::Protobuf::Nats::Client::RESPONSE_MUXER.restart end end