Skip to content

Commit 880b0a6

Browse files
committed
cluster: clean up failed host additions
1 parent 87a9792 commit 880b0a6

2 files changed

Lines changed: 38 additions & 7 deletions

File tree

cassandra/cluster.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1870,6 +1870,18 @@ def _cleanup_failed_on_up_handling(self, host):
18701870

18711871
self._start_reconnector(host, is_host_addition=False)
18721872

1873+
def _cleanup_failed_on_add_handling(self, host):
1874+
with host.lock:
1875+
host.set_down()
1876+
host._currently_handling_node_addition = False
1877+
1878+
self.profile_manager.on_down(host)
1879+
self.control_connection.on_down(host)
1880+
for session in tuple(self.sessions):
1881+
session.remove_pool(host)
1882+
1883+
self._start_reconnector(host, is_host_addition=True)
1884+
18731885
def _on_up_future_completed(self, host, futures, results, lock, finished_future):
18741886
with lock:
18751887
futures.discard(finished_future)
@@ -2100,14 +2112,12 @@ def future_completed(future):
21002112

21012113
for exc in [f for f in futures_results if isinstance(f, Exception)]:
21022114
log.error("Unexpected failure while adding node %s, will not mark up:", host, exc_info=exc)
2103-
with host.lock:
2104-
host._currently_handling_node_addition = False
2115+
self._cleanup_failed_on_add_handling(host)
21052116
return
21062117

21072118
if not all(futures_results):
21082119
log.warning("Connection pool could not be created, not marking node %s up", host)
2109-
with host.lock:
2110-
host._currently_handling_node_addition = False
2120+
self._cleanup_failed_on_add_handling(host)
21112121
return
21122122

21132123
self._finalize_add(host)
@@ -2127,8 +2137,7 @@ def future_completed(future):
21272137
add_aborted = True
21282138
for future in tuple(futures):
21292139
future.cancel()
2130-
with host.lock:
2131-
host._currently_handling_node_addition = False
2140+
self._cleanup_failed_on_add_handling(host)
21322141
raise
21332142

21342143
def _finalize_add(self, host, set_up=True):

tests/unit/test_cluster.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def test_tuple_for_contact_points(self):
103103

104104
def test_on_add_clears_in_progress_flag_when_later_session_add_fails(self):
105105
cluster = Cluster(protocol_version=4)
106-
host = Host("127.0.0.1", SimpleConvictionPolicy, host_id=uuid.uuid4())
106+
host = Host("127.0.0.1", SimpleConvictionPolicy, datacenter="dc1", rack="rack1", host_id=uuid.uuid4())
107107
successful_session = Mock()
108108
successful_session.add_or_renew_pool.return_value = Future()
109109
successful_session.update_created_pools.return_value = set()
@@ -116,6 +116,8 @@ def test_on_add_clears_in_progress_flag_when_later_session_add_fails(self):
116116
cluster.on_add(host, refresh_nodes=False)
117117

118118
assert not host._currently_handling_node_addition
119+
load_balancer = cluster.profile_manager.default.load_balancing_policy
120+
assert host not in list(load_balancer.make_query_plan())
119121

120122
with pytest.raises(RuntimeError):
121123
cluster.on_add(host, refresh_nodes=False)
@@ -124,6 +126,26 @@ def test_on_add_clears_in_progress_flag_when_later_session_add_fails(self):
124126
finally:
125127
cluster.shutdown()
126128

129+
def test_on_add_excludes_host_from_query_plan_when_pool_future_fails(self):
130+
cluster = Cluster(protocol_version=4)
131+
host = Host("127.0.0.1", SimpleConvictionPolicy, datacenter="dc1", rack="rack1", host_id=uuid.uuid4())
132+
failed_future = Future()
133+
session = Mock()
134+
session.add_or_renew_pool.return_value = failed_future
135+
session.update_created_pools.return_value = set()
136+
cluster.sessions = [session]
137+
138+
try:
139+
cluster.on_add(host, refresh_nodes=False)
140+
141+
failed_future.set_result(False)
142+
143+
load_balancer = cluster.profile_manager.default.load_balancing_policy
144+
assert host not in list(load_balancer.make_query_plan())
145+
assert host.is_up is False
146+
finally:
147+
cluster.shutdown()
148+
127149
def test_on_add_waits_for_all_session_pool_futures_before_marking_host_up(self):
128150
cluster = Cluster(protocol_version=4)
129151
host = Host("127.0.0.1", SimpleConvictionPolicy, host_id=uuid.uuid4())

0 commit comments

Comments
 (0)