Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ used to allow JVM based servers to warm-up slowly to prevent jolts in runtime pe

`PB_NATS_CLIENT_SUBSCRIPTION_POOL_SIZE` - If subscription pooling is desired for the request/response cycle then the pool size maximum should be set; the pool is lazy and therefore will only start new subscriptions as necessary (default: 0)

`PB_NATS_DISABLE_JNATS` - Disable the default jruby jnats client on the jruby platform, use the nats-pure.rb client instead (default: false).

`PROTOBUF_NATS_CONFIG_PATH` - Custom path to the config yaml (default: "config/protobuf_nats.yml").

### YAML Config
Expand Down
3 changes: 2 additions & 1 deletion lib/protobuf/nats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

require "nats/io/client"

require "protobuf/nats/platform"
require "protobuf/nats/errors"
require "protobuf/nats/client"
require "protobuf/nats/server"
Expand All @@ -23,7 +24,7 @@ module Messages
NACK = "\2".freeze
end

NatsClient = if defined? JRUBY_VERSION
NatsClient = if jruby?
require "protobuf/nats/jnats"
::Protobuf::Nats::JNats
else
Expand Down
186 changes: 154 additions & 32 deletions lib/protobuf/nats/client.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,133 @@
require "connection_pool"
require "protobuf/nats"
require "protobuf/nats/platform"
require "protobuf/rpc/connectors/base"
require "monitor"

module Protobuf
module Nats
class ResponseMuxerRequest
def initialize(muxer, token)
@muxer = muxer
@token = token
end

def publish(subject, data)
@muxer.publish(subject, data, @token)
end

def next_message(timeout)
@muxer.next_message(@token, timeout)
end

def cleanup
@muxer.cleanup(@token)
end
end

class ResponseMuxer
LOCK = ::Mutex.new

def initialize
@resp_map = Hash.new { |h,k| h[k] = { } }
end

def cleanup(token)
@resp_sub.synchronize { @resp_map.delete(token) }
end

def next_message(token, timeout)
::NATS::MonotonicTime::with_nats_timeout(timeout) do
@resp_sub.synchronize do
break if @resp_map[token].key?(:response) &&
!@resp_map[token][:response].empty?

@resp_map[token][:signal].wait(timeout)
end
end

@resp_sub.synchronize { @resp_map[token][:response].shift }
end

def new_request
nats = Protobuf::Nats.client_nats_connection
token = nats.new_inbox.split('.').last
@resp_sub.synchronize do
@resp_map[token][:signal] = @resp_sub.new_cond
end

ResponseMuxerRequest.new(self, token)
end

def publish(subject, data, token)
nats = Protobuf::Nats.client_nats_connection
reply_to = "#{@resp_inbox_prefix}.#{token}"
nats.publish(subject, data, reply_to)
end

def restart
start unless started?

LOCK.synchronize do
@resp_handler&.kill
@started = false
end

start
end

def start
return if started?
LOCK.synchronize do
# We check this twice in case another thread was waiting for the lock to
# start this party.
return if started?

nats = ::Protobuf::Nats.client_nats_connection
return if nats.nil?

@resp_inbox_prefix = nats.new_inbox
@resp_sub = nats.subscribe("#{@resp_inbox_prefix}.*")
@started = true
end

@resp_handler = Thread.new do
begin
loop do
msg = @resp_sub.pending_queue.pop
next if msg.nil?
@resp_sub.synchronize do
# Decrease pending size since consumed already
@resp_sub.pending_size -= msg.data.size
end
token = msg.subject.split('.').last

@resp_sub.synchronize do
# Reject if the token is missing from the request map
break unless @resp_map.key?(token)

signal = @resp_map[token][:signal]
@resp_map[token][:response] ||= []
@resp_map[token][:response] << msg
signal.signal
end
rescue => error
::Protobuf::Nats.notify_error_callbacks(error)
LOCK.synchronize { @started = false }
end
end
end
end

def started?
!!@started
end
end

class Client < ::Protobuf::Rpc::Connectors::Base

RESPONSE_MUXER = ResponseMuxer.new

# Structure to hold subscription and inbox to use within pool
SubscriptionInbox = ::Struct.new(:subscription, :inbox) do
def swap(sub_inbox)
Expand Down Expand Up @@ -36,6 +158,9 @@ def initialize(options)

# This will ensure the client is started.
::Protobuf::Nats.start_client_nats_connection

# Ensure the response muxer is started
RESPONSE_MUXER.start
end

def new_subscription_inbox
Expand Down Expand Up @@ -195,7 +320,7 @@ def formatted_service_and_method_name
# The Java nats client offers better message queueing so we're going to use
# that over locking ourselves. This split in code isn't great, but we can
# refactor this later.
if defined? JRUBY_VERSION
if ::Protobuf::Nats.jruby?

# This is a request that expects two responses.
# 1. An ACK from the server. We use a shorter timeout.
Expand Down Expand Up @@ -253,52 +378,49 @@ def nats_request_with_two_responses(subject, data, opts)
else

def nats_request_with_two_responses(subject, data, opts)
# Wait for the ACK from the server
ack_timeout = opts[:ack_timeout] || 5
# Wait for the protobuf response
timeout = opts[:timeout] || 60

nats = Protobuf::Nats.client_nats_connection
inbox = nats.new_inbox
lock = ::Monitor.new
received = lock.new_cond
messages = []
first_message = nil
second_message = nil
response = nil

sid = nats.subscribe(inbox, :max => 2) do |message, _, _|
lock.synchronize do
messages << message
received.signal
end
end

lock.synchronize do
# Publish to server
nats.publish(subject, data, inbox)
# Publish message with the reply topic pointed at the response muxer.
req = RESPONSE_MUXER.new_request
req.publish(subject, data)

# Wait for the ACK from the server
ack_timeout = opts[:ack_timeout] || 5
received.wait(ack_timeout) if messages.empty?
first_message = messages.shift
# Receive the first message
begin
first_message = req.next_message(ack_timeout)
rescue ::NATS::Timeout => e
return :ack_timeout
end

return :ack_timeout if first_message.nil?
return :nack if first_message == ::Protobuf::Nats::Messages::NACK
# Check for a NACK
return :nack if first_message.data == ::Protobuf::Nats::Messages::NACK

# Wait for the protobuf response
timeout = opts[:timeout] || 60
received.wait(timeout) if messages.empty?
second_message = messages.shift
# Receive the second message
begin
second_message = req.next_message(timeout)
rescue ::NATS::Timeout
# ignore to raise a repsonse timeout below
end

# NOTE: This might be nil, so be careful checking the data value
second_message_data = second_message&.data

# Check messages
response = case ::Protobuf::Nats::Messages::ACK
when first_message then second_message
when second_message then first_message
when first_message.data then second_message_data
when second_message_data then first_message.data
else return :ack_timeout
end

fail(::Protobuf::Nats::Errors::ResponseTimeout, formatted_service_and_method_name) unless response

response
ensure
# Ensure we don't leave a subscription sitting around.
nats.unsubscribe(sid) if response.nil?
req.cleanup if req
end

end
Expand Down
14 changes: 14 additions & 0 deletions lib/protobuf/nats/platform.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module Protobuf
module Nats
def self.jruby?
return false if jnats_disabled?

defined? JRUBY_VERSION
end

def self.jnats_disabled?
!!ENV["PB_NATS_DISABLE_JNATS"]
end
end
end

65 changes: 55 additions & 10 deletions lib/protobuf/nats/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,58 @@

module Protobuf
module Nats
class SuperSubscriptionManager
def initialize(nats, &cb)
# Central queue used by all subscriptions
@pending_queue = ::SizedQueue.new(::NATS::IO::DEFAULT_SUB_PENDING_MSGS_LIMIT)
@subscriptions = []
@nats = nats
@callback = cb

# For MRI, reroute the pending queue to the callback
@pending_queue_handler = Thread.new do
loop do
msg = @pending_queue.pop
@callback.call(msg.data, msg.reply)
end
end
end

def queue_subscribe(name)
if ::Protobuf::Nats.jruby?
@subscriptions << @nats.subscribe(name, :queue => name) do |request_data, reply_id|
@callback.call(request_data, reply_id)
end
else
sub = @nats.subscribe(name, :queue => name)

# Create a subscription but reset the pending queue to use a central pending queue.
# NOTE: This is a potential race condition. Chances of the round-trip message to an
# existing queue before this queue swap happens seems extremely low, but possible.
sub.pending_queue = @pending_queue

@subscriptions << sub

sub
end
end

def unsubscribe_all
if ::Protobuf::Nats.jruby?
@subscriptions.each do |subscription_id|
@nats.unsubscribe(subscription_id)
end
else
@subscriptions.each { |sub| sub.unsubscribe }
end
end
end

class Server
include ::Protobuf::Rpc::Server
include ::Protobuf::Logging

attr_reader :nats, :thread_pool, :subscriptions
attr_reader :nats, :thread_pool, :subscription_manager

MILLISECOND = 1000

Expand All @@ -25,7 +72,11 @@ def initialize(options)

@thread_pool = ::Protobuf::Nats::ThreadPool.new(@options[:threads], :max_queue => max_queue_size)

@subscriptions = []
@subscription_manager = SuperSubscriptionManager.new(@nats) do |request_data, reply_id|
unless enqueue_request(request_data, reply_id)
logger.error { "Thread pool is full! Dropping message for: #{subscription_key_and_queue}" }
end
end
@server = options.fetch(:server, ::Socket.gethostname)
end

Expand Down Expand Up @@ -114,11 +165,7 @@ def print_subscription_keys

def subscribe_to_services_once
with_each_subscription_key do |subscription_key_and_queue|
subscriptions << nats.subscribe(subscription_key_and_queue, :queue => subscription_key_and_queue) do |request_data, reply_id, _subject|
unless enqueue_request(request_data, reply_id)
logger.error { "Thread pool is full! Dropping message for: #{subscription_key_and_queue}" }
end
end
subscription_manager.queue_subscribe(subscription_key_and_queue)
end
end

Expand Down Expand Up @@ -233,9 +280,7 @@ def subscribe

def unsubscribe
logger.info "Unsubscribing from rpc routes..."
subscriptions.each do |subscription_id|
nats.unsubscribe(subscription_id)
end
subscription_manager.unsubscribe_all
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/protobuf/nats/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
module Protobuf
module Nats
VERSION = "0.10.4"
VERSION = "0.12.0.pre0"
end
end
2 changes: 1 addition & 1 deletion protobuf-nats.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Gem::Specification.new do |spec|
spec.add_runtime_dependency "activesupport", ">= 3.2"
spec.add_runtime_dependency "connection_pool"
spec.add_runtime_dependency "protobuf", "~> 3.7", ">= 3.7.2"
spec.add_runtime_dependency "nats-pure", "~> 0.3", "< 0.4"
spec.add_runtime_dependency "nats-pure", "~> 2"

spec.add_development_dependency "bundler"
spec.add_development_dependency "rake", "~> 10.0"
Expand Down
Loading