diff --git a/lib/iruby/session.rb b/lib/iruby/session.rb index ee86b80..d92a6d3 100644 --- a/lib/iruby/session.rb +++ b/lib/iruby/session.rb @@ -30,12 +30,16 @@ def close @closed = true begin - close_sockets + @adapter.shutdown_heartbeat(@hb_socket) if @hb_socket ensure begin - @adapter.close - ensure stop_heartbeat + ensure + begin + close_sockets + ensure + @adapter.close + end end end end diff --git a/lib/iruby/session_adapter.rb b/lib/iruby/session_adapter.rb index f48a350..2e1dab3 100644 --- a/lib/iruby/session_adapter.rb +++ b/lib/iruby/session_adapter.rb @@ -41,6 +41,10 @@ def close_socket(socket) socket.close if socket.respond_to?(:close) end + def shutdown_heartbeat(socket) + close_socket(socket) + end + # Override in adapters that need cleanup. def close end diff --git a/lib/iruby/session_adapter/ffirzmq_adapter.rb b/lib/iruby/session_adapter/ffirzmq_adapter.rb index 024171c..c888c17 100644 --- a/lib/iruby/session_adapter/ffirzmq_adapter.rb +++ b/lib/iruby/session_adapter/ffirzmq_adapter.rb @@ -26,7 +26,50 @@ def recv(sock) end def heartbeat_loop(sock) - @heartbeat_device = ZMQ::Device.new(sock, sock) + poller = ZMQ::Poller.new + poller.register_readable(sock) + + loop do + break unless sock.socket + + rc = poller.poll + if rc == -1 + errno = ZMQ::Util.errno + next if zmq_errno?(:EINTR, errno) + break if heartbeat_closed_errno?(errno) + + ZMQ::Util.error_check('zmq_poll', rc) + end + + poller.readables.each do |readable| + message = [] + rc = readable.recv_strings(message, ZMQ::DONTWAIT) + if rc == -1 + errno = ZMQ::Util.errno + next if zmq_errno?(:EAGAIN, errno) || zmq_errno?(:EINTR, errno) + return if heartbeat_closed_errno?(errno) + + ZMQ::Util.error_check('zmq_msg_recv', rc) + end + + rc = readable.send_strings(message) + if rc == -1 + errno = ZMQ::Util.errno + next if zmq_errno?(:EINTR, errno) + return if heartbeat_closed_errno?(errno) + + ZMQ::Util.error_check('zmq_msg_send', rc) + end + end + end + end + + def shutdown_heartbeat(sock) + if @zmq_context&.context && LibZMQ.respond_to?(:zmq_ctx_shutdown) + LibZMQ.zmq_ctx_shutdown(@zmq_context.context) + else + close_socket(sock) + end end def close @@ -36,18 +79,19 @@ def close private - def make_socket(type, protocol, host, port) - case type + def make_socket(type_symbol, protocol, host, port) + case type_symbol when :ROUTER, :PUB, :REP - type = ZMQ.const_get(type) + type = ZMQ.const_get(type_symbol) else - if ZMQ.const_defined?(type) - raise ArgumentError, "Unsupported ZMQ socket type: #{type}" + if ZMQ.const_defined?(type_symbol) + raise ArgumentError, "Unsupported ZMQ socket type: #{type_symbol}" else - raise ArgumentError, "Invalid ZMQ socket type: #{type}" + raise ArgumentError, "Invalid ZMQ socket type: #{type_symbol}" end end zmq_context.socket(type).tap do |sock| + sock.setsockopt(ZMQ::LINGER, 0) if type_symbol == :REP sock.bind("#{protocol}://#{host}:#{port}") end end @@ -55,6 +99,14 @@ def make_socket(type, protocol, host, port) def zmq_context @zmq_context ||= ZMQ::Context.new end + + def heartbeat_closed_errno?(errno) + zmq_errno?(:ETERM, errno) || zmq_errno?(:ENOTSOCK, errno) + end + + def zmq_errno?(name, errno) + ZMQ.const_defined?(name) && ZMQ.const_get(name) == errno + end end end end diff --git a/lib/iruby/session_adapter/test_adapter.rb b/lib/iruby/session_adapter/test_adapter.rb index 6deabaa..7128c2e 100644 --- a/lib/iruby/session_adapter/test_adapter.rb +++ b/lib/iruby/session_adapter/test_adapter.rb @@ -48,7 +48,12 @@ def heartbeat_loop(sock) end def close_socket(sock) - @closed_sockets << sock + @closed_sockets << sock unless @closed_sockets.any? { |closed| closed.equal?(sock) } + end + + def shutdown_heartbeat(sock) + @closed = true + close_socket(sock) end def close