Skip to content

Commit a2a26ca

Browse files
committed
control-connection: preserve policy state on endpoint swap
1 parent 9f957e3 commit a2a26ca

2 files changed

Lines changed: 93 additions & 10 deletions

File tree

cassandra/cluster.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2549,7 +2549,8 @@ def _start_reconnector(self, host, is_host_addition, expected_down_epoch=None,
25492549
def on_down_potentially_blocking(
25502550
self, host: Host, is_host_addition: bool,
25512551
down_epoch: Optional[int] = None,
2552-
expected_endpoint: Optional[EndPoint] = None) -> Any:
2552+
expected_endpoint: Optional[EndPoint] = None,
2553+
profile_manager_already_notified: bool = False) -> Any:
25532554
pending_up_epoch = None
25542555
try:
25552556
down_endpoint = None
@@ -2577,7 +2578,8 @@ def on_down_potentially_blocking(
25772578
host, down_endpoint)
25782579
endpoint_matches = False
25792580
else:
2580-
self.profile_manager.on_down(host)
2581+
if not profile_manager_already_notified:
2582+
self.profile_manager.on_down(host)
25812583
self.control_connection.on_down(host)
25822584
else:
25832585
log.debug("Not signalling down for stale down handling on node %s; endpoint changed from %s",
@@ -2638,7 +2640,7 @@ def on_down_potentially_blocking(
26382640
self._handle_pending_node_up(host, pending_up_epoch)
26392641

26402642
def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
2641-
expected_endpoint=None):
2643+
expected_endpoint=None, profile_manager_already_notified=False):
26422644
"""
26432645
Intended for internal use only.
26442646
"""
@@ -2722,7 +2724,8 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
27222724
log.warning("Host %s has been marked down", host)
27232725

27242726
future = self.on_down_potentially_blocking(
2725-
host, is_host_addition, down_epoch, expected_endpoint)
2727+
host, is_host_addition, down_epoch, expected_endpoint,
2728+
profile_manager_already_notified)
27262729
if future is None:
27272730
pending_down = None
27282731
pending_up_epoch = None
@@ -5023,17 +5026,19 @@ def _refresh_node_list_and_token_map(self, connection, preloaded_results=None,
50235026
old_endpoint = host.endpoint
50245027
self._cluster.on_down(
50255028
host, is_host_addition=False, expect_host_to_be_down=True,
5026-
expected_endpoint=old_endpoint)
5029+
expected_endpoint=old_endpoint,
5030+
profile_manager_already_notified=True)
50275031

50285032
with host.lock:
50295033
if not self._cluster._endpoints_match(host.endpoint, old_endpoint):
50305034
log.debug("[control connection] Not updating host ip from %s to %s for (%s); "
50315035
"endpoint changed to %s",
50325036
old_endpoint, endpoint, host_id, host.endpoint)
50335037
continue
5038+
self._cluster.profile_manager.on_down(host)
50345039
host.endpoint = endpoint
50355040
self._cluster.metadata.update_host(host, old_endpoint)
5036-
self._cluster.on_up(host)
5041+
self._cluster.on_up(host, expected_endpoint=endpoint)
50375042

50385043
if host is None:
50395044
log.debug("[control connection] Found new host to connect to: %s", endpoint)

tests/unit/test_control_connection.py

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import unittest
1616
import uuid
1717

18-
from concurrent.futures import ThreadPoolExecutor
18+
from concurrent.futures import Future, ThreadPoolExecutor
1919
from unittest.mock import Mock, ANY, call
2020

2121
from cassandra import OperationTimedOut, SchemaTargetType, SchemaChangeType
@@ -134,7 +134,7 @@ def on_up(self, host, expected_endpoint=None):
134134
pass
135135

136136
def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
137-
expected_endpoint=None):
137+
expected_endpoint=None, profile_manager_already_notified=False):
138138
self.down_host = host
139139
self.down_expected_endpoint = expected_endpoint
140140

@@ -154,6 +154,45 @@ def _node_meta_results(local_results, peer_results):
154154
return peer_response, local_response
155155

156156

157+
class RunOnResultFuture(Future):
158+
159+
def __init__(self, fn, args, kwargs):
160+
Future.__init__(self)
161+
self._fn = fn
162+
self._args = args
163+
self._kwargs = kwargs
164+
165+
def _run_once(self):
166+
if self.done():
167+
return
168+
try:
169+
self.set_result(self._fn(*self._args, **self._kwargs))
170+
except Exception as exc:
171+
self.set_exception(exc)
172+
173+
def result(self, timeout=None):
174+
self._run_once()
175+
return Future.result(self, timeout)
176+
177+
178+
class RunOnResultExecutor(object):
179+
180+
def __init__(self):
181+
self.futures = []
182+
183+
def submit(self, fn, *args, **kwargs):
184+
future = RunOnResultFuture(fn, args, kwargs)
185+
self.futures.append(future)
186+
return future
187+
188+
def run_all(self):
189+
for future in tuple(self.futures):
190+
future._run_once()
191+
192+
def shutdown(self):
193+
self.run_all()
194+
195+
157196
class MockConnection(object):
158197

159198
is_defunct = False
@@ -423,8 +462,47 @@ def test_change_client_route_endpoint_when_only_port_changes(self):
423462
assert Cluster._endpoints_match(host.endpoint, new_endpoint)
424463
self.cluster.on_down.assert_called_once_with(
425464
host, is_host_addition=False, expect_host_to_be_down=True,
426-
expected_endpoint=old_endpoint)
427-
self.cluster.on_up.assert_called_once_with(host)
465+
expected_endpoint=old_endpoint,
466+
profile_manager_already_notified=True)
467+
self.cluster.on_up.assert_called_once_with(
468+
host, expected_endpoint=new_endpoint)
469+
470+
def test_endpoint_change_preserves_live_policy_hosts_when_down_handler_runs_late(self):
471+
old_endpoint = DefaultEndPoint("127.0.0.1")
472+
new_endpoint = DefaultEndPoint("127.0.0.2")
473+
host_id = uuid.uuid4()
474+
policy = RoundRobinPolicy()
475+
cluster = Cluster(contact_points=[], load_balancing_policy=policy)
476+
original_executor = cluster.executor
477+
cluster.executor = RunOnResultExecutor()
478+
cluster._start_reconnector = Mock()
479+
480+
try:
481+
host = Host(
482+
old_endpoint, SimpleConvictionPolicy,
483+
datacenter="dc1", rack="rack1", host_id=host_id)
484+
host.set_up()
485+
cluster.metadata.add_or_return_host(host)
486+
cluster.profile_manager.populate(cluster, [host])
487+
cluster.endpoint_factory = Mock()
488+
cluster.endpoint_factory.create.return_value = new_endpoint
489+
cluster.control_connection._token_meta_enabled = False
490+
491+
preloaded_results = _node_meta_results(
492+
local_results=([], []),
493+
peer_results=(
494+
["rpc_address", "peer", "data_center", "rack", "host_id"],
495+
[["127.0.0.2", "127.0.0.2", "dc1", "rack1", host_id]]))
496+
497+
cluster.control_connection._refresh_node_list_and_token_map(
498+
Mock(), preloaded_results=preloaded_results)
499+
cluster.executor.run_all()
500+
501+
assert list(policy._live_hosts) == [host]
502+
assert host in policy._live_hosts
503+
finally:
504+
cluster.executor = original_executor
505+
cluster.shutdown()
428506

429507
def test_stale_control_connection_failure_is_endpoint_fenced(self):
430508
host_id = uuid.uuid4()

0 commit comments

Comments
 (0)