Skip to content

Commit 3df9e28

Browse files
committed
Fix on_up() destroying healthy pool after replace-with-same-IP
When a node is replaced with the same IP, the driver receives both TOPOLOGY_CHANGE NEW_NODE and STATUS_CHANGE UP events. The NEW_NODE handler runs first, replacing the old host and establishing a new pool. The STATUS_CHANGE UP handler fires later with a stale reference to the old host object. Because Host.__eq__/__hash__ are endpoint-based, the stale on_up() tears down the new host's pool, causing a brief window where queries fail with NoHostAvailable. Add two guards at the top of on_up(): 1. If the host has been replaced in metadata (different object, same endpoint, new host already up), skip processing. 2. If all sessions already have a healthy (non-shutdown) pool for this host, call set_up() and skip the teardown/rebuild cycle. Both guards reset _currently_handling_node_up under host.lock and use host.set_up() (which resets the conviction policy), consistent with the existing cleanup paths. Refs: SCYLLADB-833
1 parent e6f9e9f commit 3df9e28

2 files changed

Lines changed: 121 additions & 0 deletions

File tree

cassandra/cluster.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1930,6 +1930,28 @@ def on_up(self, host):
19301930
have_future = False
19311931
futures = set()
19321932
try:
1933+
# Guard against stale on_up destroying a healthy pool.
1934+
# Case 1: Host was replaced in metadata (different object, same endpoint).
1935+
current_host = self.metadata.get_host(host.endpoint)
1936+
if current_host is not None and current_host is not host and current_host.is_up:
1937+
log.debug("Host %s has been replaced by %s which is already up; "
1938+
"skipping stale on_up handling", host, current_host)
1939+
with host.lock:
1940+
host._currently_handling_node_up = False
1941+
return
1942+
1943+
# Case 2: All sessions already have a healthy pool for this host.
1944+
sessions = tuple(self.sessions)
1945+
if sessions and all(
1946+
session._pools.get(host) and not session._pools[host].is_shutdown
1947+
for session in sessions):
1948+
log.debug("Host %s already has healthy pools in all sessions; "
1949+
"skipping on_up pool teardown/rebuild", host)
1950+
with host.lock:
1951+
host.set_up()
1952+
host._currently_handling_node_up = False
1953+
return
1954+
19331955
log.info("Host %s may be up; will prepare queries and open connection pool", host)
19341956

19351957
reconnector = host.get_and_set_reconnection_handler(None)

tests/unit/test_cluster.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,3 +719,102 @@ def test_no_warning_adding_lbp_ep_to_cluster_with_contact_points(self):
719719
)
720720

721721
patched_logger.warning.assert_not_called()
722+
723+
724+
class TestOnUpStaleHost(unittest.TestCase):
725+
"""
726+
Tests for on_up() not destroying a healthy pool when called with a stale
727+
host reference after a replace-with-same-IP.
728+
"""
729+
730+
def _make_cluster(self, sessions=None):
731+
"""Create a minimal Cluster object without connecting."""
732+
from threading import Lock
733+
cluster = object.__new__(Cluster)
734+
cluster.is_shutdown = False
735+
cluster.metadata = Mock()
736+
cluster.sessions = sessions or set()
737+
cluster.profile_manager = Mock()
738+
cluster.control_connection = Mock()
739+
cluster._listeners = set()
740+
cluster._listener_lock = Lock()
741+
return cluster
742+
743+
def test_on_up_skips_when_host_replaced_in_metadata(self):
744+
"""
745+
If a NEW_NODE event already replaced the old host with a new one
746+
(same endpoint, different host_id), on_up(old_host) should bail out.
747+
"""
748+
from cassandra.connection import DefaultEndPoint
749+
endpoint = DefaultEndPoint('127.0.0.1')
750+
751+
old_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
752+
old_host.is_up = False
753+
old_host._currently_handling_node_up = False
754+
755+
new_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
756+
new_host.is_up = True
757+
758+
cluster = self._make_cluster()
759+
cluster.metadata.get_host = Mock(return_value=new_host)
760+
761+
cluster.on_up(old_host)
762+
763+
self.assertFalse(old_host._currently_handling_node_up)
764+
765+
def test_on_up_skips_when_all_sessions_have_healthy_pool(self):
766+
"""
767+
If on_add already created a healthy pool in all sessions, a subsequent
768+
on_up should not tear them down and rebuild.
769+
"""
770+
from cassandra.connection import DefaultEndPoint
771+
endpoint = DefaultEndPoint('127.0.0.1')
772+
773+
host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
774+
host.is_up = False
775+
host._currently_handling_node_up = False
776+
777+
mock_pool = Mock()
778+
mock_pool.is_shutdown = False
779+
mock_session = Mock()
780+
mock_session._pools = {host: mock_pool}
781+
782+
cluster = self._make_cluster(sessions={mock_session})
783+
cluster.metadata.get_host = Mock(return_value=host)
784+
785+
cluster.on_up(host)
786+
787+
mock_session.remove_pool.assert_not_called()
788+
self.assertTrue(host.is_up)
789+
self.assertFalse(host._currently_handling_node_up)
790+
791+
def test_on_up_proceeds_when_some_sessions_missing_pool(self):
792+
"""
793+
If only some sessions have a pool, on_up should proceed normally
794+
(not short-circuit) so all sessions get reconciled.
795+
"""
796+
from cassandra.connection import DefaultEndPoint
797+
endpoint = DefaultEndPoint('127.0.0.1')
798+
799+
host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
800+
host.is_up = False
801+
host._currently_handling_node_up = False
802+
803+
mock_pool = Mock()
804+
mock_pool.is_shutdown = False
805+
session_with_pool = Mock()
806+
session_with_pool._pools = {host: mock_pool}
807+
808+
session_without_pool = Mock()
809+
session_without_pool._pools = {}
810+
session_without_pool.add_or_renew_pool = Mock(return_value=None)
811+
812+
cluster = self._make_cluster(sessions={session_with_pool, session_without_pool})
813+
cluster.metadata.get_host = Mock(return_value=host)
814+
cluster.profile_manager.distance = Mock(return_value=HostDistance.IGNORED)
815+
816+
cluster.on_up(host)
817+
818+
# Should have proceeded to remove_pool (normal path)
819+
session_with_pool.remove_pool.assert_called_once_with(host)
820+
session_without_pool.remove_pool.assert_called_once_with(host)

0 commit comments

Comments
 (0)