Skip to content

Commit 8b9b80b

Browse files
committed
cluster: remove late pools after aborted host add
1 parent a1c83c5 commit 8b9b80b

2 files changed

Lines changed: 51 additions & 8 deletions

File tree

cassandra/cluster.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2076,7 +2076,9 @@ def on_add(self, host, refresh_nodes=True):
20762076

20772077
have_future = False
20782078
add_aborted = False
2079-
futures = set()
2079+
futures = {}
2080+
futures_lock = Lock()
2081+
futures_results = []
20802082
finalize_add = None
20812083
try:
20822084
self.profile_manager.on_add(host)
@@ -2092,12 +2094,9 @@ def on_add(self, host, refresh_nodes=True):
20922094
"load balancing policy has marked it as IGNORED", host)
20932095
finalize_add = False
20942096
else:
2095-
futures_lock = Lock()
2096-
futures_results = []
2097-
20982097
def future_completed(future):
20992098
with futures_lock:
2100-
futures.discard(future)
2099+
futures.pop(future, None)
21012100

21022101
if add_aborted:
21032102
return
@@ -2128,7 +2127,7 @@ def future_completed(future):
21282127
future = session.add_or_renew_pool(host, is_host_addition=True)
21292128
if future is not None:
21302129
have_future = True
2131-
futures.add(future)
2130+
futures[future] = session
21322131

21332132
for future in tuple(futures):
21342133
future.add_done_callback(future_completed)
@@ -2137,8 +2136,9 @@ def future_completed(future):
21372136
finalize_add = True
21382137
except Exception:
21392138
add_aborted = True
2140-
for future in tuple(futures):
2141-
future.cancel()
2139+
for future, session in tuple(futures.items()):
2140+
if not future.cancel():
2141+
future.add_done_callback(lambda f, session=session: session.remove_pool(host))
21422142
self._cleanup_failed_on_add_handling(host)
21432143
raise
21442144

tests/unit/test_cluster.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,49 @@ def test_on_add_clears_in_progress_flag_when_later_session_add_fails(self):
126126
finally:
127127
cluster.shutdown()
128128

129+
def test_on_add_removes_pool_created_by_running_future_after_add_aborts(self):
130+
cluster = Cluster(protocol_version=4)
131+
host = Host("127.0.0.1", SimpleConvictionPolicy, datacenter="dc1", rack="rack1", host_id=uuid.uuid4())
132+
running_future = Future()
133+
running_future.set_running_or_notify_cancel()
134+
135+
class RunningPoolSession(object):
136+
137+
def __init__(self):
138+
self.pool_created = False
139+
self.remove_pool_calls = 0
140+
self.update_created_pools = Mock(return_value=set())
141+
142+
def add_or_renew_pool(self, add_host, is_host_addition):
143+
return running_future
144+
145+
def remove_pool(self, remove_host):
146+
self.remove_pool_calls += 1
147+
if remove_host is host and self.pool_created:
148+
self.pool_created = False
149+
150+
def shutdown(self):
151+
pass
152+
153+
running_session = RunningPoolSession()
154+
failing_session = Mock()
155+
failing_session.add_or_renew_pool.side_effect = RuntimeError("pool add failed")
156+
failing_session.update_created_pools.return_value = set()
157+
cluster.sessions = [running_session, failing_session]
158+
159+
try:
160+
with pytest.raises(RuntimeError):
161+
cluster.on_add(host, refresh_nodes=False)
162+
163+
assert running_session.remove_pool_calls == 1
164+
165+
running_session.pool_created = True
166+
running_future.set_result(True)
167+
168+
assert running_session.pool_created is False
169+
finally:
170+
cluster.shutdown()
171+
129172
def test_on_add_excludes_host_from_query_plan_when_pool_future_fails(self):
130173
cluster = Cluster(protocol_version=4)
131174
host = Host("127.0.0.1", SimpleConvictionPolicy, datacenter="dc1", rack="rack1", host_id=uuid.uuid4())

0 commit comments

Comments
 (0)