Skip to content

Commit 28be334

Browse files
committed
cluster: fix host state pool refresh races
1 parent 0842348 commit 28be334

3 files changed

Lines changed: 87 additions & 10 deletions

File tree

cassandra/cluster.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1975,6 +1975,10 @@ def on_up(self, host):
19751975
with host.lock:
19761976
host.set_up()
19771977
host._currently_handling_node_up = False
1978+
for listener in self.listeners:
1979+
listener.on_up(host)
1980+
for session in tuple(self.sessions):
1981+
session.update_created_pools()
19781982

19791983
# for testing purposes
19801984
return futures
@@ -2020,7 +2024,7 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
20202024
Intended for internal use only.
20212025
"""
20222026
if self.is_shutdown:
2023-
return
2027+
return False
20242028

20252029
with host.lock:
20262030
was_up = host.is_up
@@ -2035,14 +2039,15 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False):
20352039
if pool_state:
20362040
connected |= pool_state['open_count'] > 0
20372041
if connected:
2038-
return
2042+
return False
20392043

20402044
host.set_down()
20412045
if (not was_up and not expect_host_to_be_down) or host.is_currently_reconnecting():
2042-
return
2046+
return False
20432047
log.warning("Host %s has been marked down", host)
20442048

20452049
self.on_down_potentially_blocking(host, is_host_addition)
2050+
return True
20462051

20472052
def on_add(self, host, refresh_nodes=True):
20482053
if self.is_shutdown:
@@ -2134,8 +2139,8 @@ def on_remove(self, host):
21342139
def signal_connection_failure(self, host, connection_exc, is_host_addition, expect_host_to_be_down=False):
21352140
is_down = host.signal_connection_failure(connection_exc)
21362141
if is_down:
2137-
self.on_down(host, is_host_addition, expect_host_to_be_down)
2138-
return is_down
2142+
return self.on_down(host, is_host_addition, expect_host_to_be_down)
2143+
return False
21392144

21402145
def add_host(self, endpoint, datacenter=None, rack=None, signal=True, refresh_nodes=True, host_id=None):
21412146
"""
@@ -3315,7 +3320,9 @@ def update_created_pools(self):
33153320
# we don't eagerly set is_up on previously ignored hosts. None is included here
33163321
# to allow us to attempt connections to hosts that have gone from ignored to something
33173322
# else.
3318-
if distance != HostDistance.IGNORED and host.is_up in (True, None):
3323+
if (distance != HostDistance.IGNORED and
3324+
host.is_up in (True, None) and
3325+
not getattr(host, '_currently_handling_node_up', False)):
33193326
future = self.add_or_renew_pool(host, False)
33203327
elif distance != pool.host_distance:
33213328
# the distance has changed
@@ -4226,9 +4233,10 @@ def _signal_error(self):
42264233
# host may be None if it's already been removed, but that indicates
42274234
# that errors have already been reported, so we're fine
42284235
if host:
4229-
self._cluster.signal_connection_failure(
4236+
is_down = self._cluster.signal_connection_failure(
42304237
host, self._connection.last_error, is_host_addition=False)
4231-
return
4238+
if is_down:
4239+
return
42324240

42334241
# if the connection is not defunct or the host already left, reconnect
42344242
# manually

tests/unit/test_cluster.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import logging
1717
import socket
18+
from concurrent.futures import Future
1819

1920
from unittest.mock import patch, Mock
2021
import uuid
@@ -229,6 +230,27 @@ def test_connection_factory_passes_compression_kwarg(self):
229230
assert factory.call_args.kwargs['compression'] == expected
230231
assert cluster.compression == expected
231232

233+
def test_on_up_without_pool_futures_notifies_listeners(self):
234+
cluster = Cluster(load_balancing_policy=RoundRobinPolicy(), protocol_version=4)
235+
self.addCleanup(cluster.shutdown)
236+
237+
host = Host("127.0.0.1", SimpleConvictionPolicy, host_id=uuid.uuid4())
238+
host.set_down()
239+
cluster.metadata.add_or_return_host(host)
240+
241+
session = Mock()
242+
session.add_or_renew_pool.return_value = None
243+
cluster.sessions.add(session)
244+
245+
listener = Mock()
246+
cluster.register_listener(listener)
247+
248+
cluster.on_up(host)
249+
250+
assert host.is_up is True
251+
listener.on_up.assert_called_once_with(host)
252+
session.update_created_pools.assert_called_once_with()
253+
232254

233255
class SchedulerTest(unittest.TestCase):
234256
# TODO: this suite could be expanded; for now just adding a test covering a ticket
@@ -339,6 +361,28 @@ def test_set_keyspace_escapes_quotes(self, *_):
339361
assert query == 'USE simple_ks', (
340362
"Simple keyspace names should not be quoted, got: %r" % query)
341363

364+
def test_update_created_pools_skips_host_with_node_up_in_progress(self):
365+
cluster = Cluster(load_balancing_policy=RoundRobinPolicy(), protocol_version=4)
366+
self.addCleanup(cluster.shutdown)
367+
368+
host = Host("127.0.0.1", SimpleConvictionPolicy, host_id=uuid.uuid4())
369+
cluster.metadata.add_or_return_host(host)
370+
cluster.profile_manager.populate(cluster, [host])
371+
cluster.profile_manager.on_up(host)
372+
373+
completed = Future()
374+
completed.set_result(True)
375+
376+
with patch.object(Session, "add_or_renew_pool", return_value=completed) as add_or_renew_pool:
377+
session = Session(cluster, [host])
378+
add_or_renew_pool.reset_mock()
379+
380+
session._pools = {}
381+
host._currently_handling_node_up = True
382+
383+
assert session.update_created_pools() == set()
384+
add_or_renew_pool.assert_not_called()
385+
342386
class ProtocolVersionTests(unittest.TestCase):
343387

344388
def test_protocol_downgrade_test(self):

tests/unit/test_control_connection.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,16 @@
1313
# limitations under the License.
1414

1515
import unittest
16+
import uuid
1617

1718
from concurrent.futures import ThreadPoolExecutor
1819
from unittest.mock import Mock, ANY, call
1920

2021
from cassandra import OperationTimedOut, SchemaTargetType, SchemaChangeType
2122
from cassandra.protocol import ResultMessage, RESULT_KIND_ROWS
22-
from cassandra.cluster import ControlConnection, _Scheduler, ProfileManager, EXEC_PROFILE_DEFAULT, ExecutionProfile
23+
from cassandra.cluster import Cluster, ControlConnection, _Scheduler, ProfileManager, EXEC_PROFILE_DEFAULT, ExecutionProfile
2324
from cassandra.pool import Host
24-
from cassandra.connection import EndPoint, DefaultEndPoint, DefaultEndPointFactory
25+
from cassandra.connection import EndPoint, DefaultEndPoint, DefaultEndPointFactory, ConnectionException
2526
from cassandra.policies import (SimpleConvictionPolicy, RoundRobinPolicy,
2627
ConstantReconnectionPolicy, IdentityTranslator)
2728

@@ -301,6 +302,30 @@ def test_wait_for_schema_agreement_none_timeout(self):
301302
cc._time = self.time
302303
assert cc.wait_for_schema_agreement()
303304

305+
def test_signal_error_reconnects_when_host_down_signal_is_discounted(self):
306+
cluster = Cluster(load_balancing_policy=RoundRobinPolicy(), protocol_version=4)
307+
self.addCleanup(cluster.shutdown)
308+
309+
host = Host(DefaultEndPoint("127.0.0.1"), SimpleConvictionPolicy, host_id=uuid.uuid4())
310+
host.set_up()
311+
cluster.metadata.add_or_return_host(host)
312+
313+
session = Mock()
314+
session.get_pool_state.return_value = {host: {"open_count": 1}}
315+
cluster.sessions.add(session)
316+
317+
connection_error = ConnectionException("control connection failed", endpoint=host.endpoint)
318+
cluster.control_connection._connection = Mock(
319+
endpoint=host.endpoint,
320+
is_defunct=True,
321+
last_error=connection_error)
322+
cluster.control_connection.reconnect = Mock()
323+
324+
cluster.control_connection._signal_error()
325+
326+
assert host.is_up is True
327+
cluster.control_connection.reconnect.assert_called_once_with()
328+
304329
def test_refresh_nodes_and_tokens(self):
305330
self.control_connection.refresh_node_list_and_token_map()
306331
meta = self.cluster.metadata

0 commit comments

Comments
 (0)