Skip to content

Commit 04eea80

Browse files
committed
fix(libev): guard handle_read/handle_write against close() race condition
When `close()` is called before the connection handshake completes, the original code signals waiters via `connected_event.set()` but doesn't populate `last_error`, allowing `factory()` to potentially return a dead connection. Additionally, peer disconnects (ECONNRESET, ENOTCONN, etc.) during I/O were routed through defunct(), which is heavier than necessary for a clean shutdown scenario. So, fix it applying following changes: - Preserve `last_error` in `close()` when `connected_event` is unset, so `factory()` can detect dead connections. - Peer disconnect detection (ECONNRESET, ENOTCONN, etc.) that calls `close()` cleanly instead of `defunct()`. - Defensive early-return guards in `handle_read()/handle_write()` to skip unnecessary work when connection is already closed/defunct. Fixes: #614
1 parent 44bc95a commit 04eea80

2 files changed

Lines changed: 322 additions & 1 deletion

File tree

cassandra/io/libevreactor.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414
import atexit
1515
from collections import deque
16+
import errno
1617
import logging
1718
import os
1819
import socket
@@ -38,6 +39,14 @@
3839

3940
log = logging.getLogger(__name__)
4041

42+
# Errno values that indicate the remote peer has disconnected or the
43+
# socket is no longer usable. When we see these during shutdown we
44+
# treat them as a clean close instead of defuncting the connection.
45+
_PEER_DISCONNECT_ERRNOS = frozenset((
46+
errno.EBADF, errno.ENOTCONN, errno.ESHUTDOWN,
47+
errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE,
48+
))
49+
4150

4251
def _cleanup(loop):
4352
if loop:
@@ -316,10 +325,18 @@ def close(self):
316325
msg = "Connection to %s was closed" % self.endpoint
317326
if self.last_error:
318327
msg += ": %s" % (self.last_error,)
319-
self.error_all_requests(ConnectionShutdown(msg))
328+
shutdown_exc = ConnectionShutdown(msg)
329+
self.error_all_requests(shutdown_exc)
330+
# Preserve the error for factory() to detect dead connections
331+
# that died before connected_event was set.
332+
if not self.connected_event.is_set():
333+
self.last_error = shutdown_exc
320334
self.connected_event.set()
321335

322336
def handle_write(self, watcher, revents, errno=None):
337+
if self.is_closed or self.is_defunct:
338+
return
339+
323340
if revents & libev.EV_ERROR:
324341
if errno:
325342
exc = IOError(errno, os.strerror(errno))
@@ -347,6 +364,16 @@ def handle_write(self, watcher, revents, errno=None):
347364
self._socket_writable = False
348365
with self._deque_lock:
349366
self.deque.appendleft(next_msg)
367+
elif self.is_closed or self.is_defunct:
368+
# Socket was closed by another thread between the
369+
# watcher firing and us calling send(). This is the
370+
# race described in scylladb/python-driver#614.
371+
return
372+
elif getattr(err, 'errno', err.args[0]) in _PEER_DISCONNECT_ERRNOS:
373+
# Peer disconnected — treat as clean close.
374+
log.debug("Connection %s closed by peer during write: %s",
375+
self, err)
376+
self.close()
350377
else:
351378
self.defunct(err)
352379
return
@@ -361,6 +388,9 @@ def handle_write(self, watcher, revents, errno=None):
361388
return
362389

363390
def handle_read(self, watcher, revents, errno=None):
391+
if self.is_closed or self.is_defunct:
392+
return
393+
364394
if revents & libev.EV_ERROR:
365395
if errno:
366396
exc = IOError(errno, os.strerror(errno))
@@ -381,11 +411,26 @@ def handle_read(self, watcher, revents, errno=None):
381411
if not self._iobuf.tell():
382412
return
383413
else:
414+
if self.is_closed or self.is_defunct:
415+
return
384416
self.defunct(err)
385417
return
386418
elif err.args[0] in NONBLOCKING:
387419
if not self._iobuf.tell():
388420
return
421+
elif self.is_closed or self.is_defunct:
422+
# Socket was closed by another thread between the watcher
423+
# firing and us calling recv(). This is the race described
424+
# in scylladb/python-driver#614.
425+
return
426+
elif getattr(err, 'errno', err.args[0]) in _PEER_DISCONNECT_ERRNOS:
427+
# Peer disconnected — treat as clean close.
428+
log.debug("Connection %s closed by peer during read: %s",
429+
self, err)
430+
self.last_error = ConnectionShutdown(
431+
"Connection to %s was closed by peer" % self.endpoint)
432+
self.close()
433+
return
389434
else:
390435
self.defunct(err)
391436
return
@@ -394,6 +439,8 @@ def handle_read(self, watcher, revents, errno=None):
394439
self.process_io_buffer()
395440
else:
396441
log.debug("Connection %s closed by server", self)
442+
self.last_error = ConnectionShutdown(
443+
"Connection to %s was closed by server" % self.endpoint)
397444
self.close()
398445

399446
def push(self, data):
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
# Copyright ScyllaDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Tests for the libev close() race condition fix (scylladb/python-driver#614).
17+
18+
The race: close() sets is_closed=True and calls _socket.close() immediately,
19+
but libev watchers are stopped asynchronously in _loop_will_run(). Between
20+
socket close and watcher stop, handle_read()/handle_write() can fire on the
21+
closed fd, causing EBADF -> ConnectionShutdown.
22+
23+
The fix adds early-return guards at the top of handle_read() and handle_write()
24+
that check is_closed/is_defunct before touching the socket.
25+
"""
26+
27+
import errno
28+
import socket
29+
import unittest
30+
from unittest.mock import patch, MagicMock, PropertyMock
31+
32+
from cassandra import DependencyException
33+
34+
try:
35+
from cassandra.io.libevreactor import LibevConnection, _PEER_DISCONNECT_ERRNOS
36+
except (ImportError, DependencyException):
37+
LibevConnection = None
38+
_PEER_DISCONNECT_ERRNOS = None
39+
40+
from tests import is_monkey_patched
41+
42+
43+
def _skip_if_unavailable(test_case):
44+
if is_monkey_patched():
45+
raise unittest.SkipTest("Can't test libev with monkey patching")
46+
if LibevConnection is None:
47+
raise unittest.SkipTest('libev does not appear to be installed correctly')
48+
49+
50+
class LibevCloseRaceTest(unittest.TestCase):
51+
"""
52+
Tests for the close-race fix in LibevConnection (issue #614).
53+
54+
Each test simulates the scenario where a watcher fires after close()
55+
has already been called, verifying that the handler exits gracefully
56+
without calling defunct() or raising.
57+
"""
58+
59+
def setUp(self):
60+
_skip_if_unavailable(self)
61+
LibevConnection.initialize_reactor()
62+
63+
self.patchers = [
64+
patch('socket.socket'),
65+
patch('cassandra.io.libevwrapper.IO'),
66+
patch('cassandra.io.libevreactor.LibevLoop.maybe_start'),
67+
]
68+
for p in self.patchers:
69+
p.start()
70+
self._connections = []
71+
72+
def tearDown(self):
73+
for c in self._connections:
74+
c.close()
75+
for p in self.patchers:
76+
p.stop()
77+
78+
def _make_connection(self):
79+
from cassandra.connection import DefaultEndPoint
80+
c = LibevConnection(DefaultEndPoint('1.2.3.4'), cql_version='3.0.1', connect_timeout=5)
81+
mock_socket = MagicMock()
82+
mock_socket.send.side_effect = lambda x: len(x)
83+
mock_socket.recv.return_value = b''
84+
c._socket = mock_socket
85+
self._connections.append(c)
86+
return c
87+
88+
# ------------------------------------------------------------------
89+
# handle_write guards
90+
# ------------------------------------------------------------------
91+
92+
def test_handle_write_returns_immediately_when_closed(self):
93+
"""
94+
handle_write() must be a no-op if is_closed is already True.
95+
This prevents EBADF when the watcher fires after close().
96+
"""
97+
c = self._make_connection()
98+
c.is_closed = True
99+
c.deque.append(b"data")
100+
101+
# Should not raise and should not call send()
102+
c.handle_write(None, 0)
103+
c._socket.send.assert_not_called()
104+
105+
def test_handle_write_returns_immediately_when_defunct(self):
106+
"""
107+
handle_write() must be a no-op if is_defunct is already True.
108+
"""
109+
c = self._make_connection()
110+
c.is_defunct = True
111+
c.deque.append(b"data")
112+
113+
c.handle_write(None, 0)
114+
c._socket.send.assert_not_called()
115+
116+
def test_handle_write_ebadf_after_close_does_not_defunct(self):
117+
"""
118+
If close() races with handle_write() and send() raises EBADF,
119+
the handler should return without calling defunct().
120+
121+
Simulates close() happening between watcher dispatch and the
122+
error handler check: is_closed is False when the handler starts
123+
(passes top guard) but becomes True when send() is called.
124+
"""
125+
c = self._make_connection()
126+
c.deque.append(b"data")
127+
128+
def send_sets_closed_and_raises(data):
129+
c.is_closed = True
130+
raise socket.error(errno.EBADF, "Bad file descriptor")
131+
132+
c._socket.send.side_effect = send_sets_closed_and_raises
133+
134+
with patch.object(c, 'defunct') as mock_defunct:
135+
c.handle_write(None, 0)
136+
mock_defunct.assert_not_called()
137+
138+
def test_handle_write_peer_disconnect_closes_cleanly(self):
139+
"""
140+
If send() raises ECONNRESET (peer disconnect), handle_write()
141+
should call close() instead of defunct().
142+
"""
143+
c = self._make_connection()
144+
c.deque.append(b"data")
145+
c._socket.send.side_effect = socket.error(errno.ECONNRESET, "Connection reset by peer")
146+
147+
with patch.object(c, 'defunct') as mock_defunct, \
148+
patch.object(c, 'close') as mock_close:
149+
c.handle_write(None, 0)
150+
mock_defunct.assert_not_called()
151+
mock_close.assert_called_once()
152+
153+
# ------------------------------------------------------------------
154+
# handle_read guards
155+
# ------------------------------------------------------------------
156+
157+
def test_handle_read_returns_immediately_when_closed(self):
158+
"""
159+
handle_read() must be a no-op if is_closed is already True.
160+
This prevents EBADF when the watcher fires after close().
161+
"""
162+
c = self._make_connection()
163+
c.is_closed = True
164+
165+
# Should not raise and should not call recv()
166+
c.handle_read(None, 0)
167+
c._socket.recv.assert_not_called()
168+
169+
def test_handle_read_returns_immediately_when_defunct(self):
170+
"""
171+
handle_read() must be a no-op if is_defunct is already True.
172+
"""
173+
c = self._make_connection()
174+
c.is_defunct = True
175+
176+
c.handle_read(None, 0)
177+
c._socket.recv.assert_not_called()
178+
179+
def test_handle_read_ebadf_after_close_does_not_defunct(self):
180+
"""
181+
If close() races with handle_read() and recv() raises EBADF,
182+
the handler should return without calling defunct().
183+
184+
Simulates close() happening between watcher dispatch and the
185+
error handler check: is_closed is False when the handler starts
186+
(passes top guard) but becomes True when recv() is called.
187+
"""
188+
c = self._make_connection()
189+
190+
def recv_sets_closed_and_raises(bufsize):
191+
c.is_closed = True
192+
raise socket.error(errno.EBADF, "Bad file descriptor")
193+
194+
c._socket.recv.side_effect = recv_sets_closed_and_raises
195+
196+
with patch.object(c, 'defunct') as mock_defunct:
197+
c.handle_read(None, 0)
198+
mock_defunct.assert_not_called()
199+
200+
def test_handle_read_peer_disconnect_closes_cleanly(self):
201+
"""
202+
If recv() raises ECONNRESET (peer disconnect), handle_read()
203+
should set last_error and call close() instead of defunct().
204+
"""
205+
c = self._make_connection()
206+
c._socket.recv.side_effect = socket.error(errno.ECONNRESET, "Connection reset by peer")
207+
208+
with patch.object(c, 'defunct') as mock_defunct, \
209+
patch.object(c, 'close') as mock_close:
210+
c.handle_read(None, 0)
211+
mock_defunct.assert_not_called()
212+
mock_close.assert_called_once()
213+
# last_error should be set before close()
214+
self.assertIsNotNone(c.last_error)
215+
self.assertIn("closed by peer", str(c.last_error))
216+
217+
def test_handle_read_eof_sets_last_error_before_close(self):
218+
"""
219+
When recv() returns empty bytes (EOF / server closed connection),
220+
last_error must be set before close() is called so that
221+
factory() can detect the dead connection.
222+
"""
223+
c = self._make_connection()
224+
c._socket.recv.return_value = b''
225+
226+
with patch.object(c, 'close') as mock_close:
227+
c.handle_read(None, 0)
228+
mock_close.assert_called_once()
229+
self.assertIsNotNone(c.last_error)
230+
self.assertIn("closed by server", str(c.last_error))
231+
232+
# ------------------------------------------------------------------
233+
# close() preserves last_error for factory()
234+
# ------------------------------------------------------------------
235+
236+
def test_close_sets_last_error_when_connected_event_not_set(self):
237+
"""
238+
When close() is called before connected_event is set (i.e. the
239+
connection was never fully established), last_error must be
240+
populated so factory() doesn't return a dead connection.
241+
"""
242+
c = self._make_connection()
243+
# connected_event is not set by default in a fresh connection
244+
c.connected_event.clear()
245+
246+
c.close()
247+
248+
self.assertIsNotNone(c.last_error)
249+
self.assertIn("was closed", str(c.last_error))
250+
251+
252+
class LibevPeerDisconnectErrnos(unittest.TestCase):
253+
"""Verify _PEER_DISCONNECT_ERRNOS contains expected values."""
254+
255+
def setUp(self):
256+
_skip_if_unavailable(self)
257+
258+
def test_contains_ebadf(self):
259+
self.assertIn(errno.EBADF, _PEER_DISCONNECT_ERRNOS)
260+
261+
def test_contains_econnreset(self):
262+
self.assertIn(errno.ECONNRESET, _PEER_DISCONNECT_ERRNOS)
263+
264+
def test_contains_econnaborted(self):
265+
self.assertIn(errno.ECONNABORTED, _PEER_DISCONNECT_ERRNOS)
266+
267+
def test_contains_enotconn(self):
268+
self.assertIn(errno.ENOTCONN, _PEER_DISCONNECT_ERRNOS)
269+
270+
def test_contains_eshutdown(self):
271+
self.assertIn(errno.ESHUTDOWN, _PEER_DISCONNECT_ERRNOS)
272+
273+
def test_contains_epipe(self):
274+
self.assertIn(errno.EPIPE, _PEER_DISCONNECT_ERRNOS)

0 commit comments

Comments
 (0)