Skip to content

Commit 9f957e3

Browse files
committed
cluster: release host lock before down listeners
1 parent 9c441bc commit 9f957e3

2 files changed

Lines changed: 36 additions & 4 deletions

File tree

cassandra/cluster.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2592,19 +2592,21 @@ def on_down_potentially_blocking(
25922592
else:
25932593
session.on_down(host, expected_endpoint=expected_endpoint)
25942594

2595+
notify_listeners = False
25952596
if endpoint_matches:
25962597
if expected_endpoint is None:
2597-
for listener in self.listeners:
2598-
listener.on_down(host)
2598+
notify_listeners = True
25992599
else:
26002600
with host.lock:
26012601
if not self._endpoints_match(host.endpoint, expected_endpoint):
26022602
log.debug("Ignoring stale down handling for host %s; endpoint changed from %s",
26032603
host, expected_endpoint)
26042604
endpoint_matches = False
26052605
else:
2606-
for listener in self.listeners:
2607-
listener.on_down(host)
2606+
notify_listeners = True
2607+
if notify_listeners:
2608+
for listener in self.listeners:
2609+
listener.on_down(host)
26082610

26092611
with host.lock:
26102612
start_reconnector = (endpoint_matches and

tests/unit/test_cluster.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,6 +1301,36 @@ def test_reserved_down_handling_after_endpoint_swap_removes_stale_pool(self):
13011301
cluster._start_reconnector.assert_not_called()
13021302
assert self._state(cluster, host).down_epoch is None
13031303

1304+
def test_expected_endpoint_down_listener_is_not_called_under_host_lock(self):
1305+
session = Mock()
1306+
cluster = self._make_cluster(session=session)
1307+
host = self._make_host()
1308+
host.set_up()
1309+
expected_endpoint = host.endpoint
1310+
down_epoch = self._reserve_down_handling(cluster, host)
1311+
blocked_on_host = []
1312+
1313+
class Listener(object):
1314+
1315+
def on_down(self, _host):
1316+
def try_host_lock():
1317+
acquired = host.lock.acquire(timeout=0.2)
1318+
blocked_on_host.append(not acquired)
1319+
if acquired:
1320+
host.lock.release()
1321+
1322+
worker = Thread(target=try_host_lock)
1323+
worker.start()
1324+
worker.join(1)
1325+
1326+
cluster._listeners = set([Listener()])
1327+
1328+
Cluster.on_down_potentially_blocking(
1329+
cluster, host, is_host_addition=False, down_epoch=down_epoch,
1330+
expected_endpoint=expected_endpoint)
1331+
1332+
self.assertEqual(blocked_on_host, [False])
1333+
13041334
def test_endpoint_match_preserves_endpoint_specific_identity(self):
13051335
proxy_endpoint = SniEndPoint("proxy.example.com", "node-a", port=9042)
13061336
other_proxy_endpoint = SniEndPoint("proxy.example.com", "node-b", port=9042)

0 commit comments

Comments
 (0)