Skip to content

Commit ad60335

Browse files
committed
fix(libev): guard handle_read/handle_write against close() race condition
When close() is called from one thread, it sets is_closed=True and closes the socket immediately. However, libev watchers are stopped asynchronously in _loop_will_run(), so handle_read()/handle_write() can still fire on the now-closed fd, causing EBADF errors that surface as ConnectionShutdown('Bad file descriptor') and prevent reconnection. So, fix it applying following changes: - Early-return guards at the top of handle_read() and handle_write() that check is_closed/is_defunct before touching the socket - Secondary is_closed/is_defunct checks in error handlers to catch the race when close() happens between watcher dispatch and syscall - Peer disconnect detection (EBADF, ECONNRESET, ENOTCONN, etc.) that calls close() cleanly instead of defunct() - last_error preservation in close() when connected_event is unset, preventing factory() from returning dead connections Fixes: #614
1 parent 44bc95a commit ad60335

2 files changed

Lines changed: 301 additions & 1 deletion

File tree

cassandra/io/libevreactor.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414
import atexit
1515
from collections import deque
16+
import errno
1617
import logging
1718
import os
1819
import socket
@@ -38,6 +39,14 @@
3839

3940
log = logging.getLogger(__name__)
4041

42+
# Errno values that indicate the remote peer has disconnected or the
43+
# socket is no longer usable. When we see these during shutdown we
44+
# treat them as a clean close instead of defuncting the connection.
45+
_PEER_DISCONNECT_ERRNOS = frozenset((
46+
errno.EBADF, errno.ENOTCONN, errno.ESHUTDOWN,
47+
errno.ECONNRESET, errno.ECONNABORTED,
48+
))
49+
4150

4251
def _cleanup(loop):
4352
if loop:
@@ -316,10 +325,18 @@ def close(self):
316325
msg = "Connection to %s was closed" % self.endpoint
317326
if self.last_error:
318327
msg += ": %s" % (self.last_error,)
319-
self.error_all_requests(ConnectionShutdown(msg))
328+
shutdown_exc = ConnectionShutdown(msg)
329+
self.error_all_requests(shutdown_exc)
330+
# Preserve the error for factory() to detect dead connections
331+
# that died before connected_event was set.
332+
if not self.connected_event.is_set():
333+
self.last_error = shutdown_exc
320334
self.connected_event.set()
321335

322336
def handle_write(self, watcher, revents, errno=None):
337+
if self.is_closed or self.is_defunct:
338+
return
339+
323340
if revents & libev.EV_ERROR:
324341
if errno:
325342
exc = IOError(errno, os.strerror(errno))
@@ -347,6 +364,16 @@ def handle_write(self, watcher, revents, errno=None):
347364
self._socket_writable = False
348365
with self._deque_lock:
349366
self.deque.appendleft(next_msg)
367+
elif self.is_closed or self.is_defunct:
368+
# Socket was closed by another thread between the
369+
# watcher firing and us calling send(). This is the
370+
# race described in scylladb/python-driver#614.
371+
return
372+
elif getattr(err, 'errno', err.args[0]) in _PEER_DISCONNECT_ERRNOS:
373+
# Peer disconnected — treat as clean close.
374+
log.debug("Connection %s closed by peer during write: %s",
375+
self, err)
376+
self.close()
350377
else:
351378
self.defunct(err)
352379
return
@@ -361,6 +388,9 @@ def handle_write(self, watcher, revents, errno=None):
361388
return
362389

363390
def handle_read(self, watcher, revents, errno=None):
391+
if self.is_closed or self.is_defunct:
392+
return
393+
364394
if revents & libev.EV_ERROR:
365395
if errno:
366396
exc = IOError(errno, os.strerror(errno))
@@ -381,11 +411,26 @@ def handle_read(self, watcher, revents, errno=None):
381411
if not self._iobuf.tell():
382412
return
383413
else:
414+
if self.is_closed or self.is_defunct:
415+
return
384416
self.defunct(err)
385417
return
386418
elif err.args[0] in NONBLOCKING:
387419
if not self._iobuf.tell():
388420
return
421+
elif self.is_closed or self.is_defunct:
422+
# Socket was closed by another thread between the watcher
423+
# firing and us calling recv(). This is the race described
424+
# in scylladb/python-driver#614.
425+
return
426+
elif getattr(err, 'errno', err.args[0]) in _PEER_DISCONNECT_ERRNOS:
427+
# Peer disconnected — treat as clean close.
428+
log.debug("Connection %s closed by peer during read: %s",
429+
self, err)
430+
self.last_error = ConnectionShutdown(
431+
"Connection to %s was closed by peer" % self.endpoint)
432+
self.close()
433+
return
389434
else:
390435
self.defunct(err)
391436
return
@@ -394,6 +439,8 @@ def handle_read(self, watcher, revents, errno=None):
394439
self.process_io_buffer()
395440
else:
396441
log.debug("Connection %s closed by server", self)
442+
self.last_error = ConnectionShutdown(
443+
"Connection to %s was closed by server" % self.endpoint)
397444
self.close()
398445

399446
def push(self, data):
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
# Copyright ScyllaDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Tests for the libev close() race condition fix (scylladb/python-driver#614).
17+
18+
The race: close() sets is_closed=True and calls _socket.close() immediately,
19+
but libev watchers are stopped asynchronously in _loop_will_run(). Between
20+
socket close and watcher stop, handle_read()/handle_write() can fire on the
21+
closed fd, causing EBADF -> ConnectionShutdown.
22+
23+
The fix adds early-return guards at the top of handle_read() and handle_write()
24+
that check is_closed/is_defunct before touching the socket.
25+
"""
26+
27+
import errno
28+
import socket
29+
import unittest
30+
from unittest.mock import patch, MagicMock, PropertyMock
31+
32+
from cassandra import DependencyException
33+
34+
try:
35+
from cassandra.io.libevreactor import LibevConnection, _PEER_DISCONNECT_ERRNOS
36+
except (ImportError, DependencyException):
37+
LibevConnection = None
38+
_PEER_DISCONNECT_ERRNOS = None
39+
40+
from tests import is_monkey_patched
41+
42+
43+
def _skip_if_unavailable(test_case):
44+
if is_monkey_patched():
45+
raise unittest.SkipTest("Can't test libev with monkey patching")
46+
if LibevConnection is None:
47+
raise unittest.SkipTest('libev does not appear to be installed correctly')
48+
49+
50+
class LibevCloseRaceTest(unittest.TestCase):
51+
"""
52+
Tests for the close-race fix in LibevConnection (issue #614).
53+
54+
Each test simulates the scenario where a watcher fires after close()
55+
has already been called, verifying that the handler exits gracefully
56+
without calling defunct() or raising.
57+
"""
58+
59+
def setUp(self):
60+
_skip_if_unavailable(self)
61+
LibevConnection.initialize_reactor()
62+
63+
self.patchers = [
64+
patch('socket.socket'),
65+
patch('cassandra.io.libevwrapper.IO'),
66+
patch('cassandra.io.libevreactor.LibevLoop.maybe_start'),
67+
]
68+
for p in self.patchers:
69+
p.start()
70+
71+
def tearDown(self):
72+
for p in self.patchers:
73+
p.stop()
74+
75+
def _make_connection(self):
76+
from cassandra.connection import DefaultEndPoint
77+
c = LibevConnection(DefaultEndPoint('1.2.3.4'), cql_version='3.0.1', connect_timeout=5)
78+
mock_socket = MagicMock()
79+
mock_socket.send.side_effect = lambda x: len(x)
80+
mock_socket.recv.return_value = b''
81+
c._socket = mock_socket
82+
return c
83+
84+
# ------------------------------------------------------------------
85+
# handle_write guards
86+
# ------------------------------------------------------------------
87+
88+
def test_handle_write_returns_immediately_when_closed(self):
89+
"""
90+
handle_write() must be a no-op if is_closed is already True.
91+
This prevents EBADF when the watcher fires after close().
92+
"""
93+
c = self._make_connection()
94+
c.is_closed = True
95+
c.deque.append(b"data")
96+
97+
# Should not raise and should not call send()
98+
c.handle_write(None, 0)
99+
c._socket.send.assert_not_called()
100+
101+
def test_handle_write_returns_immediately_when_defunct(self):
102+
"""
103+
handle_write() must be a no-op if is_defunct is already True.
104+
"""
105+
c = self._make_connection()
106+
c.is_defunct = True
107+
c.deque.append(b"data")
108+
109+
c.handle_write(None, 0)
110+
c._socket.send.assert_not_called()
111+
112+
def test_handle_write_ebadf_after_close_does_not_defunct(self):
113+
"""
114+
If close() races with handle_write() and send() raises EBADF,
115+
the handler should return without calling defunct().
116+
"""
117+
c = self._make_connection()
118+
c.deque.append(b"data")
119+
120+
# Simulate close() racing: send() raises EBADF, and is_closed is set
121+
c._socket.send.side_effect = socket.error(errno.EBADF, "Bad file descriptor")
122+
c.is_closed = True
123+
124+
with patch.object(c, 'defunct') as mock_defunct:
125+
c.handle_write(None, 0)
126+
mock_defunct.assert_not_called()
127+
128+
def test_handle_write_peer_disconnect_closes_cleanly(self):
129+
"""
130+
If send() raises ECONNRESET (peer disconnect), handle_write()
131+
should call close() instead of defunct().
132+
"""
133+
c = self._make_connection()
134+
c.deque.append(b"data")
135+
c._socket.send.side_effect = socket.error(errno.ECONNRESET, "Connection reset by peer")
136+
137+
with patch.object(c, 'defunct') as mock_defunct, \
138+
patch.object(c, 'close') as mock_close:
139+
c.handle_write(None, 0)
140+
mock_defunct.assert_not_called()
141+
mock_close.assert_called_once()
142+
143+
# ------------------------------------------------------------------
144+
# handle_read guards
145+
# ------------------------------------------------------------------
146+
147+
def test_handle_read_returns_immediately_when_closed(self):
148+
"""
149+
handle_read() must be a no-op if is_closed is already True.
150+
This prevents EBADF when the watcher fires after close().
151+
"""
152+
c = self._make_connection()
153+
c.is_closed = True
154+
155+
# Should not raise and should not call recv()
156+
c.handle_read(None, 0)
157+
c._socket.recv.assert_not_called()
158+
159+
def test_handle_read_returns_immediately_when_defunct(self):
160+
"""
161+
handle_read() must be a no-op if is_defunct is already True.
162+
"""
163+
c = self._make_connection()
164+
c.is_defunct = True
165+
166+
c.handle_read(None, 0)
167+
c._socket.recv.assert_not_called()
168+
169+
def test_handle_read_ebadf_after_close_does_not_defunct(self):
170+
"""
171+
If close() races with handle_read() and recv() raises EBADF,
172+
the handler should return without calling defunct().
173+
"""
174+
c = self._make_connection()
175+
c._socket.recv.side_effect = socket.error(errno.EBADF, "Bad file descriptor")
176+
c.is_closed = True
177+
178+
with patch.object(c, 'defunct') as mock_defunct:
179+
c.handle_read(None, 0)
180+
mock_defunct.assert_not_called()
181+
182+
def test_handle_read_peer_disconnect_closes_cleanly(self):
183+
"""
184+
If recv() raises ECONNRESET (peer disconnect), handle_read()
185+
should set last_error and call close() instead of defunct().
186+
"""
187+
c = self._make_connection()
188+
c._socket.recv.side_effect = socket.error(errno.ECONNRESET, "Connection reset by peer")
189+
190+
with patch.object(c, 'defunct') as mock_defunct, \
191+
patch.object(c, 'close') as mock_close:
192+
c.handle_read(None, 0)
193+
mock_defunct.assert_not_called()
194+
mock_close.assert_called_once()
195+
# last_error should be set before close()
196+
self.assertIsNotNone(c.last_error)
197+
self.assertIn("closed by peer", str(c.last_error))
198+
199+
def test_handle_read_eof_sets_last_error_before_close(self):
200+
"""
201+
When recv() returns empty bytes (EOF / server closed connection),
202+
last_error must be set before close() is called so that
203+
factory() can detect the dead connection.
204+
"""
205+
c = self._make_connection()
206+
c._socket.recv.return_value = b''
207+
208+
with patch.object(c, 'close') as mock_close:
209+
c.handle_read(None, 0)
210+
mock_close.assert_called_once()
211+
self.assertIsNotNone(c.last_error)
212+
self.assertIn("closed by server", str(c.last_error))
213+
214+
# ------------------------------------------------------------------
215+
# close() preserves last_error for factory()
216+
# ------------------------------------------------------------------
217+
218+
def test_close_sets_last_error_when_connected_event_not_set(self):
219+
"""
220+
When close() is called before connected_event is set (i.e. the
221+
connection was never fully established), last_error must be
222+
populated so factory() doesn't return a dead connection.
223+
"""
224+
c = self._make_connection()
225+
# connected_event is not set by default in a fresh connection
226+
c.connected_event.clear()
227+
228+
c.close()
229+
230+
self.assertIsNotNone(c.last_error)
231+
self.assertIn("was closed", str(c.last_error))
232+
233+
234+
class LibevPeerDisconnectErrnos(unittest.TestCase):
235+
"""Verify _PEER_DISCONNECT_ERRNOS contains expected values."""
236+
237+
def setUp(self):
238+
_skip_if_unavailable(self)
239+
240+
def test_contains_ebadf(self):
241+
self.assertIn(errno.EBADF, _PEER_DISCONNECT_ERRNOS)
242+
243+
def test_contains_econnreset(self):
244+
self.assertIn(errno.ECONNRESET, _PEER_DISCONNECT_ERRNOS)
245+
246+
def test_contains_econnaborted(self):
247+
self.assertIn(errno.ECONNABORTED, _PEER_DISCONNECT_ERRNOS)
248+
249+
def test_contains_enotconn(self):
250+
self.assertIn(errno.ENOTCONN, _PEER_DISCONNECT_ERRNOS)
251+
252+
def test_contains_eshutdown(self):
253+
self.assertIn(errno.ESHUTDOWN, _PEER_DISCONNECT_ERRNOS)

0 commit comments

Comments
 (0)