Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions lib/iruby/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ def close

@closed = true
begin
close_sockets
@adapter.shutdown_heartbeat(@hb_socket) if @hb_socket
ensure
begin
@adapter.close
ensure
stop_heartbeat
ensure
begin
close_sockets
ensure
@adapter.close
end
end
end
end
Expand Down
4 changes: 4 additions & 0 deletions lib/iruby/session_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def close_socket(socket)
socket.close if socket.respond_to?(:close)
end

def shutdown_heartbeat(socket)
close_socket(socket)
end

# Override in adapters that need cleanup.
def close
end
Expand Down
66 changes: 59 additions & 7 deletions lib/iruby/session_adapter/ffirzmq_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,50 @@ def recv(sock)
end

def heartbeat_loop(sock)
@heartbeat_device = ZMQ::Device.new(sock, sock)
poller = ZMQ::Poller.new
poller.register_readable(sock)

loop do
break unless sock.socket

rc = poller.poll
if rc == -1
errno = ZMQ::Util.errno
next if zmq_errno?(:EINTR, errno)
break if heartbeat_closed_errno?(errno)

ZMQ::Util.error_check('zmq_poll', rc)
end

poller.readables.each do |readable|
message = []
rc = readable.recv_strings(message, ZMQ::DONTWAIT)
if rc == -1
errno = ZMQ::Util.errno
next if zmq_errno?(:EAGAIN, errno) || zmq_errno?(:EINTR, errno)
return if heartbeat_closed_errno?(errno)

ZMQ::Util.error_check('zmq_msg_recv', rc)
end

rc = readable.send_strings(message)
if rc == -1
errno = ZMQ::Util.errno
next if zmq_errno?(:EINTR, errno)
return if heartbeat_closed_errno?(errno)

ZMQ::Util.error_check('zmq_msg_send', rc)
end
end
end
end

def shutdown_heartbeat(sock)
if @zmq_context&.context && LibZMQ.respond_to?(:zmq_ctx_shutdown)
LibZMQ.zmq_ctx_shutdown(@zmq_context.context)
else
close_socket(sock)
end
end

def close
Expand All @@ -36,25 +79,34 @@ def close

private

def make_socket(type, protocol, host, port)
case type
def make_socket(type_symbol, protocol, host, port)
case type_symbol
when :ROUTER, :PUB, :REP
type = ZMQ.const_get(type)
type = ZMQ.const_get(type_symbol)
else
if ZMQ.const_defined?(type)
raise ArgumentError, "Unsupported ZMQ socket type: #{type}"
if ZMQ.const_defined?(type_symbol)
raise ArgumentError, "Unsupported ZMQ socket type: #{type_symbol}"
else
raise ArgumentError, "Invalid ZMQ socket type: #{type}"
raise ArgumentError, "Invalid ZMQ socket type: #{type_symbol}"
end
end
zmq_context.socket(type).tap do |sock|
sock.setsockopt(ZMQ::LINGER, 0) if type_symbol == :REP
sock.bind("#{protocol}://#{host}:#{port}")
end
end

def zmq_context
@zmq_context ||= ZMQ::Context.new
end

def heartbeat_closed_errno?(errno)
zmq_errno?(:ETERM, errno) || zmq_errno?(:ENOTSOCK, errno)
end

def zmq_errno?(name, errno)
ZMQ.const_defined?(name) && ZMQ.const_get(name) == errno
end
end
end
end
7 changes: 6 additions & 1 deletion lib/iruby/session_adapter/test_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ def heartbeat_loop(sock)
end

def close_socket(sock)
@closed_sockets << sock
@closed_sockets << sock unless @closed_sockets.any? { |closed| closed.equal?(sock) }
end

def shutdown_heartbeat(sock)
@closed = true
close_socket(sock)
end

def close
Expand Down
Loading