Skip to content

Commit e8be8e5

Browse files
committed
control-connection: reconnect after late endpoint swap
1 parent 37a8146 commit e8be8e5

3 files changed

Lines changed: 55 additions & 9 deletions

File tree

cassandra/cluster.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2550,7 +2550,8 @@ def on_down_potentially_blocking(
25502550
self, host: Host, is_host_addition: bool,
25512551
down_epoch: Optional[int] = None,
25522552
expected_endpoint: Optional[EndPoint] = None,
2553-
profile_manager_already_notified: bool = False) -> Any:
2553+
profile_manager_already_notified: bool = False,
2554+
control_connection_already_notified: bool = False) -> Any:
25542555
pending_up_epoch = None
25552556
try:
25562557
down_endpoint = None
@@ -2580,7 +2581,8 @@ def on_down_potentially_blocking(
25802581
else:
25812582
if not profile_manager_already_notified:
25822583
self.profile_manager.on_down(host)
2583-
self.control_connection.on_down(host)
2584+
if not control_connection_already_notified:
2585+
self.control_connection.on_down(host)
25842586
else:
25852587
log.debug("Not signalling down for stale down handling on node %s; endpoint changed from %s",
25862588
host, expected_endpoint)
@@ -2640,7 +2642,8 @@ def on_down_potentially_blocking(
26402642
self._handle_pending_node_up(host, pending_up_epoch)
26412643

26422644
def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
2643-
expected_endpoint=None, profile_manager_already_notified=False):
2645+
expected_endpoint=None, profile_manager_already_notified=False,
2646+
control_connection_already_notified=False):
26442647
"""
26452648
Intended for internal use only.
26462649
"""
@@ -2725,7 +2728,8 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
27252728

27262729
future = self.on_down_potentially_blocking(
27272730
host, is_host_addition, down_epoch, expected_endpoint,
2728-
profile_manager_already_notified)
2731+
profile_manager_already_notified,
2732+
control_connection_already_notified)
27292733
if future is None:
27302734
pending_down = None
27312735
pending_up_epoch = None
@@ -5024,18 +5028,20 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
50245028
reconnector.cancel()
50255029
with host.lock:
50265030
old_endpoint = host.endpoint
5031+
self._cluster.profile_manager.on_down(host)
5032+
self.on_down(host)
50275033
self._cluster.on_down(
50285034
host, is_host_addition=False, expect_host_to_be_down=True,
50295035
expected_endpoint=old_endpoint,
5030-
profile_manager_already_notified=True)
5036+
profile_manager_already_notified=True,
5037+
control_connection_already_notified=True)
50315038

50325039
with host.lock:
50335040
if not self._cluster._endpoints_match(host.endpoint, old_endpoint):
50345041
log.debug("[control connection] Not updating host ip from %s to %s for (%s); "
50355042
"endpoint changed to %s",
50365043
old_endpoint, endpoint, host_id, host.endpoint)
50375044
continue
5038-
self._cluster.profile_manager.on_down(host)
50395045
host.endpoint = endpoint
50405046
self._cluster.metadata.update_host(host, old_endpoint)
50415047
self._cluster.on_up(host, expected_endpoint=endpoint)

tests/unit/test_cluster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1199,7 +1199,7 @@ def test_forced_down_is_not_discounted_by_connected_pool(self):
11991199

12001200
assert not host.is_up
12011201
cluster.on_down_potentially_blocking.assert_called_once_with(
1202-
host, False, ANY, endpoint)
1202+
host, False, ANY, endpoint, False, False)
12031203

12041204
@staticmethod
12051205
def _state(cluster, host):

tests/unit/test_control_connection.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ def on_up(self, host, expected_endpoint=None):
134134
pass
135135

136136
def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
137-
expected_endpoint=None, profile_manager_already_notified=False):
137+
expected_endpoint=None, profile_manager_already_notified=False,
138+
control_connection_already_notified=False):
138139
self.down_host = host
139140
self.down_expected_endpoint = expected_endpoint
140141

@@ -463,7 +464,8 @@ def test_change_client_route_endpoint_when_only_port_changes(self):
463464
self.cluster.on_down.assert_called_once_with(
464465
host, is_host_addition=False, expect_host_to_be_down=True,
465466
expected_endpoint=old_endpoint,
466-
profile_manager_already_notified=True)
467+
profile_manager_already_notified=True,
468+
control_connection_already_notified=True)
467469
self.cluster.on_up.assert_called_once_with(
468470
host, expected_endpoint=new_endpoint)
469471

@@ -504,6 +506,44 @@ def test_endpoint_change_preserves_live_policy_hosts_when_down_handler_runs_late
504506
cluster.executor = original_executor
505507
cluster.shutdown()
506508

509+
def test_endpoint_change_reconnects_control_connection_when_down_handler_runs_late(self):
510+
old_endpoint = DefaultEndPoint("127.0.0.1")
511+
new_endpoint = DefaultEndPoint("127.0.0.2")
512+
host_id = uuid.uuid4()
513+
cluster = Cluster(contact_points=[])
514+
original_executor = cluster.executor
515+
cluster.executor = RunOnResultExecutor()
516+
cluster._start_reconnector = Mock()
517+
cluster.control_connection._connection = Mock(endpoint=old_endpoint)
518+
cluster.control_connection.reconnect = Mock()
519+
520+
try:
521+
host = Host(
522+
old_endpoint, SimpleConvictionPolicy,
523+
datacenter="dc1", rack="rack1", host_id=host_id)
524+
host.set_up()
525+
cluster.metadata.add_or_return_host(host)
526+
cluster.profile_manager.populate(cluster, [host])
527+
cluster.endpoint_factory = Mock()
528+
cluster.endpoint_factory.create.return_value = new_endpoint
529+
cluster.control_connection._token_meta_enabled = False
530+
531+
preloaded_results = _node_meta_results(
532+
local_results=([], []),
533+
peer_results=(
534+
["rpc_address", "peer", "data_center", "rack", "host_id"],
535+
[["127.0.0.2", "127.0.0.2", "dc1", "rack1", host_id]]))
536+
537+
cluster.control_connection._refresh_node_list_and_token_map(
538+
Mock(), preloaded_results=preloaded_results)
539+
cluster.executor.run_all()
540+
541+
cluster.control_connection.reconnect.assert_called_once_with()
542+
finally:
543+
cluster.control_connection._connection = None
544+
cluster.executor = original_executor
545+
cluster.shutdown()
546+
507547
def test_stale_control_connection_failure_is_endpoint_fenced(self):
508548
host_id = uuid.uuid4()
509549
old_endpoint = ClientRoutesEndPoint(

0 commit comments

Comments
 (0)