Skip to content

Commit fe62a83

Browse files
committed
fix(libev): preserve last_error in close() and guard handlers against closed state
When 'close()' is called before 'connected_event' is set (connection not yet fully established), the original code signals waiters but leaves 'last_error' unset, allowing 'factory()' to potentially return a dead connection that appears valid. Fix by: - Preserving 'last_error' in 'close()' when 'connected_event' is unset, so 'factory()' can detect dead connections. - Adding early-return guards at the top of 'handle_read()'/'handle_write()' to skip unnecessary work when watchers fire after 'close()' (the watchers are stopped asynchronously in '_loop_will_run()') - Setting 'last_error' before 'close()' on EOF so the shutdown reason is propagated to in-flight requests. Fixes: #614
1 parent 44bc95a commit fe62a83

2 files changed

Lines changed: 190 additions & 1 deletion

File tree

cassandra/io/libevreactor.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,10 +316,18 @@ def close(self):
316316
msg = "Connection to %s was closed" % self.endpoint
317317
if self.last_error:
318318
msg += ": %s" % (self.last_error,)
319-
self.error_all_requests(ConnectionShutdown(msg))
319+
shutdown_exc = ConnectionShutdown(msg)
320+
self.error_all_requests(shutdown_exc)
321+
# Preserve the error for factory() to detect dead connections
322+
# that died before connected_event was set.
323+
if not self.connected_event.is_set():
324+
self.last_error = shutdown_exc
320325
self.connected_event.set()
321326

322327
def handle_write(self, watcher, revents, errno=None):
328+
if self.is_closed or self.is_defunct:
329+
return
330+
323331
if revents & libev.EV_ERROR:
324332
if errno:
325333
exc = IOError(errno, os.strerror(errno))
@@ -361,6 +369,9 @@ def handle_write(self, watcher, revents, errno=None):
361369
return
362370

363371
def handle_read(self, watcher, revents, errno=None):
372+
if self.is_closed or self.is_defunct:
373+
return
374+
364375
if revents & libev.EV_ERROR:
365376
if errno:
366377
exc = IOError(errno, os.strerror(errno))
@@ -394,6 +405,8 @@ def handle_read(self, watcher, revents, errno=None):
394405
self.process_io_buffer()
395406
else:
396407
log.debug("Connection %s closed by server", self)
408+
self.last_error = ConnectionShutdown(
409+
"Connection to %s was closed by server" % self.endpoint)
397410
self.close()
398411

399412
def push(self, data):
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# Copyright ScyllaDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Tests for the libev close() race condition fix (scylladb/python-driver#614).
17+
18+
The race: close() sets is_closed=True and calls _socket.close() immediately,
19+
but libev watchers are stopped asynchronously in _loop_will_run(). Between
20+
socket close and watcher stop, handle_read()/handle_write() can fire on the
21+
closed fd.
22+
23+
The fix adds early-return guards at the top of handle_read() and handle_write()
24+
that check is_closed/is_defunct before touching the socket, and preserves
25+
last_error in close() when connected_event is unset.
26+
"""
27+
28+
import unittest
29+
from unittest.mock import patch, MagicMock
30+
31+
from cassandra import DependencyException
32+
33+
try:
34+
from cassandra.io.libevreactor import LibevConnection
35+
except (ImportError, DependencyException):
36+
LibevConnection = None
37+
38+
from tests import is_monkey_patched
39+
40+
41+
def _skip_if_unavailable(test_case):
42+
if is_monkey_patched():
43+
raise unittest.SkipTest("Can't test libev with monkey patching")
44+
if LibevConnection is None:
45+
raise unittest.SkipTest('libev does not appear to be installed correctly')
46+
47+
48+
class LibevCloseRaceTest(unittest.TestCase):
49+
"""
50+
Tests for the close-race fix in LibevConnection (issue #614).
51+
52+
Each test simulates the scenario where a watcher fires after close()
53+
has already been called, verifying that the handler exits gracefully
54+
without calling defunct() or raising.
55+
"""
56+
57+
def setUp(self):
58+
_skip_if_unavailable(self)
59+
LibevConnection.initialize_reactor()
60+
61+
self.patchers = [
62+
patch('socket.socket'),
63+
patch('cassandra.io.libevwrapper.IO'),
64+
patch('cassandra.io.libevreactor.LibevLoop.maybe_start'),
65+
]
66+
for p in self.patchers:
67+
p.start()
68+
self._connections = []
69+
70+
def tearDown(self):
71+
for c in self._connections:
72+
c.close()
73+
for p in self.patchers:
74+
p.stop()
75+
76+
def _make_connection(self):
77+
from cassandra.connection import DefaultEndPoint
78+
c = LibevConnection(DefaultEndPoint('1.2.3.4'), cql_version='3.0.1', connect_timeout=5)
79+
mock_socket = MagicMock()
80+
mock_socket.send.side_effect = lambda x: len(x)
81+
mock_socket.recv.return_value = b''
82+
c._socket = mock_socket
83+
self._connections.append(c)
84+
return c
85+
86+
# ------------------------------------------------------------------
87+
# handle_write guards
88+
# ------------------------------------------------------------------
89+
90+
def test_handle_write_returns_immediately_when_closed(self):
91+
"""
92+
handle_write() must be a no-op if is_closed is already True.
93+
This prevents EBADF when the watcher fires after close().
94+
"""
95+
c = self._make_connection()
96+
c.is_closed = True
97+
c.deque.append(b"data")
98+
99+
# Should not raise and should not call send()
100+
c.handle_write(None, 0)
101+
c._socket.send.assert_not_called()
102+
103+
def test_handle_write_returns_immediately_when_defunct(self):
104+
"""
105+
handle_write() must be a no-op if is_defunct is already True.
106+
"""
107+
c = self._make_connection()
108+
c.is_defunct = True
109+
c.deque.append(b"data")
110+
111+
c.handle_write(None, 0)
112+
c._socket.send.assert_not_called()
113+
114+
# ------------------------------------------------------------------
115+
# handle_read guards
116+
# ------------------------------------------------------------------
117+
118+
def test_handle_read_returns_immediately_when_closed(self):
119+
"""
120+
handle_read() must be a no-op if is_closed is already True.
121+
This prevents EBADF when the watcher fires after close().
122+
"""
123+
c = self._make_connection()
124+
c.is_closed = True
125+
126+
# Should not raise and should not call recv()
127+
c.handle_read(None, 0)
128+
c._socket.recv.assert_not_called()
129+
130+
def test_handle_read_returns_immediately_when_defunct(self):
131+
"""
132+
handle_read() must be a no-op if is_defunct is already True.
133+
"""
134+
c = self._make_connection()
135+
c.is_defunct = True
136+
137+
c.handle_read(None, 0)
138+
c._socket.recv.assert_not_called()
139+
140+
# ------------------------------------------------------------------
141+
# handle_read EOF sets last_error
142+
# ------------------------------------------------------------------
143+
144+
def test_handle_read_eof_sets_last_error_before_close(self):
145+
"""
146+
When recv() returns empty bytes (EOF / server closed connection),
147+
last_error must be set before close() is called so that
148+
factory() can detect the dead connection.
149+
"""
150+
c = self._make_connection()
151+
c._socket.recv.return_value = b''
152+
153+
with patch.object(c, 'close') as mock_close:
154+
c.handle_read(None, 0)
155+
mock_close.assert_called_once()
156+
self.assertIsNotNone(c.last_error)
157+
self.assertIn("closed by server", str(c.last_error))
158+
159+
# ------------------------------------------------------------------
160+
# close() preserves last_error for factory()
161+
# ------------------------------------------------------------------
162+
163+
def test_close_sets_last_error_when_connected_event_not_set(self):
164+
"""
165+
When close() is called before connected_event is set (i.e. the
166+
connection was never fully established), last_error must be
167+
populated so factory() doesn't return a dead connection.
168+
"""
169+
c = self._make_connection()
170+
# connected_event is not set by default in a fresh connection
171+
c.connected_event.clear()
172+
173+
c.close()
174+
175+
self.assertIsNotNone(c.last_error)
176+
self.assertIn("was closed", str(c.last_error))

0 commit comments

Comments
 (0)