Skip to content

Commit 65fedb7

Browse files
committed
cluster: fence stale host add futures
1 parent 8b9b80b commit 65fedb7

2 files changed

Lines changed: 145 additions & 14 deletions

File tree

cassandra/cluster.py

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1870,8 +1870,11 @@ def _cleanup_failed_on_up_handling(self, host):
18701870

18711871
self._start_reconnector(host, is_host_addition=False)
18721872

1873-
def _cleanup_failed_on_add_handling(self, host):
1873+
def _cleanup_failed_on_add_handling(self, host, addition_generation=None):
18741874
with host.lock:
1875+
if (addition_generation is not None and
1876+
getattr(host, "_node_addition_generation", None) is not addition_generation):
1877+
return
18751878
host.set_down()
18761879

18771880
self.profile_manager.on_down(host)
@@ -1880,7 +1883,10 @@ def _cleanup_failed_on_add_handling(self, host):
18801883
session.remove_pool(host)
18811884

18821885
with host.lock:
1883-
host._currently_handling_node_addition = False
1886+
if (addition_generation is None or
1887+
getattr(host, "_node_addition_generation", None) is addition_generation):
1888+
host._currently_handling_node_addition = False
1889+
host._node_addition_generation = None
18841890

18851891
self._start_reconnector(host, is_host_addition=True)
18861892

@@ -2068,11 +2074,13 @@ def on_add(self, host, refresh_nodes=True):
20682074
log.debug("Handling new host %r and notifying listeners", host)
20692075

20702076
# Keep refresh-time pool rebuilds from racing this host's pool creation.
2077+
addition_generation = object()
20712078
with host.lock:
20722079
if getattr(host, "_currently_handling_node_addition", False):
20732080
log.debug("Another thread is already handling add status of node %s", host)
20742081
return
20752082
host._currently_handling_node_addition = True
2083+
host._node_addition_generation = addition_generation
20762084

20772085
have_future = False
20782086
add_aborted = False
@@ -2113,15 +2121,15 @@ def future_completed(future):
21132121

21142122
for exc in [f for f in futures_results if isinstance(f, Exception)]:
21152123
log.error("Unexpected failure while adding node %s, will not mark up:", host, exc_info=exc)
2116-
self._cleanup_failed_on_add_handling(host)
2124+
self._cleanup_failed_on_add_handling(host, addition_generation)
21172125
return
21182126

21192127
if not all(futures_results):
21202128
log.warning("Connection pool could not be created, not marking node %s up", host)
2121-
self._cleanup_failed_on_add_handling(host)
2129+
self._cleanup_failed_on_add_handling(host, addition_generation)
21222130
return
21232131

2124-
self._finalize_add(host)
2132+
self._finalize_add(host, addition_generation=addition_generation)
21252133

21262134
for session in tuple(self.sessions):
21272135
future = session.add_or_renew_pool(host, is_host_addition=True)
@@ -2137,15 +2145,14 @@ def future_completed(future):
21372145
except Exception:
21382146
add_aborted = True
21392147
for future, session in tuple(futures.items()):
2140-
if not future.cancel():
2141-
future.add_done_callback(lambda f, session=session: session.remove_pool(host))
2142-
self._cleanup_failed_on_add_handling(host)
2148+
future.cancel()
2149+
self._cleanup_failed_on_add_handling(host, addition_generation)
21432150
raise
21442151

21452152
if finalize_add is not None:
2146-
self._finalize_add(host, set_up=finalize_add)
2153+
self._finalize_add(host, set_up=finalize_add, addition_generation=addition_generation)
21472154

2148-
def _finalize_add(self, host, set_up=True):
2155+
def _finalize_add(self, host, set_up=True, addition_generation=None):
21492156
try:
21502157
if set_up:
21512158
host.set_up()
@@ -2158,7 +2165,10 @@ def _finalize_add(self, host, set_up=True):
21582165
session.update_created_pools()
21592166
finally:
21602167
with host.lock:
2161-
host._currently_handling_node_addition = False
2168+
if (addition_generation is None or
2169+
getattr(host, "_node_addition_generation", None) is addition_generation):
2170+
host._currently_handling_node_addition = False
2171+
host._node_addition_generation = None
21622172

21632173
def on_remove(self, host):
21642174
if self.is_shutdown:
@@ -3294,15 +3304,27 @@ def add_or_renew_pool(self, host, is_host_addition):
32943304
distance = self._profile_manager.distance(host)
32953305
if distance == HostDistance.IGNORED:
32963306
return None
3307+
addition_generation = getattr(host, "_node_addition_generation", None) if is_host_addition else None
3308+
3309+
def is_stale_addition():
3310+
return (is_host_addition and
3311+
getattr(host, "_node_addition_generation", None) is not addition_generation)
32973312

32983313
def run_add_or_renew_pool():
3314+
if is_stale_addition():
3315+
return False
3316+
32993317
try:
3300-
new_pool = HostConnection(host, distance, self)
3318+
new_pool = HostConnection(host, distance, self)
33013319
except AuthenticationFailed as auth_exc:
3320+
if is_stale_addition():
3321+
return False
33023322
conn_exc = ConnectionException(str(auth_exc), endpoint=host)
33033323
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
33043324
return False
33053325
except Exception as conn_exc:
3326+
if is_stale_addition():
3327+
return False
33063328
log.warning("Failed to create connection pool for new host %s:",
33073329
host, exc_info=conn_exc)
33083330
# the host itself will still be marked down, so we need to pass
@@ -3311,6 +3333,10 @@ def run_add_or_renew_pool():
33113333
host, conn_exc, is_host_addition, expect_host_to_be_down=True)
33123334
return False
33133335

3336+
if is_stale_addition():
3337+
new_pool.shutdown()
3338+
return False
3339+
33143340
previous = self._pools.get(host)
33153341
with self._lock:
33163342
while new_pool._keyspace != self.keyspace:
@@ -3325,12 +3351,19 @@ def callback(pool, errors):
33253351
new_pool._set_keyspace_for_all_conns(self.keyspace, callback)
33263352
set_keyspace_event.wait(self.cluster.connect_timeout)
33273353
if not set_keyspace_event.is_set() or errors_returned:
3354+
if is_stale_addition():
3355+
new_pool.shutdown()
3356+
self._lock.acquire()
3357+
return False
33283358
log.warning("Failed setting keyspace for pool after keyspace changed during connect: %s", errors_returned)
33293359
self.cluster.on_down(host, is_host_addition)
33303360
new_pool.shutdown()
33313361
self._lock.acquire()
33323362
return False
33333363
self._lock.acquire()
3364+
if is_stale_addition():
3365+
new_pool.shutdown()
3366+
return False
33343367
self._pools[host] = new_pool
33353368

33363369
log.debug("Added pool for host %s to session", host)

tests/unit/test_cluster.py

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,19 @@ class RunningPoolSession(object):
137137
def __init__(self):
138138
self.pool_created = False
139139
self.remove_pool_calls = 0
140+
self.generation = None
140141
self.update_created_pools = Mock(return_value=set())
141142

142143
def add_or_renew_pool(self, add_host, is_host_addition):
144+
self.generation = getattr(add_host, "_node_addition_generation", None)
143145
return running_future
144146

147+
def complete_running_pool_creation(self, add_host):
148+
current_generation = getattr(add_host, "_node_addition_generation", None)
149+
if self.generation is None or current_generation is self.generation:
150+
self.pool_created = True
151+
running_future.set_result(True)
152+
145153
def remove_pool(self, remove_host):
146154
self.remove_pool_calls += 1
147155
if remove_host is host and self.pool_created:
@@ -162,13 +170,103 @@ def shutdown(self):
162170

163171
assert running_session.remove_pool_calls == 1
164172

165-
running_session.pool_created = True
166-
running_future.set_result(True)
173+
running_session.complete_running_pool_creation(host)
167174

168175
assert running_session.pool_created is False
169176
finally:
170177
cluster.shutdown()
171178

179+
def test_on_add_aborted_future_does_not_remove_newer_successful_pool(self):
180+
cluster = Cluster(protocol_version=4)
181+
host = Host("127.0.0.1", SimpleConvictionPolicy, datacenter="dc1", rack="rack1", host_id=uuid.uuid4())
182+
183+
class FencedPoolSession(object):
184+
185+
def __init__(self):
186+
self.pool = None
187+
self.add_calls = 0
188+
self.stale_future = Future()
189+
self.stale_future.set_running_or_notify_cancel()
190+
self.stale_generation = None
191+
self.update_created_pools = Mock(return_value=set())
192+
193+
def add_or_renew_pool(self, add_host, is_host_addition):
194+
self.add_calls += 1
195+
if self.add_calls == 1:
196+
self.stale_generation = getattr(add_host, "_node_addition_generation", None)
197+
return self.stale_future
198+
199+
self.pool = "fresh-pool"
200+
completed_future = Future()
201+
completed_future.set_result(True)
202+
return completed_future
203+
204+
def complete_stale_pool_creation(self, add_host):
205+
current_generation = getattr(add_host, "_node_addition_generation", None)
206+
if self.stale_generation is None or current_generation is self.stale_generation:
207+
self.pool = "stale-pool"
208+
self.stale_future.set_result(True)
209+
210+
def remove_pool(self, remove_host):
211+
if remove_host is host:
212+
self.pool = None
213+
214+
def shutdown(self):
215+
pass
216+
217+
pool_session = FencedPoolSession()
218+
recovered_future = Future()
219+
recovered_future.set_result(True)
220+
failing_session = Mock()
221+
failing_session.add_or_renew_pool.side_effect = [RuntimeError("pool add failed"), recovered_future]
222+
failing_session.update_created_pools.return_value = set()
223+
cluster.sessions = [pool_session, failing_session]
224+
225+
try:
226+
with pytest.raises(RuntimeError):
227+
cluster.on_add(host, refresh_nodes=False)
228+
229+
cluster.on_add(host, refresh_nodes=False)
230+
assert pool_session.pool == "fresh-pool"
231+
232+
pool_session.complete_stale_pool_creation(host)
233+
234+
assert pool_session.pool == "fresh-pool"
235+
finally:
236+
cluster.shutdown()
237+
238+
def test_add_or_renew_pool_does_not_signal_stale_addition_failure(self):
239+
host = Host("127.0.0.1", SimpleConvictionPolicy, datacenter="dc1", rack="rack1", host_id=uuid.uuid4())
240+
host._node_addition_generation = object()
241+
242+
session = object.__new__(Session)
243+
session.cluster = Mock(connect_timeout=1)
244+
session.cluster.signal_connection_failure = Mock()
245+
session._profile_manager = Mock()
246+
session._profile_manager.distance.return_value = HostDistance.LOCAL
247+
session.keyspace = None
248+
session._pools = {}
249+
250+
def submit(fn, *args, **kwargs):
251+
future = Future()
252+
try:
253+
future.set_result(fn(*args, **kwargs))
254+
except Exception as exc:
255+
future.set_exception(exc)
256+
return future
257+
258+
session.submit = submit
259+
260+
def raise_after_add_aborts(*args, **kwargs):
261+
host._node_addition_generation = None
262+
raise RuntimeError("stale connect failure")
263+
264+
with patch("cassandra.cluster.HostConnection", side_effect=raise_after_add_aborts):
265+
future = session.add_or_renew_pool(host, is_host_addition=True)
266+
267+
assert future.result() is False
268+
session.cluster.signal_connection_failure.assert_not_called()
269+
172270
def test_on_add_excludes_host_from_query_plan_when_pool_future_fails(self):
173271
cluster = Cluster(protocol_version=4)
174272
host = Host("127.0.0.1", SimpleConvictionPolicy, datacenter="dc1", rack="rack1", host_id=uuid.uuid4())

0 commit comments

Comments
 (0)