Skip to content

Commit 725ae74

Browse files
committed
Fix on_up() destroying healthy pool after replace-with-same-IP
When a node is replaced with the same IP, the driver receives both TOPOLOGY_CHANGE NEW_NODE and STATUS_CHANGE UP events. The NEW_NODE handler runs first, replacing the old host and establishing a new pool. The STATUS_CHANGE UP handler fires later with a stale reference to the old host object. Because Host.__eq__/__hash__ are endpoint-based, the stale on_up() tears down the new host's pool, causing a brief window where queries fail with NoHostAvailable. Add guards in on_up(): 1. If the host has been replaced in metadata (different object, same endpoint, new host already up), skip processing entirely. 2. Per-session: if a session already has a healthy (non-shutdown) pool for this host, skip remove/rebuild for that session only. The rest of on_up() bookkeeping (reconnector cancel, query preparation, LBP and control connection notifications) still runs unconditionally. Both guards reset _currently_handling_node_up under host.lock and use host.set_up() (which resets the conviction policy), consistent with the existing cleanup paths. Refs: SCYLLADB-833
1 parent e6f9e9f commit 725ae74

2 files changed

Lines changed: 130 additions & 0 deletions

File tree

cassandra/cluster.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1930,6 +1930,16 @@ def on_up(self, host):
19301930
have_future = False
19311931
futures = set()
19321932
try:
1933+
# Guard against stale on_up destroying a healthy pool.
1934+
# Case 1: Host was replaced in metadata (different object, same endpoint).
1935+
current_host = self.metadata.get_host(host.endpoint)
1936+
if current_host is not None and current_host is not host and current_host.is_up:
1937+
log.debug("Host %s has been replaced by %s which is already up; "
1938+
"skipping stale on_up handling", host, current_host)
1939+
with host.lock:
1940+
host._currently_handling_node_up = False
1941+
return
1942+
19331943
log.info("Host %s may be up; will prepare queries and open connection pool", host)
19341944

19351945
reconnector = host.get_and_set_reconnection_handler(None)
@@ -1942,6 +1952,11 @@ def on_up(self, host):
19421952
log.debug("Done preparing all queries for host %s, ", host)
19431953

19441954
for session in tuple(self.sessions):
1955+
pool = session._pools.get(host)
1956+
if pool and not pool.is_shutdown:
1957+
log.debug("Session %s already has a healthy pool for host %s; "
1958+
"skipping remove/rebuild", session, host)
1959+
continue
19451960
session.remove_pool(host)
19461961

19471962
log.debug("Signalling to load balancing policies that host %s is up", host)
@@ -1955,6 +1970,9 @@ def on_up(self, host):
19551970
futures_results = []
19561971
callback = partial(self._on_up_future_completed, host, futures, futures_results, futures_lock)
19571972
for session in tuple(self.sessions):
1973+
pool = session._pools.get(host)
1974+
if pool and not pool.is_shutdown:
1975+
continue
19581976
future = session.add_or_renew_pool(host, is_host_addition=False)
19591977
if future is not None:
19601978
have_future = True

tests/unit/test_cluster.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,3 +719,115 @@ def test_no_warning_adding_lbp_ep_to_cluster_with_contact_points(self):
719719
)
720720

721721
patched_logger.warning.assert_not_called()
722+
723+
724+
class TestOnUpStaleHost(unittest.TestCase):
725+
"""
726+
Tests for on_up() guards after a replace-with-same-IP scenario.
727+
728+
Verifies that:
729+
- A stale host reference (replaced in metadata) triggers a full skip.
730+
- Per-session healthy pools are preserved (no teardown/rebuild), while
731+
the rest of on_up() bookkeeping (LBP, control connection) still runs.
732+
- Sessions missing a pool still get the normal rebuild path.
733+
"""
734+
735+
def _make_cluster(self, sessions=None):
736+
"""Create a minimal Cluster object without connecting."""
737+
from threading import Lock
738+
cluster = object.__new__(Cluster)
739+
cluster.is_shutdown = False
740+
cluster.metadata = Mock()
741+
cluster.sessions = sessions or set()
742+
cluster.profile_manager = Mock()
743+
cluster.control_connection = Mock()
744+
cluster._listeners = set()
745+
cluster._listener_lock = Lock()
746+
return cluster
747+
748+
def test_on_up_skips_when_host_replaced_in_metadata(self):
749+
"""
750+
If a NEW_NODE event already replaced the old host with a new one
751+
(same endpoint, different host_id), on_up(old_host) should bail out.
752+
"""
753+
from cassandra.connection import DefaultEndPoint
754+
endpoint = DefaultEndPoint('127.0.0.1')
755+
756+
old_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
757+
old_host.is_up = False
758+
old_host._currently_handling_node_up = False
759+
760+
new_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
761+
new_host.is_up = True
762+
763+
cluster = self._make_cluster()
764+
cluster.metadata.get_host = Mock(return_value=new_host)
765+
766+
cluster.on_up(old_host)
767+
768+
self.assertFalse(old_host._currently_handling_node_up)
769+
770+
def test_on_up_skips_rebuild_when_session_has_healthy_pool(self):
771+
"""
772+
If on_add already created a healthy pool in a session, a subsequent
773+
on_up should not tear it down and rebuild, but should still run
774+
the rest of the on_up bookkeeping (LBP, control connection).
775+
"""
776+
from cassandra.connection import DefaultEndPoint
777+
endpoint = DefaultEndPoint('127.0.0.1')
778+
779+
host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
780+
host.is_up = False
781+
host._currently_handling_node_up = False
782+
783+
mock_pool = Mock()
784+
mock_pool.is_shutdown = False
785+
mock_session = Mock()
786+
mock_session._pools = {host: mock_pool}
787+
788+
cluster = self._make_cluster(sessions={mock_session})
789+
cluster.metadata.get_host = Mock(return_value=host)
790+
cluster.profile_manager.distance = Mock(return_value=HostDistance.IGNORED)
791+
792+
cluster.on_up(host)
793+
794+
mock_session.remove_pool.assert_not_called()
795+
mock_session.add_or_renew_pool.assert_not_called()
796+
cluster.profile_manager.on_up.assert_called_once_with(host)
797+
cluster.control_connection.on_up.assert_called_once_with(host)
798+
self.assertTrue(host.is_up)
799+
self.assertFalse(host._currently_handling_node_up)
800+
801+
def test_on_up_only_rebuilds_sessions_missing_pool(self):
802+
"""
803+
If only some sessions have a pool, on_up should only tear down/rebuild
804+
for sessions that lack a healthy pool.
805+
"""
806+
from cassandra.connection import DefaultEndPoint
807+
endpoint = DefaultEndPoint('127.0.0.1')
808+
809+
host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
810+
host.is_up = False
811+
host._currently_handling_node_up = False
812+
813+
mock_pool = Mock()
814+
mock_pool.is_shutdown = False
815+
session_with_pool = Mock()
816+
session_with_pool._pools = {host: mock_pool}
817+
818+
session_without_pool = Mock()
819+
session_without_pool._pools = {}
820+
session_without_pool.add_or_renew_pool = Mock(return_value=None)
821+
822+
cluster = self._make_cluster(sessions={session_with_pool, session_without_pool})
823+
cluster.metadata.get_host = Mock(return_value=host)
824+
cluster.profile_manager.distance = Mock(return_value=HostDistance.IGNORED)
825+
826+
cluster.on_up(host)
827+
828+
# Session with healthy pool should NOT be torn down
829+
session_with_pool.remove_pool.assert_not_called()
830+
session_with_pool.add_or_renew_pool.assert_not_called()
831+
# Session without pool should get remove + rebuild
832+
session_without_pool.remove_pool.assert_called_once_with(host)
833+
session_without_pool.add_or_renew_pool.assert_called_once_with(host, is_host_addition=False)

0 commit comments

Comments
 (0)