Skip to content

Commit 788e878

Browse files
committed
cluster: preserve endpoint for queued down handling
1 parent e8be8e5 commit 788e878

2 files changed

Lines changed: 65 additions & 20 deletions

File tree

cassandra/cluster.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2686,11 +2686,11 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
26862686

26872687
was_up = host.is_up
26882688
state = self._get_host_liveness_state(host)
2689+
down_endpoint = expected_endpoint if expected_endpoint is not None else host.endpoint
26892690
pending_down_endpoint = None
26902691
if state.down_endpoint is not None:
2691-
target_endpoint = expected_endpoint if expected_endpoint is not None else host.endpoint
2692-
if not self._endpoints_match(state.down_endpoint, target_endpoint):
2693-
pending_down_endpoint = target_endpoint
2692+
if not self._endpoints_match(state.down_endpoint, down_endpoint):
2693+
pending_down_endpoint = down_endpoint
26942694

26952695
if pending_down_endpoint is not None:
26962696
state.advance()
@@ -2723,11 +2723,11 @@ def on_down(self, host, is_host_addition, expect_host_to_be_down=False,
27232723
state.up_epoch is None):
27242724
return
27252725
state.down_epoch = down_epoch
2726-
state.down_endpoint = expected_endpoint
2726+
state.down_endpoint = down_endpoint
27272727
log.warning("Host %s has been marked down", host)
27282728

27292729
future = self.on_down_potentially_blocking(
2730-
host, is_host_addition, down_epoch, expected_endpoint,
2730+
host, is_host_addition, down_epoch, down_endpoint,
27312731
profile_manager_already_notified,
27322732
control_connection_already_notified)
27332733
if future is None:

tests/unit/test_cluster.py

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,6 +1237,31 @@ def test_stale_down_handling_is_ignored_after_host_is_up(self):
12371237
listener.on_down.assert_not_called()
12381238
cluster._start_reconnector.assert_not_called()
12391239

1240+
def test_stale_generic_down_handling_uses_original_endpoint_after_endpoint_swap(self):
1241+
executor = _QueuedExecutor()
1242+
session = Mock()
1243+
listener = Mock()
1244+
cluster = self._make_cluster(session=session, listener=listener)
1245+
cluster.executor = executor
1246+
cluster.profile_manager.distance.return_value = HostDistance.LOCAL
1247+
host = self._make_host()
1248+
host.set_up()
1249+
old_endpoint = host.endpoint
1250+
new_endpoint = DefaultEndPoint("127.0.0.2")
1251+
1252+
Cluster.on_down(cluster, host, is_host_addition=False)
1253+
host.endpoint = new_endpoint
1254+
1255+
executor.run_next()
1256+
1257+
cluster.profile_manager.on_down.assert_not_called()
1258+
cluster.control_connection.on_down.assert_not_called()
1259+
session.on_down.assert_called_once_with(
1260+
host, expected_endpoint=old_endpoint)
1261+
listener.on_down.assert_not_called()
1262+
cluster._start_reconnector.assert_not_called()
1263+
assert self._state(cluster, host).down_epoch is None
1264+
12401265
def test_unreserved_down_handling_is_ignored_during_host_up_handling(self):
12411266
session = Mock()
12421267
cluster = self._make_cluster(session=session)
@@ -1435,7 +1460,9 @@ def test_newer_forced_down_during_up_handling_is_preserved(self):
14351460
session.on_down.assert_called_once_with(
14361461
host, expected_endpoint=host.endpoint)
14371462
listener.on_down.assert_called_once_with(host)
1438-
cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY)
1463+
cluster._start_reconnector.assert_called_once_with(
1464+
host, False, expected_down_epoch=ANY,
1465+
expected_endpoint=host.endpoint)
14391466
assert state.epoch > first_up_epoch
14401467
assert state.up_epoch == first_up_epoch
14411468
assert not host.is_up
@@ -1469,7 +1496,9 @@ def test_stale_failed_up_callback_does_not_cleanup_newer_down(self):
14691496
session.on_down.assert_called_once_with(
14701497
host, expected_endpoint=host.endpoint)
14711498
listener.on_down.assert_called_once_with(host)
1472-
cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY)
1499+
cluster._start_reconnector.assert_called_once_with(
1500+
host, False, expected_down_epoch=ANY,
1501+
expected_endpoint=host.endpoint)
14731502
listener.on_up.assert_not_called()
14741503
assert not host.is_up
14751504
assert self._state(cluster, host).up_epoch is None
@@ -1503,7 +1532,8 @@ def force_down_before_cleanup(message, *args, **kwargs):
15031532
host, expected_endpoint=host.endpoint)
15041533
listener.on_down.assert_called_once_with(host)
15051534
cluster._start_reconnector.assert_called_once_with(
1506-
host, False, expected_down_epoch=ANY)
1535+
host, False, expected_down_epoch=ANY,
1536+
expected_endpoint=host.endpoint)
15071537
assert session.remove_pool.call_count == 1
15081538
listener.on_up.assert_not_called()
15091539
assert not host.is_up
@@ -1561,7 +1591,9 @@ def force_down_before_reconnector_is_cleared(h, up_epoch, **kwargs):
15611591
session.on_down.assert_called_once_with(
15621592
host, expected_endpoint=host.endpoint)
15631593
listener.on_down.assert_called_once_with(host)
1564-
cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY)
1594+
cluster._start_reconnector.assert_called_once_with(
1595+
host, False, expected_down_epoch=ANY,
1596+
expected_endpoint=host.endpoint)
15651597
cluster.profile_manager.on_up.assert_not_called()
15661598
cluster.control_connection.on_up.assert_not_called()
15671599
old_reconnector.cancel.assert_called_once_with()
@@ -1585,7 +1617,9 @@ def test_forced_down_while_reconnecting_runs_new_down_handling(self):
15851617
session.on_down.assert_called_once_with(
15861618
host, expected_endpoint=host.endpoint)
15871619
listener.on_down.assert_called_once_with(host)
1588-
cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY)
1620+
cluster._start_reconnector.assert_called_once_with(
1621+
host, False, expected_down_epoch=ANY,
1622+
expected_endpoint=host.endpoint)
15891623
assert self._state(cluster, host).down_epoch is None
15901624

15911625
def test_newer_down_before_up_side_effects_suppresses_stale_up(self):
@@ -1612,7 +1646,9 @@ def force_down_before_first_superseded_check(h, up_epoch):
16121646
cluster.control_connection.on_down.assert_called_once_with(host)
16131647
cluster.profile_manager.on_up.assert_not_called()
16141648
cluster.control_connection.on_up.assert_not_called()
1615-
cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY)
1649+
cluster._start_reconnector.assert_called_once_with(
1650+
host, False, expected_down_epoch=ANY,
1651+
expected_endpoint=host.endpoint)
16161652
assert not host.is_up
16171653
assert self._state(cluster, host).up_epoch is None
16181654
assert self._state(cluster, host).down_epoch is None
@@ -1820,7 +1856,9 @@ def test_down_during_up_listener_is_handled(self):
18201856
session.on_down.assert_called_once_with(
18211857
host, expected_endpoint=host.endpoint)
18221858
listener.on_down.assert_called_once_with(host)
1823-
cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY)
1859+
cluster._start_reconnector.assert_called_once_with(
1860+
host, False, expected_down_epoch=ANY,
1861+
expected_endpoint=host.endpoint)
18241862
assert not host.is_up
18251863
assert self._state(cluster, host).up_epoch is None
18261864
assert self._state(cluster, host).down_epoch is None
@@ -1944,7 +1982,9 @@ def test_on_up_queues_after_down_is_submitted_before_worker_runs(self):
19441982
session.on_down.assert_called_once_with(
19451983
host, expected_endpoint=host.endpoint)
19461984
listener.on_down.assert_called_once_with(host)
1947-
cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY)
1985+
cluster._start_reconnector.assert_called_once_with(
1986+
host, False, expected_down_epoch=ANY,
1987+
expected_endpoint=host.endpoint)
19481988
cluster.profile_manager.on_up.assert_called_once_with(host)
19491989
cluster.control_connection.on_up.assert_called_once_with(host)
19501990
assert host.is_up
@@ -1966,6 +2006,7 @@ def test_on_up_stays_queued_after_endpoint_update_before_down_worker_runs(self):
19662006
Cluster.on_down(
19672007
cluster, host, is_host_addition=False, expect_host_to_be_down=True)
19682008
state = self._state(cluster, host)
2009+
old_endpoint = host.endpoint
19692010

19702011
host.endpoint = DefaultEndPoint("127.0.0.2")
19712012

@@ -1980,12 +2021,12 @@ def test_on_up_stays_queued_after_endpoint_update_before_down_worker_runs(self):
19802021

19812022
executor.run_next()
19822023

1983-
cluster.profile_manager.on_down.assert_called_once_with(host)
1984-
cluster.control_connection.on_down.assert_called_once_with(host)
2024+
cluster.profile_manager.on_down.assert_not_called()
2025+
cluster.control_connection.on_down.assert_not_called()
19852026
session.on_down.assert_called_once_with(
1986-
host, expected_endpoint=host.endpoint)
1987-
listener.on_down.assert_called_once_with(host)
1988-
cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY)
2027+
host, expected_endpoint=old_endpoint)
2028+
listener.on_down.assert_not_called()
2029+
cluster._start_reconnector.assert_not_called()
19892030
cluster.profile_manager.on_up.assert_called_once_with(host)
19902031
cluster.control_connection.on_up.assert_called_once_with(host)
19912032
assert host.is_up
@@ -2397,7 +2438,9 @@ def test_real_down_for_unknown_host_marks_host_down(self):
23972438
assert host.is_up is False
23982439
cluster.profile_manager.on_down.assert_called_once_with(host)
23992440
cluster.control_connection.on_down.assert_called_once_with(host)
2400-
cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY)
2441+
cluster._start_reconnector.assert_called_once_with(
2442+
host, False, expected_down_epoch=ANY,
2443+
expected_endpoint=host.endpoint)
24012444

24022445
def test_expected_down_for_unknown_host_marks_host_down(self):
24032446
cluster = self._make_cluster()
@@ -2409,7 +2452,9 @@ def test_expected_down_for_unknown_host_marks_host_down(self):
24092452
assert host.is_up is False
24102453
cluster.profile_manager.on_down.assert_called_once_with(host)
24112454
cluster.control_connection.on_down.assert_called_once_with(host)
2412-
cluster._start_reconnector.assert_called_once_with(host, False, expected_down_epoch=ANY)
2455+
cluster._start_reconnector.assert_called_once_with(
2456+
host, False, expected_down_epoch=ANY,
2457+
expected_endpoint=host.endpoint)
24132458

24142459

24152460
class SessionTest(unittest.TestCase):

0 commit comments

Comments
 (0)