Skip to content

Commit a1c83c5

Browse files
committed
cluster: preserve successful add after listener errors
1 parent 14d9330 commit a1c83c5

2 files changed

Lines changed: 57 additions & 35 deletions

File tree

cassandra/cluster.py

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2077,6 +2077,7 @@ def on_add(self, host, refresh_nodes=True):
20772077
have_future = False
20782078
add_aborted = False
20792079
futures = set()
2080+
finalize_add = None
20802081
try:
20812082
self.profile_manager.on_add(host)
20822083
self.control_connection.on_add(host, refresh_nodes)
@@ -2089,59 +2090,61 @@ def on_add(self, host, refresh_nodes=True):
20892090
if distance == HostDistance.IGNORED:
20902091
log.debug("Not adding connection pool for new host %r because the "
20912092
"load balancing policy has marked it as IGNORED", host)
2092-
self._finalize_add(host, set_up=False)
2093-
return
2094-
2095-
futures_lock = Lock()
2096-
futures_results = []
2093+
finalize_add = False
2094+
else:
2095+
futures_lock = Lock()
2096+
futures_results = []
20972097

2098-
def future_completed(future):
2099-
with futures_lock:
2100-
futures.discard(future)
2098+
def future_completed(future):
2099+
with futures_lock:
2100+
futures.discard(future)
21012101

2102-
if add_aborted:
2103-
return
2102+
if add_aborted:
2103+
return
21042104

2105-
try:
2106-
futures_results.append(future.result())
2107-
except Exception as exc:
2108-
futures_results.append(exc)
2105+
try:
2106+
futures_results.append(future.result())
2107+
except Exception as exc:
2108+
futures_results.append(exc)
21092109

2110-
if futures:
2111-
return
2110+
if futures:
2111+
return
21122112

2113-
log.debug('All futures have completed for added host %s', host)
2113+
log.debug('All futures have completed for added host %s', host)
21142114

2115-
for exc in [f for f in futures_results if isinstance(f, Exception)]:
2116-
log.error("Unexpected failure while adding node %s, will not mark up:", host, exc_info=exc)
2117-
self._cleanup_failed_on_add_handling(host)
2118-
return
2115+
for exc in [f for f in futures_results if isinstance(f, Exception)]:
2116+
log.error("Unexpected failure while adding node %s, will not mark up:", host, exc_info=exc)
2117+
self._cleanup_failed_on_add_handling(host)
2118+
return
21192119

2120-
if not all(futures_results):
2121-
log.warning("Connection pool could not be created, not marking node %s up", host)
2122-
self._cleanup_failed_on_add_handling(host)
2123-
return
2120+
if not all(futures_results):
2121+
log.warning("Connection pool could not be created, not marking node %s up", host)
2122+
self._cleanup_failed_on_add_handling(host)
2123+
return
21242124

2125-
self._finalize_add(host)
2125+
self._finalize_add(host)
21262126

2127-
for session in tuple(self.sessions):
2128-
future = session.add_or_renew_pool(host, is_host_addition=True)
2129-
if future is not None:
2130-
have_future = True
2131-
futures.add(future)
2127+
for session in tuple(self.sessions):
2128+
future = session.add_or_renew_pool(host, is_host_addition=True)
2129+
if future is not None:
2130+
have_future = True
2131+
futures.add(future)
21322132

2133-
for future in tuple(futures):
2134-
future.add_done_callback(future_completed)
2133+
for future in tuple(futures):
2134+
future.add_done_callback(future_completed)
21352135

2136-
if not have_future:
2137-
self._finalize_add(host)
2136+
if not have_future:
2137+
finalize_add = True
21382138
except Exception:
21392139
add_aborted = True
21402140
for future in tuple(futures):
21412141
future.cancel()
21422142
self._cleanup_failed_on_add_handling(host)
21432143
raise
21442144

2145+
if finalize_add is not None:
2146+
self._finalize_add(host, set_up=finalize_add)
2147+
21452148
def _finalize_add(self, host, set_up=True):
21462149
try:
21472150
if set_up:

tests/unit/test_cluster.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,25 @@ def reentrant_add_while_cleanup_removes_host(cleanup_host):
177177
finally:
178178
cluster.shutdown()
179179

180+
def test_on_add_listener_failure_does_not_mark_successful_add_down(self):
181+
cluster = Cluster(protocol_version=4)
182+
host = Host("127.0.0.1", SimpleConvictionPolicy, datacenter="dc1", rack="rack1", host_id=uuid.uuid4())
183+
listener = Mock()
184+
listener.on_add.side_effect = RuntimeError("listener failed")
185+
cluster.register_listener(listener)
186+
187+
try:
188+
with pytest.raises(RuntimeError):
189+
cluster.on_add(host, refresh_nodes=False)
190+
191+
load_balancer = cluster.profile_manager.default.load_balancing_policy
192+
assert host.is_up is True
193+
assert host in list(load_balancer.make_query_plan())
194+
assert not host.is_currently_reconnecting()
195+
assert not host._currently_handling_node_addition
196+
finally:
197+
cluster.shutdown()
198+
180199
def test_on_add_waits_for_all_session_pool_futures_before_marking_host_up(self):
181200
cluster = Cluster(protocol_version=4)
182201
host = Host("127.0.0.1", SimpleConvictionPolicy, host_id=uuid.uuid4())

0 commit comments

Comments
 (0)