Skip to content

Commit 85688c4

Browse files
committed
THRIFT-5945: Incomplete cleanup in NonblockingServer leaks sockets
1 parent 6574255 commit 85688c4

File tree

2 files changed

+111
-19
lines changed

2 files changed

+111
-19
lines changed

lib/rb/lib/thrift/server/nonblocking_server.rb

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,12 @@ def shutdown(timeout = 0)
138138

139139
def ensure_closed
140140
kill_worker_threads if @worker_threads
141-
@iom_thread.kill
141+
if @iom_thread&.alive?
142+
@iom_thread.kill
143+
@iom_thread.join
144+
end
145+
close_connections
146+
close_signal_pipes
142147
end
143148

144149
private
@@ -246,6 +251,26 @@ def kill_worker_threads
246251
@worker_threads.clear
247252
end
248253

254+
def close_connections
255+
@connections.each do |fd|
256+
begin
257+
fd.close
258+
rescue IOError, SystemCallError, TransportException
259+
end
260+
end
261+
@connections.clear
262+
@buffers.clear
263+
end
264+
265+
def close_signal_pipes
266+
@signal_pipes.each do |pipe|
267+
begin
268+
pipe.close unless pipe.closed?
269+
rescue IOError
270+
end
271+
end
272+
end
273+
249274
def slice_frame!(buf)
250275
if buf.length >= 4
251276
size = buf.unpack('N').first

lib/rb/spec/nonblocking_server_spec.rb

Lines changed: 85 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def listen
101101

102102
describe Thrift::NonblockingServer do
103103
before(:each) do
104-
@port = 43251
104+
@port = available_port
105105
handler = Handler.new
106106
processor = SpecNamespace::NonblockingService::Processor.new(handler)
107107
queue = Queue.new
@@ -121,16 +121,19 @@ def listen
121121
end
122122
end
123123
queue.pop
124+
wait_until_listening(@transport, @server_thread)
124125

125126
@clients = []
126127
@catch_exceptions = false
127128
end
128129

129130
after(:each) do
130131
@clients.each { |client, trans| trans.close }
131-
# @server.shutdown(1)
132-
@server_thread.kill
133-
@transport.close
132+
@server.shutdown(1, false) if @server
133+
@server_thread.join(2) if @server_thread
134+
@server_thread.kill if @server_thread && @server_thread.alive?
135+
@server_thread.join(2) if @server_thread
136+
@transport.close if @transport
134137
end
135138

136139
def setup_client(queue = nil)
@@ -261,6 +264,70 @@ def setup_client_thread(result)
261264
end
262265
end
263266

267+
describe Thrift::NonblockingServer::IOManager do
268+
def build_io_manager
269+
logger = Logger.new(IO::NULL)
270+
logger.level = Logger::FATAL
271+
Thrift::NonblockingServer::IOManager.new(
272+
double('processor'),
273+
double('server_transport'),
274+
Thrift::BaseTransportFactory.new,
275+
Thrift::BinaryProtocolFactory.new,
276+
1,
277+
logger
278+
)
279+
end
280+
281+
it "closes tracked connections and signal pipes during forced cleanup" do
282+
io_manager = build_io_manager
283+
connection = double('connection', :close => nil)
284+
pipe_a = double('pipe_a', :closed? => false, :close => nil)
285+
pipe_b = double('pipe_b', :closed? => false, :close => nil)
286+
287+
io_manager.instance_variable_set(:@connections, [connection])
288+
io_manager.instance_variable_set(:@buffers, { connection => 'frame' })
289+
io_manager.instance_variable_set(:@signal_pipes, [pipe_a, pipe_b])
290+
io_manager.instance_variable_set(:@worker_threads, [])
291+
292+
io_manager.ensure_closed
293+
294+
expect(connection).to have_received(:close)
295+
expect(pipe_a).to have_received(:close)
296+
expect(pipe_b).to have_received(:close)
297+
expect(io_manager.instance_variable_get(:@connections)).to be_empty
298+
expect(io_manager.instance_variable_get(:@buffers)).to be_empty
299+
end
300+
301+
it "continues closing remaining signal pipes when one close raises" do
302+
io_manager = build_io_manager
303+
pipe_a = double('pipe_a', :closed? => false)
304+
pipe_b = double('pipe_b', :closed? => false, :close => nil)
305+
306+
allow(pipe_a).to receive(:close).and_raise(IOError)
307+
308+
io_manager.instance_variable_set(:@signal_pipes, [pipe_a, pipe_b])
309+
io_manager.instance_variable_set(:@worker_threads, [])
310+
311+
io_manager.send(:close_signal_pipes)
312+
313+
expect(pipe_a).to have_received(:close)
314+
expect(pipe_b).to have_received(:close)
315+
end
316+
317+
it "drops removed connections from bookkeeping" do
318+
io_manager = build_io_manager
319+
connection = double('connection', :close => nil)
320+
321+
io_manager.instance_variable_set(:@connections, [connection])
322+
io_manager.instance_variable_set(:@buffers, { connection => 'frame' })
323+
324+
io_manager.send(:remove_connection, connection)
325+
326+
expect(io_manager.instance_variable_get(:@connections)).to be_empty
327+
expect(io_manager.instance_variable_get(:@buffers)).to be_empty
328+
end
329+
end
330+
264331
describe "#{Thrift::NonblockingServer} with TLS transport" do
265332
before(:each) do
266333
@port = available_port
@@ -282,7 +349,7 @@ def setup_client_thread(result)
282349
end
283350

284351
@clients = []
285-
wait_until_listening
352+
wait_until_listening(@transport, @server_thread)
286353
end
287354

288355
after(:each) do
@@ -313,19 +380,6 @@ def setup_tls_client
313380
client
314381
end
315382

316-
def wait_until_listening
317-
Timeout.timeout(2) do
318-
until @transport.handle
319-
raise "Server thread exited unexpectedly" unless @server_thread.alive?
320-
sleep 0.01
321-
end
322-
end
323-
end
324-
325-
def available_port
326-
TCPServer.open('localhost', 0) { |server| server.addr[1] }
327-
end
328-
329383
def ssl_keys_dir
330384
File.expand_path('../../../test/keys', __dir__)
331385
end
@@ -358,4 +412,17 @@ def create_client_ssl_context
358412
end
359413
end
360414
end
415+
416+
def wait_until_listening(server_transport, server_thread)
417+
Timeout.timeout(2) do
418+
until server_transport.handle
419+
raise "Server thread exited unexpectedly" unless server_thread.alive?
420+
sleep 0.01
421+
end
422+
end
423+
end
424+
425+
def available_port
426+
TCPServer.open('localhost', 0) { |server| server.addr[1] }
427+
end
361428
end

0 commit comments

Comments
 (0)